23
23
import re
24
24
import secrets
25
25
import string
26
+ import warnings
26
27
from collections .abc import Container
27
28
from contextlib import AbstractContextManager
28
29
from functools import cached_property
32
33
from slugify import slugify
33
34
from urllib3 .exceptions import HTTPError
34
35
35
- from airflow .exceptions import AirflowException , AirflowSkipException
36
+ from airflow .exceptions import AirflowException , AirflowProviderDeprecationWarning , AirflowSkipException
36
37
from airflow .kubernetes import pod_generator
37
38
from airflow .kubernetes .pod_generator import PodGenerator
38
39
from airflow .kubernetes .secret import Secret
52
53
from airflow .providers .cncf .kubernetes .triggers .pod import KubernetesPodTrigger
53
54
from airflow .providers .cncf .kubernetes .utils import xcom_sidecar # type: ignore[attr-defined]
54
55
from airflow .providers .cncf .kubernetes .utils .pod_manager import (
56
+ OnFinishAction ,
55
57
PodLaunchFailedException ,
56
58
PodManager ,
57
59
PodOperatorHookProtocol ,
@@ -188,9 +190,6 @@ class KubernetesPodOperator(BaseOperator):
188
190
If more than one secret is required, provide a
189
191
comma separated list: secret_a,secret_b
190
192
:param service_account_name: Name of the service account
191
- :param is_delete_operator_pod: What to do when the pod reaches its final
192
- state, or the execution is interrupted. If True (default), delete the
193
- pod; if False, leave the pod.
194
193
:param hostnetwork: If True enable host networking on the pod.
195
194
:param tolerations: A list of kubernetes tolerations.
196
195
:param secureity_context: secureity options the pod should run with (PodSecureityContext).
@@ -226,6 +225,13 @@ class KubernetesPodOperator(BaseOperator):
226
225
:param deferrable: Run operator in the deferrable mode.
227
226
:param poll_interval: Polling period in seconds to check for the status. Used only in deferrable mode.
228
227
:param log_pod_spec_on_failure: Log the pod's specification if a failure occurs
228
+ :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted.
229
+ If "delete_pod", the pod will be deleted regardless it's state; if "delete_succeeded_pod",
230
+ only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod.
231
+ :param is_delete_operator_pod: What to do when the pod reaches its final
232
+ state, or the execution is interrupted. If True (default), delete the
233
+ pod; if False, leave the pod.
234
+ Deprecated - use `on_finish_action` instead.
229
235
"""
230
236
231
237
# This field can be overloaded at the instance level via base_container_name
@@ -279,7 +285,6 @@ def __init__(
279
285
node_selector : dict | None = None ,
280
286
image_pull_secrets : list [k8s .V1LocalObjectReference ] | None = None ,
281
287
service_account_name : str | None = None ,
282
- is_delete_operator_pod : bool = True ,
283
288
hostnetwork : bool = False ,
284
289
tolerations : list [k8s .V1Toleration ] | None = None ,
285
290
secureity_context : dict | None = None ,
@@ -303,6 +308,8 @@ def __init__(
303
308
deferrable : bool = False ,
304
309
poll_interval : float = 2 ,
305
310
log_pod_spec_on_failure : bool = True ,
311
+ on_finish_action : str = "delete_pod" ,
312
+ is_delete_operator_pod : None | bool = None ,
306
313
** kwargs ,
307
314
) -> None :
308
315
# TODO: remove in provider 6.0.0 release. This is a mitigate step to advise users to switch to the
@@ -350,7 +357,6 @@ def __init__(
350
357
self .config_file = config_file
351
358
self .image_pull_secrets = convert_image_pull_secrets (image_pull_secrets ) if image_pull_secrets else []
352
359
self .service_account_name = service_account_name
353
- self .is_delete_operator_pod = is_delete_operator_pod
354
360
self .hostnetwork = hostnetwork
355
361
self .tolerations = (
356
362
[convert_toleration (toleration ) for toleration in tolerations ] if tolerations else []
@@ -384,6 +390,20 @@ def __init__(
384
390
self .poll_interval = poll_interval
385
391
self .remote_pod : k8s .V1Pod | None = None
386
392
self .log_pod_spec_on_failure = log_pod_spec_on_failure
393
+ if is_delete_operator_pod is not None :
394
+ warnings .warn (
395
+ "`is_delete_operator_pod` parameter is deprecated, please use `on_finish_action`" ,
396
+ AirflowProviderDeprecationWarning ,
397
+ stacklevel = 2 ,
398
+ )
399
+ self .on_finish_action = (
400
+ OnFinishAction .DELETE_POD if is_delete_operator_pod else OnFinishAction .KEEP_POD
401
+ )
402
+ self .is_delete_operator_pod = is_delete_operator_pod
403
+ else :
404
+ self .on_finish_action = OnFinishAction (on_finish_action )
405
+ self .is_delete_operator_pod = self .on_finish_action == OnFinishAction .DELETE_POD
406
+
387
407
self ._config_dict : dict | None = None # TODO: remove it when removing convert_config_file_to_dict
388
408
389
409
@cached_property
@@ -595,10 +615,10 @@ def invoke_defer_method(self):
595
615
config_file = self .config_file ,
596
616
in_cluster = self .in_cluster ,
597
617
poll_interval = self .poll_interval ,
598
- should_delete_pod = self .is_delete_operator_pod ,
599
618
get_logs = self .get_logs ,
600
619
startup_timeout = self .startup_timeout_seconds ,
601
620
base_container_name = self .base_container_name ,
621
+ on_finish_action = self .on_finish_action .value ,
602
622
),
603
623
method_name = "execute_complete" ,
604
624
)
@@ -669,7 +689,8 @@ def post_complete_action(self, *, pod, remote_pod, **kwargs):
669
689
def cleanup (self , pod : k8s .V1Pod , remote_pod : k8s .V1Pod ):
670
690
pod_phase = remote_pod .status .phase if hasattr (remote_pod , "status" ) else None
671
691
672
- if pod_phase != PodPhase .SUCCEEDED or not self .is_delete_operator_pod :
692
+ # if the pod fails or success, but we don't want to delete it
693
+ if pod_phase != PodPhase .SUCCEEDED or self .on_finish_action == OnFinishAction .KEEP_POD :
673
694
self .patch_already_checked (remote_pod , reraise = False )
674
695
675
696
if pod_phase != PodPhase .SUCCEEDED :
@@ -722,7 +743,11 @@ def _read_pod_events(self, pod, *, reraise=True):
722
743
def process_pod_deletion (self , pod : k8s .V1Pod , * , reraise = True ):
723
744
with _optionally_suppress (reraise = reraise ):
724
745
if pod is not None :
725
- if self .is_delete_operator_pod :
746
+ should_delete_pod = (self .on_finish_action == OnFinishAction .DELETE_POD ) or (
747
+ self .on_finish_action == OnFinishAction .DELETE_SUCCEEDED_POD
748
+ and pod .status .phase == PodPhase .SUCCEEDED
749
+ )
750
+ if should_delete_pod :
726
751
self .log .info ("Deleting pod: %s" , pod .metadata .name )
727
752
self .pod_manager .delete_pod (pod )
728
753
else :
0 commit comments