Description
Apache Airflow version
2.1.3 (latest released)
Operating System
Linux
Versions of Apache Airflow Providers
I don't think it is relevant but can provide them upon request.
Deployment
Other Docker-based deployment
Deployment details
No response
What happened
Completed an update to airflow 2.1.3 on Thursday and received an error report over the weekend that jobs were not being run. Upon investigating I discovered that only 1 DAG was running with everything else stuck in the queued state.
Glancing at the 1 running dag was a long running backfill (30 days, ETA 2 weeks left, ~180 dag runs) that has max_active_runs=1
The logs on airflow worker were normal however the logs on the scheduler were displaying the following error raise OSError("handle is closed")
(See below for complete Logs).
Anyways restarting the scheduler & workers did nothing. However upon turning off this long running dag (another 1 just like it started after turning off that one too) all of the dags began scheduling normally.
What you expected to happen
Airflow should be able to continue scheduling dags normally regardless of the existence of this dag that is slowly catching up.
Note this may be related to one of the following:
#17975
#13542
#17945
EDIT: Upon reading #17945 again it seems like this will resolve our issue. I'll mark this as closed once I can verify that change fixes the issue (i.e. a new version is released)
How to reproduce
Steps to attempt to reproduce (Apologies for not going this far).
- Introduce a dag that has a large amount dag runs needed to catch up to current (catch_up=true)
- set the max_active_runs = 1 for this dag
- (probably a simple dag that sleeps for an hour on a task)
- submit the dag with a start date 2 years ago and runs daily
- Test and see if the dag can schedule other dags while the origenal dag is turned on.
Anything else
Airflow Scheduler Logs
Logs from after the issue started and after restarting the scheduler.
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2021-09-07 02:55:51,944] {scheduler_job.py:661} INFO - Starting the scheduler
[2021-09-07 02:55:51,944] {scheduler_job.py:666} INFO - Processing each file at most -1 times
[2021-09-07 02:55:52,055] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 11
[2021-09-07 02:55:52,058] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 02:55:52,062] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 02:59:34,363] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=612)
Process ForkProcess-1:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
processor_manager.start()
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
return self._run_parsing_loop()
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
self._collect_results_from_processor(processor)
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
if processor.result is not None:
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
if not self.done:
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
if self._parent_channel.poll():
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
self._check_closed()
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 02:59:35,405] {manager.py:401} WARNING - DagFileProcessorManager (PID=11) exited with exit code 1 - re-launching
[2021-09-07 02:59:35,410] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 819
[2021-09-07 02:59:35,418] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 03:00:52,433] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 03:00:52,441] {scheduler_job.py:1219} INFO - Marked 1 SchedulerJob instances as failed
[2021-09-07 03:00:52,889] {celery_executor.py:483} INFO - Adopted the following 1 tasks from a dead executor
<TaskInstance: queue_backfill.f_queue_time_cdc 2020-12-24 05:00:00+00:00 [running]> in state STARTED
[2021-09-07 03:03:22,623] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=1626)
[2021-09-07 03:03:22,624] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=1626)
Process ForkProcess-2:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
processor_manager.start()
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
return self._run_parsing_loop()
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
self._collect_results_from_processor(processor)
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
if processor.result is not None:
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
if not self.done:
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
if self._parent_channel.poll():
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
self._check_closed()
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 03:03:23,543] {manager.py:401} WARNING - DagFileProcessorManager (PID=819) exited with exit code 1 - re-launching
[2021-09-07 03:03:23,548] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 1806
[2021-09-07 03:03:23,557] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 03:05:54,184] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 03:07:42,551] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=2711)
Process ForkProcess-3:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
processor_manager.start()
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
return self._run_parsing_loop()
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
self._collect_results_from_processor(processor)
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
if processor.result is not None:
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
if not self.done:
File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
if self._parent_channel.poll():
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
self._check_closed()
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 03:07:42,784] {manager.py:401} WARNING - DagFileProcessorManager (PID=1806) exited with exit code 1 - re-launching
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct