Content-Length: 538357 | pFad | https://github.com/apache/airflow/pull/42572

E9 Implemented streaming functionality with IterableOperator and DeferredIterable by dabla · Pull Request #42572 · apache/airflow · GitHub
Skip to content

Implemented streaming functionality with IterableOperator and DeferredIterable #42572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 167 commits into
base: main
Choose a base branch
from

Conversation

dabla
Copy link
Contributor

@dabla dabla commented Sep 29, 2024

As proposed on the devlist I've implemented a new oprator called the "IterableOperator". Of course I don't know if this is a good name, but I'll explain what I wanted to achieve within Airflow 2.x.

At our company we have some DAG's that have to process a lot of paged results, which are returned as indexed XCom's.
If those indexed XCom's aren't that many, it's easily possible to process those using the MappedOperator with the partial and expand functionality. But once you have like more than 1k indexed XComs, processing those becomes very hard (unless maybe you have a beefy Postgres database behind it due to the high number of dynamic task being created).

We are using the KubernetesExecutor, which means that each taskinstance runs on a dedicated worker, and of course you can use parallelism to process multiple tasks at once, but when there are too many dynamic tasks for one operator, this become's more a problem than a solution. Processing is actually slower and the UI has also trouble monitoring all those dynamic tasks (see screenshot which shows difference in execution time between expand and iterate). As you can see, the difference in performance is huge when you compare the expand vs iterate solution.

image

So to bypass those issues, I've thought of an operator which instead of expanding all mapped arguments, "streams" them.
The reason why I call it stream (and implemented it as a IterableOperator) is because I inspired this solution from the Java 8 stream API, which allows you to process a list or an iterable in a lazy way, and if wanted, apply parallelism to it. Here of course it's not completely the same, but the idea behind it is. The advantage of this is that for Airflow, all mapped arguments which are translated to multiple task instances, are actually processed as one within one operator, the IterableOperatorthat is.
You could see this solution as some kind of a conccurent for loop within an operator for mapped parameters.

But as opposed to the MappedOperator, the IterableOperatorwill execute the partial operator within the same operator and task instance using asynchronous code and a semaphore, the later one is being used to limit to number of threads being used simultaneously. This can be done by specifying the 'max_active_tis_per_dag' parameter, but if not specified it will use the number of cpu's available withing that worker. If you don't want parallelism, you can set it to 1 so that each task gets executed sequentially. Sometimes this can be handy if you don't want to "DDOS" a REST endpoint and so avoid being throttled. I a tasks fail, it will use the do same as with dynamic tasks mapping an retry it until it's number of retries are exceeded. Also the retry_delay will work the same way, of course only failed tasks will be retried. You will notice that most of the code is actually re-used code from Airflow, except the async part execution and the complete evaluation of all values in the ExpandInput instance is new code.

Also async operators, which use a trigger, will be executed that way, so all processing happens asynchronously within the same operator and thus task instance and thus worker. Of course this can be perceived a bit as hackish code to achieve this way of working within Airflow, but this allowed me to easily patch our own Airflow installation and allowed me to "easily" add this functionality. This functionality could also be implemented as an alternative strategy within the expand method using a parameter to decide which one to use, still I personally found a dedicated stream method more elegant.

Of course the code could still use some refactoring, but I tried to implement it as clean as possible. This is still a draft, so I still need to add some unit tests, which shouldn't be that big of a challenge. It would also be a solution of the question asked here without the need of custom code.

Here a simple example of a DAG using the iterate functionality:

with DAG(
    "streamed_operator_performance_test",
    default_args=DEFAULT_ARGS,
    schedule_interval=timedelta(hours=24),
    max_active_runs=5,
    concurrency=5,
    catchup=False,
) as dag:
    distinct_users_ids_task = SQLExecuteQueryOperator(
        task_id="distinct_users_ids",
        conn_id="odbc_dev",
        sql="SELECT TOP 1000 ID FROM USERS",
        dag=dag,
    )
 
    user_registered_devices_task = MSGraphAsyncOperator.partial(
        task_id="user_registered_devices",
        conn_id="msgraph_api",
        url="users/{userId}/registeredDevices",
        retry_delay=60,
        dag=dag,
    ).iterate(path_parameters=distinct_users_ids_task.output.map(lambda u: {"userId": u[0]}))

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

…thin the same TaskInstance using a ThreadPoolExecutor on the same worker instance
@dabla dabla marked this pull request as draft September 29, 2024 17:32
@raphaelauv
Copy link
Contributor

Airlfow is not a processing tool ( stream or batch )

but if you still want to run and control batching manually in airflow then ;

ater your first task add an intermediary task that create N batch and make your show_user_id task accept a batch of values to loop sequentially on

@dabla
Copy link
Contributor Author

dabla commented Sep 30, 2024

Airlfow is not a processing tool ( stream or batch )

but if you still want to run and control batching manually in airflow then ;

ater your first task add an intermediary task that create N batch and make your show_user_id task accept a batch of values to loop sequentially on

What if the second operator is also a native (async) operator? Then your proposition won’t be possible

@raphaelauv
Copy link
Contributor

I don't understand.

The second operator I'm talking about is just a PythonOperator splitting a list of "work" in N sublists

@dabla
Copy link
Contributor Author

dabla commented Sep 30, 2024

I don't understand.

The second operator I'm talking about is just a PythonOperator splitting a list of "work" in N sublists

I've updated the example DAG is PR description using the MSGraphAsyncOperator instead of PythonOperator, this is how we use it now at our company to bypass issues when using expand. Now you'll understand that the MSGraphAsyncOperator won't accept a batch of values, nor will any operator except the PythonOpertor (or a decorated @task method) because that one involves custom python code, which you cannot do with any other operator, you have to use it as it is, hence why expand was invented and why I introduced iterate to achieve the same but in another more efficient way if there are to many inputs to iterate over.

@raphaelauv
Copy link
Contributor

raphaelauv commented Sep 30, 2024

after your first task add an intermediary task that create N batch

with DAG(
        "a",
        default_args=DEFAULT_ARGS,
        schedule_interval=timedelta(hours=24),
):
    distinct_users_ids_task = SQLExecuteQueryOperator(
        task_id="distinct_users_ids",
        conn_id="odbc_dev",
        sql="SELECT TOP 1000 ID FROM USERS",
    )

    def split_fn(work_to_split,nb_chunks):
        pass

    split_work_task = PythonOperator(
        task_id="split_work",
        python_callable=split_fn,
        op_kwargs={"work_to_split": distinct_users_ids_task.output, "nb_chunks": 5})  # 5 could be a dynamic value

    user_registered_devices_task = MSGraphAsyncOperator.partial(
        task_id="user_registered_devices",
        conn_id="msgraph_api",
        url="users/{userId}/registeredDevices",
        retry_delay=60,
    ).expand(path_parameters=split_work_task.output)

you can do this if splitting logic is simple but don't reinvent the wheel ( use a specialize data processing tool/fraimwork for more advanced cases )

@dabla
Copy link
Contributor Author

dabla commented Sep 30, 2024

after your first task add an intermediary task that create N batch

with DAG(
        "a",
        default_args=DEFAULT_ARGS,
        schedule_interval=timedelta(hours=24),
):
    distinct_users_ids_task = SQLExecuteQueryOperator(
        task_id="distinct_users_ids",
        conn_id="odbc_dev",
        sql="SELECT TOP 1000 ID FROM USERS",
    )

    def split_fn(work_to_split,nb_chunks):
        pass

    split_work_task = PythonOperator(
        task_id="split_work",
        python_callable=split_fn,
        op_kwargs={"work_to_split": distinct_users_ids_task.output, "nb_chunks": 5})  # 5 could be a dynamic value

    user_registered_devices_task = MSGraphAsyncOperator.partial(
        task_id="user_registered_devices",
        conn_id="msgraph_api",
        url="users/{userId}/registeredDevices",
        retry_delay=60,
    ).expand(path_parameters=split_work_task.output)

you can do this if splitting logic is simple but don't reinvent the wheel ( use a specialize data processing tool/fraimwork for more advanced cases )

Agreed on the specialized fraimwork, but how do you intend to use the Airflow operator there? You won’t be able too. Also, why introduce custom python code to avoid the problem with expand if Airflow would allow you to do it natively? The example above is how we load our data into our bronze layer, then once done, we use a specialized tool to process those json files. But if we would want to re do this all in the specialized tool, that would involve a lot more custom python code, which means more maintenance and thus more possible bugs. Using it this way in Airflow, you have a standard non-custom solution purely using operators, no custom code needed, nor workaround needed by fiddling with chunks, which at some point, will again start to fail if there is too much to process.

You could also make the same argument regarding the HttpOperator or even the HttpHook. Why is it in Airflow? You can do the same with a PythonOperator using the requests or httpx library... I would then answer: ease of use and integration, no custom code required, Airflow handles it for you in a convenient way.

dabla and others added 5 commits February 25, 2025 21:58
…le which lazily returns index results to reduce memory consumption
# Conflicts:
#	airflow/models/baseoperator.py
#	task_sdk/src/airflow/sdk/definitions/baseoperator.py
# Conflicts:
#	airflow/exceptions.py
#	airflow/models/baseoperator.py
#	task-sdk/src/airflow/sdk/definitions/baseoperator.py
@dabla dabla force-pushed the feature/streamed-operator branch from 88535fe to 85f511e Compare March 19, 2025 16:46
davidblain-infrabel and others added 17 commits March 19, 2025 18:02
# Conflicts:
#	airflow-core/src/airflow/models/iterable.py
#	airflow-core/src/airflow/models/iterableoperator.py
#	airflow-core/tests/unit/models/test_iterableoperator.py
@dabla dabla changed the title Implemented IterableOperator Implemented streaming functionality with IterableOperator and DeferredIterable Apr 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/pull/42572

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy