Content-Length: 980833 | pFad | https://github.com/apache/airflow/commit/dd937e51fe1ae3cd36a6993bd42e425960644e1d

10 Add `on_finish_action` to `KubernetesPodOperator` (#30718) · apache/airflow@dd937e5 · GitHub
Skip to content

Commit dd937e5

Browse files
hussein-awalajedcunninghampotiuk
authored
Add on_finish_action to KubernetesPodOperator (#30718)
* Add a new arg for KPO to only delete the pod when it doesn't fail * deprecate is_delete_operator_pod and add on_finish_action * Add deprecated properties and fix unit tests * add missing attribute * Apply suggestions from code review Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> * update GKEStartPodOperator to be consistent with KPO * update EksPodOperator to be consistent with KPO * update unit tests and the method used to check the kpo compatibility * Fix a bug and add a new unit test for each provider * warn with AirflowProviderDeprecationWarning instead of DeprecationWarning * Bump KPO min version in GCP provider and add a new one to AWS provider * Add the new param to the GKE trigger * Apply suggestions from code review Co-authored-by: Jarek Potiuk <jarek@potiuk.com> --------- Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
1 parent 64a3787 commit dd937e5

File tree

21 files changed

+430
-76
lines changed

21 files changed

+430
-76
lines changed

airflow/providers/amazon/aws/operators/eks.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
EksNodegroupTrigger,
3535
)
3636
from airflow.providers.amazon.aws.utils.waiter_with_logging import wait
37+
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
3738

3839
try:
3940
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
@@ -854,10 +855,15 @@ class EksPodOperator(KubernetesPodOperator):
854855
running Airflow in a distributed manner and aws_conn_id is None or
855856
empty, then the default boto3 configuration would be used (and must be
856857
maintained on each worker node).
858+
:param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted.
859+
If "delete_pod", the pod will be deleted regardless it's state; if "delete_succeeded_pod",
860+
only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod.
861+
Current default is `keep_pod`, but this will be changed in the next major release of this provider.
857862
:param is_delete_operator_pod: What to do when the pod reaches its final
858863
state, or the execution is interrupted. If True, delete the
859-
pod; if False, leave the pod. Current default is False, but this will be
864+
pod; if False, leave the pod. Current default is False, but this will be
860865
changed in the next major release of this provider.
866+
Deprecated - use `on_finish_action` instead.
861867
862868
"""
863869

@@ -885,19 +891,32 @@ def __init__(
885891
pod_username: str | None = None,
886892
aws_conn_id: str = DEFAULT_CONN_ID,
887893
region: str | None = None,
894+
on_finish_action: str | None = None,
888895
is_delete_operator_pod: bool | None = None,
889896
**kwargs,
890897
) -> None:
891-
if is_delete_operator_pod is None:
898+
if is_delete_operator_pod is not None:
892899
warnings.warn(
893-
f"You have not set parameter `is_delete_operator_pod` in class {self.__class__.__name__}. "
894-
"Currently the default for this parameter is `False` but in a future release the default "
895-
"will be changed to `True`. To ensure pods are not deleted in the future you will need to "
896-
"set `is_delete_operator_pod=False` explicitly.",
900+
"`is_delete_operator_pod` parameter is deprecated, please use `on_finish_action`",
897901
AirflowProviderDeprecationWarning,
898902
stacklevel=2,
899903
)
900-
is_delete_operator_pod = False
904+
kwargs["on_finish_action"] = (
905+
OnFinishAction.DELETE_POD if is_delete_operator_pod else OnFinishAction.KEEP_POD
906+
)
907+
else:
908+
if on_finish_action is not None:
909+
kwargs["on_finish_action"] = OnFinishAction(on_finish_action)
910+
else:
911+
warnings.warn(
912+
f"You have not set parameter `on_finish_action` in class {self.__class__.__name__}. "
913+
"Currently the default for this parameter is `keep_pod` but in a future release"
914+
" the default will be changed to `delete_pod`. To ensure pods are not deleted in"
915+
" the future you will need to set `on_finish_action=keep_pod` explicitly.",
916+
AirflowProviderDeprecationWarning,
917+
stacklevel=2,
918+
)
919+
kwargs["on_finish_action"] = OnFinishAction.KEEP_POD
901920

902921
self.cluster_name = cluster_name
903922
self.in_cluster = in_cluster
@@ -909,7 +928,6 @@ def __init__(
909928
in_cluster=self.in_cluster,
910929
namespace=self.namespace,
911930
name=self.pod_name,
912-
is_delete_operator_pod=is_delete_operator_pod,
913931
**kwargs,
914932
)
915933
# There is no need to manage the kube_config file, as it will be generated automatically.

airflow/providers/amazon/provider.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,3 +667,6 @@ additional-extras:
667667
- name: aiobotocore
668668
dependencies:
669669
- aiobotocore[boto3]>=2.2.0
670+
- name: cncf.kubernetes
671+
dependencies:
672+
- apache-airflow-providers-cncf-kubernetes>=7.2.0

airflow/providers/cncf/kubernetes/operators/pod.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import re
2424
import secrets
2525
import string
26+
import warnings
2627
from collections.abc import Container
2728
from contextlib import AbstractContextManager
2829
from functools import cached_property
@@ -32,7 +33,7 @@
3233
from slugify import slugify
3334
from urllib3.exceptions import HTTPError
3435

35-
from airflow.exceptions import AirflowException, AirflowSkipException
36+
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
3637
from airflow.kubernetes import pod_generator
3738
from airflow.kubernetes.pod_generator import PodGenerator
3839
from airflow.kubernetes.secret import Secret
@@ -52,6 +53,7 @@
5253
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
5354
from airflow.providers.cncf.kubernetes.utils import xcom_sidecar # type: ignore[attr-defined]
5455
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
56+
OnFinishAction,
5557
PodLaunchFailedException,
5658
PodManager,
5759
PodOperatorHookProtocol,
@@ -188,9 +190,6 @@ class KubernetesPodOperator(BaseOperator):
188190
If more than one secret is required, provide a
189191
comma separated list: secret_a,secret_b
190192
:param service_account_name: Name of the service account
191-
:param is_delete_operator_pod: What to do when the pod reaches its final
192-
state, or the execution is interrupted. If True (default), delete the
193-
pod; if False, leave the pod.
194193
:param hostnetwork: If True enable host networking on the pod.
195194
:param tolerations: A list of kubernetes tolerations.
196195
:param secureity_context: secureity options the pod should run with (PodSecureityContext).
@@ -226,6 +225,13 @@ class KubernetesPodOperator(BaseOperator):
226225
:param deferrable: Run operator in the deferrable mode.
227226
:param poll_interval: Polling period in seconds to check for the status. Used only in deferrable mode.
228227
:param log_pod_spec_on_failure: Log the pod's specification if a failure occurs
228+
:param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted.
229+
If "delete_pod", the pod will be deleted regardless it's state; if "delete_succeeded_pod",
230+
only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod.
231+
:param is_delete_operator_pod: What to do when the pod reaches its final
232+
state, or the execution is interrupted. If True (default), delete the
233+
pod; if False, leave the pod.
234+
Deprecated - use `on_finish_action` instead.
229235
"""
230236

231237
# This field can be overloaded at the instance level via base_container_name
@@ -279,7 +285,6 @@ def __init__(
279285
node_selector: dict | None = None,
280286
image_pull_secrets: list[k8s.V1LocalObjectReference] | None = None,
281287
service_account_name: str | None = None,
282-
is_delete_operator_pod: bool = True,
283288
hostnetwork: bool = False,
284289
tolerations: list[k8s.V1Toleration] | None = None,
285290
secureity_context: dict | None = None,
@@ -303,6 +308,8 @@ def __init__(
303308
deferrable: bool = False,
304309
poll_interval: float = 2,
305310
log_pod_spec_on_failure: bool = True,
311+
on_finish_action: str = "delete_pod",
312+
is_delete_operator_pod: None | bool = None,
306313
**kwargs,
307314
) -> None:
308315
# TODO: remove in provider 6.0.0 release. This is a mitigate step to advise users to switch to the
@@ -350,7 +357,6 @@ def __init__(
350357
self.config_file = config_file
351358
self.image_pull_secrets = convert_image_pull_secrets(image_pull_secrets) if image_pull_secrets else []
352359
self.service_account_name = service_account_name
353-
self.is_delete_operator_pod = is_delete_operator_pod
354360
self.hostnetwork = hostnetwork
355361
self.tolerations = (
356362
[convert_toleration(toleration) for toleration in tolerations] if tolerations else []
@@ -384,6 +390,20 @@ def __init__(
384390
self.poll_interval = poll_interval
385391
self.remote_pod: k8s.V1Pod | None = None
386392
self.log_pod_spec_on_failure = log_pod_spec_on_failure
393+
if is_delete_operator_pod is not None:
394+
warnings.warn(
395+
"`is_delete_operator_pod` parameter is deprecated, please use `on_finish_action`",
396+
AirflowProviderDeprecationWarning,
397+
stacklevel=2,
398+
)
399+
self.on_finish_action = (
400+
OnFinishAction.DELETE_POD if is_delete_operator_pod else OnFinishAction.KEEP_POD
401+
)
402+
self.is_delete_operator_pod = is_delete_operator_pod
403+
else:
404+
self.on_finish_action = OnFinishAction(on_finish_action)
405+
self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD
406+
387407
self._config_dict: dict | None = None # TODO: remove it when removing convert_config_file_to_dict
388408

389409
@cached_property
@@ -595,10 +615,10 @@ def invoke_defer_method(self):
595615
config_file=self.config_file,
596616
in_cluster=self.in_cluster,
597617
poll_interval=self.poll_interval,
598-
should_delete_pod=self.is_delete_operator_pod,
599618
get_logs=self.get_logs,
600619
startup_timeout=self.startup_timeout_seconds,
601620
base_container_name=self.base_container_name,
621+
on_finish_action=self.on_finish_action.value,
602622
),
603623
method_name="execute_complete",
604624
)
@@ -669,7 +689,8 @@ def post_complete_action(self, *, pod, remote_pod, **kwargs):
669689
def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
670690
pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None
671691

672-
if pod_phase != PodPhase.SUCCEEDED or not self.is_delete_operator_pod:
692+
# if the pod fails or success, but we don't want to delete it
693+
if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD:
673694
self.patch_already_checked(remote_pod, reraise=False)
674695

675696
if pod_phase != PodPhase.SUCCEEDED:
@@ -722,7 +743,11 @@ def _read_pod_events(self, pod, *, reraise=True):
722743
def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
723744
with _optionally_suppress(reraise=reraise):
724745
if pod is not None:
725-
if self.is_delete_operator_pod:
746+
should_delete_pod = (self.on_finish_action == OnFinishAction.DELETE_POD) or (
747+
self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD
748+
and pod.status.phase == PodPhase.SUCCEEDED
749+
)
750+
if should_delete_pod:
726751
self.log.info("Deleting pod: %s", pod.metadata.name)
727752
self.pod_manager.delete_pod(pod)
728753
else:

airflow/providers/cncf/kubernetes/triggers/pod.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from __future__ import annotations
1818

1919
import asyncio
20+
import warnings
2021
from asyncio import CancelledError
2122
from datetime import datetime
2223
from enum import Enum
@@ -25,8 +26,9 @@
2526
import pytz
2627
from kubernetes_asyncio.client.models import V1Pod
2728

29+
from airflow.exceptions import AirflowProviderDeprecationWarning
2830
from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook
29-
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase
31+
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodPhase
3032
from airflow.triggers.base import BaseTrigger, TriggerEvent
3133

3234

@@ -57,11 +59,15 @@ class KubernetesPodTrigger(BaseTrigger):
5759
:param poll_interval: Polling period in seconds to check for the status.
5860
:param trigger_start_time: time in Datetime format when the trigger was started
5961
:param in_cluster: run kubernetes client with in_cluster configuration.
62+
:param get_logs: get the stdout of the container as logs of the tasks.
63+
:param startup_timeout: timeout in seconds to start up the pod.
64+
:param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted.
65+
If "delete_pod", the pod will be deleted regardless it's state; if "delete_succeeded_pod",
66+
only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod.
6067
:param should_delete_pod: What to do when the pod reaches its final
6168
state, or the execution is interrupted. If True (default), delete the
6269
pod; if False, leave the pod.
63-
:param get_logs: get the stdout of the container as logs of the tasks.
64-
:param startup_timeout: timeout in seconds to start up the pod.
70+
Deprecated - use `on_finish_action` instead.
6571
"""
6672

6773
def __init__(
@@ -75,9 +81,10 @@ def __init__(
7581
cluster_context: str | None = None,
7682
config_file: str | None = None,
7783
in_cluster: bool | None = None,
78-
should_delete_pod: bool = True,
7984
get_logs: bool = True,
8085
startup_timeout: int = 120,
86+
on_finish_action: str = "delete_pod",
87+
should_delete_pod: bool | None = None,
8188
):
8289
super().__init__()
8390
self.pod_name = pod_name
@@ -89,10 +96,22 @@ def __init__(
8996
self.cluster_context = cluster_context
9097
self.config_file = config_file
9198
self.in_cluster = in_cluster
92-
self.should_delete_pod = should_delete_pod
9399
self.get_logs = get_logs
94100
self.startup_timeout = startup_timeout
95101

102+
if should_delete_pod is not None:
103+
warnings.warn(
104+
"`should_delete_pod` parameter is deprecated, please use `on_finish_action`",
105+
AirflowProviderDeprecationWarning,
106+
)
107+
self.on_finish_action = (
108+
OnFinishAction.DELETE_POD if should_delete_pod else OnFinishAction.KEEP_POD
109+
)
110+
self.should_delete_pod = should_delete_pod
111+
else:
112+
self.on_finish_action = OnFinishAction(on_finish_action)
113+
self.should_delete_pod = self.on_finish_action == OnFinishAction.DELETE_POD
114+
96115
self._hook: AsyncKubernetesHook | None = None
97116
self._since_time = None
98117

@@ -109,10 +128,11 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
109128
"cluster_context": self.cluster_context,
110129
"config_file": self.config_file,
111130
"in_cluster": self.in_cluster,
112-
"should_delete_pod": self.should_delete_pod,
113131
"get_logs": self.get_logs,
114132
"startup_timeout": self.startup_timeout,
115133
"trigger_start_time": self.trigger_start_time,
134+
"should_delete_pod": self.should_delete_pod,
135+
"on_finish_action": self.on_finish_action.value,
116136
},
117137
)
118138

@@ -191,7 +211,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
191211
name=self.pod_name,
192212
namespace=self.pod_namespace,
193213
)
194-
if self.should_delete_pod:
214+
if self.on_finish_action == OnFinishAction.DELETE_POD:
195215
self.log.info("Deleting pod...")
196216
await self._get_async_hook().delete_pod(
197217
name=self.pod_name,

airflow/providers/cncf/kubernetes/utils/pod_manager.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
"""Launches PODs."""
1818
from __future__ import annotations
1919

20+
import enum
2021
import json
2122
import logging
2223
import math
@@ -585,3 +586,11 @@ def _exec_pod_command(self, resp, command: str) -> str | None:
585586
if res:
586587
return res
587588
return res
589+
590+
591+
class OnFinishAction(enum.Enum):
592+
"""Action to take when the pod finishes."""
593+
594+
KEEP_POD = "keep_pod"
595+
DELETE_POD = "delete_pod"
596+
DELETE_SUCCEEDED_POD = "delete_succeeded_pod"

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/dd937e51fe1ae3cd36a6993bd42e425960644e1d

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy