-
Notifications
You must be signed in to change notification settings - Fork 15.1k
OpenTelemetry traces implementation cleanup #49180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…orated function spans
That's probably because I updated it using a merge instead of a rebase, to avoid rewriting the commit history. |
@potiuk I've added a redis integration on the ci so that it runs my new otel integration tests. I've noticed that 1/10 times, it freezes right before the tests start and the entire step times out after 30 mins. I thought that it had to do with my tests and I added a timeout to make it fail fast if that's the case, but it seems to be something else causing it. Any insight or any idea why this might be happening? https://github.com/apache/airflow/actions/runs/14476740125/job/40604515009?pr=49180#logs Looks random https://github.com/xBis7/airflow/actions/runs/14497144925/job/40668243402#step:5:4279 |
@@ -69,7 +69,6 @@ class TaskInstance(BaseModel): | |||
|
|||
parent_context_carrier: dict | None = None | |||
context_carrier: dict | None = None | |||
queued_dttm: datetime | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like an un-intended change/a bad rebase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is intentional. I added it in the previous PR in order to use it as the start_time
for the task span in the base_executor.py
.
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/executors/base_executor.py#L446
When workloads.TaskInstance
is initialized, queue time isn't available and the field ends up None
. I removed it because it's redundant.
for prefixed_key, span in self.active_spans.get_all().items(): | ||
# Use partition to split on the first occurrence of ':'. | ||
prefix, sep, key = prefixed_key.partition(":") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if instead of prefixing the string we should store the keys as tuples:
("ti", str(ti.id))
, ("dr", dr.id)
etc?
Not much in it but maybe it makes things slightly clearer? (Though we'd have to be more careful of the type of the id we put in -- cos ("ti", UUID(...))
wouldn't match ("ti", "the_uuid")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, I thought of using nested dictionaries
active_spans: {
"ti": {
str(ti.id): span,
str(ti.id): span,
str(ti.id): span,
...
},
"dr": {
dr.id: span,
dr.id: span,
...
},
}
I went with the string prefix but if we change the dag_run key type to an integer then this approach might make more sense over the rest.
Though we'd have to be more careful of the type of the id we put in -- cos ("ti", UUID(...)) wouldn't match ("ti", "the_uuid")
The ti.id
is always a UUID str.
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py#L717
@@ -1287,6 +1291,7 @@ def test_scheduler_exits_forcefully_in_the_middle_of_the_first_task( | |||
# Dag run should have succeeded. Test the spans in the output. | |||
check_spans_without_continuance(output=out, dag=dag, is_recreated=True, check_t1_sub_spans=False) | |||
|
|||
@pytest.mark.xfail(reason="Tests with a control file are flaky when running on the remote CI.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the plan with these long term? Having them left as xfail adds very little and is there any point keeping them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that they are always passing locally and pretty fast, it takes around 6 mins for the entire class to run, while these tests are flaky on the ci and I don't see any errors. They mostly freeze and timeout.
We shouldn't remove them and I would like to have them running on the ci to make sure that future changes don't break the otel implementation. I'm looking at refactoring the test class to see if I can get rid of the flakiness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the xfail annotations. I refactored the tests a bit and I was able to run the tests 5 times in a row successfully.
https://github.com/apache/airflow/actions/runs/14759772893/job/41437793893?pr=49180
https://github.com/xBis7/airflow/actions/runs/14759759875/job/41437846694
https://github.com/xBis7/airflow/actions/runs/14760507799/job/41468986413
https://github.com/xBis7/airflow/actions/runs/14760511337/job/41453655641
https://github.com/xBis7/airflow/actions/runs/14760515145/job/41473346959
@ashb I think I fixed the flaky tests. I've run the CI multiple times without failures. The only pending item is to figure out what to do with the active_spans prefixing for TIs and DRs. I made the changes but left the dict format as it was. |
The failure for the |
@ashb Can you help get this PR merged? |
This is a follow up PR for #43941. It's also related to #43789.
The patch is composed by the following changes
The
active_spans
dict has been using theti.key
as a key for task instance spans.ti.key
will be replaced byti.id
. The change is based on this comment Provide an alternative OpenTelemetry implementation for traces that follows standard otel practices #43941 (comment). The comment is referring totry_id
but it was removed by PR Ensure that TI.id is unique per try. #48749 in favor ofid
.The previous PR added some integration tests which aren't running on the CI. The patch fixes that. The tests are using the
redis
integration. I've verified that the providers tests with theredis
integration, don't run on this new CI step.We are generating spans for a bunch of airflow's internal methods. The information that comes from the spans, might be relevant to developers but not to end users. Added a config flag to turn on/off the traces.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.