Content-Length: 1076705 | pFad | https://github.com/apache/airflow/commit/448e50bd23b4493980a41a5d4241ad3ecef087fb

FA Updating Google Cloud example DAGs to use XComArgs (#16875) · apache/airflow@448e50b · GitHub
Skip to content

Commit 448e50b

Browse files
authored
Updating Google Cloud example DAGs to use XComArgs (#16875)
1 parent aaf44cc commit 448e50b

26 files changed

+271
-177
lines changed

airflow/providers/google/cloud/example_dags/example_automl_nl_text_classification.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
6868
)
6969

70-
dataset_id = '{{ task_instance.xcom_pull("create_dataset_task", key="dataset_id") }}'
70+
dataset_id = create_dataset_task.output['dataset_id']
7171

7272
import_dataset_task = AutoMLImportDataOperator(
7373
task_id="import_dataset_task",
@@ -80,7 +80,7 @@
8080

8181
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
8282

83-
model_id = "{{ task_instance.xcom_pull('create_model', key='model_id') }}"
83+
model_id = create_model.output['model_id']
8484

8585
delete_model_task = AutoMLDeleteModelOperator(
8686
task_id="delete_model_task",
@@ -96,4 +96,10 @@
9696
project_id=GCP_PROJECT_ID,
9797
)
9898

99-
create_dataset_task >> import_dataset_task >> create_model >> delete_model_task >> delete_datasets_task
99+
import_dataset_task >> create_model
100+
delete_model_task >> delete_datasets_task
101+
102+
# Task dependencies created via `XComArgs`:
103+
# create_dataset_task >> import_dataset_task
104+
# create_dataset_task >> create_model
105+
# create_dataset_task >> delete_datasets_task

airflow/providers/google/cloud/example_dags/example_automl_nl_text_extraction.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
6868
)
6969

70-
dataset_id = '{{ task_instance.xcom_pull("create_dataset_task", key="dataset_id") }}'
70+
dataset_id = create_dataset_task.output['dataset_id']
7171

7272
import_dataset_task = AutoMLImportDataOperator(
7373
task_id="import_dataset_task",
@@ -80,7 +80,7 @@
8080

8181
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
8282

83-
model_id = "{{ task_instance.xcom_pull('create_model', key='model_id') }}"
83+
model_id = create_model.output['model_id']
8484

8585
delete_model_task = AutoMLDeleteModelOperator(
8686
task_id="delete_model_task",
@@ -96,4 +96,11 @@
9696
project_id=GCP_PROJECT_ID,
9797
)
9898

99-
create_dataset_task >> import_dataset_task >> create_model >> delete_model_task >> delete_datasets_task
99+
import_dataset_task >> create_model
100+
delete_model_task >> delete_datasets_task
101+
102+
# Task dependencies created via `XComArgs`:
103+
# create_dataset_task >> import_dataset_task
104+
# create_dataset_task >> create_model
105+
# create_model >> delete_model_task
106+
# create_dataset_task >> delete_datasets_task

airflow/providers/google/cloud/example_dags/example_automl_nl_text_sentiment.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
6969
)
7070

71-
dataset_id = '{{ task_instance.xcom_pull("create_dataset_task", key="dataset_id") }}'
71+
dataset_id = create_dataset_task.output['dataset_id']
7272

7373
import_dataset_task = AutoMLImportDataOperator(
7474
task_id="import_dataset_task",
@@ -81,7 +81,7 @@
8181

8282
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
8383

84-
model_id = "{{ task_instance.xcom_pull('create_model', key='model_id') }}"
84+
model_id = create_model.output['model_id']
8585

8686
delete_model_task = AutoMLDeleteModelOperator(
8787
task_id="delete_model_task",
@@ -97,4 +97,11 @@
9797
project_id=GCP_PROJECT_ID,
9898
)
9999

100-
create_dataset_task >> import_dataset_task >> create_model >> delete_model_task >> delete_datasets_task
100+
import_dataset_task >> create_model
101+
delete_model_task >> delete_datasets_task
102+
103+
# Task dependencies created via `XComArgs`:
104+
# create_dataset_task >> import_dataset_task
105+
# create_dataset_task >> create_model
106+
# create_model >> delete_model_task
107+
# create_dataset_task >> delete_datasets_task

airflow/providers/google/cloud/example_dags/example_automl_tables.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
101101
project_id=GCP_PROJECT_ID,
102102
)
103103

104-
dataset_id = "{{ task_instance.xcom_pull('create_dataset_task', key='dataset_id') }}"
104+
dataset_id = create_dataset_task.output['dataset_id']
105105
# [END howto_operator_automl_create_dataset]
106106

107107
MODEL["dataset_id"] = dataset_id
@@ -156,7 +156,7 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
156156
project_id=GCP_PROJECT_ID,
157157
)
158158

159-
model_id = "{{ task_instance.xcom_pull('create_model_task', key='model_id') }}"
159+
model_id = create_model_task.output['model_id']
160160
# [END howto_operator_automl_create_model]
161161

162162
# [START howto_operator_automl_delete_model]
@@ -176,15 +176,21 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
176176
)
177177

178178
(
179-
create_dataset_task
180-
>> import_dataset_task
179+
import_dataset_task
181180
>> list_tables_spec_task
182181
>> list_columns_spec_task
183182
>> update_dataset_task
184183
>> create_model_task
185-
>> delete_model_task
186-
>> delete_datasets_task
187184
)
185+
delete_model_task >> delete_datasets_task
186+
187+
# Task dependencies created via `XComArgs`:
188+
# create_dataset_task >> import_dataset_task
189+
# create_dataset_task >> list_tables_spec_task
190+
# create_dataset_task >> list_columns_spec_task
191+
# create_dataset_task >> create_model_task
192+
# create_model_task >> delete_model_task
193+
# create_dataset_task >> delete_datasets_task
188194

189195

190196
# Example DAG for AutoML datasets operations
@@ -201,7 +207,7 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
201207
project_id=GCP_PROJECT_ID,
202208
)
203209

204-
dataset_id = '{{ task_instance.xcom_pull("create_dataset_task", key="dataset_id") }}'
210+
dataset_id = create_dataset_task.output['dataset_id']
205211

206212
import_dataset_task = AutoMLImportDataOperator(
207213
task_id="import_dataset_task",
@@ -243,14 +249,19 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
243249
# [END howto_operator_delete_dataset]
244250

245251
(
246-
create_dataset_task
247-
>> import_dataset_task
252+
import_dataset_task
248253
>> list_tables_spec_task
249254
>> list_columns_spec_task
250255
>> list_datasets_task
251256
>> delete_datasets_task
252257
)
253258

259+
# Task dependencies created via `XComArgs`:
260+
# create_dataset_task >> import_dataset_task
261+
# create_dataset_task >> list_tables_spec_task
262+
# create_dataset_task >> list_columns_spec_task
263+
264+
254265
with models.DAG(
255266
"example_gcp_get_deploy",
256267
schedule_interval=None, # Override to match your needs

airflow/providers/google/cloud/example_dags/example_automl_translation.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
7575
)
7676

77-
dataset_id = '{{ task_instance.xcom_pull("create_dataset_task", key="dataset_id") }}'
77+
dataset_id = create_dataset_task.output["dataset_id"]
7878

7979
import_dataset_task = AutoMLImportDataOperator(
8080
task_id="import_dataset_task",
@@ -87,7 +87,7 @@
8787

8888
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
8989

90-
model_id = "{{ task_instance.xcom_pull('create_model', key='model_id') }}"
90+
model_id = create_model.output["model_id"]
9191

9292
delete_model_task = AutoMLDeleteModelOperator(
9393
task_id="delete_model_task",
@@ -103,4 +103,11 @@
103103
project_id=GCP_PROJECT_ID,
104104
)
105105

106-
create_dataset_task >> import_dataset_task >> create_model >> delete_model_task >> delete_datasets_task
106+
import_dataset_task >> create_model
107+
delete_model_task >> delete_datasets_task
108+
109+
# Task dependencies created via `XComArgs`:
110+
# create_dataset_task >> import_dataset_task
111+
# create_dataset_task >> create_model
112+
# create_model >> delete_model_task
113+
# create_dataset_task >> delete_datasets_task

airflow/providers/google/cloud/example_dags/example_automl_video_intelligence_classification.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
7272
)
7373

74-
dataset_id = '{{ task_instance.xcom_pull("create_dataset_task", key="dataset_id") }}'
74+
dataset_id = create_dataset_task.output["dataset_id"]
7575

7676
import_dataset_task = AutoMLImportDataOperator(
7777
task_id="import_dataset_task",
@@ -84,7 +84,7 @@
8484

8585
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
8686

87-
model_id = "{{ task_instance.xcom_pull('create_model', key='model_id') }}"
87+
model_id = create_model.output["model_id"]
8888

8989
delete_model_task = AutoMLDeleteModelOperator(
9090
task_id="delete_model_task",
@@ -100,4 +100,11 @@
100100
project_id=GCP_PROJECT_ID,
101101
)
102102

103-
create_dataset_task >> import_dataset_task >> create_model >> delete_model_task >> delete_datasets_task
103+
import_dataset_task >> create_model
104+
delete_model_task >> delete_datasets_task
105+
106+
# Task dependencies created via `XComArgs`:
107+
# create_dataset_task >> import_dataset_task
108+
# create_dataset_task >> create_model
109+
# create_model >> delete_model_task
110+
# create_dataset_task >> delete_datasets_task

airflow/providers/google/cloud/example_dags/example_automl_video_intelligence_tracking.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
7373
)
7474

75-
dataset_id = '{{ task_instance.xcom_pull("create_dataset_task", key="dataset_id") }}'
75+
dataset_id = create_dataset_task.output["dataset_id"]
7676

7777
import_dataset_task = AutoMLImportDataOperator(
7878
task_id="import_dataset_task",
@@ -85,7 +85,7 @@
8585

8686
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
8787

88-
model_id = "{{ task_instance.xcom_pull('create_model', key='model_id') }}"
88+
model_id = create_model.output["model_id"]
8989

9090
delete_model_task = AutoMLDeleteModelOperator(
9191
task_id="delete_model_task",
@@ -101,4 +101,11 @@
101101
project_id=GCP_PROJECT_ID,
102102
)
103103

104-
create_dataset_task >> import_dataset_task >> create_model >> delete_model_task >> delete_datasets_task
104+
import_dataset_task >> create_model
105+
delete_model_task >> delete_datasets_task
106+
107+
# Task dependencies created via `XComArgs`:
108+
# create_dataset_task >> import_dataset_task
109+
# create_dataset_task >> create_model
110+
# create_model >> delete_model_task
111+
# create_dataset_task >> delete_datasets_task

airflow/providers/google/cloud/example_dags/example_automl_vision_classification.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
7070
)
7171

72-
dataset_id = '{{ task_instance.xcom_pull("create_dataset_task", key="dataset_id") }}'
72+
dataset_id = create_dataset_task.output["dataset_id"]
7373

7474
import_dataset_task = AutoMLImportDataOperator(
7575
task_id="import_dataset_task",
@@ -82,7 +82,7 @@
8282

8383
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
8484

85-
model_id = "{{ task_instance.xcom_pull('create_model', key='model_id') }}"
85+
model_id = create_model.output["model_id"]
8686

8787
delete_model_task = AutoMLDeleteModelOperator(
8888
task_id="delete_model_task",
@@ -98,4 +98,11 @@
9898
project_id=GCP_PROJECT_ID,
9999
)
100100

101-
create_dataset_task >> import_dataset_task >> create_model >> delete_model_task >> delete_datasets_task
101+
import_dataset_task >> create_model
102+
delete_model_task >> delete_datasets_task
103+
104+
# Task dependencies created via `XComArgs`:
105+
# create_dataset_task >> import_dataset_task
106+
# create_dataset_task >> create_model
107+
# create_model >> delete_model_task
108+
# create_dataset_task >> delete_datasets_task

airflow/providers/google/cloud/example_dags/example_automl_vision_object_detection.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION
7272
)
7373

74-
dataset_id = '{{ task_instance.xcom_pull("create_dataset_task", key="dataset_id") }}'
74+
dataset_id = create_dataset_task.output["dataset_id"]
7575

7676
import_dataset_task = AutoMLImportDataOperator(
7777
task_id="import_dataset_task",
@@ -84,7 +84,7 @@
8484

8585
create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION)
8686

87-
model_id = "{{ task_instance.xcom_pull('create_model', key='model_id') }}"
87+
model_id = create_model.output["model_id"]
8888

8989
delete_model_task = AutoMLDeleteModelOperator(
9090
task_id="delete_model_task",
@@ -100,4 +100,11 @@
100100
project_id=GCP_PROJECT_ID,
101101
)
102102

103-
create_dataset_task >> import_dataset_task >> create_model >> delete_model_task >> delete_datasets_task
103+
import_dataset_task >> create_model
104+
delete_model_task >> delete_datasets_task
105+
106+
# Task dependencies created via `XComArgs`:
107+
# create_dataset_task >> import_dataset_task
108+
# create_dataset_task >> create_model
109+
# create_model >> delete_model_task
110+
# create_dataset_task >> delete_datasets_task

airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,18 @@
2424
AZURE_SHARE_NAME = os.environ.get('AZURE_SHARE_NAME', 'test-azure-share')
2525
AZURE_DIRECTORY_NAME = "test-azure-dir"
2626

27-
default_args = {
28-
'owner': 'airflow',
29-
'depends_on_past': False,
30-
'email': ['airflow@example.com'],
31-
'email_on_failure': False,
32-
'email_on_retry': False,
33-
'retries': 1,
34-
'retry_delay': timedelta(minutes=5),
35-
}
3627

3728
with DAG(
3829
dag_id='azure_fileshare_to_gcs_example',
39-
default_args=default_args,
30+
default_args={
31+
'owner': 'airflow',
32+
'depends_on_past': False,
33+
'email': ['airflow@example.com'],
34+
'email_on_failure': False,
35+
'email_on_retry': False,
36+
'retries': 1,
37+
'retry_delay': timedelta(minutes=5),
38+
},
4039
schedule_interval=None,
4140
start_date=datetime(2018, 11, 1),
4241
tags=['example'],

airflow/providers/google/cloud/example_dags/example_bigquery_dts.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,7 @@
7575
task_id="gcp_bigquery_create_transfer",
7676
)
7777

78-
transfer_config_id = (
79-
"{{ task_instance.xcom_pull('gcp_bigquery_create_transfer', key='transfer_config_id') }}"
80-
)
78+
transfer_config_id = gcp_bigquery_create_transfer.output["transfer_config_id"]
8179
# [END howto_bigquery_create_data_transfer]
8280

8381
# [START howto_bigquery_start_transfer]
@@ -86,14 +84,13 @@
8684
transfer_config_id=transfer_config_id,
8785
requested_run_time={"seconds": int(time.time() + 60)},
8886
)
89-
run_id = "{{ task_instance.xcom_pull('gcp_bigquery_start_transfer', key='run_id') }}"
9087
# [END howto_bigquery_start_transfer]
9188

9289
# [START howto_bigquery_dts_sensor]
9390
gcp_run_sensor = BigQueryDataTransferServiceTransferRunSensor(
9491
task_id="gcp_run_sensor",
9592
transfer_config_id=transfer_config_id,
96-
run_id=run_id,
93+
run_id=gcp_bigquery_start_transfer.output["run_id"],
9794
expected_statuses={"SUCCEEDED"},
9895
)
9996
# [END howto_bigquery_dts_sensor]
@@ -104,9 +101,10 @@
104101
)
105102
# [END howto_bigquery_delete_data_transfer]
106103

107-
(
108-
gcp_bigquery_create_transfer
109-
>> gcp_bigquery_start_transfer
110-
>> gcp_run_sensor
111-
>> gcp_bigquery_delete_transfer
112-
)
104+
gcp_run_sensor >> gcp_bigquery_delete_transfer
105+
106+
# Task dependencies created via `XComArgs`:
107+
# gcp_bigquery_create_transfer >> gcp_bigquery_start_transfer
108+
# gcp_bigquery_create_transfer >> gcp_run_sensor
109+
# gcp_bigquery_start_transfer >> gcp_run_sensor
110+
# gcp_bigquery_create_transfer >> gcp_bigquery_delete_transfer

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/448e50bd23b4493980a41a5d4241ad3ecef087fb

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy