Content-Length: 412036 | pFad | https://github.com/apache/airflow/commit/3a5b583c916fff4603cdb2f2be815ccc5c281750

08 Optimize deferrable mode in `GCSObjectExistenceSensor` (#30901) · apache/airflow@3a5b583 · GitHub
Skip to content

Commit 3a5b583

Browse files
authored
Optimize deferrable mode in GCSObjectExistenceSensor (#30901)
* optimize deferrable mode in GCSObjectExistenceSensor
1 parent 1132da1 commit 3a5b583

File tree

2 files changed

+34
-15
lines changed
  • airflow/providers/google/cloud/sensors
  • tests/providers/google/cloud/sensors

2 files changed

+34
-15
lines changed

airflow/providers/google/cloud/sensors/gcs.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -101,19 +101,20 @@ def execute(self, context: Context) -> None:
101101
if not self.deferrable:
102102
super().execute(context)
103103
else:
104-
self.defer(
105-
timeout=timedelta(seconds=self.timeout),
106-
trigger=GCSBlobTrigger(
107-
bucket=self.bucket,
108-
object_name=self.object,
109-
poke_interval=self.poke_interval,
110-
google_cloud_conn_id=self.google_cloud_conn_id,
111-
hook_params={
112-
"impersonation_chain": self.impersonation_chain,
113-
},
114-
),
115-
method_name="execute_complete",
116-
)
104+
if not self.poke(context=context):
105+
self.defer(
106+
timeout=timedelta(seconds=self.timeout),
107+
trigger=GCSBlobTrigger(
108+
bucket=self.bucket,
109+
object_name=self.object,
110+
poke_interval=self.poke_interval,
111+
google_cloud_conn_id=self.google_cloud_conn_id,
112+
hook_params={
113+
"impersonation_chain": self.impersonation_chain,
114+
},
115+
),
116+
method_name="execute_complete",
117+
)
117118

118119
def execute_complete(self, context: Context, event: dict[str, str]) -> str:
119120
"""

tests/providers/google/cloud/sensors/test_gcs.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,22 @@ def test_should_pass_argument_to_hook(self, mock_hook):
9999
)
100100
mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET, TEST_OBJECT, DEFAULT_RETRY)
101101

102-
def test_gcs_object_existence_sensor_deferred(self):
102+
@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
103+
@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor.defer")
104+
def test_gcs_object_existence_sensor_finish_before_deferred(self, mock_defer, mock_hook):
105+
task = GCSObjectExistenceSensor(
106+
task_id="task-id",
107+
bucket=TEST_BUCKET,
108+
object=TEST_OBJECT,
109+
google_cloud_conn_id=TEST_GCP_CONN_ID,
110+
deferrable=True,
111+
)
112+
mock_hook.return_value.exists.return_value = True
113+
task.execute(mock.MagicMock())
114+
assert not mock_defer.called
115+
116+
@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
117+
def test_gcs_object_existence_sensor_deferred(self, mock_hook):
103118
"""
104119
Asserts that a task is deferred and a GCSBlobTrigger will be fired
105120
when the GCSObjectExistenceSensor is executed and deferrable is set to True.
@@ -111,6 +126,7 @@ def test_gcs_object_existence_sensor_deferred(self):
111126
google_cloud_conn_id=TEST_GCP_CONN_ID,
112127
deferrable=True,
113128
)
129+
mock_hook.return_value.exists.return_value = False
114130
with pytest.raises(TaskDeferred) as exc:
115131
task.execute(context)
116132
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not a GCSBlobTrigger"
@@ -147,7 +163,8 @@ class TestGoogleCloudStorageObjectSensorAsync:
147163
"Please use `GCSObjectExistenceSensor` and set `deferrable` attribute to `True` instead"
148164
)
149165

150-
def test_gcs_object_existence_sensor_async(self):
166+
@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
167+
def test_gcs_object_existence_sensor_async(self, mock_hook):
151168
"""
152169
Asserts that a task is deferred and a GCSBlobTrigger will be fired
153170
when the GCSObjectExistenceAsyncSensor is executed.
@@ -159,6 +176,7 @@ def test_gcs_object_existence_sensor_async(self):
159176
object=TEST_OBJECT,
160177
google_cloud_conn_id=TEST_GCP_CONN_ID,
161178
)
179+
mock_hook.return_value.exists.return_value = False
162180
with pytest.raises(TaskDeferred) as exc:
163181
task.execute(context)
164182
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not a GCSBlobTrigger"

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/3a5b583c916fff4603cdb2f2be815ccc5c281750

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy