Content-Length: 526813 | pFad | http://github.com/apache/airflow/pull/49180/commits/4769ac980c0fdd9e4d0e29da6322c0dedf010f3f

C0 OpenTelemetry traces implementation cleanup by xBis7 · Pull Request #49180 · apache/airflow · GitHub
Skip to content

OpenTelemetry traces implementation cleanup #49180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
786a7da
replace ti.key with ti.try_id in active_spans
xBis7 Apr 6, 2025
e234bbf
add otel to integrations
xBis7 Apr 6, 2025
92682be
modify pytest_plugin.py
xBis7 Apr 6, 2025
33c28f7
add redis integration to global_constants.py
xBis7 Apr 7, 2025
a3ef753
fix ci failures
xBis7 Apr 7, 2025
4fbda4b
fix test_edge_command.py
xBis7 Apr 8, 2025
9e71f2f
fix test_edge_executor.py
xBis7 Apr 8, 2025
d21af67
increase test timeout
xBis7 Apr 8, 2025
5a4c246
increase test timeout
xBis7 Apr 9, 2025
d65196f
mark flaky tests with xfail
xBis7 Apr 9, 2025
a287246
add config for disabling spans from internal operations + disable dec…
xBis7 Apr 9, 2025
9b5d168
disable all debugging traces
xBis7 Apr 11, 2025
81d9d85
keep redis env variables
xBis7 Apr 13, 2025
cadeb6b
merge with main and resolve conflicts
xBis7 Apr 13, 2025
53d3de4
replace ti.try_id with ti.id
xBis7 Apr 13, 2025
cc8d1fc
fix _end_active_spans logic bug causing tests to fail
xBis7 Apr 14, 2025
6901de8
set a timeout for each test in test_otel.py
xBis7 Apr 15, 2025
472d416
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 15, 2025
b3bcae7
add 'timeout' configuration option in pytest 'markers'
xBis7 Apr 15, 2025
5b355e0
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 15, 2025
221ab47
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 15, 2025
f5ec1f4
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 16, 2025
a78cf91
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 17, 2025
e08543b
merge with main and resolve conflicts
xBis7 Apr 24, 2025
6b672d6
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 Apr 28, 2025
35f93bc
update config target version
xBis7 Apr 28, 2025
248af6b
convert leftover ti.id to str
xBis7 Apr 28, 2025
d90d29e
simplify TaskInstance query
xBis7 Apr 28, 2025
74ebb42
replace dag_run.run_id with dag_run.id
xBis7 Apr 28, 2025
3fee0b0
remove conf.add_section from test_otel_tracer.py
xBis7 Apr 28, 2025
39abf69
increase timeout for each test in test_otel.py
xBis7 Apr 28, 2025
e511c16
replace class annotation with method annotation for a timeout in test…
xBis7 Apr 29, 2025
4769ac9
rename add_span to add_debug_span
xBis7 Apr 29, 2025
afcc997
initialize db only once at the start, in test_otel.py
xBis7 Apr 30, 2025
71280ce
Merge remote-tracking branch 'origen/main' into otel_cleanup
xBis7 May 1, 2025
e5e91d0
cleanup control_file in case of failure
xBis7 May 1, 2025
e27feb1
merge with main and resolve conflicts
xBis7 May 7, 2025
2c86006
convert log info to debug
xBis7 May 8, 2025
1a9d0d7
merge with main and resolve conflicts
xBis7 May 24, 2025
4770e4d
Merge branch 'main' into otel_cleanup
xBis7 Jun 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rename add_span to add_debug_span
  • Loading branch information
xBis7 committed Apr 29, 2025
commit 4769ac980c0fdd9e4d0e29da6322c0dedf010f3f
8 changes: 4 additions & 4 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from airflow.models import Log
from airflow.stats import Stats
from airflow.traces import NO_TRACE_ID
from airflow.traces.tracer import DebugTrace, Trace, add_span, gen_context
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span, gen_context
from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -270,7 +270,7 @@ def sync(self) -> None:
Executors should override this to perform gather statuses.
"""

@add_span
@add_debug_span
def heartbeat(self) -> None:
"""Heartbeat sent to trigger new jobs."""
open_slots = self.parallelism - len(self.running)
Expand Down Expand Up @@ -373,7 +373,7 @@ def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, QueuedTa
reverse=True,
)

@add_span
@add_debug_span
def trigger_tasks(self, open_slots: int) -> None:
"""
Initiate async execution of the queued tasks, up to the number of available slots.
Expand Down Expand Up @@ -465,7 +465,7 @@ def trigger_tasks(self, open_slots: int) -> None:
elif workload_list:
self._process_workloads(workload_list) # type: ignore[attr-defined]

@add_span
@add_debug_span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
for key, command, queue, executor_config in task_tuples:
task_instance = self.queued_tasks[key][3] # TaskInstance in fourth element
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from airflow.listeners.listener import get_listener_manager
from airflow.models.base import ID_LEN, Base
from airflow.stats import Stats
from airflow.traces.tracer import DebugTrace, add_span
from airflow.traces.tracer import DebugTrace, add_debug_span
from airflow.utils import timezone
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -385,7 +385,7 @@ def execute_job(job: Job, execute_callable: Callable[[], int | None]) -> int | N
return ret


@add_span
@add_debug_span
def perform_heartbeat(
job: Job, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool
) -> None:
Expand Down
8 changes: 4 additions & 4 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
from airflow.timetables.simple import AssetTriggeredTimetable
from airflow.traces import utils as trace_utils
from airflow.traces.tracer import DebugTrace, Trace, add_span
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span
from airflow.utils import timezone
from airflow.utils.dates import datetime_to_nano
from airflow.utils.event_scheduler import EventScheduler
Expand Down Expand Up @@ -1489,7 +1489,7 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None:
for b in backfills:
b.completed_at = now

@add_span
@add_debug_span
def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -> None:
"""Create a DAG run and update the dag_model to control if/when the next DAGRun should be created."""
# Bulk Fetch DagRuns with dag_id and logical_date same
Expand Down Expand Up @@ -1674,7 +1674,7 @@ def _should_update_dag_next_dagruns(
return False
return True

@add_span
@add_debug_span
def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running state."""
# added all() to save runtime, otherwise query is executed more than once
Expand All @@ -1691,7 +1691,7 @@ def _start_queued_dagruns(self, session: Session) -> None:
)
active_runs_of_dags = Counter({(dag_id, br_id): num for dag_id, br_id, num in session.execute(query)})

@add_span
@add_debug_span
def _update_state(dag: DAG, dag_run: DagRun):
span = Trace.get_current_span()
span.set_attributes(
Expand Down
10 changes: 5 additions & 5 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
)
from airflow.sdk.execution_time.supervisor import WatchedSubprocess, make_buffered_socket_reader
from airflow.stats import Stats
from airflow.traces.tracer import DebugTrace, Trace, add_span
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span
from airflow.triggers import base as events
from airflow.utils import timezone
from airflow.utils.helpers import log_filename_template_renderer
Expand Down Expand Up @@ -484,14 +484,14 @@ def heartbeat(self):
def heartbeat_callback(self, session: Session | None = None) -> None:
Stats.incr("triggerer_heartbeat", 1, 1)

@add_span
@add_debug_span
def load_triggers(self):
"""Query the database for the triggers we're supposed to be running and update the runner."""
Trigger.assign_unassigned(self.job.id, self.capacity, self.health_check_threshold)
ids = Trigger.ids_for_triggerer(self.job.id)
self.update_triggers(set(ids))

@add_span
@add_debug_span
def handle_events(self):
"""Dispatch outbound events to the Trigger model which pushes them to the relevant task instances."""
while self.events:
Expand All @@ -502,12 +502,12 @@ def handle_events(self):
# Emit stat event
Stats.incr("triggers.succeeded")

@add_span
@add_debug_span
def clean_unused(self):
"""Clean out unused or finished triggers."""
Trigger.clean_unused()

@add_span
@add_debug_span
def handle_failed_triggers(self):
"""
Handle "failed" triggers. - ones that errored or exited before they sent an event.
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/traces/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def gen_links_from_kv_list(list):
return gen_links_from_kv_list(list)


def add_span(func):
def add_debug_span(func):
"""Decorate a function with span."""
func_name = func.__name__
qual_name = func.__qualname__
Expand Down
Loading








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/pull/49180/commits/4769ac980c0fdd9e4d0e29da6322c0dedf010f3f

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy