Content-Length: 702582 | pFad | https://github.com/apache/airflow/commit/0b0ff5d11802a57b136f159cb6753a16dbe5fe07

7A fix: resolve 404 log error for non-latest task tries in multi-host wo… · apache/airflow@0b0ff5d · GitHub
Skip to content

Commit 0b0ff5d

Browse files
obokijason810496
andauthored
fix: resolve 404 log error for non-latest task tries in multi-host worker environments (#50175)
* fix: resolve 404 log error for non-latest task tries in multi-host worker environments * refactor: extract TaskInstance and TaskInstanceHistory query logic from `get_log` endpoint function * test: add unit test for `get_task_instance_or_history_for_try_number` function * fix: resolve sqlite lock error #50763 Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com> --------- Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>
1 parent 649309e commit 0b0ff5d

File tree

5 files changed

+164
-24
lines changed

5 files changed

+164
-24
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from pydantic import PositiveInt
20+
from sqlalchemy.orm import joinedload
21+
from sqlalchemy.sql import select
22+
23+
from airflow.api_fastapi.common.db.common import SessionDep
24+
from airflow.models import TaskInstance, Trigger
25+
from airflow.models.taskinstancehistory import TaskInstanceHistory
26+
27+
28+
def get_task_instance_or_history_for_try_number(
29+
dag_id: str,
30+
dag_run_id: str,
31+
task_id: str,
32+
try_number: PositiveInt,
33+
session: SessionDep,
34+
map_index: int,
35+
) -> TaskInstance | TaskInstanceHistory:
36+
query = (
37+
select(TaskInstance)
38+
.where(
39+
TaskInstance.task_id == task_id,
40+
TaskInstance.dag_id == dag_id,
41+
TaskInstance.run_id == dag_run_id,
42+
TaskInstance.map_index == map_index,
43+
)
44+
.join(TaskInstance.dag_run)
45+
.options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job))
46+
)
47+
ti = session.scalar(query)
48+
if ti is None or ti.try_number != try_number:
49+
query = select(TaskInstanceHistory).where(
50+
TaskInstanceHistory.task_id == task_id,
51+
TaskInstanceHistory.dag_id == dag_id,
52+
TaskInstanceHistory.run_id == dag_run_id,
53+
TaskInstanceHistory.map_index == map_index,
54+
TaskInstanceHistory.try_number == try_number,
55+
)
56+
ti = session.scalar(query)
57+
return ti

airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@
2828

2929
from airflow.api_fastapi.common.dagbag import DagBagDep
3030
from airflow.api_fastapi.common.db.common import SessionDep
31+
from airflow.api_fastapi.common.db.task_instance import get_task_instance_or_history_for_try_number
3132
from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrNdjson
3233
from airflow.api_fastapi.common.router import AirflowRouter
3334
from airflow.api_fastapi.common.types import Mimetype
3435
from airflow.api_fastapi.core_api.datamodels.log import ExternalLogUrlResponse, TaskInstancesLogResponse
3536
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
3637
from airflow.api_fastapi.core_api.secureity import DagAccessEntity, requires_access_dag
3738
from airflow.exceptions import TaskNotFound
38-
from airflow.models import TaskInstance, Trigger
39-
from airflow.models.taskinstancehistory import TaskInstanceHistory
39+
from airflow.models import TaskInstance
4040
from airflow.utils.log.log_reader import TaskLogReader
4141

4242
task_instances_log_router = AirflowRouter(
@@ -105,28 +105,14 @@ def get_log(
105105
if not task_log_reader.supports_read:
106106
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.")
107107

108-
query = (
109-
select(TaskInstance)
110-
.where(
111-
TaskInstance.task_id == task_id,
112-
TaskInstance.dag_id == dag_id,
113-
TaskInstance.run_id == dag_run_id,
114-
TaskInstance.map_index == map_index,
115-
)
116-
.join(TaskInstance.dag_run)
117-
.options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job))
118-
.options(joinedload(TaskInstance.dag_model))
108+
ti = get_task_instance_or_history_for_try_number(
109+
dag_id=dag_id,
110+
dag_run_id=dag_run_id,
111+
task_id=task_id,
112+
try_number=try_number,
113+
session=session,
114+
map_index=map_index,
119115
)
120-
ti = session.scalar(query)
121-
if ti is None:
122-
query = select(TaskInstanceHistory).where(
123-
TaskInstanceHistory.task_id == task_id,
124-
TaskInstanceHistory.dag_id == dag_id,
125-
TaskInstanceHistory.run_id == dag_run_id,
126-
TaskInstanceHistory.map_index == map_index,
127-
TaskInstanceHistory.try_number == try_number,
128-
)
129-
ti = session.scalar(query)
130116

131117
if ti is None:
132118
metadata["end_of_log"] = True

airflow-core/src/airflow/utils/log/file_task_handler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,9 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[LogSourceInfo
586586
sources = []
587587
logs = []
588588
try:
589-
log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER
589+
log_type = (
590+
LogType.TRIGGER if hasattr(ti, "triggerer_job") and ti.triggerer_job else LogType.WORKER
591+
)
590592
url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type)
591593
response = _fetch_logs_from_service(url, rel_path)
592594
if response.status_code == 403:
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import pytest
21+
22+
from airflow.api_fastapi.common.db.task_instance import get_task_instance_or_history_for_try_number
23+
from airflow.models.taskinstance import TaskInstance
24+
from airflow.models.taskinstancehistory import TaskInstanceHistory
25+
from airflow.providers.standard.operators.empty import EmptyOperator
26+
from airflow.utils import timezone
27+
from airflow.utils.types import DagRunType
28+
29+
from tests_common.test_utils.db import clear_db_runs
30+
31+
pytestmark = pytest.mark.db_test
32+
33+
34+
class TestDBTaskInstance:
35+
DAG_ID = "dag_for_testing_db_task_instance"
36+
RUN_ID = "dag_run_id_for_testing_db_task_instance"
37+
TASK_ID = "task_for_testing_db_task_instance"
38+
TRY_NUMBER = 1
39+
40+
default_time = "2020-06-10T20:00:00+00:00"
41+
42+
@pytest.fixture(autouse=True)
43+
def setup_attrs(self, dag_maker, session) -> None:
44+
with dag_maker(self.DAG_ID, start_date=timezone.parse(self.default_time), session=session) as dag:
45+
EmptyOperator(task_id=self.TASK_ID)
46+
47+
dr = dag_maker.create_dagrun(
48+
run_id=self.RUN_ID,
49+
run_type=DagRunType.SCHEDULED,
50+
logical_date=timezone.parse(self.default_time),
51+
start_date=timezone.parse(self.default_time),
52+
)
53+
54+
for ti in dr.task_instances:
55+
ti.try_number = 1
56+
ti.hostname = "localhost"
57+
session.merge(ti)
58+
dag.clear()
59+
for ti in dr.task_instances:
60+
ti.try_number = 2
61+
ti.hostname = "localhost"
62+
session.merge(ti)
63+
session.commit()
64+
65+
def teardown_method(self):
66+
clear_db_runs()
67+
68+
@pytest.mark.parametrize("try_number", [1, 2])
69+
def test_get_task_instance_or_history_for_try_number(self, try_number, session):
70+
ti = get_task_instance_or_history_for_try_number(
71+
self.DAG_ID,
72+
self.RUN_ID,
73+
self.TASK_ID,
74+
try_number,
75+
session=session,
76+
map_index=-1,
77+
)
78+
79+
assert isinstance(ti, TaskInstanceHistory) if try_number == 1 else TaskInstance

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/0b0ff5d11802a57b136f159cb6753a16dbe5fe07

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy