@@ -98,6 +98,68 @@ def test_passing_arguments_to_hook(self, mock_hook):
98
98
partition_id = TEST_PARTITION_ID ,
99
99
)
100
100
101
+ def test_execute_with_deferrable_mode (self ):
102
+ """
103
+ Asserts that a task is deferred and a BigQueryTablePartitionExistenceTrigger will be fired
104
+ when the BigQueryTablePartitionExistenceSensor is executed and deferrable is set to True.
105
+ """
106
+ task = BigQueryTablePartitionExistenceSensor (
107
+ task_id = "test_task_id" ,
108
+ project_id = TEST_PROJECT_ID ,
109
+ dataset_id = TEST_DATASET_ID ,
110
+ table_id = TEST_TABLE_ID ,
111
+ partition_id = TEST_PARTITION_ID ,
112
+ deferrable = True ,
113
+ )
114
+ with pytest .raises (TaskDeferred ) as exc :
115
+ task .execute (context = {})
116
+ assert isinstance (
117
+ exc .value .trigger , BigQueryTablePartitionExistenceTrigger
118
+ ), "Trigger is not a BigQueryTablePartitionExistenceTrigger"
119
+
120
+ def test_execute_with_deferrable_mode_execute_failure (self ):
121
+ """Tests that an AirflowException is raised in case of error event"""
122
+ task = BigQueryTablePartitionExistenceSensor (
123
+ task_id = "test_task_id" ,
124
+ project_id = TEST_PROJECT_ID ,
125
+ dataset_id = TEST_DATASET_ID ,
126
+ table_id = TEST_TABLE_ID ,
127
+ partition_id = TEST_PARTITION_ID ,
128
+ deferrable = True ,
129
+ )
130
+ with pytest .raises (AirflowException ):
131
+ task .execute_complete (context = {}, event = {"status" : "error" , "message" : "test failure message" })
132
+
133
+ def test_execute_complete_event_none (self ):
134
+ """Asserts that logging occurs as expected"""
135
+ task = BigQueryTablePartitionExistenceSensor (
136
+ task_id = "task-id" ,
137
+ project_id = TEST_PROJECT_ID ,
138
+ dataset_id = TEST_DATASET_ID ,
139
+ table_id = TEST_TABLE_ID ,
140
+ partition_id = TEST_PARTITION_ID ,
141
+ deferrable = True ,
142
+ )
143
+ with pytest .raises (AirflowException , match = "No event received in trigger callback" ):
144
+ task .execute_complete (context = {}, event = None )
145
+
146
+ def test_execute_complete (self ):
147
+ """Asserts that logging occurs as expected"""
148
+ task = BigQueryTablePartitionExistenceSensor (
149
+ task_id = "task-id" ,
150
+ project_id = TEST_PROJECT_ID ,
151
+ dataset_id = TEST_DATASET_ID ,
152
+ table_id = TEST_TABLE_ID ,
153
+ partition_id = TEST_PARTITION_ID ,
154
+ deferrable = True ,
155
+ )
156
+ table_uri = f"{ TEST_PROJECT_ID } :{ TEST_DATASET_ID } .{ TEST_TABLE_ID } "
157
+ with mock .patch .object (task .log , "info" ) as mock_log_info :
158
+ task .execute_complete (context = {}, event = {"status" : "success" , "message" : "test" })
159
+ mock_log_info .assert_called_with (
160
+ 'Sensor checks existence of partition: "%s" in table: %s' , TEST_PARTITION_ID , table_uri
161
+ )
162
+
101
163
102
164
@pytest .fixture ()
103
165
def context ():
@@ -163,18 +225,26 @@ def test_big_query_sensor_async_execute_complete_event_none(self):
163
225
164
226
165
227
class TestBigQueryTableExistencePartitionAsyncSensor :
228
+ depcrecation_message = (
229
+ "Class `BigQueryTableExistencePartitionAsyncSensor` is deprecated and "
230
+ "will be removed in a future release. "
231
+ "Please use `BigQueryTableExistencePartitionSensor` and "
232
+ "set `deferrable` attribute to `True` instead"
233
+ )
234
+
166
235
def test_big_query_table_existence_partition_sensor_async (self ):
167
236
"""
168
237
Asserts that a task is deferred and a BigQueryTablePartitionExistenceTrigger will be fired
169
238
when the BigQueryTableExistencePartitionAsyncSensor is executed.
170
239
"""
171
- task = BigQueryTableExistencePartitionAsyncSensor (
172
- task_id = "test_task_id" ,
173
- project_id = TEST_PROJECT_ID ,
174
- dataset_id = TEST_DATASET_ID ,
175
- table_id = TEST_TABLE_ID ,
176
- partition_id = TEST_PARTITION_ID ,
177
- )
240
+ with pytest .warns (DeprecationWarning , match = self .depcrecation_message ):
241
+ task = BigQueryTableExistencePartitionAsyncSensor (
242
+ task_id = "test_task_id" ,
243
+ project_id = TEST_PROJECT_ID ,
244
+ dataset_id = TEST_DATASET_ID ,
245
+ table_id = TEST_TABLE_ID ,
246
+ partition_id = TEST_PARTITION_ID ,
247
+ )
178
248
with pytest .raises (TaskDeferred ) as exc :
179
249
task .execute (context = {})
180
250
assert isinstance (
@@ -183,37 +253,40 @@ def test_big_query_table_existence_partition_sensor_async(self):
183
253
184
254
def test_big_query_table_existence_partition_sensor_async_execute_failure (self ):
185
255
"""Tests that an AirflowException is raised in case of error event"""
186
- task = BigQueryTableExistencePartitionAsyncSensor (
187
- task_id = "test_task_id" ,
188
- project_id = TEST_PROJECT_ID ,
189
- dataset_id = TEST_DATASET_ID ,
190
- table_id = TEST_TABLE_ID ,
191
- partition_id = TEST_PARTITION_ID ,
192
- )
256
+ with pytest .warns (DeprecationWarning , match = self .depcrecation_message ):
257
+ task = BigQueryTableExistencePartitionAsyncSensor (
258
+ task_id = "test_task_id" ,
259
+ project_id = TEST_PROJECT_ID ,
260
+ dataset_id = TEST_DATASET_ID ,
261
+ table_id = TEST_TABLE_ID ,
262
+ partition_id = TEST_PARTITION_ID ,
263
+ )
193
264
with pytest .raises (AirflowException ):
194
265
task .execute_complete (context = {}, event = {"status" : "error" , "message" : "test failure message" })
195
266
196
267
def test_big_query_table_existence_partition_sensor_async_execute_complete_event_none (self ):
197
268
"""Asserts that logging occurs as expected"""
198
- task = BigQueryTableExistencePartitionAsyncSensor (
199
- task_id = "task-id" ,
200
- project_id = TEST_PROJECT_ID ,
201
- dataset_id = TEST_DATASET_ID ,
202
- table_id = TEST_TABLE_ID ,
203
- partition_id = TEST_PARTITION_ID ,
204
- )
269
+ with pytest .warns (DeprecationWarning , match = self .depcrecation_message ):
270
+ task = BigQueryTableExistencePartitionAsyncSensor (
271
+ task_id = "task-id" ,
272
+ project_id = TEST_PROJECT_ID ,
273
+ dataset_id = TEST_DATASET_ID ,
274
+ table_id = TEST_TABLE_ID ,
275
+ partition_id = TEST_PARTITION_ID ,
276
+ )
205
277
with pytest .raises (AirflowException , match = "No event received in trigger callback" ):
206
278
task .execute_complete (context = {}, event = None )
207
279
208
280
def test_big_query_table_existence_partition_sensor_async_execute_complete (self ):
209
281
"""Asserts that logging occurs as expected"""
210
- task = BigQueryTableExistencePartitionAsyncSensor (
211
- task_id = "task-id" ,
212
- project_id = TEST_PROJECT_ID ,
213
- dataset_id = TEST_DATASET_ID ,
214
- table_id = TEST_TABLE_ID ,
215
- partition_id = TEST_PARTITION_ID ,
216
- )
282
+ with pytest .warns (DeprecationWarning , match = self .depcrecation_message ):
283
+ task = BigQueryTableExistencePartitionAsyncSensor (
284
+ task_id = "task-id" ,
285
+ project_id = TEST_PROJECT_ID ,
286
+ dataset_id = TEST_DATASET_ID ,
287
+ table_id = TEST_TABLE_ID ,
288
+ partition_id = TEST_PARTITION_ID ,
289
+ )
217
290
table_uri = f"{ TEST_PROJECT_ID } :{ TEST_DATASET_ID } .{ TEST_TABLE_ID } "
218
291
with mock .patch .object (task .log , "info" ) as mock_log_info :
219
292
task .execute_complete (context = {}, event = {"status" : "success" , "message" : "test" })
0 commit comments