-
Notifications
You must be signed in to change notification settings - Fork 15.1k
[AIRFLOW-2511] Fix improper failed session commit handling #4769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@kaxil @bolkedebruin PTAL, thank you. |
Codecov Report
@@ Coverage Diff @@
## master #4769 +/- ##
==========================================
+ Coverage 74.44% 74.46% +0.01%
==========================================
Files 450 450
Lines 28973 28976 +3
==========================================
+ Hits 21570 21577 +7
+ Misses 7403 7399 -4
Continue to review full report at Codecov.
|
@@ -2168,147 +2168,149 @@ def _process_backfill_task_instances(self, | |||
# or leaf to root, as otherwise tasks might be | |||
# determined deadlocked while they are actually | |||
# waiting for their upstream to finish | |||
@provide_session |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this change also mean we open and close a connection for each TI? Does that have any impact on performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
Not necessarily so, it just means that each TI gets its own session, which may or may not mapped to new connection as SQLAlchemy/Airflow does connection pooling. In fact, we use ti.refresh_from_db() a lot, where a new session object is created per call (
airflow/airflow/models/__init__.py
Line 932 in c50a851
def refresh_from_db(self, session=None, lock_for_update=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I think we should let SQLAlchemy do the pooling and close the sessions that we don't use anymore, instead of keeping them open and passing them around all the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I would prefer to have a create_session
, since we commit the result on the last line anyway. If we do this properly, we shouldn't have to do refresh_from_db
so often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think SQLAlchemy does the pooling but is un-opinionated about how sessions are managed. The following access pattern is recommended per https://docs.sqlalchemy.org/en/latest/orm/session_basics.html#when-do-i-construct-a-session-when-do-i-commit-it-and-when-do-i-close-it, which is what Airflow follows:
Line 37 in c50a851
def create_session(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reference link, no further concerns from my side.
You've typod the Jira ticket in the commit - could you fix that please :) |
@ashb thanks for the review, replied your comment and fixed the typo [/facepalm]. |
@ashb, a gentle ping, thanks. |
(Sorry - inbox bankruptcy) |
@fenglu-g this PR might have caused a failure on py3 on matter. Could you check? (I'm on mobile right now, so hard to say if it was this PR or another one.) Nevermind - it's Tornado 6.0.0 which was released 4 hours ago and breaks on our version of python. |
It looks as though this is still an issue post 1.10.3:
|
Make sure you have checked all steps below.
Jira
Description
The bug is caused by improper clean up of failed session commit. This PR pulls out the per-task db manipulations within its own session through provide_session decorator. In the event a single commit fails, the decorate will ensure the session is properly flushed and closed.
Tests
This PR passes the tests.test_jobs unit test.
Commits
Documentation
Code Quality
flake8