19
19
20
20
from unittest import mock
21
21
22
+ import pytest
23
+
24
+ from airflow .exceptions import AirflowException , AirflowSkipException
22
25
from airflow .providers .google .marketing_platform .sensors .display_video import (
23
26
GoogleDisplayVideo360GetSDFDownloadOperationSensor ,
24
27
GoogleDisplayVideo360RunQuerySensor ,
25
28
)
26
29
30
+ MODULE_NAME = "airflow.providers.google.marketing_platform.sensors.display_video"
31
+
27
32
API_VERSION = "api_version"
28
33
GCP_CONN_ID = "google_cloud_default"
29
34
30
35
31
36
class TestGoogleDisplayVideo360RunQuerySensor :
32
- @mock .patch ("airflow.providers.google.marketing_platform.sensors.display_video .GoogleDisplayVideo360Hook" )
33
- @mock .patch ("airflow.providers.google.marketing_platform.sensors.display_video .BaseSensorOperator" )
37
+ @mock .patch (f" { MODULE_NAME } .GoogleDisplayVideo360Hook" )
38
+ @mock .patch (f" { MODULE_NAME } .BaseSensorOperator" )
34
39
def test_poke (self , mock_base_op , hook_mock ):
35
40
query_id = "QUERY_ID"
36
41
report_id = "REPORT_ID"
@@ -46,8 +51,8 @@ def test_poke(self, mock_base_op, hook_mock):
46
51
47
52
48
53
class TestGoogleDisplayVideo360Sensor :
49
- @mock .patch ("airflow.providers.google.marketing_platform.sensors.display_video .GoogleDisplayVideo360Hook" )
50
- @mock .patch ("airflow.providers.google.marketing_platform.sensors.display_video .BaseSensorOperator" )
54
+ @mock .patch (f" { MODULE_NAME } .GoogleDisplayVideo360Hook" )
55
+ @mock .patch (f" { MODULE_NAME } .BaseSensorOperator" )
51
56
def test_poke (self , mock_base_op , hook_mock ):
52
57
operation_name = "operation_name"
53
58
op = GoogleDisplayVideo360GetSDFDownloadOperationSensor (
@@ -65,3 +70,23 @@ def test_poke(self, mock_base_op, hook_mock):
65
70
hook_mock .return_value .get_sdf_download_operation .assert_called_once_with (
66
71
operation_name = operation_name
67
72
)
73
+
74
+ @pytest .mark .parametrize (
75
+ "soft_fail, expected_exception" , ((False , AirflowException ), (True , AirflowSkipException ))
76
+ )
77
+ @mock .patch (f"{ MODULE_NAME } .GoogleDisplayVideo360Hook" )
78
+ @mock .patch (f"{ MODULE_NAME } .BaseSensorOperator" )
79
+ def test_poke_with_exception (
80
+ self , mock_base_op , hook_mock , soft_fail : bool , expected_exception : AirflowException
81
+ ):
82
+ operation_name = "operation_name"
83
+ op = GoogleDisplayVideo360GetSDFDownloadOperationSensor (
84
+ operation_name = operation_name ,
85
+ api_version = API_VERSION ,
86
+ task_id = "test_task" ,
87
+ soft_fail = soft_fail ,
88
+ )
89
+ hook_mock .return_value .get_sdf_download_operation .return_value = {"error" : "error" }
90
+
91
+ with pytest .raises (expected_exception , match = "The operation finished in error with error" ):
92
+ op .poke (context = None )
0 commit comments