-
Notifications
You must be signed in to change notification settings - Fork 15.1k
Fix processor cleanup on DagFileProcessorManager #22685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
References to processors weren't being cleaned up after killing them in the event of a timeout. This lead to a crash caused by an unhandled exception when trying to read from a closed end of a pipe.
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
When calling `_kill_process()` we're generating zombies which weren't being `wait()`ed for. This led to a process leak we fix by just calling `waitpid()` on the appropriate PIDs.
airflow/dag_processing/processor.py
Outdated
@@ -231,6 +231,9 @@ def _kill_process(self) -> None: | |||
if self._process.is_alive() and self._process.pid: | |||
self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", self._process.pid) | |||
os.kill(self._process.pid, signal.SIGKILL) | |||
|
|||
# Reap the spawned zombie | |||
os.waitpid(self._process.pid, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed there might be a short time between even SIGKILL gets processed so waiting here makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if there's anything here that could be racy, but there are some details here: https://bugs.python.org/issue42558.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ough. Very interesting . So it seems that this one (or similar) is safer (ad waitpid might crash for Python 3.9+) :)
Do I read it right ?
while self._process.poll() is None:
sleep(0.001)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is how I read it too.
@pcolladosoto you wanna make that change? I don't think I had anything else that needed changing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @malthe! I just pushed the changes: I decided to import the time
module so that I could call time.sleep()
within the while
loop. Hope I made the right choice... You can check out the changes on 27502be. Thanks a lot for the input! This is something that I'm sure will come in handy for future projects 😸
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
According to @potiuk's and @malthe's input, the way we were reaping the zombies could cause some racy and unwanted situations. As seen on the discussion over at `https://bugs.python.org/issue42558` we can safely reap the spawned zombies with the changes we have introduced.
As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now... Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
After accepting the changes proposed on the PR we found a small typo (we make those on a daily basis) and a trailing whitespace we though was nice to delete. Hope we made the right choice!
We were calling `poll()` through the `_process` attribute and, as shown on the static checks triggered by GitHub, it's not defined for the `BaseProcess` class. We instead have to call `poll()` through `BaseProcess`'s `_popen` attribute.
References to processors weren't being cleaned up after killing them in the event of a timeout. This lead to a crash caused by an unhandled exception when trying to read from a closed end of a pipe.
When calling `_kill_process()` we're generating zombies which weren't being `wait()`ed for. This led to a process leak we fix by just calling `waitpid()` on the appropriate PIDs.
According to @potiuk's and @malthe's input, the way we were reaping the zombies could cause some racy and unwanted situations. As seen on the discussion over at `https://bugs.python.org/issue42558` we can safely reap the spawned zombies with the changes we have introduced.
As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now... Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
After accepting the changes proposed on the PR we found a small typo (we make those on a daily basis) and a trailing whitespace we though was nice to delete. Hope we made the right choice!
We were calling `poll()` through the `_process` attribute and, as shown on the static checks triggered by GitHub, it's not defined for the `BaseProcess` class. We instead have to call `poll()` through `BaseProcess`'s `_popen` attribute.
Hi @potiuk! Is there anything else that needs to be done on my end? 🤔 Thanks for your time! |
I guess fixing static checks and tests failing since you asked. But I think GitHub already told you so by all the notificaitons and you can see it in the red status of the PR. |
Some changes have been force pushed!
After reading through `multiprocessing`'s implementation we really didn't know why the static check on line `239` was failing: the process should contain a `_popen` attribute... That's when we found line `223` and discovered the trailing `# type: ignore` comment. After reading up on it we found that it instructs *MyPy* not to statically check that very line. Given we're having trouble with the exact same attribute we decided to include the same directive for the static checker. Hope we made the right call!
We hadn't updated the tests for the method whose body we've altered. This caused the tests to fail when trying to retrieve a processor's *waitable*, a property similar to a *file descriptor* in UNIX-like systems. We have added a mock property to the `processor` and we've also updated the `manager`'s attributes so as to faithfully recreate the state of the data sctructures at a moment when a `processor` is to be terminated. Please note the `assertions` at the end are meant to check we reach the `manager`'s expected state. We have chosen to check the number of processor's against an explicit value because we're defining `manager._processors` explicitly within the test. On the other hand, `manager.waitables` can have a different length depending on the call to `DagFileProcessorManager`'s `__init__()`. In this test the expected initial length is `1` given we're passing `MagicMock()` as the `signal_conn` when instantiating the manager. However, if this were to be changed the tests would 'inexplicably' fail. Instead of checking `manager.waitables`' length against a hardcoded value we decided to instead compare it to its initial length so as to emphasize we're interested in the change in length, not its absolute value.
One of the methods we are to mock required a rather long `@mock.patch` decorator which didn't pass the checks made by `black` on the precommit hooks. On top of that, we messed up the ordering of the `@mock.patch` decorators which meant we didn't set them up properly. This manifested as a `KeyError` on the method we're currently testing. O_o
Awesome work, congrats on your first merged pull request! |
Fix processor cleanup on DagFileProcessorManager - cherrypick from Community apache/airflow#22685 Change-Id: I7e505840325b5f61bd96238424caedf9f9afe19e GitOrigin-RevId: 9e20cd0bf3bd2fa67115b1d4a81a6e2009e49936
Fix processor cleanup on DagFileProcessorManager - cherrypick from Community apache/airflow#22685 Change-Id: Iaebb1431f78b220d444810ba9f0c854186d7e07b GitOrigin-RevId: 9972c3d38fee1bbfd7216a1db97d1b17bb88e7d3
This PR fixes the cleanup procedures of the
DAGFileProcessor
s being spawned by theDagFileProcessorManager
instance. Thus, this PR closes #22191.We came across this bug when encountering a timeout triggering the forceful killing of these
DAGFileProcessor
s.We have had somewhat lengthy discussion on an issue (#22191) which we encourage anyone to read for some more insight into the cause, discovery and posterior fix. People on that thread were extremely helpful! 😸
We have not included tests because we're unsure of how to test a behaviour such as this one. If pointed in the right direction we would be more than happy to add them. We can say however that we applied these changes on our own production Airflow instance and we haven't encountered the issue ever since.
On top of that we would be more than welcome if you made any suggestions to the code: for instance, when cleaning up a dictionary we're iterating over we decided to take note of the problematic
DAGFileProcessor
s to then remove them on a secondfor
loop. Our background in programming is much stronger on some other languages and so we feel really uncomfortable pushing Python 'to the limit' in terms of relying on its implementation to make design choices. If there's anything that can be done better by all means say so.Another fertile topic for discussion is how to
wait()
for the processes being killed through theSIGKILL
signal. This has been brought up by @dlesco on #22191 and we agree with him on adding an optional timeout to the operation to avoid blocking in very bizarre circumstances (which the current solution would do). However, we decided to contribute our initial approach and then iterate on solutions within the PR.Thanks a lot for letting me contribute to a tool with the expertise and size of Airflow: it's truly an honour.
Hope to make the merge as seamless as possible 😜
closes: #22191