Content-Length: 873758 | pFad | https://github.com/apache/airflow/commit/29eba167eac5d08862a85dd1358a8b6c43f61200

83 Merge BigQueryTableExistencePartitionAsyncSensor into BigQueryTableEx… · apache/airflow@29eba16 · GitHub
Skip to content

Commit 29eba16

Browse files
authored
Merge BigQueryTableExistencePartitionAsyncSensor into BigQueryTableExistencePartitionSensor (#30231)
1 parent c63836c commit 29eba16

File tree

4 files changed

+171
-65
lines changed

4 files changed

+171
-65
lines changed

airflow/providers/google/cloud/sensors/bigquery.py

Lines changed: 51 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,11 @@ def __init__(
147147
gcp_conn_id: str = "google_cloud_default",
148148
delegate_to: str | None = None,
149149
impersonation_chain: str | Sequence[str] | None = None,
150+
deferrable: bool = False,
150151
**kwargs,
151152
) -> None:
153+
if deferrable and "poke_interval" not in kwargs:
154+
kwargs["poke_interval"] = 5
152155
super().__init__(**kwargs)
153156

154157
self.project_id = project_id
@@ -163,6 +166,8 @@ def __init__(
163166
self.delegate_to = delegate_to
164167
self.impersonation_chain = impersonation_chain
165168

169+
self.deferrable = deferrable
170+
166171
def poke(self, context: Context) -> bool:
167172
table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
168173
self.log.info('Sensor checks existence of partition: "%s" in table: %s', self.partition_id, table_uri)
@@ -178,6 +183,44 @@ def poke(self, context: Context) -> bool:
178183
partition_id=self.partition_id,
179184
)
180185

186+
def execute(self, context: Context) -> None:
187+
"""
188+
Airflow runs this method on the worker and defers using the triggers
189+
if deferrable is set to True.
190+
"""
191+
if not self.deferrable:
192+
super().execute(context)
193+
else:
194+
self.defer(
195+
timeout=timedelta(seconds=self.timeout),
196+
trigger=BigQueryTablePartitionExistenceTrigger(
197+
dataset_id=self.dataset_id,
198+
table_id=self.table_id,
199+
project_id=self.project_id,
200+
partition_id=self.partition_id,
201+
poll_interval=self.poke_interval,
202+
gcp_conn_id=self.gcp_conn_id,
203+
hook_params={
204+
"impersonation_chain": self.impersonation_chain,
205+
},
206+
),
207+
method_name="execute_complete",
208+
)
209+
210+
def execute_complete(self, context: dict[str, Any], event: dict[str, str] | None = None) -> str:
211+
"""
212+
Callback for when the trigger fires - returns immediately.
213+
Relies on trigger to throw an exception, otherwise it assumes execution was
214+
successful.
215+
"""
216+
table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
217+
self.log.info('Sensor checks existence of partition: "%s" in table: %s', self.partition_id, table_uri)
218+
if event:
219+
if event["status"] == "success":
220+
return event["message"]
221+
raise AirflowException(event["message"])
222+
raise AirflowException("No event received in trigger callback")
223+
181224

182225
class BigQueryTableExistenceAsyncSensor(BigQueryTableExistenceSensor):
183226
"""
@@ -274,38 +317,12 @@ class BigQueryTableExistencePartitionAsyncSensor(BigQueryTablePartitionExistence
274317
:param poke_interval: The interval in seconds to wait between checks table existence.
275318
"""
276319

277-
def __init__(self, poke_interval: int = 5, **kwargs):
278-
super().__init__(**kwargs)
279-
self.poke_interval = poke_interval
280-
281-
def execute(self, context: Context) -> None:
282-
"""Airflow runs this method on the worker and defers using the trigger."""
283-
self.defer(
284-
timeout=timedelta(seconds=self.timeout),
285-
trigger=BigQueryTablePartitionExistenceTrigger(
286-
dataset_id=self.dataset_id,
287-
table_id=self.table_id,
288-
project_id=self.project_id,
289-
partition_id=self.partition_id,
290-
poll_interval=self.poke_interval,
291-
gcp_conn_id=self.gcp_conn_id,
292-
hook_params={
293-
"impersonation_chain": self.impersonation_chain,
294-
},
295-
),
296-
method_name="execute_complete",
320+
def __init__(self, **kwargs):
321+
warnings.warn(
322+
"Class `BigQueryTableExistencePartitionAsyncSensor` is deprecated and "
323+
"will be removed in a future release. "
324+
"Please use `BigQueryTableExistencePartitionSensor` and "
325+
"set `deferrable` attribute to `True` instead",
326+
DeprecationWarning,
297327
)
298-
299-
def execute_complete(self, context: dict[str, Any], event: dict[str, str] | None = None) -> str:
300-
"""
301-
Callback for when the trigger fires - returns immediately.
302-
Relies on trigger to throw an exception, otherwise it assumes execution was
303-
successful.
304-
"""
305-
table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
306-
self.log.info('Sensor checks existence of partition: "%s" in table: %s', self.partition_id, table_uri)
307-
if event:
308-
if event["status"] == "success":
309-
return event["message"]
310-
raise AirflowException(event["message"])
311-
raise AirflowException("No event received in trigger callback")
328+
super().__init__(deferrable=True, **kwargs)

docs/apache-airflow-providers-google/operators/cloud/bigquery.rst

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -509,10 +509,15 @@ To check that a table exists and has a partition you can use.
509509

510510
For DAY partitioned tables, the partition_id parameter is a string on the "%Y%m%d" format
511511

512-
Use the :class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTableExistencePartitionAsyncSensor`
513-
(deferrable version) if you would like to free up the worker slots while the sensor is running.
512+
Also you can use deferrable mode in this operator if you would like to free up the worker slots while the sensor is running.
513+
514+
.. exampleinclude:: /../../tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py
515+
:language: python
516+
:dedent: 4
517+
:start-after: [START howto_sensor_bigquery_table_partition_defered]
518+
:end-before: [END howto_sensor_bigquery_table_partition_defered]
514519

515-
:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTableExistencePartitionAsyncSensor`.
520+
:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTableExistencePartitionAsyncSensor` is deprecated and will be removed in a future release. Please use :class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTablePartitionExistenceSensor` and use the deferrable mode in that operator.
516521

517522
.. exampleinclude:: /../../tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py
518523
:language: python

tests/providers/google/cloud/sensors/test_bigquery.py

Lines changed: 101 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,68 @@ def test_passing_arguments_to_hook(self, mock_hook):
9898
partition_id=TEST_PARTITION_ID,
9999
)
100100

101+
def test_execute_with_deferrable_mode(self):
102+
"""
103+
Asserts that a task is deferred and a BigQueryTablePartitionExistenceTrigger will be fired
104+
when the BigQueryTablePartitionExistenceSensor is executed and deferrable is set to True.
105+
"""
106+
task = BigQueryTablePartitionExistenceSensor(
107+
task_id="test_task_id",
108+
project_id=TEST_PROJECT_ID,
109+
dataset_id=TEST_DATASET_ID,
110+
table_id=TEST_TABLE_ID,
111+
partition_id=TEST_PARTITION_ID,
112+
deferrable=True,
113+
)
114+
with pytest.raises(TaskDeferred) as exc:
115+
task.execute(context={})
116+
assert isinstance(
117+
exc.value.trigger, BigQueryTablePartitionExistenceTrigger
118+
), "Trigger is not a BigQueryTablePartitionExistenceTrigger"
119+
120+
def test_execute_with_deferrable_mode_execute_failure(self):
121+
"""Tests that an AirflowException is raised in case of error event"""
122+
task = BigQueryTablePartitionExistenceSensor(
123+
task_id="test_task_id",
124+
project_id=TEST_PROJECT_ID,
125+
dataset_id=TEST_DATASET_ID,
126+
table_id=TEST_TABLE_ID,
127+
partition_id=TEST_PARTITION_ID,
128+
deferrable=True,
129+
)
130+
with pytest.raises(AirflowException):
131+
task.execute_complete(context={}, event={"status": "error", "message": "test failure message"})
132+
133+
def test_execute_complete_event_none(self):
134+
"""Asserts that logging occurs as expected"""
135+
task = BigQueryTablePartitionExistenceSensor(
136+
task_id="task-id",
137+
project_id=TEST_PROJECT_ID,
138+
dataset_id=TEST_DATASET_ID,
139+
table_id=TEST_TABLE_ID,
140+
partition_id=TEST_PARTITION_ID,
141+
deferrable=True,
142+
)
143+
with pytest.raises(AirflowException, match="No event received in trigger callback"):
144+
task.execute_complete(context={}, event=None)
145+
146+
def test_execute_complete(self):
147+
"""Asserts that logging occurs as expected"""
148+
task = BigQueryTablePartitionExistenceSensor(
149+
task_id="task-id",
150+
project_id=TEST_PROJECT_ID,
151+
dataset_id=TEST_DATASET_ID,
152+
table_id=TEST_TABLE_ID,
153+
partition_id=TEST_PARTITION_ID,
154+
deferrable=True,
155+
)
156+
table_uri = f"{TEST_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}"
157+
with mock.patch.object(task.log, "info") as mock_log_info:
158+
task.execute_complete(context={}, event={"status": "success", "message": "test"})
159+
mock_log_info.assert_called_with(
160+
'Sensor checks existence of partition: "%s" in table: %s', TEST_PARTITION_ID, table_uri
161+
)
162+
101163

102164
@pytest.fixture()
103165
def context():
@@ -163,18 +225,26 @@ def test_big_query_sensor_async_execute_complete_event_none(self):
163225

164226

165227
class TestBigQueryTableExistencePartitionAsyncSensor:
228+
depcrecation_message = (
229+
"Class `BigQueryTableExistencePartitionAsyncSensor` is deprecated and "
230+
"will be removed in a future release. "
231+
"Please use `BigQueryTableExistencePartitionSensor` and "
232+
"set `deferrable` attribute to `True` instead"
233+
)
234+
166235
def test_big_query_table_existence_partition_sensor_async(self):
167236
"""
168237
Asserts that a task is deferred and a BigQueryTablePartitionExistenceTrigger will be fired
169238
when the BigQueryTableExistencePartitionAsyncSensor is executed.
170239
"""
171-
task = BigQueryTableExistencePartitionAsyncSensor(
172-
task_id="test_task_id",
173-
project_id=TEST_PROJECT_ID,
174-
dataset_id=TEST_DATASET_ID,
175-
table_id=TEST_TABLE_ID,
176-
partition_id=TEST_PARTITION_ID,
177-
)
240+
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
241+
task = BigQueryTableExistencePartitionAsyncSensor(
242+
task_id="test_task_id",
243+
project_id=TEST_PROJECT_ID,
244+
dataset_id=TEST_DATASET_ID,
245+
table_id=TEST_TABLE_ID,
246+
partition_id=TEST_PARTITION_ID,
247+
)
178248
with pytest.raises(TaskDeferred) as exc:
179249
task.execute(context={})
180250
assert isinstance(
@@ -183,37 +253,40 @@ def test_big_query_table_existence_partition_sensor_async(self):
183253

184254
def test_big_query_table_existence_partition_sensor_async_execute_failure(self):
185255
"""Tests that an AirflowException is raised in case of error event"""
186-
task = BigQueryTableExistencePartitionAsyncSensor(
187-
task_id="test_task_id",
188-
project_id=TEST_PROJECT_ID,
189-
dataset_id=TEST_DATASET_ID,
190-
table_id=TEST_TABLE_ID,
191-
partition_id=TEST_PARTITION_ID,
192-
)
256+
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
257+
task = BigQueryTableExistencePartitionAsyncSensor(
258+
task_id="test_task_id",
259+
project_id=TEST_PROJECT_ID,
260+
dataset_id=TEST_DATASET_ID,
261+
table_id=TEST_TABLE_ID,
262+
partition_id=TEST_PARTITION_ID,
263+
)
193264
with pytest.raises(AirflowException):
194265
task.execute_complete(context={}, event={"status": "error", "message": "test failure message"})
195266

196267
def test_big_query_table_existence_partition_sensor_async_execute_complete_event_none(self):
197268
"""Asserts that logging occurs as expected"""
198-
task = BigQueryTableExistencePartitionAsyncSensor(
199-
task_id="task-id",
200-
project_id=TEST_PROJECT_ID,
201-
dataset_id=TEST_DATASET_ID,
202-
table_id=TEST_TABLE_ID,
203-
partition_id=TEST_PARTITION_ID,
204-
)
269+
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
270+
task = BigQueryTableExistencePartitionAsyncSensor(
271+
task_id="task-id",
272+
project_id=TEST_PROJECT_ID,
273+
dataset_id=TEST_DATASET_ID,
274+
table_id=TEST_TABLE_ID,
275+
partition_id=TEST_PARTITION_ID,
276+
)
205277
with pytest.raises(AirflowException, match="No event received in trigger callback"):
206278
task.execute_complete(context={}, event=None)
207279

208280
def test_big_query_table_existence_partition_sensor_async_execute_complete(self):
209281
"""Asserts that logging occurs as expected"""
210-
task = BigQueryTableExistencePartitionAsyncSensor(
211-
task_id="task-id",
212-
project_id=TEST_PROJECT_ID,
213-
dataset_id=TEST_DATASET_ID,
214-
table_id=TEST_TABLE_ID,
215-
partition_id=TEST_PARTITION_ID,
216-
)
282+
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
283+
task = BigQueryTableExistencePartitionAsyncSensor(
284+
task_id="task-id",
285+
project_id=TEST_PROJECT_ID,
286+
dataset_id=TEST_DATASET_ID,
287+
table_id=TEST_TABLE_ID,
288+
partition_id=TEST_PARTITION_ID,
289+
)
217290
table_uri = f"{TEST_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}"
218291
with mock.patch.object(task.log, "info") as mock_log_info:
219292
task.execute_complete(context={}, event={"status": "success", "message": "test"})

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,17 @@
118118
)
119119
# [END howto_sensor_bigquery_table_partition]
120120

121+
# [START howto_sensor_bigquery_table_partition_defered]
122+
check_table_partition_exists: BaseSensorOperator = BigQueryTablePartitionExistenceSensor(
123+
task_id="check_table_partition_exists_defered",
124+
project_id=PROJECT_ID,
125+
dataset_id=DATASET_NAME,
126+
table_id=TABLE_NAME,
127+
partition_id=PARTITION_NAME,
128+
deferrable=True,
129+
)
130+
# [END howto_sensor_bigquery_table_partition_defered]
131+
121132
# [START howto_sensor_bigquery_table_partition_async]
122133
check_table_partition_exists_async: BaseSensorOperator = BigQueryTableExistencePartitionAsyncSensor(
123134
task_id="check_table_partition_exists_async",

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/29eba167eac5d08862a85dd1358a8b6c43f61200

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy