Content-Length: 736493 | pFad | http://github.com/apache/airflow/pull/42572/commits/97c64467e56f573c1823b2484a73743e83c2881d

79 Implemented streaming functionality with IterableOperator and DeferredIterable by dabla · Pull Request #42572 · apache/airflow · GitHub
Skip to content

Implemented streaming functionality with IterableOperator and DeferredIterable #42572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 167 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
167 commits
Select commit Hold shift + click to select a range
60d3065
refactor: Implemented StreamedOperator which runs all mapped tasks wi…
davidblain-infrabel Sep 29, 2024
effb0ea
refactor: Changed return type of next_callable method
davidblain-infrabel Sep 29, 2024
d013e3d
refactor: Changed operator type to BaseOperator in OperatorMethodExec…
davidblain-infrabel Sep 29, 2024
2a827d2
refactor: Fixed some static checks
davidblain-infrabel Sep 30, 2024
e9e0ad4
refactor: Initialise Semaphore correctly
davidblain-infrabel Sep 30, 2024
20f02ed
refactor: Reformatted StreamedOperator
davidblain-infrabel Sep 30, 2024
5d97c31
refactor: Fixed some mypy issues
davidblain-infrabel Sep 30, 2024
513edd0
refactor: Reformatted StreamOperator
davidblain-infrabel Sep 30, 2024
d142ebf
refactor: Changed next_callable to instance method in BaseOperator
davidblain-infrabel Sep 30, 2024
985dbbe
refactor: Added docstrings in StreamedOperator
davidblain-infrabel Sep 30, 2024
542727a
refactor: Changed next_callable method in StreamedOperator back to cl…
davidblain-infrabel Oct 1, 2024
cb00425
refactor: Changed typing of next_callable method in StreamedOperator
davidblain-infrabel Oct 1, 2024
08a7c7f
refactor: Fixed some typing issues StreamedOperator
davidblain-infrabel Oct 1, 2024
2712b85
refactor: Initialise reschedule_date as utcnow by default
davidblain-infrabel Oct 1, 2024
33bb6b7
refactor: Reorganized imports in StreamedOperator
Oct 1, 2024
8906fa9
refactor: Force cast task to BaseOperator
davidblain-infrabel Oct 1, 2024
eaa734a
refactor: Make sure semaphore is correctly initialized with int value
davidblain-infrabel Oct 1, 2024
d2e93c3
refactor: Cast operator to type[BaseOperator]
davidblain-infrabel Oct 1, 2024
de3f314
refactor: Check if expand_input value is of type dict
davidblain-infrabel Oct 1, 2024
ceb024f
refactor: Operator class is of type BaseOperator only
davidblain-infrabel Oct 1, 2024
c7ec7c0
refactor: Close the event loop if it was a newly created one in event…
davidblain-infrabel Oct 1, 2024
555b68b
refactor: Refactored run_trigger method
davidblain-infrabel Oct 1, 2024
b2b9cee
refactor: Moved run_trigger method to BaseTrigger module so it can be…
davidblain-infrabel Oct 1, 2024
cb6f0de
refactor: It is possible the run_trigger method doesn't yield any Tri…
davidblain-infrabel Oct 1, 2024
6c18b74
refactor: Return None in run_trigger method of no events are returned
davidblain-infrabel Oct 1, 2024
be49bfe
refactor: Check if returned event in _run_deferrable method isn't None
davidblain-infrabel Oct 1, 2024
97a68c0
Merge branch 'main' into feature/streamed-operator
dabla Oct 1, 2024
f96cfbb
Merge branch 'main' into feature/streamed-operator
dabla Oct 2, 2024
f514f90
refactor: Added stream method on PartialOperator
davidblain-infrabel Oct 2, 2024
c0e8c46
refactor: Added unit test for StreamedOperator
davidblain-infrabel Oct 2, 2024
e18b066
refactor: Added stream method to _TaskDecorator class
davidblain-infrabel Oct 2, 2024
a688e4c
refactor: Suppress AttributeError when close on loop fails as some ev…
davidblain-infrabel Oct 2, 2024
a7c79c1
refactor: Removed invocation of validate_mapping_kwargs in stream met…
davidblain-infrabel Oct 2, 2024
90cf03c
refactor: test_mapped_expand_against_params should check on instance …
davidblain-infrabel Oct 2, 2024
6f8399f
refactor: expand_input and partial_kwargs should be public instead of…
davidblain-infrabel Oct 2, 2024
51e3d0c
refactor: Skip test related to stream on TaskGroups as this isn't sup…
davidblain-infrabel Oct 2, 2024
cd6fd36
Merge branch 'main' into feature/streamed-operator
dabla Oct 2, 2024
d607ab1
refactor: Use pytest.mark.skip instead of unittest.skip
davidblain-infrabel Oct 3, 2024
ad3022e
refactor: Added def _get_specified_expand_input method on StreamedOpe…
davidblain-infrabel Oct 3, 2024
c0eb1c6
refactor: Fix logging of number of workers being used for exception i…
davidblain-infrabel Oct 3, 2024
26782ff
refactor: Pop task_group with None when key doesn't exist
davidblain-infrabel Oct 3, 2024
8cdbd46
refactor: Pop dag or task_group with None when key doesn't exist and …
davidblain-infrabel Oct 3, 2024
3409993
refactor: Added info logging statement when a task completed successf…
davidblain-infrabel Oct 3, 2024
4d29dfb
Merge branch 'main' into feature/streamed-operator
dabla Oct 3, 2024
26a557a
refactor: Fixed info logging statement when a task completed successf…
davidblain-infrabel Oct 3, 2024
8321564
Merge branch 'main' into feature/streamed-operator
dabla Oct 3, 2024
57d6086
Merge branch 'main' into feature/streamed-operator
dabla Oct 3, 2024
4e9aa8d
Merge branch 'main' into feature/streamed-operator
dabla Oct 3, 2024
b04c322
Merge branch 'main' into feature/streamed-operator
dabla Oct 4, 2024
52fb43c
refactor: expand_input attribute in StreamedOperator should be a shal…
davidblain-infrabel Oct 4, 2024
bedda87
Merge branch 'main' into feature/streamed-operator
dabla Oct 4, 2024
97dcfa3
Merge branch 'main' into feature/streamed-operator
dabla Oct 8, 2024
cf54d74
refactor: task_id can be directly retrieved from itself in unmap_oper…
davidblain-infrabel Oct 8, 2024
00a598c
refactor: Updated shallow_copy_attrs in StreamedOperator
davidblain-infrabel Oct 8, 2024
7d11512
refactor: Use existing is_mappable method instead of custom isinstanc…
davidblain-infrabel Oct 8, 2024
b76ccf5
refactor: Reorganized imports of StreamedOperator
davidblain-infrabel Oct 9, 2024
14be647
refactor: Ignore type when evaluating value as list
davidblain-infrabel Oct 9, 2024
f641d57
Merge branch 'main' into feature/streamed-operator
dabla Oct 9, 2024
19863e9
Merge branch 'main' into feature/streamed-operator
dabla Oct 9, 2024
45bc01f
Merge branch 'main' into feature/streamed-operator
dabla Oct 9, 2024
cd06b5e
Merge branch 'main' into feature/streamed-operator
dabla Oct 10, 2024
9f400d3
Merge branch 'main' into feature/streamed-operator
dabla Oct 10, 2024
c0365d5
Merge branch 'main' into feature/streamed-operator
dabla Oct 10, 2024
54c0468
Merge branch 'main' into feature/streamed-operator
dabla Oct 10, 2024
ad4ead8
Merge branch 'main' into feature/streamed-operator
dabla Oct 11, 2024
aad14fd
Merge branch 'main' into feature/streamed-operator
dabla Oct 11, 2024
c20ad0f
Merge branch 'main' into feature/streamed-operator
dabla Oct 15, 2024
84ce1eb
Merge branch 'main' into feature/streamed-operator
dabla Oct 18, 2024
2050fac
Merge branch 'main' into feature/streamed-operator
dabla Oct 21, 2024
4fa419c
Merge branch 'main' into feature/streamed-operator
dabla Oct 22, 2024
c306084
Merge branch 'main' into feature/streamed-operator
dabla Oct 22, 2024
42d5f16
Merge branch 'main' into feature/streamed-operator
dabla Oct 22, 2024
4e5f437
Merge branch 'main' into feature/streamed-operator
dabla Oct 22, 2024
723ab5e
Merge branch 'main' into feature/streamed-operator
dabla Oct 23, 2024
55cec8d
Merge branch 'main' into feature/streamed-operator
dabla Oct 23, 2024
870ea81
Merge branch 'main' into feature/streamed-operator
dabla Oct 25, 2024
f9b1d33
Merge branch 'main' into feature/streamed-operator
dabla Nov 4, 2024
7400e95
Merge branch 'main' into feature/streamed-operator
dabla Nov 21, 2024
7577be8
Merge branch 'main' into feature/streamed-operator
dabla Nov 21, 2024
d9ba001
refactor: Use ThreadPoolExecutor to execute tasks concurrently instea…
davidblain-infrabel Nov 22, 2024
7082c11
Merge branch 'main' into feature/streamed-operator
dabla Nov 28, 2024
1e10f27
refactor: Reorganized imports in StreamedOperator
davidblain-infrabel Nov 25, 2024
11200e4
refactor: Refactored StreamedOperator using ThreadPool so we can also…
davidblain-infrabel Nov 28, 2024
41381f9
fix: Don't set the newly created loop event, just use it locally and …
davidblain-infrabel Nov 28, 2024
c3c06f7
refactor: Refactored the StreamedOperator even more
davidblain-infrabel Nov 30, 2024
493972a
feature: Fix StreamedOperator so it can run triggers in async mode an…
Dec 2, 2024
5ddb950
feature: Re-added semaphore for async execution
Dec 2, 2024
6626671
feature: Import gather from asyncio StreamedOperator
davidblain-infrabel Dec 3, 2024
2092923
Merge branch 'main' into feature/streamed-operator
dabla Dec 3, 2024
bc75484
refactored: Refactored BaseExecutor as a context manager to handle co…
davidblain-infrabel Dec 3, 2024
065dc6f
refactored: Renamed StreamedOperator to IterableOperator
davidblain-infrabel Dec 4, 2024
1fd075f
refactor: Make sure the run_deferrable method uses the semaphore to p…
davidblain-infrabel Dec 4, 2024
73a7779
refactor: Make sure the run_deferrable method uses the semaphore to p…
davidblain-infrabel Dec 4, 2024
a5b238a
refactor: Updated IterableOperator
davidblain-infrabel Dec 4, 2024
53c1488
Merge branch 'main' into feature/streamed-operator
dabla Dec 4, 2024
b0c0559
Merge branch 'main' into feature/streamed-operator
dabla Dec 4, 2024
e45b49c
Merge branch 'main' into feature/streamed-operator
dabla Dec 13, 2024
ed0a9bb
refactor: Reorganized imports TestIterableOperator
davidblain-infrabel Dec 16, 2024
ad73a8b
refactor: Fixed import IterableOperator in MappedOperator
davidblain-infrabel Dec 16, 2024
e2f3065
refactor: Removed unused import of DAG in exceptions module
davidblain-infrabel Dec 16, 2024
00d1de1
refactor: Reformatted IterableOperator
davidblain-infrabel Dec 16, 2024
3aca032
Merge branch 'main' into feature/streamed-operator
dabla Dec 16, 2024
8a48456
refactor: Current time should be taken before execution of tasks in I…
davidblain-infrabel Dec 16, 2024
dd39e1c
Merge branch 'main' into feature/streamed-operator
dabla Jan 10, 2025
7e6824c
Merge branch 'main' into feature/streamed-operator
dabla Jan 10, 2025
b4ba204
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel Jan 28, 2025
cd73aff
refactor: Fixed some static checks
davidblain-infrabel Jan 28, 2025
9cb1442
refactor: Fixed static checks
davidblain-infrabel Jan 28, 2025
0f43135
refactor: Fixed import of from _MapResult
davidblain-infrabel Jan 28, 2025
75d1680
refactor: Fixed import of BaseOperator
davidblain-infrabel Jan 28, 2025
67f1a14
Merge branch 'main' into feature/streamed-operator
dabla Jan 28, 2025
4f36167
refactor: Added next_callable method on BaseOperator
davidblain-infrabel Jan 28, 2025
5a40363
refactor: Fixed more static checks
davidblain-infrabel Jan 28, 2025
d620c36
Merge branch 'main' into feature/streamed-operator
dabla Jan 28, 2025
87034e7
refactor: Fixed import BaseOperator
davidblain-infrabel Jan 29, 2025
8020251
refactor: Fixed some imports
davidblain-infrabel Jan 29, 2025
8bcffe6
refactor: Try to fix some mypy issues
davidblain-infrabel Jan 29, 2025
dae0a91
refactor: Removed test not applicable on IterableOperator
davidblain-infrabel Jan 29, 2025
e2727bb
refactor: Cannot assign state anymore from context to TaskInstance
davidblain-infrabel Jan 29, 2025
0c80f9a
refactor: Removed stream from decorators as this method is deprecated…
davidblain-infrabel Jan 29, 2025
1f311a9
Merge branch 'main' into feature/streamed-operator
dabla Jan 29, 2025
77489e1
refactor: Next callable method should be instance method instead of c…
davidblain-infrabel Jan 29, 2025
10ff45d
Merge branch 'main' into feature/streamed-operator
dabla Jan 30, 2025
96ea9da
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel Jan 31, 2025
ceaffcf
Merge remote-tracking branch 'origen/feature/streamed-operator' into …
davidblain-infrabel Jan 31, 2025
b30b597
Merge branch 'main' into feature/streamed-operator
dabla Jan 31, 2025
454b8f4
refactor: Only keep sdk imports
davidblain-infrabel Jan 31, 2025
aa793b6
Merge branch 'main' into feature/streamed-operator
dabla Feb 5, 2025
888ef3b
Merge branch 'main' into feature/streamed-operator
dabla Feb 5, 2025
596314a
Merge branch 'main' into feature/streamed-operator
dabla Feb 5, 2025
2075b01
Merge branch 'main' into feature/streamed-operator
dabla Feb 6, 2025
f8eb9b2
Merge branch 'main' into feature/streamed-operator
dabla Feb 10, 2025
04167b6
Merge branch 'main' into feature/streamed-operator
dabla Feb 11, 2025
8df86c9
Merge branch 'main' into feature/streamed-operator
dabla Feb 11, 2025
44876f3
refactor: Updated IterableOperator which optimizes execution of defer…
Feb 13, 2025
30ed196
Merge branch 'main' into feature/streamed-operator
dabla Feb 18, 2025
0900d5c
Merge branch 'main' into feature/streamed-operator
dabla Feb 18, 2025
d77f6a5
Merge branch 'main' into feature/streamed-operator
dabla Feb 19, 2025
d30c568
refactor: Set state on TaskInstance otherwise xcom push and pulls won…
davidblain-infrabel Feb 14, 2025
f18cd4d
refactor: Fixed task_id generation without duplicated nested task gro…
davidblain-infrabel Feb 25, 2025
0850b29
Merge branch 'main' into feature/streamed-operator
dabla Feb 25, 2025
1375476
refactor: Refactored IterableOperator which now returns an XComIterab…
davidblain-infrabel Feb 26, 2025
cc2d875
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel Feb 26, 2025
4c6aabe
Merge branch 'main' into feature/streamed-operator
dabla Feb 26, 2025
85f511e
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel Mar 19, 2025
0eefacd
refactor: Refactored IterableOperator to support streaming of iterabl…
davidblain-infrabel Mar 19, 2025
169a8ae
Merge branch 'main' into feature/streamed-operator
dabla Mar 19, 2025
d20b24e
refactor: Don't need to copy context in resolve method of DeferrableI…
davidblain-infrabel Mar 19, 2025
41856fd
refactor: Inherit LoggingMixin for DeferrableIterable
Mar 20, 2025
ea769cb
refactor: Moved DeferrableIterable to dedicated iterable module under…
Mar 20, 2025
9958bec
refactor: Also use ThreadPool to execute task producer and consumer i…
Mar 20, 2025
03d8312
refactor: Reformatted files
Mar 20, 2025
7ee67cb
refactor: Refactored way to get operator after deserialization trigge…
davidblain-infrabel Mar 23, 2025
15f9b5c
refactor: Refactored producing of tasks in IterableOperator
davidblain-infrabel Mar 23, 2025
ad43728
refactor: Improved IterableOperator and DeferredIterable
davidblain-infrabel Mar 24, 2025
d9d3dac
refactor: Made next_kwargs optional for next_callable method in BaseO…
davidblain-infrabel Mar 26, 2025
07f98cc
refactor: Improved lazy evaluation of tasks passed to the _run_tasks …
Mar 27, 2025
04195bc
Merge branch 'main' into feature/streamed-operator
Mar 27, 2025
e70691a
refactor: Improved run_tasks method of IterableOperator
davidblain-infrabel Mar 28, 2025
f416744
Merge branch 'main' into feature/streamed-operator
dabla Mar 28, 2025
90ab543
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel Apr 7, 2025
97c6446
refactor: Updated IterableOperator with DeferredIterable
davidblain-infrabel Apr 7, 2025
a0f945a
Merge branch 'main' into feature/streamed-operator
dabla Apr 7, 2025
8fd3b90
refactor: Refactored IterableOperator to be Airflow 3 compliant
davidblain-infrabel May 6, 2025
89758e4
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel May 7, 2025
fa58c0c
Merge branch 'main' into feature/streamed-operator
dabla May 7, 2025
87d3a94
Merge branch 'main' into feature/streamed-operator
dabla May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: Updated IterableOperator with DeferredIterable
  • Loading branch information
davidblain-infrabel committed Apr 7, 2025
commit 97c64467e56f573c1823b2484a73743e83c2881d
53 changes: 39 additions & 14 deletions airflow-core/src/airflow/models/iterable.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
from __future__ import annotations

import asyncio
from collections.abc import Generator, Iterator, Sequence
from collections.abc import Generator, Iterable, Iterator, Sequence
from contextlib import contextmanager, suppress
from typing import TYPE_CHECKING, Any

from airflow.exceptions import AirflowException
from airflow.serialization import serde
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.mixins import ResolveMixin
from airflow.utils.module_loading import import_string
from airflow.utils.xcom import XCOM_RETURN_KEY

Expand Down Expand Up @@ -117,7 +118,7 @@ def deserialize(cls, data: dict, version: int):
return XComIterable(**data)


class DeferredIterable(Iterator, LoggingMixin):
class DeferredIterable(Iterator, ResolveMixin, LoggingMixin):
"""An iterable that lazily fetches XCom values one by one instead of loading all at once."""

def __init__(
Expand All @@ -136,7 +137,12 @@ def __init__(
self.context = context
self.index = 0

def resolve(self, context: Context) -> DeferredIterable:
def iter_references(self) -> Iterable[tuple[Operator, str]]:
yield self.operator, XCOM_RETURN_KEY

def resolve(
self, context: Context, *, include_xcom: bool = True
) -> DeferredIterable:
return DeferredIterable(
results=self.results,
trigger=self.trigger,
Expand All @@ -159,10 +165,25 @@ def __next__(self):

self.log.info("No more results. Running trigger: %s", self.trigger)

if not self.context:
raise AirflowException("Context is required to run the trigger.")

results = self._execute_trigger()

if isinstance(results, (list, set)):
self.results.extend(results)
else:
self.results.append(results)

self.index += 1
return self.results[-1]

def _execute_trigger(self):
try:
with event_loop() as loop:
self.log.info("Running trigger: %s", self.trigger)
event = loop.run_until_complete(run_trigger(self.trigger))
self.operator.render_template_fields(context=self.context)
next_method = getattr(self.operator, self.next_method)
self.log.info("Triggering next method: %s", self.next_method)
results = next_method(self.context, event.payload)
Expand All @@ -172,15 +193,13 @@ def __next__(self):

if isinstance(results, DeferredIterable):
self.trigger = results.trigger
self.results.extend(results.results)
else:
self.trigger = None
self.results.extend(results)
return results.results

self.index += 1
return self.results[-1]
self.trigger = None
return results

def __len__(self):
# TODO: maybe we should raise an exception here as you can't know the total length of an iterable in advance
return len(self.results)

def __getitem__(self, index: int):
Expand All @@ -201,14 +220,15 @@ def serialize(self):
}

@classmethod
def get_operator_from_dag(cls, dag_fileloc: str, dag_id: str, task_id: str) -> Operator:
def get_operator_from_dag(
cls, dag_fileloc: str, dag_id: str, task_id: str
) -> Operator:
"""Loads a DAG using DagBag and gets the operator by task_id."""

from airflow.models import DagBag

dag_bag = DagBag(dag_folder=None) # Avoid loading all DAGs
processed_dags = dag_bag.process_file(dag_fileloc)
cls.logger().info("processed_dags: %s", processed_dags)
dag_bag.process_file(dag_fileloc)
cls.logger().info("dag_bag: %s", dag_bag)
cls.logger().info("dags: %s", dag_bag.dags)
return dag_bag.dags[dag_id].get_task(task_id)
Expand All @@ -219,9 +239,14 @@ def deserialize(cls, data: dict, version: int):

trigger_class = import_string(data["trigger"][0])
trigger = trigger_class(**data["trigger"][1])
operator = cls.get_operator_from_dag(data["dag_fileloc"], data["dag_id"], data["task_id"])
operator = cls.get_operator_from_dag(
data["dag_fileloc"], data["dag_id"], data["task_id"]
)
return DeferredIterable(
results=data["results"], trigger=trigger, operator=operator, next_method=data["next_method"]
results=data["results"],
trigger=trigger,
operator=operator,
next_method=data["next_method"],
)


Expand Down
76 changes: 54 additions & 22 deletions airflow-core/src/airflow/models/iterableoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(
self.__context = dict(context.items())
self._task_instance = task_instance
self._is_async_mode: bool = False # Flag to track sync/async mode
self._result: Any | None = None

@property
def task_instance(self) -> TaskInstance:
Expand All @@ -99,8 +100,16 @@ def mode(self) -> str:
return "async" if self._is_async_mode else "sync"

@abstractmethod
def execute(self, *args, **kwargs):
raise NotImplementedError

def run(self, *args, **kwargs):
raise NotImplementedError()
self._result = self.execute(*args, **kwargs)
return self._result

async def run_deferred(self, *args, **kwargs):
self._result = await self.execute(*args, **kwargs)
return self._result

def __enter__(self):
if self.log.isEnabledFor(logging.INFO):
Expand Down Expand Up @@ -145,7 +154,9 @@ def __exit__(self, exc_type, exc_value, traceback):

self.task_instance.set_state(TaskInstanceState.FAILED)
raise exc_value
self.operator.post_execute(context=self.context)
if self.operator.do_xcom_push:
self.task_instance.xcom_push(key=XCOM_RETURN_KEY, value=self._result)
self.operator.post_execute(context=self.context, result=self._result)
self.task_instance.set_state(TaskInstanceState.SUCCESS)
if self.log.isEnabledFor(logging.INFO):
self.log.info(
Expand All @@ -170,7 +181,7 @@ class OperatorExecutor(TaskExecutor):
:meta private:
"""

def run(self, *args, **kwargs):
def execute(self, *args, **kwargs):
outlet_events = context_get_outlet_events(self.context)
# TODO: change back to operator.execute once ExecutorSafeguard is fixed
if hasattr(self.operator.execute, "__wrapped__"):
Expand All @@ -197,7 +208,7 @@ class TriggerExecutor(TaskExecutor):
:meta private:
"""

async def run(self, task_deferred: TaskDeferred):
async def execute(self, task_deferred: TaskDeferred):
event = await run_trigger(task_deferred.trigger)

self.log.debug("event: %s", event)
Expand All @@ -217,7 +228,7 @@ async def run(self, task_deferred: TaskDeferred):
logger=self.log,
).run(self.context, event.payload)
except TaskDeferred as task_deferred:
return await self.run(task_deferred=task_deferred)
return await self.execute(task_deferred=task_deferred)


class IterableOperator(BaseOperator):
Expand Down Expand Up @@ -321,9 +332,18 @@ def _resolve_expand_input(self, context: Context, session: Session):
self.log.debug("resolved_input: %s", resolved_input)

if isinstance(resolved_input, _MapResult):
self._mapped_kwargs = map(lambda value: self._resolve(value=value, context=context, session=session), resolved_input)
self._mapped_kwargs = map(
lambda value: self._resolve(
value=value, context=context, session=session
),
resolved_input,
)
else:
self._mapped_kwargs = iter(self._lazy_mapped_kwargs(input=resolved_input, context=context, session=session))
self._mapped_kwargs = iter(
self._lazy_mapped_kwargs(
input=resolved_input, context=context, session=session
)
)

self.log.debug("mapped_kwargs: %s", self._mapped_kwargs)

Expand All @@ -345,7 +365,9 @@ def _run_tasks(
prev_futures_count = 0
futures: dict[ApplyResult, TaskInstance] = {}
failed_tasks: list[TaskInstance] = []
chunked_tasks: Iterator[Iterable[TaskInstance]] = ichunked(tasks, (self.max_active_tis_per_dag * 2))
chunked_tasks: Iterator[Iterable[TaskInstance]] = ichunked(
tasks, (self.max_active_tis_per_dag * 2)
)

with ThreadPool(processes=self.max_active_tis_per_dag) as pool:
for task in next(chunked_tasks, []):
Expand Down Expand Up @@ -385,7 +407,9 @@ def _run_tasks(
if task.next_try_number > self.retries:
exception = AirflowTaskTimeout(e)
else:
reschedule_date = min(reschedule_date, task.next_retry_datetime())
reschedule_date = min(
reschedule_date, task.next_retry_datetime()
)
failed_tasks.append(task)
except AirflowRescheduleTaskInstanceException as e:
reschedule_date = min(reschedule_date, e.reschedule_date)
Expand All @@ -405,15 +429,20 @@ def _run_tasks(

with event_loop() as loop:
for result in loop.run_until_complete(
gather(*[loop.create_task(task) for task in deferred_tasks], return_exceptions=True)
gather(
*[loop.create_task(task) for task in deferred_tasks],
return_exceptions=True,
)
):
self.log.debug("result: %s", result)

if isinstance(result, Exception):
if isinstance(
result, AirflowRescheduleTaskInstanceException
):
reschedule_date = min(reschedule_date, result.reschedule_date)
reschedule_date = min(
reschedule_date, result.reschedule_date
)
failed_tasks.append(result.task)
else:
exception = result
Expand Down Expand Up @@ -449,7 +478,9 @@ def _run_tasks(
@classmethod
def _xcom_pull(cls, task_instance: TaskInstance):
with suppress(JSONDecodeError):
return task_instance.xcom_pull(task_ids=task_instance.task_id, dag_id=task_instance.dag_id)
return task_instance.xcom_pull(
task_ids=task_instance.task_id, dag_id=task_instance.dag_id
)
return None

def _run_operator(self, context: Context, task_instance: TaskInstance):
Expand All @@ -462,11 +493,11 @@ def _run_operator(self, context: Context, task_instance: TaskInstance):
with OperatorExecutor(
context=context, task_instance=task_instance
) as executor:
result = executor.run()
if self.do_xcom_push:
task_instance.xcom_push(key=XCOM_RETURN_KEY, value=result)
return executor.run()
else:
self.log.info("Task instance %s already completed.", task_instance.task_id)
self.log.info(
"Task instance %s already completed.", task_instance.task_id
)
return result
except TaskDeferred as task_deferred:
return task_deferred
Expand All @@ -478,12 +509,11 @@ async def _run_deferrable(
async with TriggerExecutor(
context=context, task_instance=task_instance
) as executor:
result = await executor.run(task_deferred)
if self.do_xcom_push:
task_instance.xcom_push(key=XCOM_RETURN_KEY, value=result)
return result
return await executor.run_deferred(task_deferred)

def _create_task(self, run_id: str, index: int, mapped_kwargs: dict) -> TaskInstance:
def _create_task(
self, run_id: str, index: int, mapped_kwargs: dict
) -> TaskInstance:
operator = self._unmap_operator(index, mapped_kwargs)
return TaskInstance(
task=operator,
Expand All @@ -495,7 +525,9 @@ def execute(self, context: Context):
context=context,
tasks=iter(
map(
lambda mapped_kwargs: self._create_task(context["ti"].run_id, mapped_kwargs[0], mapped_kwargs[1]),
lambda mapped_kwargs: self._create_task(
context["ti"].run_id, mapped_kwargs[0], mapped_kwargs[1]
),
enumerate(self._mapped_kwargs),
)
),
Expand Down
6 changes: 3 additions & 3 deletions task-sdk/src/airflow/sdk/definitions/xcom_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.xcom import XCOM_RETURN_KEY

from airflow.models.iterable import DeferredIterable

if TYPE_CHECKING:
from airflow.sdk.bases.operator import BaseOperator
from airflow.sdk.definitions.edges import EdgeModifier
Expand Down Expand Up @@ -358,6 +356,8 @@ def resolve(self, context: Mapping[str, Any]) -> Any:
default=NOTSET,
)
if not isinstance(result, ArgNotSet):
if isinstance(result, ResolveMixin):
return result.resolve(context)
return result
if self.key == XCOM_RETURN_KEY:
return None
Expand Down Expand Up @@ -450,7 +450,7 @@ def resolve(self, context: Mapping[str, Any]) -> Any:
value = self.arg.resolve(context)
if not isinstance(value, (Sequence, Iterable, dict)):
raise ValueError(f"XCom map expects sequence or dict, not {type(value).__name__}")
if isinstance(value, DeferredIterable):
if isinstance(value, ResolveMixin):
value = value.resolve(context)
return _MapResult(value, self.callables)

Expand Down
Loading








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/pull/42572/commits/97c64467e56f573c1823b2484a73743e83c2881d

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy