Content-Length: 305096 | pFad | http://github.com/apache/airflow/pull/50641/commits/f207e92cd46889ef4ef5c1c71ba0173d8ed5eaec

0F fix(task_instances): handle upstream_mapped_index when xcom access is needed by Lee-W · Pull Request #50641 · apache/airflow · GitHub
Skip to content

fix(task_instances): handle upstream_mapped_index when xcom access is needed #50641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 22, 2025
Next Next commit
fix(task_instances): handle upstream_mapped_index when xcom access is…
… needed
  • Loading branch information
Lee-W committed May 22, 2025
commit f207e92cd46889ef4ef5c1c71ba0173d8ed5eaec
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.trigger import Trigger
from airflow.models.xcom import XComModel
from airflow.sdk.definitions._internal.expandinput import NotFullyPopulated
from airflow.sdk.definitions.taskgroup import MappedTaskGroup
from airflow.utils import timezone
from airflow.utils.state import DagRunState, TaskInstanceState, TerminalTIState
Expand Down Expand Up @@ -244,7 +245,9 @@ def ti_run(
)

if dag := dag_bag.get_dag(ti.dag_id):
upstream_map_indexes = dict(_get_upstream_map_indexes(dag.get_task(ti.task_id), ti.map_index))
upstream_map_indexes = dict(
_get_upstream_map_indexes(dag.get_task(ti.task_id), ti.map_index, ti.run_id, session)
)
else:
upstream_map_indexes = None

Expand Down Expand Up @@ -274,7 +277,7 @@ def ti_run(


def _get_upstream_map_indexes(
task: Operator, ti_map_index: int
task: Operator, ti_map_index: int, run_id: str, session: SessionDep
) -> Iterator[tuple[str, int | list[int] | None]]:
for upstream_task in task.upstream_list:
map_indexes: int | list[int] | None
Expand All @@ -287,8 +290,17 @@ def _get_upstream_map_indexes(
map_indexes = ti_map_index
else:
# tasks not in the same mapped task group
# the upstream mapped task group should combine the xcom as a list and return it
mapped_ti_count: int = upstream_task.task_group.get_parse_time_mapped_ti_count()
# the upstream mapped task group should combine the return xcom as a list and return it
mapped_ti_count: int
upstream_mapped_group = upstream_task.task_group
try:
# for cases that does not need to resolve xcom
mapped_ti_count = upstream_mapped_group.get_parse_time_mapped_ti_count()
except NotFullyPopulated:
# for cases that needs to resolve xcom to get the correct count
mapped_ti_count = upstream_mapped_group._expand_input.get_total_map_length(
run_id, session=session
)
map_indexes = list(range(mapped_ti_count)) if mapped_ti_count is not None else None

yield upstream_task.task_id, map_indexes
Expand Down








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/pull/50641/commits/f207e92cd46889ef4ef5c1c71ba0173d8ed5eaec

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy