Content-Length: 656294 | pFad | https://github.com/apache/airflow/commit/c88e746494a0ccc718687fe230b02390309c0ea7

36 Dynamic setting up of artifact versinos for Datafusion pipelines (#34… · apache/airflow@c88e746 · GitHub
Skip to content

Commit c88e746

Browse files
authored
Dynamic setting up of artifact versinos for Datafusion pipelines (#34068)
1 parent ba59f34 commit c88e746

File tree

5 files changed

+80
-294
lines changed

5 files changed

+80
-294
lines changed

airflow/providers/google/cloud/hooks/datafusion.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ def _base_url(instance_url: str, namespace: str) -> str:
153153
return os.path.join(instance_url, "v3", "namespaces", quote(namespace), "apps")
154154

155155
def _cdap_request(
156-
self, url: str, method: str, body: list | dict | None = None
156+
self, url: str, method: str, body: list | dict | None = None, params: dict | None = None
157157
) -> google.auth.transport.Response:
158158
headers: dict[str, str] = {"Content-Type": "application/json"}
159159
request = google.auth.transport.requests.Request()
@@ -163,7 +163,7 @@ def _cdap_request(
163163

164164
payload = json.dumps(body) if body else None
165165

166-
response = request(method=method, url=url, headers=headers, body=payload)
166+
response = request(method=method, url=url, headers=headers, body=payload, params=params)
167167
return response
168168

169169
@staticmethod
@@ -282,6 +282,23 @@ def get_instance(self, instance_name: str, location: str, project_id: str) -> di
282282
)
283283
return instance
284284

285+
def get_instance_artifacts(
286+
self, instance_url: str, namespace: str = "default", scope: str = "SYSTEM"
287+
) -> Any:
288+
url = os.path.join(
289+
instance_url,
290+
"v3",
291+
"namespaces",
292+
quote(namespace),
293+
"artifacts",
294+
)
295+
response = self._cdap_request(url=url, method="GET", params={"scope": scope})
296+
self._check_response_status_and_data(
297+
response, f"Retrieving an instance artifacts failed with code {response.status}"
298+
)
299+
content = json.loads(response.data)
300+
return content
301+
285302
@GoogleBaseHook.fallback_to_default_project_id
286303
def patch_instance(
287304
self,

docs/apache-airflow-providers-google/operators/cloud/datafusion.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ It is not possible to use both asynchronous and deferrable parameters at the sam
179179
Please, check the example of using deferrable mode:
180180
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionStartPipelineOperator`.
181181

182-
.. exampleinclude:: /../../tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
182+
.. exampleinclude:: /../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
183183
:language: python
184184
:dedent: 4
185185
:start-after: [START howto_cloud_data_fusion_start_pipeline_def]

tests/providers/google/cloud/hooks/test_datafusion.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,24 @@ def test_get_instance(self, get_conn_mock, hook):
168168
assert result == "value"
169169
method_mock.assert_called_once_with(name=hook._name(PROJECT_ID, LOCATION, INSTANCE_NAME))
170170

171+
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
172+
def test_get_instance_artifacts(self, mock_request, hook):
173+
scope = "SYSTEM"
174+
artifact = {
175+
"name": "test-artifact",
176+
"version": "1.2.3",
177+
"scope": scope,
178+
}
179+
mock_request.return_value = mock.MagicMock(status=200, data=json.dumps([artifact]))
180+
181+
hook.get_instance_artifacts(instance_url=INSTANCE_URL, scope=scope)
182+
183+
mock_request.assert_called_with(
184+
url=f"{INSTANCE_URL}/v3/namespaces/default/artifacts",
185+
method="GET",
186+
params={"scope": scope},
187+
)
188+
171189
@mock.patch("google.auth.transport.requests.Request")
172190
@mock.patch(HOOK_STR.format("DataFusionHook.get_credentials"))
173191
def test_cdap_request(self, get_credentials_mock, mock_request, hook):
@@ -177,14 +195,17 @@ def test_cdap_request(self, get_credentials_mock, mock_request, hook):
177195
request = mock_request.return_value
178196
request.return_value = mock.MagicMock()
179197
body = {"data": "value"}
198+
params = {"param_key": "param_value"}
180199

181-
result = hook._cdap_request(url=url, method=method, body=body)
200+
result = hook._cdap_request(url=url, method=method, body=body, params=params)
182201
mock_request.assert_called_once_with()
183202
get_credentials_mock.assert_called_once_with()
184203
get_credentials_mock.return_value.before_request.assert_called_once_with(
185204
request=request, method=method, url=url, headers=headers
186205
)
187-
request.assert_called_once_with(method=method, url=url, headers=headers, body=json.dumps(body))
206+
request.assert_called_once_with(
207+
method=method, url=url, headers=headers, body=json.dumps(body), params=params
208+
)
188209
assert result == request.return_value
189210

190211
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))

tests/system/providers/google/cloud/datafusion/example_datafusion.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
from datetime import datetime
2424

2525
from airflow import models
26+
from airflow.decorators import task
27+
from airflow.providers.google.cloud.hooks.datafusion import DataFusionHook
2628
from airflow.providers.google.cloud.operators.datafusion import (
2729
CloudDataFusionCreateInstanceOperator,
2830
CloudDataFusionCreatePipelineOperator,
@@ -61,7 +63,7 @@
6163
PIPELINE = {
6264
"artifact": {
6365
"name": "cdap-data-pipeline",
64-
"version": "6.8.3",
66+
"version": "{{ task_instance.xcom_pull(task_ids='get_artifacts_versions')['cdap-data-pipeline'] }}",
6567
"scope": "SYSTEM",
6668
},
6769
"description": "Data Pipeline Application",
@@ -82,7 +84,12 @@
8284
"name": "GCSFile",
8385
"type": "batchsource",
8486
"label": "GCS",
85-
"artifact": {"name": "google-cloud", "version": "0.21.2", "scope": "SYSTEM"},
87+
"artifact": {
88+
"name": "google-cloud",
89+
"version": "{{ task_instance.xcom_pull(task_ids='get_artifacts_versions')\
90+
['google-cloud'] }}",
91+
"scope": "SYSTEM",
92+
},
8693
"properties": {
8794
"project": "auto-detect",
8895
"format": "text",
@@ -111,7 +118,12 @@
111118
"name": "GCS",
112119
"type": "batchsink",
113120
"label": "GCS2",
114-
"artifact": {"name": "google-cloud", "version": "0.21.2", "scope": "SYSTEM"},
121+
"artifact": {
122+
"name": "google-cloud",
123+
"version": "{{ task_instance.xcom_pull(task_ids='get_artifacts_versions')\
124+
['google-cloud'] }}",
125+
"scope": "SYSTEM",
126+
},
115127
"properties": {
116128
"project": "auto-detect",
117129
"suffix": "yyyy-MM-dd-HH-mm",
@@ -147,6 +159,9 @@
147159
}
148160
# [END howto_data_fusion_env_variables]
149161

162+
CloudDataFusionCreatePipelineOperator.template_fields += ("pipeline",)
163+
164+
150165
with models.DAG(
151166
DAG_ID,
152167
start_date=datetime(2021, 1, 1),
@@ -196,6 +211,13 @@
196211
)
197212
# [END howto_cloud_data_fusion_update_instance_operator]
198213

214+
@task(task_id="get_artifacts_versions")
215+
def get_artifacts_versions(ti) -> dict:
216+
hook = DataFusionHook()
217+
instance_url = ti.xcom_pull(task_ids="get_instance", key="return_value")["apiEndpoint"]
218+
artifacts = hook.get_instance_artifacts(instance_url=instance_url, namespace="default")
219+
return {item["name"]: item["version"] for item in artifacts}
220+
199221
# [START howto_cloud_data_fusion_create_pipeline]
200222
create_pipeline = CloudDataFusionCreatePipelineOperator(
201223
location=LOCATION,
@@ -221,6 +243,16 @@
221243
)
222244
# [END howto_cloud_data_fusion_start_pipeline]
223245

246+
# [START howto_cloud_data_fusion_start_pipeline_def]
247+
start_pipeline_def = CloudDataFusionStartPipelineOperator(
248+
location=LOCATION,
249+
pipeline_name=PIPELINE_NAME,
250+
instance_name=INSTANCE_NAME,
251+
task_id="start_pipeline_def",
252+
deferrable=True,
253+
)
254+
# [END howto_cloud_data_fusion_start_pipeline_def]
255+
224256
# [START howto_cloud_data_fusion_start_pipeline_async]
225257
start_pipeline_async = CloudDataFusionStartPipelineOperator(
226258
location=LOCATION,
@@ -284,10 +316,12 @@
284316
# TEST BODY
285317
>> create_instance
286318
>> get_instance
319+
>> get_artifacts_versions()
287320
>> restart_instance
288321
>> update_instance
289322
>> create_pipeline
290323
>> list_pipelines
324+
>> start_pipeline_def
291325
>> start_pipeline_async
292326
>> start_pipeline_sensor
293327
>> start_pipeline

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/c88e746494a0ccc718687fe230b02390309c0ea7

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy