Content-Length: 666813 | pFad | http://github.com/astronomer/airflow/commit/489aa26c35f9edcb84ee0981afd5e24f8d255828

E7 Port ``ti.run`` to Task SDK execution path · astronomer/airflow@489aa26 · GitHub
Skip to content

Commit 489aa26

Browse files
committed
Port ti.run to Task SDK execution path
This is the last thing to remove the parallel execution path. For simplicity, `ti.run` and `ti._run_raw_task` have been retained, but they now use the Task SDK execution path. They have been retained so we don't have to make big bang changes in tests and PR remains relatively review-able. There are opportunities for evaluating & cleanup after this PR is merged: - `get_template_context`, - `handle_failure`, - `check_and_change_state_before_execution` - and probably a lot more. The following bugs/missing features were identified and implemented: - Running `on_kill` on the Task SDK execution path - Resolving lazy_object_proxies in the Context dict when running with a VirtualEnvOperator - apache#50898
1 parent e52e8ac commit 489aa26

File tree

28 files changed

+317
-2677
lines changed

28 files changed

+317
-2677
lines changed

airflow-core/src/airflow/cli/commands/task_command.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ def task_test(args, dag: DAG | None = None) -> None:
381381
)
382382
try:
383383
with redirect_stdout(RedactedIO()):
384-
_run_task(ti=ti)
384+
_run_task(ti=ti, run_triggerer=True)
385385
if ti.state == State.FAILED and args.post_mortem:
386386
debugger = _guess_debugger()
387387
debugger.set_trace()

airflow-core/src/airflow/models/taskinstance.py

Lines changed: 19 additions & 700 deletions
Large diffs are not rendered by default.

airflow-core/tests/unit/api_fastapi/execution_api/conftest.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,35 @@ def client(request: pytest.FixtureRequest):
3232

3333
with TestClient(app, headers={"Authorization": "Bearer fake"}) as client:
3434
auth = AsyncMock(spec=JWTValidator)
35-
auth.avalidated_claims.return_value = {"sub": "edb09971-4e0e-4221-ad3f-800852d38085"}
3635

37-
# Inject our fake JWTValidator object. Can be over-ridden by tests if they want
36+
# Create a side_effect function that dynamically extracts the task instance ID from validators
37+
def smart_validated_claims(cred, validators=None):
38+
# Extract task instance ID from validators if present
39+
# This handles the JWTBearerTIPathDep case where the validator contains the task ID from the path
40+
if (
41+
validators
42+
and "sub" in validators
43+
and isinstance(validators["sub"], dict)
44+
and "value" in validators["sub"]
45+
):
46+
return {
47+
"sub": validators["sub"]["value"],
48+
"exp": 9999999999, # Far future expiration
49+
"iat": 1000000000, # Past issuance time
50+
"aud": "test-audience",
51+
}
52+
53+
# For other cases (like JWTBearerDep) where no specific validators are provided
54+
# Return a default UUID with all required claims
55+
return {
56+
"sub": "00000000-0000-0000-0000-000000000000",
57+
"exp": 9999999999, # Far future expiration
58+
"iat": 1000000000, # Past issuance time
59+
"aud": "test-audience",
60+
}
61+
62+
# Set the side_effect for avalidated_claims
63+
auth.avalidated_claims.side_effect = smart_validated_claims
3864
lifespan.registry.register_value(JWTValidator, auth)
65+
3966
yield client

airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
from __future__ import annotations
1919

20-
import operator
2120
from datetime import datetime
2221
from unittest import mock
2322
from uuid import uuid4
@@ -962,22 +961,18 @@ def test_ti_skip_downstream(self, client, session, create_task_instance, dag_mak
962961
t1 = EmptyOperator(task_id="t1")
963962
t0 >> t1
964963
dr = dag_maker.create_dagrun(run_id="run")
965-
decision = dr.task_instance_scheduling_decisions(session=session)
966-
for ti in sorted(decision.schedulable_tis, key=operator.attrgetter("task_id")):
967-
# TODO: TaskSDK #45549
968-
ti.task = dag_maker.dag.get_task(ti.task_id)
969-
ti.run(session=session)
970964

971-
t0 = dr.get_task_instance("t0")
965+
ti0 = dr.get_task_instance("t0")
966+
ti0.set_state(State.SUCCESS)
967+
972968
response = client.patch(
973-
f"/execution/task-instances/{t0.id}/skip-downstream",
969+
f"/execution/task-instances/{ti0.id}/skip-downstream",
974970
json=_json,
975971
)
976-
t1 = dr.get_task_instance("t1")
972+
ti1 = dr.get_task_instance("t1")
977973

978974
assert response.status_code == 204
979-
assert decision.schedulable_tis[0].state == State.SUCCESS
980-
assert t1.state == State.SKIPPED
975+
assert ti1.state == State.SKIPPED
981976

982977

983978
class TestTIHealthEndpoint:

airflow-core/tests/unit/listeners/test_listeners.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,13 @@ def test_listener_gets_only_subscribed_calls(create_task_instance, session=None)
120120

121121

122122
@provide_session
123-
def test_listener_suppresses_exceptions(create_task_instance, session, caplog):
123+
def test_listener_suppresses_exceptions(create_task_instance, session, cap_structlog):
124124
lm = get_listener_manager()
125125
lm.add_listener(throwing_listener)
126126

127127
ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
128-
with caplog.at_level(logging.ERROR):
129-
ti._run_raw_task()
130-
assert "error calling listener" in caplog.messages
128+
ti.run()
129+
assert "error calling listener" in cap_structlog
131130

132131

133132
@provide_session
@@ -139,7 +138,7 @@ def test_listener_captures_failed_taskinstances(create_task_instance_of_operator
139138
BashOperator, dag_id=DAG_ID, logical_date=LOGICAL_DATE, task_id=TASK_ID, bash_command="exit 1"
140139
)
141140
with pytest.raises(AirflowException):
142-
ti._run_raw_task()
141+
ti.run()
143142

144143
assert full_listener.state == [TaskInstanceState.RUNNING, TaskInstanceState.FAILED]
145144
assert len(full_listener.state) == 2
@@ -153,7 +152,7 @@ def test_listener_captures_longrunning_taskinstances(create_task_instance_of_ope
153152
ti = create_task_instance_of_operator(
154153
BashOperator, dag_id=DAG_ID, logical_date=LOGICAL_DATE, task_id=TASK_ID, bash_command="sleep 5"
155154
)
156-
ti._run_raw_task()
155+
ti.run()
157156

158157
assert full_listener.state == [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS]
159158
assert len(full_listener.state) == 2
@@ -166,13 +165,9 @@ def test_class_based_listener(create_task_instance, session=None):
166165
lm.add_listener(listener)
167166

168167
ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
169-
# Using ti.run() instead of ti._run_raw_task() to capture state change to RUNNING
170-
# that only happens on `check_and_change_state_before_execution()` that is called before
171-
# `run()` calls `_run_raw_task()`
172168
ti.run()
173169

174-
assert len(listener.state) == 2
175-
assert listener.state == [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS]
170+
assert listener.state == [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS, DagRunState.SUCCESS]
176171

177172

178173
def test_listener_logs_call(caplog, create_task_instance, session):
@@ -181,10 +176,9 @@ def test_listener_logs_call(caplog, create_task_instance, session):
181176
lm.add_listener(full_listener)
182177

183178
ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
184-
ti._run_raw_task()
179+
ti.run()
185180

186181
listener_logs = [r for r in caplog.record_tuples if r[0] == "airflow.listeners.listener"]
187-
assert len(listener_logs) == 6
188182
assert all(r[:-1] == ("airflow.listeners.listener", logging.DEBUG) for r in listener_logs)
189183
assert listener_logs[0][-1].startswith("Calling 'on_task_instance_running' with {'")
190184
assert listener_logs[1][-1].startswith("Hook impls: [<HookImpl plugin")

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/astronomer/airflow/commit/489aa26c35f9edcb84ee0981afd5e24f8d255828

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy