File tree 2 files changed +15
-18
lines changed
2 files changed +15
-18
lines changed Original file line number Diff line number Diff line change 28
28
from logging import Logger
29
29
30
30
from airflow .models .dag import DAG
31
+ from airflow .models .operator import Operator
31
32
from airflow .utils .edgemodifier import EdgeModifier
32
33
from airflow .utils .task_group import TaskGroup
33
34
@@ -236,14 +237,14 @@ def set_upstream(
236
237
self ._set_relatives (task_or_task_list , upstream = True , edge_modifier = edge_modifier )
237
238
238
239
@property
239
- def downstream_list (self ) -> Iterable ["DAGNode " ]:
240
+ def downstream_list (self ) -> Iterable ["Operator " ]:
240
241
"""List of nodes directly downstream"""
241
242
if not self .dag :
242
243
raise AirflowException (f'Operator { self } has not been assigned to a DAG yet' )
243
244
return [self .dag .get_task (tid ) for tid in self .downstream_task_ids ]
244
245
245
246
@property
246
- def upstream_list (self ) -> Iterable ["DAGNode " ]:
247
+ def upstream_list (self ) -> Iterable ["Operator " ]:
247
248
"""List of nodes directly upstream"""
248
249
if not self .dag :
249
250
raise AirflowException (f'Operator { self } has not been assigned to a DAG yet' )
Original file line number Diff line number Diff line change @@ -515,7 +515,7 @@ def get_value_from_path(key_path, content):
515
515
return elem
516
516
517
517
518
- def dag_edges (dag ):
518
+ def dag_edges (dag : DAG ):
519
519
"""
520
520
Create the list of edges needed to construct the Graph view.
521
521
@@ -596,21 +596,17 @@ def collect_edges(task_group):
596
596
# Collect all the edges between individual tasks
597
597
edges = set ()
598
598
599
- def get_downstream (task ):
600
- tasks_to_trace = {(task , frozenset (task .downstream_list ))}
601
- while tasks_to_trace :
602
- tasks_to_trace_next : Set [tuple ] = set ()
603
- for task , children in tasks_to_trace :
604
- for child in children :
605
- edge = (task .task_id , child .task_id )
606
- if edge in edges :
607
- continue
608
- edges .add (edge )
609
- tasks_to_trace_next .add ((child , frozenset (child .downstream_list )))
610
- tasks_to_trace = tasks_to_trace_next
611
-
612
- for root in dag .roots :
613
- get_downstream (root )
599
+ tasks_to_trace : List [Operator ] = dag .roots
600
+ while tasks_to_trace :
601
+ tasks_to_trace_next : List [Operator ] = []
602
+ for task in tasks_to_trace :
603
+ for child in task .downstream_list :
604
+ edge = (task .task_id , child .task_id )
605
+ if edge in edges :
606
+ continue
607
+ edges .add (edge )
608
+ tasks_to_trace_next .append (child )
609
+ tasks_to_trace = tasks_to_trace_next
614
610
615
611
result = []
616
612
# Build result dicts with the two ends of the edge, plus any extra metadata
You can’t perform that action at this time.
0 commit comments