Content-Length: 811817 | pFad | https://github.com/apache/airflow/commit/4dcdc349964647ade80702e3d34bbf819ddf7661

CA Add explicit support of stream (realtime) pipelines for CloudDataFusi… · apache/airflow@4dcdc34 · GitHub
Skip to content

Commit 4dcdc34

Browse files
authored
Add explicit support of stream (realtime) pipelines for CloudDataFusionStartPipelineOperator (#34271)
1 parent 6042e76 commit 4dcdc34

File tree

8 files changed

+219
-10
lines changed

8 files changed

+219
-10
lines changed

airflow/providers/google/cloud/hooks/datafusion.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from googleapiclient.discovery import Resource, build
3232

3333
from airflow.exceptions import AirflowException, AirflowNotFoundException
34+
from airflow.providers.google.cloud.utils.datafusion import DataFusionPipelineType
3435
from airflow.providers.google.common.hooks.base_google import (
3536
PROVIDE_PROJECT_ID,
3637
GoogleBaseAsyncHook,
@@ -105,6 +106,7 @@ def wait_for_pipeline_state(
105106
pipeline_name: str,
106107
pipeline_id: str,
107108
instance_url: str,
109+
pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
108110
namespace: str = "default",
109111
success_states: list[str] | None = None,
110112
failure_states: list[str] | None = None,
@@ -120,6 +122,7 @@ def wait_for_pipeline_state(
120122
workflow = self.get_pipeline_workflow(
121123
pipeline_name=pipeline_name,
122124
pipeline_id=pipeline_id,
125+
pipeline_type=pipeline_type,
123126
instance_url=instance_url,
124127
namespace=namespace,
125128
)
@@ -432,13 +435,14 @@ def get_pipeline_workflow(
432435
pipeline_name: str,
433436
instance_url: str,
434437
pipeline_id: str,
438+
pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
435439
namespace: str = "default",
436440
) -> Any:
437441
url = os.path.join(
438442
self._base_url(instance_url, namespace),
439443
quote(pipeline_name),
440-
"workflows",
441-
"DataPipelineWorkflow",
444+
f"{self.cdap_program_type(pipeline_type=pipeline_type)}s",
445+
self.cdap_program_id(pipeline_type=pipeline_type),
442446
"runs",
443447
quote(pipeline_id),
444448
)
@@ -453,13 +457,15 @@ def start_pipeline(
453457
self,
454458
pipeline_name: str,
455459
instance_url: str,
460+
pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
456461
namespace: str = "default",
457462
runtime_args: dict[str, Any] | None = None,
458463
) -> str:
459464
"""
460465
Starts a Cloud Data Fusion pipeline. Works for both batch and stream pipelines.
461466
462467
:param pipeline_name: Your pipeline name.
468+
:param pipeline_type: Optional pipeline type (BATCH by default).
463469
:param instance_url: Endpoint on which the REST APIs is accessible for the instance.
464470
:param runtime_args: Optional runtime JSON args to be passed to the pipeline
465471
:param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID
@@ -480,9 +486,9 @@ def start_pipeline(
480486
body = [
481487
{
482488
"appId": pipeline_name,
483-
"programType": "workflow",
484-
"programId": "DataPipelineWorkflow",
485489
"runtimeargs": runtime_args,
490+
"programType": self.cdap_program_type(pipeline_type=pipeline_type),
491+
"programId": self.cdap_program_id(pipeline_type=pipeline_type),
486492
}
487493
]
488494
response = self._cdap_request(url=url, method="POST", body=body)
@@ -514,6 +520,30 @@ def stop_pipeline(self, pipeline_name: str, instance_url: str, namespace: str =
514520
response, f"Stopping a pipeline failed with code {response.status}"
515521
)
516522

523+
@staticmethod
524+
def cdap_program_type(pipeline_type: DataFusionPipelineType) -> str:
525+
"""Retrieves CDAP Program type depending on the pipeline type.
526+
527+
:param pipeline_type: Pipeline type.
528+
"""
529+
program_types = {
530+
DataFusionPipelineType.BATCH: "workflow",
531+
DataFusionPipelineType.STREAM: "spark",
532+
}
533+
return program_types.get(pipeline_type, "")
534+
535+
@staticmethod
536+
def cdap_program_id(pipeline_type: DataFusionPipelineType) -> str:
537+
"""Retrieves CDAP Program id depending on the pipeline type.
538+
539+
:param pipeline_type: Pipeline type.
540+
"""
541+
program_ids = {
542+
DataFusionPipelineType.BATCH: "DataPipelineWorkflow",
543+
DataFusionPipelineType.STREAM: "DataStreamsSparkStreaming",
544+
}
545+
return program_ids.get(pipeline_type, "")
546+
517547

518548
class DataFusionAsyncHook(GoogleBaseAsyncHook):
519549
"""Class to get asynchronous hook for DataFusion."""
@@ -561,10 +591,13 @@ async def get_pipeline(
561591
pipeline_name: str,
562592
pipeline_id: str,
563593
session,
594+
pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
564595
):
596+
program_type = self.sync_hook_class.cdap_program_type(pipeline_type=pipeline_type)
597+
program_id = self.sync_hook_class.cdap_program_id(pipeline_type=pipeline_type)
565598
base_url_link = self._base_url(instance_url, namespace)
566599
url = urljoin(
567-
base_url_link, f"{quote(pipeline_name)}/workflows/DataPipelineWorkflow/runs/{quote(pipeline_id)}"
600+
base_url_link, f"{quote(pipeline_name)}/{program_type}s/{program_id}/runs/{quote(pipeline_id)}"
568601
)
569602
return await self._get_link(url=url, session=session)
570603

@@ -573,6 +606,7 @@ async def get_pipeline_status(
573606
pipeline_name: str,
574607
instance_url: str,
575608
pipeline_id: str,
609+
pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
576610
namespace: str = "default",
577611
success_states: list[str] | None = None,
578612
) -> str:
@@ -581,7 +615,8 @@ async def get_pipeline_status(
581615
582616
:param pipeline_name: Your pipeline name.
583617
:param instance_url: Endpoint on which the REST APIs is accessible for the instance.
584-
:param pipeline_id: Unique pipeline ID associated with specific pipeline
618+
:param pipeline_id: Unique pipeline ID associated with specific pipeline.
619+
:param pipeline_type: Optional pipeline type (by default batch).
585620
:param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID
586621
is always default. If your pipeline belongs to an Enterprise edition instance, you
587622
can create a namespace.
@@ -596,6 +631,7 @@ async def get_pipeline_status(
596631
namespace=namespace,
597632
pipeline_name=pipeline_name,
598633
pipeline_id=pipeline_id,
634+
pipeline_type=pipeline_type,
599635
session=session,
600636
)
601637
pipeline = await pipeline.json(content_type=None)

airflow/providers/google/cloud/operators/datafusion.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
)
3434
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
3535
from airflow.providers.google.cloud.triggers.datafusion import DataFusionStartPipelineTrigger
36+
from airflow.providers.google.cloud.utils.datafusion import DataFusionPipelineType
3637

3738
if TYPE_CHECKING:
3839
from airflow.utils.context import Context
@@ -708,6 +709,7 @@ class CloudDataFusionStartPipelineOperator(GoogleCloudBaseOperator):
708709
:ref:`howto/operator:CloudDataFusionStartPipelineOperator`
709710
710711
:param pipeline_name: Your pipeline name.
712+
:param pipeline_type: Optional pipeline type (BATCH by default).
711713
:param instance_name: The name of the instance.
712714
:param success_states: If provided the operator will wait for pipeline to be in one of
713715
the provided states.
@@ -752,6 +754,7 @@ def __init__(
752754
pipeline_name: str,
753755
instance_name: str,
754756
location: str,
757+
pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
755758
runtime_args: dict[str, Any] | None = None,
756759
success_states: list[str] | None = None,
757760
namespace: str = "default",
@@ -767,6 +770,7 @@ def __init__(
767770
) -> None:
768771
super().__init__(**kwargs)
769772
self.pipeline_name = pipeline_name
773+
self.pipeline_type = pipeline_type
770774
self.runtime_args = runtime_args
771775
self.namespace = namespace
772776
self.instance_name = instance_name
@@ -800,6 +804,7 @@ def execute(self, context: Context) -> str:
800804
api_url = instance["apiEndpoint"]
801805
pipeline_id = hook.start_pipeline(
802806
pipeline_name=self.pipeline_name,
807+
pipeline_type=self.pipeline_type,
803808
instance_url=api_url,
804809
namespace=self.namespace,
805810
runtime_args=self.runtime_args,
@@ -824,6 +829,7 @@ def execute(self, context: Context) -> str:
824829
instance_url=api_url,
825830
namespace=self.namespace,
826831
pipeline_name=self.pipeline_name,
832+
pipeline_type=self.pipeline_type.value,
827833
pipeline_id=pipeline_id,
828834
poll_interval=self.poll_interval,
829835
gcp_conn_id=self.gcp_conn_id,
@@ -839,6 +845,7 @@ def execute(self, context: Context) -> str:
839845
success_states=self.success_states,
840846
pipeline_id=pipeline_id,
841847
pipeline_name=self.pipeline_name,
848+
pipeline_type=self.pipeline_type,
842849
namespace=self.namespace,
843850
instance_url=api_url,
844851
timeout=self.pipeline_timeout,

airflow/providers/google/cloud/triggers/datafusion.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from typing import Any, AsyncIterator, Sequence
2121

2222
from airflow.providers.google.cloud.hooks.datafusion import DataFusionAsyncHook
23+
from airflow.providers.google.cloud.utils.datafusion import DataFusionPipelineType
2324
from airflow.triggers.base import BaseTrigger, TriggerEvent
2425

2526

@@ -30,6 +31,7 @@ class DataFusionStartPipelineTrigger(BaseTrigger):
3031
:param pipeline_name: Your pipeline name.
3132
:param instance_url: Endpoint on which the REST APIs is accessible for the instance.
3233
:param pipeline_id: Unique pipeline ID associated with specific pipeline
34+
:param pipeline_type: Your pipeline type.
3335
:param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID
3436
is always default. If your pipeline belongs to an Enterprise edition instance, you
3537
can create a namespace.
@@ -51,6 +53,7 @@ def __init__(
5153
namespace: str,
5254
pipeline_name: str,
5355
pipeline_id: str,
56+
pipeline_type: str,
5457
poll_interval: float = 3.0,
5558
gcp_conn_id: str = "google_cloud_default",
5659
impersonation_chain: str | Sequence[str] | None = None,
@@ -61,6 +64,7 @@ def __init__(
6164
self.namespace = namespace
6265
self.pipeline_name = pipeline_name
6366
self.pipeline_id = pipeline_id
67+
self.pipeline_type = pipeline_type
6468
self.poll_interval = poll_interval
6569
self.gcp_conn_id = gcp_conn_id
6670
self.impersonation_chain = impersonation_chain
@@ -76,6 +80,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
7680
"namespace": self.namespace,
7781
"pipeline_name": self.pipeline_name,
7882
"pipeline_id": self.pipeline_id,
83+
"pipeline_type": self.pipeline_type,
7984
"success_states": self.success_states,
8085
},
8186
)
@@ -92,6 +97,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
9297
namespace=self.namespace,
9398
pipeline_name=self.pipeline_name,
9499
pipeline_id=self.pipeline_id,
100+
pipeline_type=DataFusionPipelineType.from_str(self.pipeline_type),
95101
)
96102
if response_from_hook == "success":
97103
yield TriggerEvent(
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from enum import Enum
20+
21+
22+
class DataFusionPipelineType(Enum):
23+
"""Enum for Data Fusion pipeline types."""
24+
25+
BATCH = "batch"
26+
STREAM = "stream"
27+
28+
@staticmethod
29+
def from_str(value: str) -> DataFusionPipelineType:
30+
value_to_item = {item.value: item for item in DataFusionPipelineType}
31+
if value in value_to_item:
32+
return value_to_item[value]
33+
raise ValueError(f"Invalid value '{value}'. Valid values are: {[i for i in value_to_item.keys()]}")

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/4dcdc349964647ade80702e3d34bbf819ddf7661

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy