Content-Length: 745905 | pFad | http://github.com/apache/airflow/pull/49180/commits/53d3de45139be1543c56b8dd839ae0f05e3cfd48

20 OpenTelemetry traces implementation cleanup by xBis7 · Pull Request #49180 · apache/airflow · GitHub
Skip to content

OpenTelemetry traces implementation cleanup #49180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
786a7da
replace ti.key with ti.try_id in active_spans
xBis7 Apr 6, 2025
e234bbf
add otel to integrations
xBis7 Apr 6, 2025
92682be
modify pytest_plugin.py
xBis7 Apr 6, 2025
33c28f7
add redis integration to global_constants.py
xBis7 Apr 7, 2025
a3ef753
fix ci failures
xBis7 Apr 7, 2025
4fbda4b
fix test_edge_command.py
xBis7 Apr 8, 2025
9e71f2f
fix test_edge_executor.py
xBis7 Apr 8, 2025
d21af67
increase test timeout
xBis7 Apr 8, 2025
5a4c246
increase test timeout
xBis7 Apr 9, 2025
d65196f
mark flaky tests with xfail
xBis7 Apr 9, 2025
a287246
add config for disabling spans from internal operations + disable dec…
xBis7 Apr 9, 2025
9b5d168
disable all debugging traces
xBis7 Apr 11, 2025
81d9d85
keep redis env variables
xBis7 Apr 13, 2025
cadeb6b
merge with main and resolve conflicts
xBis7 Apr 13, 2025
53d3de4
replace ti.try_id with ti.id
xBis7 Apr 13, 2025
cc8d1fc
fix _end_active_spans logic bug causing tests to fail
xBis7 Apr 14, 2025
6901de8
set a timeout for each test in test_otel.py
xBis7 Apr 15, 2025
472d416
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 15, 2025
b3bcae7
add 'timeout' configuration option in pytest 'markers'
xBis7 Apr 15, 2025
5b355e0
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 15, 2025
221ab47
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 15, 2025
f5ec1f4
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 16, 2025
a78cf91
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 17, 2025
e08543b
merge with main and resolve conflicts
xBis7 Apr 24, 2025
6b672d6
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 28, 2025
35f93bc
update config target version
xBis7 Apr 28, 2025
248af6b
convert leftover ti.id to str
xBis7 Apr 28, 2025
d90d29e
simplify TaskInstance query
xBis7 Apr 28, 2025
74ebb42
replace dag_run.run_id with dag_run.id
xBis7 Apr 28, 2025
3fee0b0
remove conf.add_section from test_otel_tracer.py
xBis7 Apr 28, 2025
39abf69
increase timeout for each test in test_otel.py
xBis7 Apr 28, 2025
e511c16
replace class annotation with method annotation for a timeout in test…
xBis7 Apr 29, 2025
4769ac9
rename add_span to add_debug_span
xBis7 Apr 29, 2025
afcc997
initialize db only once at the start, in test_otel.py
xBis7 Apr 30, 2025
71280ce
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 May 1, 2025
e5e91d0
cleanup control_file in case of failure
xBis7 May 1, 2025
e27feb1
merge with main and resolve conflicts
xBis7 May 7, 2025
2c86006
convert log info to debug
xBis7 May 8, 2025
1a9d0d7
merge with main and resolve conflicts
xBis7 May 24, 2025
4770e4d
Merge branch 'main' into otel_cleanup
xBis7 Jun 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
replace ti.try_id with ti.id
  • Loading branch information
xBis7 committed Apr 13, 2025
commit 53d3de45139be1543c56b8dd839ae0f05e3cfd48
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ def trigger_tasks(self, open_slots: int) -> None:
if isinstance(item, workloads.ExecuteTask) and hasattr(item, "ti"):
ti = item.ti

# If it's None, then the span for the current try_id hasn't been started.
if self.active_spans is not None and self.active_spans.get(ti.try_id) is None:
# If it's None, then the span for the current id hasn't been started.
if self.active_spans is not None and self.active_spans.get(ti.id) is None:
from airflow.models.taskinstance import SimpleTaskInstance

if isinstance(ti, SimpleTaskInstance):
Expand All @@ -451,7 +451,7 @@ def trigger_tasks(self, open_slots: int) -> None:
component="task",
start_as_current=False,
)
self.active_spans.set(ti.try_id, span)
self.active_spans.set(ti.id, span)
# Inject the current context into the carrier.
carrier = Trace.inject()
ti.context_carrier = carrier
Expand Down
1 change: 0 additions & 1 deletion airflow-core/src/airflow/executors/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ class TaskInstance(BaseModel):
task_id: str
dag_id: str
run_id: str
try_id: uuid.UUID
try_number: int
map_index: int = -1

Expand Down
23 changes: 11 additions & 12 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,11 +819,11 @@ def process_executor_events(
ti.pid,
)

if (active_ti_span := cls.active_spans.get(ti.try_id)) is not None:
if (active_ti_span := cls.active_spans.get(ti.id)) is not None:
cls.set_ti_span_attrs(span=active_ti_span, state=state, ti=ti)
# End the span and remove it from the active_spans dict.
active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
cls.active_spans.delete(ti.try_id)
cls.active_spans.delete(ti.id)
ti.span_status = SpanStatus.ENDED
else:
if ti.span_status == SpanStatus.ACTIVE:
Expand Down Expand Up @@ -998,12 +998,11 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION)
def _end_active_spans(self, session: Session = NEW_SESSION):
# No need to do a commit for every update. The annotation will commit all of them once at the end.
for key, span in self.active_spans.get_all().items():
from uuid import UUID
ti: TaskInstance | None = session.scalars(
select(TaskInstance).where(TaskInstance.id == str(key))
).one_or_none()

if isinstance(key, UUID): # ti span.
ti: TaskInstance = session.scalars(
select(TaskInstance).where(TaskInstance.try_id == key)
).one()
if ti is not None:
if ti.state in State.finished:
self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
span.end(end_time=datetime_to_nano(ti.end_date))
Expand All @@ -1012,7 +1011,7 @@ def _end_active_spans(self, session: Session = NEW_SESSION):
span.end()
ti.span_status = SpanStatus.NEEDS_CONTINUANCE
else:
dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.run_id == key)).one()
dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.run_id == str(key))).one()
if dag_run.state in State.finished_dr_states:
dag_run.set_dagrun_span_attrs(span=span)

Expand Down Expand Up @@ -1060,14 +1059,14 @@ def _end_spans_of_externally_ended_ops(self, session: Session):
dag_run.span_status = SpanStatus.ENDED

for ti in tis_should_end:
active_ti_span = self.active_spans.get(ti.try_id)
active_ti_span = self.active_spans.get(ti.id)
if active_ti_span is not None:
if ti.state in State.finished:
self.set_ti_span_attrs(span=active_ti_span, state=ti.state, ti=ti)
active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
else:
active_ti_span.end()
self.active_spans.delete(ti.try_id)
self.active_spans.delete(ti.id)
ti.span_status = SpanStatus.ENDED

def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session: Session):
Expand Down Expand Up @@ -1115,7 +1114,7 @@ def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session
for ti in tis
# If it has started and there is a reference on the active_spans dict,
# then it was started by the current scheduler.
if ti.start_date is not None and self.active_spans.get(ti.try_id) is None
if ti.start_date is not None and self.active_spans.get(ti.id) is None
]

dr_context = Trace.extract(dag_run.context_carrier)
Expand All @@ -1135,7 +1134,7 @@ def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session
ti.span_status = SpanStatus.ENDED
else:
ti.span_status = SpanStatus.ACTIVE
self.active_spans.set(ti.try_id, ti_span)
self.active_spans.set(ti.id, ti_span)

def _run_scheduler_loop(self) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ def start_dr_spans_if_needed(self, tis: list[TI]):
ti_carrier = Trace.inject()
ti.context_carrier = ti_carrier
ti.span_status = SpanStatus.ACTIVE
self.active_spans.set(ti.try_id, ti_span)
self.active_spans.set(ti.id, ti_span)
else:
self.log.info(
"Found span_status '%s', while updating state for dag_run '%s'",
Expand Down
1 change: 0 additions & 1 deletion airflow-core/tests/unit/executors/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def _test_execute(self, mock_supervise, parallelism=1):
task_id=f"success_{i}",
dag_id="mydag",
run_id="run1",
try_id=uuid7(),
try_number=1,
state="queued",
pool_slots=1,
Expand Down
18 changes: 9 additions & 9 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2263,7 +2263,7 @@ def test_recreate_unhealthy_scheduler_spans_if_needed(self, ti_state, final_ti_s
assert dr.span_status == SpanStatus.ACTIVE
assert self.job_runner.active_spans.get(dr.run_id) is None

assert self.job_runner.active_spans.get(ti.try_id) is None
assert self.job_runner.active_spans.get(ti.id) is None
assert ti.state == ti_state
assert ti.span_status == SpanStatus.ACTIVE

Expand All @@ -2272,10 +2272,10 @@ def test_recreate_unhealthy_scheduler_spans_if_needed(self, ti_state, final_ti_s
assert self.job_runner.active_spans.get(dr.run_id) is not None

if final_ti_span_status == SpanStatus.ACTIVE:
assert self.job_runner.active_spans.get(ti.try_id) is not None
assert self.job_runner.active_spans.get(ti.id) is not None
assert len(self.job_runner.active_spans.get_all()) == 2
else:
assert self.job_runner.active_spans.get(ti.try_id) is None
assert self.job_runner.active_spans.get(ti.id) is None
assert len(self.job_runner.active_spans.get_all()) == 1

assert dr.span_status == SpanStatus.ACTIVE
Expand Down Expand Up @@ -2316,21 +2316,21 @@ def test_end_spans_of_externally_ended_ops(self, dag_maker):
ti_span = Trace.start_child_span(span_name="ti_span", start_as_current=False)

self.job_runner.active_spans.set(dr.run_id, dr_span)
self.job_runner.active_spans.set(ti.try_id, ti_span)
self.job_runner.active_spans.set(ti.id, ti_span)

assert dr.span_status == SpanStatus.SHOULD_END
assert ti.span_status == SpanStatus.SHOULD_END

assert self.job_runner.active_spans.get(dr.run_id) is not None
assert self.job_runner.active_spans.get(ti.try_id) is not None
assert self.job_runner.active_spans.get(ti.id) is not None

self.job_runner._end_spans_of_externally_ended_ops(session)

assert dr.span_status == SpanStatus.ENDED
assert ti.span_status == SpanStatus.ENDED

assert self.job_runner.active_spans.get(dr.run_id) is None
assert self.job_runner.active_spans.get(ti.try_id) is None
assert self.job_runner.active_spans.get(ti.id) is None

@pytest.mark.parametrize(
"state, final_span_status",
Expand Down Expand Up @@ -2373,13 +2373,13 @@ def test_end_active_spans(self, state, final_span_status, dag_maker):
ti_span = Trace.start_child_span(span_name="ti_span", start_as_current=False)

self.job_runner.active_spans.set(dr.run_id, dr_span)
self.job_runner.active_spans.set(ti.try_id, ti_span)
self.job_runner.active_spans.set(ti.id, ti_span)

assert dr.span_status == SpanStatus.ACTIVE
assert ti.span_status == SpanStatus.ACTIVE

assert self.job_runner.active_spans.get(dr.run_id) is not None
assert self.job_runner.active_spans.get(ti.try_id) is not None
assert self.job_runner.active_spans.get(ti.id) is not None
assert len(self.job_runner.active_spans.get_all()) == 2

self.job_runner._end_active_spans(session)
Expand All @@ -2388,7 +2388,7 @@ def test_end_active_spans(self, state, final_span_status, dag_maker):
assert ti.span_status == final_span_status

assert self.job_runner.active_spans.get(dr.run_id) is None
assert self.job_runner.active_spans.get(ti.try_id) is None
assert self.job_runner.active_spans.get(ti.id) is None
assert len(self.job_runner.active_spans.get_all()) == 0

def test_dagrun_timeout_verify_max_active_runs(self, dag_maker):
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def test_start_dr_spans_if_needed_span_with_continuance(self, testing_dag_bundle

assert dag_run.active_spans is not None
assert dag_run.active_spans.get(dag_run.run_id) is None
assert dag_run.active_spans.get(first_ti.try_id) is None
assert dag_run.active_spans.get(first_ti.id) is None
assert dag_run.span_status == SpanStatus.NEEDS_CONTINUANCE
assert first_ti.span_status == SpanStatus.NEEDS_CONTINUANCE

Expand All @@ -546,7 +546,7 @@ def test_start_dr_spans_if_needed_span_with_continuance(self, testing_dag_bundle
assert dag_run.span_status == SpanStatus.ACTIVE
assert first_ti.span_status == SpanStatus.ACTIVE
assert dag_run.active_spans.get(dag_run.run_id) is not None
assert dag_run.active_spans.get(first_ti.try_id) is not None
assert dag_run.active_spans.get(first_ti.id) is not None

def test_end_dr_span_if_needed(self, testing_dag_bundle, dag_maker, session):
with dag_maker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,10 @@ def fake_execute_workload(command):

with start_worker(app=app, logfile=sys.stdout, loglevel="info"):
ti = workloads.TaskInstance.model_construct(
id=uuid7(),
task_id="success",
dag_id="id",
run_id="abc",
try_id=uuid7(),
try_number=0,
priority_weight=1,
queue=celery_executor_utils.celery_configuration["task_default_queue"],
Expand Down Expand Up @@ -243,7 +243,6 @@ def fake_task():
start_date=datetime.now(),
)
ti = TaskInstance(task=task, run_id="abc")
ti.try_id = uuid7()
workload = workloads.ExecuteTask.model_construct(
ti=workloads.TaskInstance.model_validate(ti, from_attributes=True),
)
Expand Down Expand Up @@ -280,7 +279,6 @@ def test_retry_on_error_sending_task(self, caplog):
start_date=datetime.now(),
)
ti = TaskInstance(task=task, run_id="abc")
ti.try_id = uuid7()
workload = workloads.ExecuteTask.model_construct(
ti=workloads.TaskInstance.model_validate(ti, from_attributes=True),
)
Expand Down
2 changes: 0 additions & 2 deletions providers/edge/tests/unit/edge/cli/test_edge_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import pytest
import time_machine
from requests import HTTPError, Response
from uuid6 import uuid7

from airflow.providers.edge.cli.dataclasses import Job
from airflow.providers.edge.cli.edge_command import _EdgeWorkerCli, _write_pid_to_pidfile
Expand All @@ -53,7 +52,6 @@
"task_id": "mock",
"dag_id": "mock",
"run_id": "mock",
"try_id": uuid7(),
"try_number": 1,
"pool_slots": 1,
"queue": "default",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from unittest.mock import MagicMock, patch

import pytest
from uuid6 import uuid7

from airflow.configuration import conf
from airflow.models.taskinstancekey import TaskInstanceKey
Expand Down Expand Up @@ -321,7 +320,6 @@ def test_queue_workload(self):
task_id="mock",
dag_id="mock",
run_id="mock",
try_id=uuid7(),
try_number=1,
pool_slots=1,
queue="default",
Expand Down
Loading








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/pull/49180/commits/53d3de45139be1543c56b8dd839ae0f05e3cfd48

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy