Content-Length: 293065 | pFad | http://github.com/apache/airflow/pull/22685/commits/85f5440441dc3ee06f8748b5c131616c65457fd2

B4 Fix processor cleanup on DagFileProcessorManager by pcolladosoto · Pull Request #22685 · apache/airflow · GitHub
Skip to content

Fix processor cleanup on DagFileProcessorManager #22685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 6, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix processor cleanup
References to processors weren't being cleaned up after
killing them in the event of a timeout. This lead to
a crash caused by an unhandled exception when trying to
read from a closed end of a pipe.
  • Loading branch information
pcolladosoto authored and potiuk committed Apr 4, 2022
commit 85f5440441dc3ee06f8748b5c131616c65457fd2
9 changes: 9 additions & 0 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,7 @@ def prepare_file_path_queue(self):
def _kill_timed_out_processors(self):
"""Kill any file processors that timeout to defend against process hangs."""
now = timezone.utcnow()
processors_to_remove = []
for file_path, processor in self._processors.items():
duration = now - processor.start_time
if duration > self._processor_timeout:
Expand All @@ -1080,6 +1081,14 @@ def _kill_timed_out_processors(self):
Stats.incr('dag_file_processor_timeouts')
processor.kill()

# Clean up processor references
self.waitables.pop(processor.waitable_handle)
processors_to_remove.append(file_path)

# Clean up `self._processors` after iterating over it
for proc in processors_to_remove:
self._processors.pop(proc)

def max_runs_reached(self):
""":return: whether all file paths have been processed max_runs times"""
if self._max_runs == -1: # Unlimited runs.
Expand Down








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/pull/22685/commits/85f5440441dc3ee06f8748b5c131616c65457fd2

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy