Content-Length: 418733 | pFad | http://github.com/apache/airflow/commit/ea1ae1963ecf1b543e4f5e8deb59d623df42d44a

76 Fix cancel_on_kill after execution timeout for DataprocSubmitJobOpera… · apache/airflow@ea1ae19 · GitHub
Skip to content

Commit ea1ae19

Browse files
authored
Fix cancel_on_kill after execution timeout for DataprocSubmitJobOperator (#22955)
Synchronous tasks killed by execution timeout weren't canceled due to wrong assignment of job_id property.
1 parent 52b724e commit ea1ae19

File tree

2 files changed

+38
-7
lines changed

2 files changed

+38
-7
lines changed

airflow/providers/google/cloud/operators/dataproc.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,19 +1858,21 @@ def execute(self, context: 'Context'):
18581858
timeout=self.timeout,
18591859
metadata=self.metadata,
18601860
)
1861-
job_id = job_object.reference.job_id
1862-
self.log.info('Job %s submitted successfully.', job_id)
1861+
new_job_id: str = job_object.reference.job_id
1862+
self.log.info('Job %s submitted successfully.', new_job_id)
18631863
# Save data required by extra links no matter what the job status will be
1864-
DataprocLink.persist(context=context, task_instance=self, url=DATAPROC_JOB_LOG_LINK, resource=job_id)
1864+
DataprocLink.persist(
1865+
context=context, task_instance=self, url=DATAPROC_JOB_LOG_LINK, resource=new_job_id
1866+
)
18651867

1868+
self.job_id = new_job_id
18661869
if not self.asynchronous:
1867-
self.log.info('Waiting for job %s to complete', job_id)
1870+
self.log.info('Waiting for job %s to complete', new_job_id)
18681871
self.hook.wait_for_job(
1869-
job_id=job_id, region=self.region, project_id=self.project_id, timeout=self.wait_timeout
1872+
job_id=new_job_id, region=self.region, project_id=self.project_id, timeout=self.wait_timeout
18701873
)
1871-
self.log.info('Job %s completed successfully.', job_id)
1874+
self.log.info('Job %s completed successfully.', new_job_id)
18721875

1873-
self.job_id = job_id
18741876
return self.job_id
18751877

18761878
def on_kill(self):

tests/providers/google/cloud/operators/test_dataproc.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from google.api_core.retry import Retry
2626

2727
from airflow import AirflowException
28+
from airflow.exceptions import AirflowTaskTimeout
2829
from airflow.models import DAG, DagBag
2930
from airflow.providers.google.cloud.operators.dataproc import (
3031
DATAPROC_CLUSTER_LINK,
@@ -877,6 +878,34 @@ def test_on_kill(self, mock_hook):
877878
project_id=GCP_PROJECT, region=GCP_LOCATION, job_id=job_id
878879
)
879880

881+
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
882+
def test_on_kill_after_execution_timeout(self, mock_hook):
883+
job = {}
884+
job_id = "job_id"
885+
mock_hook.return_value.wait_for_job.side_effect = AirflowTaskTimeout()
886+
mock_hook.return_value.submit_job.return_value.reference.job_id = job_id
887+
888+
op = DataprocSubmitJobOperator(
889+
task_id=TASK_ID,
890+
region=GCP_LOCATION,
891+
project_id=GCP_PROJECT,
892+
job=job,
893+
gcp_conn_id=GCP_CONN_ID,
894+
retry=RETRY,
895+
timeout=TIMEOUT,
896+
metadata=METADATA,
897+
request_id=REQUEST_ID,
898+
impersonation_chain=IMPERSONATION_CHAIN,
899+
cancel_on_kill=True,
900+
)
901+
with pytest.raises(AirflowTaskTimeout):
902+
op.execute(context=self.mock_context)
903+
904+
op.on_kill()
905+
mock_hook.return_value.cancel_job.assert_called_once_with(
906+
project_id=GCP_PROJECT, region=GCP_LOCATION, job_id=job_id
907+
)
908+
880909
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
881910
def test_location_deprecation_warning(self, mock_hook):
882911
xcom_push_call = call.ti.xcom_push(execution_date=None, key='conf', value=DATAPROC_JOB_CONF_EXPECTED)

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/commit/ea1ae1963ecf1b543e4f5e8deb59d623df42d44a

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy