Content-Length: 776025 | pFad | https://github.com/apache/airflow/commit/cad983dac9f240a2d0658531235acfd003b64760

49 Fix datastore system tests. (#34215) · apache/airflow@cad983d · GitHub
Skip to content

Commit cad983d

Browse files
bkossakowskaBeata Kossakowska
and
Beata Kossakowska
authored
Fix datastore system tests. (#34215)
Co-authored-by: Beata Kossakowska <bkossakowska@google.com>
1 parent 9d4e69b commit cad983d

File tree

6 files changed

+81
-141
lines changed

6 files changed

+81
-141
lines changed

docs/apache-airflow-providers-google/operators/cloud/datastore.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ Export Entities
3838
To export entities from Google Cloud Datastore to Cloud Storage use
3939
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreExportEntitiesOperator`
4040

41-
.. exampleinclude:: /../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
41+
.. exampleinclude:: /../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
4242
:language: python
4343
:dedent: 4
4444
:start-after: [START how_to_export_task]
@@ -52,7 +52,7 @@ Import Entities
5252
To import entities from Cloud Storage to Google Cloud Datastore use
5353
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreImportEntitiesOperator`
5454

55-
.. exampleinclude:: /../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
55+
.. exampleinclude:: /../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
5656
:language: python
5757
:dedent: 4
5858
:start-after: [START how_to_import_task]
@@ -168,7 +168,7 @@ Get operation state
168168
To get the current state of a long-running operation use
169169
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator`
170170

171-
.. exampleinclude:: /../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
171+
.. exampleinclude:: /../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
172172
:language: python
173173
:dedent: 4
174174
:start-after: [START get_operation_state]
@@ -182,7 +182,7 @@ Delete operation
182182
To delete an operation use
183183
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreDeleteOperationOperator`
184184

185-
.. exampleinclude:: /../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
185+
.. exampleinclude:: /../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
186186
:language: python
187187
:dedent: 4
188188
:start-after: [START delete_operation]

tests/system/providers/google/cloud/datastore/example_datastore_commit.py

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,24 @@
2525
from typing import Any
2626

2727
from airflow import models
28+
from airflow.models.baseoperator import chain
2829
from airflow.providers.google.cloud.operators.datastore import (
2930
CloudDatastoreAllocateIdsOperator,
3031
CloudDatastoreBeginTransactionOperator,
3132
CloudDatastoreCommitOperator,
33+
CloudDatastoreDeleteOperationOperator,
34+
CloudDatastoreExportEntitiesOperator,
35+
CloudDatastoreGetOperationOperator,
36+
CloudDatastoreImportEntitiesOperator,
3237
)
38+
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
39+
from airflow.utils.trigger_rule import TriggerRule
3340

3441
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
3542
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
3643

3744
DAG_ID = "datastore_commit"
38-
45+
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
3946
# [START how_to_keys_def]
4047
KEYS = [
4148
{
@@ -57,20 +64,21 @@
5764
catchup=False,
5865
tags=["datastore", "example"],
5966
) as dag:
67+
create_bucket = GCSCreateBucketOperator(
68+
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID, location="EU"
69+
)
6070
# [START how_to_allocate_ids]
6171
allocate_ids = CloudDatastoreAllocateIdsOperator(
6272
task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
6373
)
6474
# [END how_to_allocate_ids]
65-
6675
# [START how_to_begin_transaction]
6776
begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
6877
task_id="begin_transaction_commit",
6978
transaction_options=TRANSACTION_OPTIONS,
7079
project_id=PROJECT_ID,
7180
)
7281
# [END how_to_begin_transaction]
73-
7482
# [START how_to_commit_def]
7583
COMMIT_BODY = {
7684
"mode": "TRANSACTIONAL",
@@ -82,15 +90,65 @@
8290
}
8391
}
8492
],
85-
"transaction": begin_transaction_commit.output,
93+
"singleUseTransaction": {"readWrite": {}},
8694
}
8795
# [END how_to_commit_def]
88-
8996
# [START how_to_commit_task]
9097
commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)
9198
# [END how_to_commit_task]
99+
# [START how_to_export_task]
100+
export_task = CloudDatastoreExportEntitiesOperator(
101+
task_id="export_task",
102+
bucket=BUCKET_NAME,
103+
project_id=PROJECT_ID,
104+
overwrite_existing=True,
105+
)
106+
# [END how_to_export_task]
107+
# [START how_to_import_task]
108+
import_task = CloudDatastoreImportEntitiesOperator(
109+
task_id="import_task",
110+
bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
111+
file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
112+
project_id=PROJECT_ID,
113+
)
114+
# [END how_to_import_task]
115+
# [START get_operation_state]
116+
get_operation = CloudDatastoreGetOperationOperator(
117+
task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
118+
)
119+
# [END get_operation_state]
120+
# [START delete_operation]
121+
delete_export_operation = CloudDatastoreDeleteOperationOperator(
122+
task_id="delete_export_operation",
123+
name="{{ task_instance.xcom_pull('export_task')['name'] }}",
124+
)
125+
# [END delete_operation]
126+
delete_export_operation.trigger_rule = TriggerRule.ALL_DONE
127+
delete_import_operation = CloudDatastoreDeleteOperationOperator(
128+
task_id="delete_import_operation",
129+
name="{{ task_instance.xcom_pull('import_task')['name'] }}",
130+
trigger_rule=TriggerRule.ALL_DONE,
131+
)
132+
delete_bucket = GCSDeleteBucketOperator(
133+
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
134+
)
135+
136+
chain(
137+
create_bucket,
138+
allocate_ids,
139+
begin_transaction_commit,
140+
commit_task,
141+
export_task,
142+
import_task,
143+
get_operation,
144+
[delete_bucket, delete_export_operation, delete_import_operation],
145+
)
146+
147+
from tests.system.utils.watcher import watcher
92148

93-
allocate_ids >> begin_transaction_commit >> commit_task
149+
# This test needs watcher in order to properly mark success/failure
150+
# when "tearDown" task with trigger rule is part of the DAG
151+
list(dag.tasks) >> watcher()
94152

95153

96154
from tests.system.utils import get_test_run # noqa: E402

tests/system/providers/google/cloud/datastore/example_datastore_export_import.py

Lines changed: 0 additions & 114 deletions
This file was deleted.

tests/system/providers/google/cloud/datastore/example_datastore_query.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@
7777

7878
allocate_ids >> begin_transaction_query >> run_query
7979

80+
from tests.system.utils.watcher import watcher
81+
82+
# This test needs watcher in order to properly mark success/failure
83+
# when "tearDown" task with trigger rule is part of the DAG
84+
list(dag.tasks) >> watcher()
85+
8086

8187
from tests.system.utils import get_test_run # noqa: E402
8288

tests/system/providers/google/cloud/datastore/example_datastore_rollback.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@
6060

6161
begin_transaction_to_rollback >> rollback_transaction
6262

63+
from tests.system.utils.watcher import watcher
64+
65+
# This test needs watcher in order to properly mark success/failure
66+
# when "tearDown" task with trigger rule is part of the DAG
67+
list(dag.tasks) >> watcher()
68+
6369

6470
from tests.system.utils import get_test_run # noqa: E402
6571

tests/system/providers/google/cloud/gcs/example_firestore.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
)
3737
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
3838
from airflow.providers.google.cloud.operators.datastore import (
39-
CloudDatastoreBeginTransactionOperator,
4039
CloudDatastoreCommitOperator,
4140
)
4241
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
@@ -74,20 +73,12 @@
7473
create_bucket = GCSCreateBucketOperator(
7574
task_id="create_bucket", bucket_name=BUCKET_NAME, location=DATASET_LOCATION
7675
)
77-
7876
create_dataset = BigQueryCreateEmptyDatasetOperator(
7977
task_id="create_dataset",
8078
dataset_id=DATASET_NAME,
8179
location=DATASET_LOCATION,
8280
project_id=PROJECT_ID,
8381
)
84-
85-
begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
86-
task_id="begin_transaction_commit",
87-
transaction_options={"readWrite": {}},
88-
project_id=PROJECT_ID,
89-
)
90-
9182
commit_task = CloudDatastoreCommitOperator(
9283
task_id="commit_task",
9384
body={
@@ -100,19 +91,17 @@
10091
}
10192
}
10293
],
103-
"transaction": begin_transaction_commit.output,
94+
"singleUseTransaction": {"readWrite": {}},
10495
},
10596
project_id=PROJECT_ID,
10697
)
107-
10898
# [START howto_operator_export_database_to_gcs]
10999
export_database_to_gcs = CloudFirestoreExportDatabaseOperator(
110100
task_id="export_database_to_gcs",
111101
project_id=PROJECT_ID,
112102
body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds": [EXPORT_COLLECTION_ID]},
113103
)
114104
# [END howto_operator_export_database_to_gcs]
115-
116105
# [START howto_operator_create_external_table_multiple_types]
117106
create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
118107
task_id="create_external_table",
@@ -131,7 +120,6 @@
131120
},
132121
)
133122
# [END howto_operator_create_external_table_multiple_types]
134-
135123
read_data_from_gcs_multiple_types = BigQueryInsertJobOperator(
136124
task_id="execute_query",
137125
configuration={
@@ -141,7 +129,6 @@
141129
}
142130
},
143131
)
144-
145132
delete_entity = DataflowTemplatedJobStartOperator(
146133
task_id="delete-entity-firestore",
147134
project_id=PROJECT_ID,
@@ -158,23 +145,20 @@
158145
append_job_name=False,
159146
trigger_rule=TriggerRule.ALL_DONE,
160147
)
161-
162148
delete_dataset = BigQueryDeleteDatasetOperator(
163149
task_id="delete_dataset",
164150
dataset_id=DATASET_NAME,
165151
project_id=PROJECT_ID,
166152
delete_contents=True,
167153
trigger_rule=TriggerRule.ALL_DONE,
168154
)
169-
170155
delete_bucket = GCSDeleteBucketOperator(
171156
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
172157
)
173158

174159
(
175160
# TEST SETUP
176161
[create_bucket, create_dataset]
177-
>> begin_transaction_commit
178162
>> commit_task
179163
# TEST BODY
180164
>> export_database_to_gcs

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/cad983dac9f240a2d0658531235acfd003b64760

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy