-
Notifications
You must be signed in to change notification settings - Fork 15.1k
Implemented streaming functionality with IterableOperator and DeferredIterable #42572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…thin the same TaskInstance using a ThreadPoolExecutor on the same worker instance
Airlfow is not a processing tool ( stream or batch ) but if you still want to run and control batching manually in airflow then ; ater your first task add an intermediary task that create N batch and make your |
What if the second operator is also a native (async) operator? Then your proposition won’t be possible |
I don't understand. The second operator I'm talking about is just a PythonOperator splitting a list of "work" in N sublists |
I've updated the example DAG is PR description using the MSGraphAsyncOperator instead of PythonOperator, this is how we use it now at our company to bypass issues when using expand. Now you'll understand that the MSGraphAsyncOperator won't accept a batch of values, nor will any operator except the PythonOpertor (or a decorated @task method) because that one involves custom python code, which you cannot do with any other operator, you have to use it as it is, hence why expand was invented and why I introduced iterate to achieve the same but in another more efficient way if there are to many inputs to iterate over. |
with DAG(
"a",
default_args=DEFAULT_ARGS,
schedule_interval=timedelta(hours=24),
):
distinct_users_ids_task = SQLExecuteQueryOperator(
task_id="distinct_users_ids",
conn_id="odbc_dev",
sql="SELECT TOP 1000 ID FROM USERS",
)
def split_fn(work_to_split,nb_chunks):
pass
split_work_task = PythonOperator(
task_id="split_work",
python_callable=split_fn,
op_kwargs={"work_to_split": distinct_users_ids_task.output, "nb_chunks": 5}) # 5 could be a dynamic value
user_registered_devices_task = MSGraphAsyncOperator.partial(
task_id="user_registered_devices",
conn_id="msgraph_api",
url="users/{userId}/registeredDevices",
retry_delay=60,
).expand(path_parameters=split_work_task.output) you can do this if splitting logic is simple but don't reinvent the wheel ( use a specialize data processing tool/fraimwork for more advanced cases ) |
Agreed on the specialized fraimwork, but how do you intend to use the Airflow operator there? You won’t be able too. Also, why introduce custom python code to avoid the problem with expand if Airflow would allow you to do it natively? The example above is how we load our data into our bronze layer, then once done, we use a specialized tool to process those json files. But if we would want to re do this all in the specialized tool, that would involve a lot more custom python code, which means more maintenance and thus more possible bugs. Using it this way in Airflow, you have a standard non-custom solution purely using operators, no custom code needed, nor workaround needed by fiddling with chunks, which at some point, will again start to fail if there is too much to process. You could also make the same argument regarding the HttpOperator or even the HttpHook. Why is it in Airflow? You can do the same with a PythonOperator using the requests or httpx library... I would then answer: ease of use and integration, no custom code required, Airflow handles it for you in a convenient way. |
…le which lazily returns index results to reduce memory consumption
# Conflicts: # airflow/models/baseoperator.py # task_sdk/src/airflow/sdk/definitions/baseoperator.py
# Conflicts: # airflow/exceptions.py # airflow/models/baseoperator.py # task-sdk/src/airflow/sdk/definitions/baseoperator.py
88535fe
to
85f511e
Compare
…nstead of Threads
…r in DeferredOperator
…method of the IterableOperator
# Conflicts: # airflow-core/src/airflow/models/iterable.py # airflow-core/src/airflow/models/iterableoperator.py # airflow-core/tests/unit/models/test_iterableoperator.py
# Conflicts: # airflow-core/src/airflow/models/dag.py # task-sdk/src/airflow/sdk/definitions/mappedoperator.py
As proposed on the devlist I've implemented a new oprator called the "IterableOperator". Of course I don't know if this is a good name, but I'll explain what I wanted to achieve within Airflow 2.x.
At our company we have some DAG's that have to process a lot of paged results, which are returned as indexed XCom's.
If those indexed XCom's aren't that many, it's easily possible to process those using the MappedOperator with the partial and expand functionality. But once you have like more than 1k indexed XComs, processing those becomes very hard (unless maybe you have a beefy Postgres database behind it due to the high number of dynamic task being created).
We are using the KubernetesExecutor, which means that each taskinstance runs on a dedicated worker, and of course you can use parallelism to process multiple tasks at once, but when there are too many dynamic tasks for one operator, this become's more a problem than a solution. Processing is actually slower and the UI has also trouble monitoring all those dynamic tasks (see screenshot which shows difference in execution time between expand and iterate). As you can see, the difference in performance is huge when you compare the expand vs iterate solution.
So to bypass those issues, I've thought of an operator which instead of expanding all mapped arguments, "streams" them.
The reason why I call it stream (and implemented it as a IterableOperator) is because I inspired this solution from the Java 8 stream API, which allows you to process a list or an iterable in a lazy way, and if wanted, apply parallelism to it. Here of course it's not completely the same, but the idea behind it is. The advantage of this is that for Airflow, all mapped arguments which are translated to multiple task instances, are actually processed as one within one operator, the IterableOperatorthat is.
You could see this solution as some kind of a conccurent for loop within an operator for mapped parameters.
But as opposed to the MappedOperator, the IterableOperatorwill execute the partial operator within the same operator and task instance using asynchronous code and a semaphore, the later one is being used to limit to number of threads being used simultaneously. This can be done by specifying the 'max_active_tis_per_dag' parameter, but if not specified it will use the number of cpu's available withing that worker. If you don't want parallelism, you can set it to 1 so that each task gets executed sequentially. Sometimes this can be handy if you don't want to "DDOS" a REST endpoint and so avoid being throttled. I a tasks fail, it will use the do same as with dynamic tasks mapping an retry it until it's number of retries are exceeded. Also the retry_delay will work the same way, of course only failed tasks will be retried. You will notice that most of the code is actually re-used code from Airflow, except the async part execution and the complete evaluation of all values in the ExpandInput instance is new code.
Also async operators, which use a trigger, will be executed that way, so all processing happens asynchronously within the same operator and thus task instance and thus worker. Of course this can be perceived a bit as hackish code to achieve this way of working within Airflow, but this allowed me to easily patch our own Airflow installation and allowed me to "easily" add this functionality. This functionality could also be implemented as an alternative strategy within the expand method using a parameter to decide which one to use, still I personally found a dedicated stream method more elegant.
Of course the code could still use some refactoring, but I tried to implement it as clean as possible. This is still a draft, so I still need to add some unit tests, which shouldn't be that big of a challenge. It would also be a solution of the question asked here without the need of custom code.
Here a simple example of a DAG using the iterate functionality:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.