Content-Length: 406425 | pFad | http://github.com/apache/airflow/commit/cfb7baafa2e9df030bcc03aad09fa7f85f2d7dff

C5 Simplify · apache/airflow@cfb7baa · GitHub
Skip to content

Commit cfb7baa

Browse files
jedcunninghamashb
authored andcommitted
Simplify
1 parent e116b48 commit cfb7baa

File tree

2 files changed

+15
-18
lines changed

2 files changed

+15
-18
lines changed

airflow/models/taskmixin.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from logging import Logger
2929

3030
from airflow.models.dag import DAG
31+
from airflow.models.operator import Operator
3132
from airflow.utils.edgemodifier import EdgeModifier
3233
from airflow.utils.task_group import TaskGroup
3334

@@ -236,14 +237,14 @@ def set_upstream(
236237
self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
237238

238239
@property
239-
def downstream_list(self) -> Iterable["DAGNode"]:
240+
def downstream_list(self) -> Iterable["Operator"]:
240241
"""List of nodes directly downstream"""
241242
if not self.dag:
242243
raise AirflowException(f'Operator {self} has not been assigned to a DAG yet')
243244
return [self.dag.get_task(tid) for tid in self.downstream_task_ids]
244245

245246
@property
246-
def upstream_list(self) -> Iterable["DAGNode"]:
247+
def upstream_list(self) -> Iterable["Operator"]:
247248
"""List of nodes directly upstream"""
248249
if not self.dag:
249250
raise AirflowException(f'Operator {self} has not been assigned to a DAG yet')

airflow/www/views.py

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ def get_value_from_path(key_path, content):
515515
return elem
516516

517517

518-
def dag_edges(dag):
518+
def dag_edges(dag: DAG):
519519
"""
520520
Create the list of edges needed to construct the Graph view.
521521
@@ -596,21 +596,17 @@ def collect_edges(task_group):
596596
# Collect all the edges between individual tasks
597597
edges = set()
598598

599-
def get_downstream(task):
600-
tasks_to_trace = {(task, frozenset(task.downstream_list))}
601-
while tasks_to_trace:
602-
tasks_to_trace_next: Set[tuple] = set()
603-
for task, children in tasks_to_trace:
604-
for child in children:
605-
edge = (task.task_id, child.task_id)
606-
if edge in edges:
607-
continue
608-
edges.add(edge)
609-
tasks_to_trace_next.add((child, frozenset(child.downstream_list)))
610-
tasks_to_trace = tasks_to_trace_next
611-
612-
for root in dag.roots:
613-
get_downstream(root)
599+
tasks_to_trace: List[Operator] = dag.roots
600+
while tasks_to_trace:
601+
tasks_to_trace_next: List[Operator] = []
602+
for task in tasks_to_trace:
603+
for child in task.downstream_list:
604+
edge = (task.task_id, child.task_id)
605+
if edge in edges:
606+
continue
607+
edges.add(edge)
608+
tasks_to_trace_next.append(child)
609+
tasks_to_trace = tasks_to_trace_next
614610

615611
result = []
616612
# Build result dicts with the two ends of the edge, plus any extra metadata

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/commit/cfb7baafa2e9df030bcc03aad09fa7f85f2d7dff

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy