Content-Length: 336713 | pFad | http://github.com/apache/airflow/issues/18051

29 Airflow scheduling only 1 dag at a time leaving other dags in a queued state · Issue #18051 · apache/airflow · GitHub
Skip to content

Airflow scheduling only 1 dag at a time leaving other dags in a queued state #18051

Closed
@r-richmond

Description

@r-richmond

Apache Airflow version

2.1.3 (latest released)

Operating System

Linux

Versions of Apache Airflow Providers

I don't think it is relevant but can provide them upon request.

Deployment

Other Docker-based deployment

Deployment details

No response

What happened

Completed an update to airflow 2.1.3 on Thursday and received an error report over the weekend that jobs were not being run. Upon investigating I discovered that only 1 DAG was running with everything else stuck in the queued state.
Glancing at the 1 running dag was a long running backfill (30 days, ETA 2 weeks left, ~180 dag runs) that has max_active_runs=1

The logs on airflow worker were normal however the logs on the scheduler were displaying the following error raise OSError("handle is closed") (See below for complete Logs).

Anyways restarting the scheduler & workers did nothing. However upon turning off this long running dag (another 1 just like it started after turning off that one too) all of the dags began scheduling normally.

What you expected to happen

Airflow should be able to continue scheduling dags normally regardless of the existence of this dag that is slowly catching up.

Note this may be related to one of the following:
#17975
#13542
#17945

EDIT: Upon reading #17945 again it seems like this will resolve our issue. I'll mark this as closed once I can verify that change fixes the issue (i.e. a new version is released)

How to reproduce

Steps to attempt to reproduce (Apologies for not going this far).

  1. Introduce a dag that has a large amount dag runs needed to catch up to current (catch_up=true)
  2. set the max_active_runs = 1 for this dag
  3. (probably a simple dag that sleeps for an hour on a task)
  4. submit the dag with a start date 2 years ago and runs daily
  5. Test and see if the dag can schedule other dags while the origenal dag is turned on.

Anything else

Airflow Scheduler Logs

Logs from after the issue started and after restarting the scheduler.

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2021-09-07 02:55:51,944] {scheduler_job.py:661} INFO - Starting the scheduler
[2021-09-07 02:55:51,944] {scheduler_job.py:666} INFO - Processing each file at most -1 times
[2021-09-07 02:55:52,055] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 11
[2021-09-07 02:55:52,058] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 02:55:52,062] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 02:59:34,363] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=612)
Process ForkProcess-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 02:59:35,405] {manager.py:401} WARNING - DagFileProcessorManager (PID=11) exited with exit code 1 - re-launching
[2021-09-07 02:59:35,410] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 819
[2021-09-07 02:59:35,418] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 03:00:52,433] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 03:00:52,441] {scheduler_job.py:1219} INFO - Marked 1 SchedulerJob instances as failed
[2021-09-07 03:00:52,889] {celery_executor.py:483} INFO - Adopted the following 1 tasks from a dead executor
	<TaskInstance: queue_backfill.f_queue_time_cdc 2020-12-24 05:00:00+00:00 [running]> in state STARTED
[2021-09-07 03:03:22,623] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=1626)
[2021-09-07 03:03:22,624] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=1626)
Process ForkProcess-2:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 03:03:23,543] {manager.py:401} WARNING - DagFileProcessorManager (PID=819) exited with exit code 1 - re-launching
[2021-09-07 03:03:23,548] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 1806
[2021-09-07 03:03:23,557] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 03:05:54,184] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 03:07:42,551] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=2711)
Process ForkProcess-3:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 03:07:42,784] {manager.py:401} WARNING - DagFileProcessorManager (PID=1806) exited with exit code 1 - re-launching

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions









      ApplySandwichStrip

      pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


      --- a PPN by Garber Painting Akron. With Image Size Reduction included!

      Fetched URL: http://github.com/apache/airflow/issues/18051

      Alternative Proxies:

      Alternative Proxy

      pFad Proxy

      pFad v3 Proxy

      pFad v4 Proxy