Content-Length: 437490 | pFad | http://github.com/apache/airflow/pull/42572/commits/d013e3d7a2477e5c3b3f32c4e4b623825e2f775c

D4 Implemented streaming functionality with IterableOperator and DeferredIterable by dabla · Pull Request #42572 · apache/airflow · GitHub
Skip to content

Implemented streaming functionality with IterableOperator and DeferredIterable #42572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 167 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
167 commits
Select commit Hold shift + click to select a range
60d3065
refactor: Implemented StreamedOperator which runs all mapped tasks wi…
davidblain-infrabel Sep 29, 2024
effb0ea
refactor: Changed return type of next_callable method
davidblain-infrabel Sep 29, 2024
d013e3d
refactor: Changed operator type to BaseOperator in OperatorMethodExec…
davidblain-infrabel Sep 29, 2024
2a827d2
refactor: Fixed some static checks
davidblain-infrabel Sep 30, 2024
e9e0ad4
refactor: Initialise Semaphore correctly
davidblain-infrabel Sep 30, 2024
20f02ed
refactor: Reformatted StreamedOperator
davidblain-infrabel Sep 30, 2024
5d97c31
refactor: Fixed some mypy issues
davidblain-infrabel Sep 30, 2024
513edd0
refactor: Reformatted StreamOperator
davidblain-infrabel Sep 30, 2024
d142ebf
refactor: Changed next_callable to instance method in BaseOperator
davidblain-infrabel Sep 30, 2024
985dbbe
refactor: Added docstrings in StreamedOperator
davidblain-infrabel Sep 30, 2024
542727a
refactor: Changed next_callable method in StreamedOperator back to cl…
davidblain-infrabel Oct 1, 2024
cb00425
refactor: Changed typing of next_callable method in StreamedOperator
davidblain-infrabel Oct 1, 2024
08a7c7f
refactor: Fixed some typing issues StreamedOperator
davidblain-infrabel Oct 1, 2024
2712b85
refactor: Initialise reschedule_date as utcnow by default
davidblain-infrabel Oct 1, 2024
33bb6b7
refactor: Reorganized imports in StreamedOperator
Oct 1, 2024
8906fa9
refactor: Force cast task to BaseOperator
davidblain-infrabel Oct 1, 2024
eaa734a
refactor: Make sure semaphore is correctly initialized with int value
davidblain-infrabel Oct 1, 2024
d2e93c3
refactor: Cast operator to type[BaseOperator]
davidblain-infrabel Oct 1, 2024
de3f314
refactor: Check if expand_input value is of type dict
davidblain-infrabel Oct 1, 2024
ceb024f
refactor: Operator class is of type BaseOperator only
davidblain-infrabel Oct 1, 2024
c7ec7c0
refactor: Close the event loop if it was a newly created one in event…
davidblain-infrabel Oct 1, 2024
555b68b
refactor: Refactored run_trigger method
davidblain-infrabel Oct 1, 2024
b2b9cee
refactor: Moved run_trigger method to BaseTrigger module so it can be…
davidblain-infrabel Oct 1, 2024
cb6f0de
refactor: It is possible the run_trigger method doesn't yield any Tri…
davidblain-infrabel Oct 1, 2024
6c18b74
refactor: Return None in run_trigger method of no events are returned
davidblain-infrabel Oct 1, 2024
be49bfe
refactor: Check if returned event in _run_deferrable method isn't None
davidblain-infrabel Oct 1, 2024
97a68c0
Merge branch 'main' into feature/streamed-operator
dabla Oct 1, 2024
f96cfbb
Merge branch 'main' into feature/streamed-operator
dabla Oct 2, 2024
f514f90
refactor: Added stream method on PartialOperator
davidblain-infrabel Oct 2, 2024
c0e8c46
refactor: Added unit test for StreamedOperator
davidblain-infrabel Oct 2, 2024
e18b066
refactor: Added stream method to _TaskDecorator class
davidblain-infrabel Oct 2, 2024
a688e4c
refactor: Suppress AttributeError when close on loop fails as some ev…
davidblain-infrabel Oct 2, 2024
a7c79c1
refactor: Removed invocation of validate_mapping_kwargs in stream met…
davidblain-infrabel Oct 2, 2024
90cf03c
refactor: test_mapped_expand_against_params should check on instance …
davidblain-infrabel Oct 2, 2024
6f8399f
refactor: expand_input and partial_kwargs should be public instead of…
davidblain-infrabel Oct 2, 2024
51e3d0c
refactor: Skip test related to stream on TaskGroups as this isn't sup…
davidblain-infrabel Oct 2, 2024
cd6fd36
Merge branch 'main' into feature/streamed-operator
dabla Oct 2, 2024
d607ab1
refactor: Use pytest.mark.skip instead of unittest.skip
davidblain-infrabel Oct 3, 2024
ad3022e
refactor: Added def _get_specified_expand_input method on StreamedOpe…
davidblain-infrabel Oct 3, 2024
c0eb1c6
refactor: Fix logging of number of workers being used for exception i…
davidblain-infrabel Oct 3, 2024
26782ff
refactor: Pop task_group with None when key doesn't exist
davidblain-infrabel Oct 3, 2024
8cdbd46
refactor: Pop dag or task_group with None when key doesn't exist and …
davidblain-infrabel Oct 3, 2024
3409993
refactor: Added info logging statement when a task completed successf…
davidblain-infrabel Oct 3, 2024
4d29dfb
Merge branch 'main' into feature/streamed-operator
dabla Oct 3, 2024
26a557a
refactor: Fixed info logging statement when a task completed successf…
davidblain-infrabel Oct 3, 2024
8321564
Merge branch 'main' into feature/streamed-operator
dabla Oct 3, 2024
57d6086
Merge branch 'main' into feature/streamed-operator
dabla Oct 3, 2024
4e9aa8d
Merge branch 'main' into feature/streamed-operator
dabla Oct 3, 2024
b04c322
Merge branch 'main' into feature/streamed-operator
dabla Oct 4, 2024
52fb43c
refactor: expand_input attribute in StreamedOperator should be a shal…
davidblain-infrabel Oct 4, 2024
bedda87
Merge branch 'main' into feature/streamed-operator
dabla Oct 4, 2024
97dcfa3
Merge branch 'main' into feature/streamed-operator
dabla Oct 8, 2024
cf54d74
refactor: task_id can be directly retrieved from itself in unmap_oper…
davidblain-infrabel Oct 8, 2024
00a598c
refactor: Updated shallow_copy_attrs in StreamedOperator
davidblain-infrabel Oct 8, 2024
7d11512
refactor: Use existing is_mappable method instead of custom isinstanc…
davidblain-infrabel Oct 8, 2024
b76ccf5
refactor: Reorganized imports of StreamedOperator
davidblain-infrabel Oct 9, 2024
14be647
refactor: Ignore type when evaluating value as list
davidblain-infrabel Oct 9, 2024
f641d57
Merge branch 'main' into feature/streamed-operator
dabla Oct 9, 2024
19863e9
Merge branch 'main' into feature/streamed-operator
dabla Oct 9, 2024
45bc01f
Merge branch 'main' into feature/streamed-operator
dabla Oct 9, 2024
cd06b5e
Merge branch 'main' into feature/streamed-operator
dabla Oct 10, 2024
9f400d3
Merge branch 'main' into feature/streamed-operator
dabla Oct 10, 2024
c0365d5
Merge branch 'main' into feature/streamed-operator
dabla Oct 10, 2024
54c0468
Merge branch 'main' into feature/streamed-operator
dabla Oct 10, 2024
ad4ead8
Merge branch 'main' into feature/streamed-operator
dabla Oct 11, 2024
aad14fd
Merge branch 'main' into feature/streamed-operator
dabla Oct 11, 2024
c20ad0f
Merge branch 'main' into feature/streamed-operator
dabla Oct 15, 2024
84ce1eb
Merge branch 'main' into feature/streamed-operator
dabla Oct 18, 2024
2050fac
Merge branch 'main' into feature/streamed-operator
dabla Oct 21, 2024
4fa419c
Merge branch 'main' into feature/streamed-operator
dabla Oct 22, 2024
c306084
Merge branch 'main' into feature/streamed-operator
dabla Oct 22, 2024
42d5f16
Merge branch 'main' into feature/streamed-operator
dabla Oct 22, 2024
4e5f437
Merge branch 'main' into feature/streamed-operator
dabla Oct 22, 2024
723ab5e
Merge branch 'main' into feature/streamed-operator
dabla Oct 23, 2024
55cec8d
Merge branch 'main' into feature/streamed-operator
dabla Oct 23, 2024
870ea81
Merge branch 'main' into feature/streamed-operator
dabla Oct 25, 2024
f9b1d33
Merge branch 'main' into feature/streamed-operator
dabla Nov 4, 2024
7400e95
Merge branch 'main' into feature/streamed-operator
dabla Nov 21, 2024
7577be8
Merge branch 'main' into feature/streamed-operator
dabla Nov 21, 2024
d9ba001
refactor: Use ThreadPoolExecutor to execute tasks concurrently instea…
davidblain-infrabel Nov 22, 2024
7082c11
Merge branch 'main' into feature/streamed-operator
dabla Nov 28, 2024
1e10f27
refactor: Reorganized imports in StreamedOperator
davidblain-infrabel Nov 25, 2024
11200e4
refactor: Refactored StreamedOperator using ThreadPool so we can also…
davidblain-infrabel Nov 28, 2024
41381f9
fix: Don't set the newly created loop event, just use it locally and …
davidblain-infrabel Nov 28, 2024
c3c06f7
refactor: Refactored the StreamedOperator even more
davidblain-infrabel Nov 30, 2024
493972a
feature: Fix StreamedOperator so it can run triggers in async mode an…
Dec 2, 2024
5ddb950
feature: Re-added semaphore for async execution
Dec 2, 2024
6626671
feature: Import gather from asyncio StreamedOperator
davidblain-infrabel Dec 3, 2024
2092923
Merge branch 'main' into feature/streamed-operator
dabla Dec 3, 2024
bc75484
refactored: Refactored BaseExecutor as a context manager to handle co…
davidblain-infrabel Dec 3, 2024
065dc6f
refactored: Renamed StreamedOperator to IterableOperator
davidblain-infrabel Dec 4, 2024
1fd075f
refactor: Make sure the run_deferrable method uses the semaphore to p…
davidblain-infrabel Dec 4, 2024
73a7779
refactor: Make sure the run_deferrable method uses the semaphore to p…
davidblain-infrabel Dec 4, 2024
a5b238a
refactor: Updated IterableOperator
davidblain-infrabel Dec 4, 2024
53c1488
Merge branch 'main' into feature/streamed-operator
dabla Dec 4, 2024
b0c0559
Merge branch 'main' into feature/streamed-operator
dabla Dec 4, 2024
e45b49c
Merge branch 'main' into feature/streamed-operator
dabla Dec 13, 2024
ed0a9bb
refactor: Reorganized imports TestIterableOperator
davidblain-infrabel Dec 16, 2024
ad73a8b
refactor: Fixed import IterableOperator in MappedOperator
davidblain-infrabel Dec 16, 2024
e2f3065
refactor: Removed unused import of DAG in exceptions module
davidblain-infrabel Dec 16, 2024
00d1de1
refactor: Reformatted IterableOperator
davidblain-infrabel Dec 16, 2024
3aca032
Merge branch 'main' into feature/streamed-operator
dabla Dec 16, 2024
8a48456
refactor: Current time should be taken before execution of tasks in I…
davidblain-infrabel Dec 16, 2024
dd39e1c
Merge branch 'main' into feature/streamed-operator
dabla Jan 10, 2025
7e6824c
Merge branch 'main' into feature/streamed-operator
dabla Jan 10, 2025
b4ba204
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel Jan 28, 2025
cd73aff
refactor: Fixed some static checks
davidblain-infrabel Jan 28, 2025
9cb1442
refactor: Fixed static checks
davidblain-infrabel Jan 28, 2025
0f43135
refactor: Fixed import of from _MapResult
davidblain-infrabel Jan 28, 2025
75d1680
refactor: Fixed import of BaseOperator
davidblain-infrabel Jan 28, 2025
67f1a14
Merge branch 'main' into feature/streamed-operator
dabla Jan 28, 2025
4f36167
refactor: Added next_callable method on BaseOperator
davidblain-infrabel Jan 28, 2025
5a40363
refactor: Fixed more static checks
davidblain-infrabel Jan 28, 2025
d620c36
Merge branch 'main' into feature/streamed-operator
dabla Jan 28, 2025
87034e7
refactor: Fixed import BaseOperator
davidblain-infrabel Jan 29, 2025
8020251
refactor: Fixed some imports
davidblain-infrabel Jan 29, 2025
8bcffe6
refactor: Try to fix some mypy issues
davidblain-infrabel Jan 29, 2025
dae0a91
refactor: Removed test not applicable on IterableOperator
davidblain-infrabel Jan 29, 2025
e2727bb
refactor: Cannot assign state anymore from context to TaskInstance
davidblain-infrabel Jan 29, 2025
0c80f9a
refactor: Removed stream from decorators as this method is deprecated…
davidblain-infrabel Jan 29, 2025
1f311a9
Merge branch 'main' into feature/streamed-operator
dabla Jan 29, 2025
77489e1
refactor: Next callable method should be instance method instead of c…
davidblain-infrabel Jan 29, 2025
10ff45d
Merge branch 'main' into feature/streamed-operator
dabla Jan 30, 2025
96ea9da
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel Jan 31, 2025
ceaffcf
Merge remote-tracking branch 'origen/feature/streamed-operator' into …
davidblain-infrabel Jan 31, 2025
b30b597
Merge branch 'main' into feature/streamed-operator
dabla Jan 31, 2025
454b8f4
refactor: Only keep sdk imports
davidblain-infrabel Jan 31, 2025
aa793b6
Merge branch 'main' into feature/streamed-operator
dabla Feb 5, 2025
888ef3b
Merge branch 'main' into feature/streamed-operator
dabla Feb 5, 2025
596314a
Merge branch 'main' into feature/streamed-operator
dabla Feb 5, 2025
2075b01
Merge branch 'main' into feature/streamed-operator
dabla Feb 6, 2025
f8eb9b2
Merge branch 'main' into feature/streamed-operator
dabla Feb 10, 2025
04167b6
Merge branch 'main' into feature/streamed-operator
dabla Feb 11, 2025
8df86c9
Merge branch 'main' into feature/streamed-operator
dabla Feb 11, 2025
44876f3
refactor: Updated IterableOperator which optimizes execution of defer…
Feb 13, 2025
30ed196
Merge branch 'main' into feature/streamed-operator
dabla Feb 18, 2025
0900d5c
Merge branch 'main' into feature/streamed-operator
dabla Feb 18, 2025
d77f6a5
Merge branch 'main' into feature/streamed-operator
dabla Feb 19, 2025
d30c568
refactor: Set state on TaskInstance otherwise xcom push and pulls won…
davidblain-infrabel Feb 14, 2025
f18cd4d
refactor: Fixed task_id generation without duplicated nested task gro…
davidblain-infrabel Feb 25, 2025
0850b29
Merge branch 'main' into feature/streamed-operator
dabla Feb 25, 2025
1375476
refactor: Refactored IterableOperator which now returns an XComIterab…
davidblain-infrabel Feb 26, 2025
cc2d875
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel Feb 26, 2025
4c6aabe
Merge branch 'main' into feature/streamed-operator
dabla Feb 26, 2025
85f511e
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel Mar 19, 2025
0eefacd
refactor: Refactored IterableOperator to support streaming of iterabl…
davidblain-infrabel Mar 19, 2025
169a8ae
Merge branch 'main' into feature/streamed-operator
dabla Mar 19, 2025
d20b24e
refactor: Don't need to copy context in resolve method of DeferrableI…
davidblain-infrabel Mar 19, 2025
41856fd
refactor: Inherit LoggingMixin for DeferrableIterable
Mar 20, 2025
ea769cb
refactor: Moved DeferrableIterable to dedicated iterable module under…
Mar 20, 2025
9958bec
refactor: Also use ThreadPool to execute task producer and consumer i…
Mar 20, 2025
03d8312
refactor: Reformatted files
Mar 20, 2025
7ee67cb
refactor: Refactored way to get operator after deserialization trigge…
davidblain-infrabel Mar 23, 2025
15f9b5c
refactor: Refactored producing of tasks in IterableOperator
davidblain-infrabel Mar 23, 2025
ad43728
refactor: Improved IterableOperator and DeferredIterable
davidblain-infrabel Mar 24, 2025
d9d3dac
refactor: Made next_kwargs optional for next_callable method in BaseO…
davidblain-infrabel Mar 26, 2025
07f98cc
refactor: Improved lazy evaluation of tasks passed to the _run_tasks …
Mar 27, 2025
04195bc
Merge branch 'main' into feature/streamed-operator
Mar 27, 2025
e70691a
refactor: Improved run_tasks method of IterableOperator
davidblain-infrabel Mar 28, 2025
f416744
Merge branch 'main' into feature/streamed-operator
dabla Mar 28, 2025
90ab543
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel Apr 7, 2025
97c6446
refactor: Updated IterableOperator with DeferredIterable
davidblain-infrabel Apr 7, 2025
a0f945a
Merge branch 'main' into feature/streamed-operator
dabla Apr 7, 2025
8fd3b90
refactor: Refactored IterableOperator to be Airflow 3 compliant
davidblain-infrabel May 6, 2025
89758e4
Merge branch 'main' into feature/streamed-operator
davidblain-infrabel May 7, 2025
fa58c0c
Merge branch 'main' into feature/streamed-operator
dabla May 7, 2025
87d3a94
Merge branch 'main' into feature/streamed-operator
dabla May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: Changed operator type to BaseOperator in OperatorMethodExec…
…utor
  • Loading branch information
davidblain-infrabel committed Sep 29, 2024
commit d013e3d7a2477e5c3b3f32c4e4b623825e2f775c
4 changes: 2 additions & 2 deletions airflow/models/streamedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
AirflowException,
AirflowRescheduleTaskInstanceException,
)
from airflow.models import BaseOperator, Operator, TaskInstance
from airflow.models import BaseOperator, TaskInstance
from airflow.models.expandinput import (
ExpandInput,
DictOfListsExpandInput,
Expand Down Expand Up @@ -60,7 +60,7 @@ class OperatorMethodExecutor(LoggingMixin):
def __init__(
self,
semaphore: Semaphore,
operator: Operator,
operator: BaseOperator,
context: Context,
task_instance: TaskInstance,
):
Expand Down
Loading








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/pull/42572/commits/d013e3d7a2477e5c3b3f32c4e4b623825e2f775c

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy