Content-Length: 776709 | pFad | http://github.com/ashb/airflow/commit/f559d43abf4e53a3a7afbfba4b3469ffddf9182c

7F [AIRFLOW-2511] Fix improper failed session commit handling causing de… · ashb/airflow@f559d43 · GitHub
Skip to content

Commit f559d43

Browse files
fenglu-gashb
fenglu-g
authored andcommitted
[AIRFLOW-2511] Fix improper failed session commit handling causing deadlocks (apache#4769)
1 parent 0196b5f commit f559d43

File tree

1 file changed

+128
-126
lines changed

1 file changed

+128
-126
lines changed

airflow/jobs.py

Lines changed: 128 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -2179,147 +2179,149 @@ def _process_backfill_task_instances(self,
21792179
# or leaf to root, as otherwise tasks might be
21802180
# determined deadlocked while they are actually
21812181
# waiting for their upstream to finish
2182+
@provide_session
2183+
def _per_task_process(task, key, ti, session=None):
2184+
if task.task_id != ti.task_id:
2185+
return
21822186

2183-
for task in self.dag.topological_sort():
2184-
for key, ti in list(ti_status.to_run.items()):
2185-
2186-
if task.task_id != ti.task_id:
2187-
continue
2188-
2189-
ti.refresh_from_db()
2187+
ti.refresh_from_db()
21902188

2191-
task = self.dag.get_task(ti.task_id)
2192-
ti.task = task
2189+
task = self.dag.get_task(ti.task_id)
2190+
ti.task = task
21932191

2194-
ignore_depends_on_past = (
2195-
self.ignore_first_depends_on_past and
2196-
ti.execution_date == (start_date or ti.start_date))
2197-
self.log.debug(
2198-
"Task instance to run %s state %s", ti, ti.state)
2192+
ignore_depends_on_past = (
2193+
self.ignore_first_depends_on_past and
2194+
ti.execution_date == (start_date or ti.start_date))
2195+
self.log.debug(
2196+
"Task instance to run %s state %s", ti, ti.state)
2197+
2198+
# The task was already marked successful or skipped by a
2199+
# different Job. Don't rerun it.
2200+
if ti.state == State.SUCCESS:
2201+
ti_status.succeeded.add(key)
2202+
self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
2203+
ti_status.to_run.pop(key)
2204+
if key in ti_status.running:
2205+
ti_status.running.pop(key)
2206+
return
2207+
elif ti.state == State.SKIPPED:
2208+
ti_status.skipped.add(key)
2209+
self.log.debug("Task instance %s skipped. Don't rerun.", ti)
2210+
ti_status.to_run.pop(key)
2211+
if key in ti_status.running:
2212+
ti_status.running.pop(key)
2213+
return
21992214

2200-
# The task was already marked successful or skipped by a
2201-
# different Job. Don't rerun it.
2202-
if ti.state == State.SUCCESS:
2203-
ti_status.succeeded.add(key)
2204-
self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
2205-
ti_status.to_run.pop(key)
2215+
# guard against externally modified tasks instances or
2216+
# in case max concurrency has been reached at task runtime
2217+
elif ti.state == State.NONE:
2218+
self.log.warning(
2219+
"FIXME: task instance {} state was set to None "
2220+
"externally. This should not happen"
2221+
)
2222+
ti.set_state(State.SCHEDULED, session=session)
2223+
if self.rerun_failed_tasks:
2224+
# Rerun failed tasks or upstreamed failed tasks
2225+
if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
2226+
self.log.error("Task instance {ti} "
2227+
"with state {state}".format(ti=ti,
2228+
state=ti.state))
22062229
if key in ti_status.running:
22072230
ti_status.running.pop(key)
2208-
continue
2209-
elif ti.state == State.SKIPPED:
2210-
ti_status.skipped.add(key)
2211-
self.log.debug("Task instance %s skipped. Don't rerun.", ti)
2231+
# Reset the failed task in backfill to scheduled state
2232+
ti.set_state(State.SCHEDULED, session=session)
2233+
else:
2234+
# Default behaviour which works for subdag.
2235+
if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
2236+
self.log.error("Task instance {ti} "
2237+
"with {state} state".format(ti=ti,
2238+
state=ti.state))
2239+
ti_status.failed.add(key)
22122240
ti_status.to_run.pop(key)
22132241
if key in ti_status.running:
22142242
ti_status.running.pop(key)
2215-
continue
2243+
return
22162244

2217-
# guard against externally modified tasks instances or
2218-
# in case max concurrency has been reached at task runtime
2219-
elif ti.state == State.NONE:
2220-
self.log.warning(
2221-
"FIXME: task instance {} state was set to None "
2222-
"externally. This should not happen"
2223-
)
2224-
ti.set_state(State.SCHEDULED, session=session)
2225-
if self.rerun_failed_tasks:
2226-
# Rerun failed tasks or upstreamed failed tasks
2227-
if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
2228-
self.log.error("Task instance {ti} "
2229-
"with state {state}".format(ti=ti,
2230-
state=ti.state))
2231-
if key in ti_status.running:
2232-
ti_status.running.pop(key)
2233-
# Reset the failed task in backfill to scheduled state
2234-
ti.set_state(State.SCHEDULED, session=session)
2235-
else:
2236-
# Default behaviour which works for subdag.
2237-
if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
2238-
self.log.error("Task instance {ti} "
2239-
"with {state} state".format(ti=ti,
2240-
state=ti.state))
2241-
ti_status.failed.add(key)
2245+
backfill_context = DepContext(
2246+
deps=RUN_DEPS,
2247+
ignore_depends_on_past=ignore_depends_on_past,
2248+
ignore_task_deps=self.ignore_task_deps,
2249+
flag_upstream_failed=True)
2250+
2251+
# Is the task runnable? -- then run it
2252+
# the dependency checker can change states of tis
2253+
if ti.are_dependencies_met(
2254+
dep_context=backfill_context,
2255+
session=session,
2256+
verbose=self.verbose):
2257+
ti.refresh_from_db(lock_for_update=True, session=session)
2258+
if ti.state in (State.SCHEDULED, State.UP_FOR_RETRY, State.UP_FOR_RESCHEDULE):
2259+
if executor.has_task(ti):
2260+
self.log.debug(
2261+
"Task Instance %s already in executor "
2262+
"waiting for queue to clear",
2263+
ti
2264+
)
2265+
else:
2266+
self.log.debug('Sending %s to executor', ti)
2267+
# Skip scheduled state, we are executing immediately
2268+
ti.state = State.QUEUED
2269+
ti.queued_dttm = timezone.utcnow() if not ti.queued_dttm else ti.queued_dttm
2270+
session.merge(ti)
2271+
2272+
cfg_path = None
2273+
if executor.__class__ in (executors.LocalExecutor,
2274+
executors.SequentialExecutor):
2275+
cfg_path = tmp_configuration_copy()
2276+
2277+
executor.queue_task_instance(
2278+
ti,
2279+
mark_success=self.mark_success,
2280+
pickle_id=pickle_id,
2281+
ignore_task_deps=self.ignore_task_deps,
2282+
ignore_depends_on_past=ignore_depends_on_past,
2283+
pool=self.pool,
2284+
cfg_path=cfg_path)
2285+
ti_status.running[key] = ti
22422286
ti_status.to_run.pop(key)
2243-
if key in ti_status.running:
2244-
ti_status.running.pop(key)
2245-
continue
2246-
2247-
backfill_context = DepContext(
2248-
deps=RUN_DEPS,
2249-
ignore_depends_on_past=ignore_depends_on_past,
2250-
ignore_task_deps=self.ignore_task_deps,
2251-
flag_upstream_failed=True)
2252-
2253-
# Is the task runnable? -- then run it
2254-
# the dependency checker can change states of tis
2255-
if ti.are_dependencies_met(
2256-
dep_context=backfill_context,
2257-
session=session,
2258-
verbose=self.verbose):
2259-
ti.refresh_from_db(lock_for_update=True, session=session)
2260-
if ti.state in (State.SCHEDULED, State.UP_FOR_RETRY, State.UP_FOR_RESCHEDULE):
2261-
if executor.has_task(ti):
2262-
self.log.debug(
2263-
"Task Instance %s already in executor "
2264-
"waiting for queue to clear",
2265-
ti
2266-
)
2267-
else:
2268-
self.log.debug('Sending %s to executor', ti)
2269-
# Skip scheduled state, we are executing immediately
2270-
ti.state = State.QUEUED
2271-
ti.queued_dttm = timezone.utcnow() if not ti.queued_dttm else ti.queued_dttm
2272-
session.merge(ti)
2273-
2274-
cfg_path = None
2275-
if executor.__class__ in (executors.LocalExecutor,
2276-
executors.SequentialExecutor):
2277-
cfg_path = tmp_configuration_copy()
2278-
2279-
executor.queue_task_instance(
2280-
ti,
2281-
mark_success=self.mark_success,
2282-
pickle_id=pickle_id,
2283-
ignore_task_deps=self.ignore_task_deps,
2284-
ignore_depends_on_past=ignore_depends_on_past,
2285-
pool=self.pool,
2286-
cfg_path=cfg_path)
2287-
ti_status.running[key] = ti
2288-
ti_status.to_run.pop(key)
2289-
session.commit()
2290-
continue
2287+
session.commit()
2288+
return
22912289

2292-
if ti.state == State.UPSTREAM_FAILED:
2293-
self.log.error("Task instance %s upstream failed", ti)
2294-
ti_status.failed.add(key)
2295-
ti_status.to_run.pop(key)
2296-
if key in ti_status.running:
2297-
ti_status.running.pop(key)
2298-
continue
2290+
if ti.state == State.UPSTREAM_FAILED:
2291+
self.log.error("Task instance %s upstream failed", ti)
2292+
ti_status.failed.add(key)
2293+
ti_status.to_run.pop(key)
2294+
if key in ti_status.running:
2295+
ti_status.running.pop(key)
2296+
return
22992297

2300-
# special case
2301-
if ti.state == State.UP_FOR_RETRY:
2302-
self.log.debug(
2303-
"Task instance %s retry period not "
2304-
"expired yet", ti)
2305-
if key in ti_status.running:
2306-
ti_status.running.pop(key)
2307-
ti_status.to_run[key] = ti
2308-
continue
2298+
# special case
2299+
if ti.state == State.UP_FOR_RETRY:
2300+
self.log.debug(
2301+
"Task instance %s retry period not "
2302+
"expired yet", ti)
2303+
if key in ti_status.running:
2304+
ti_status.running.pop(key)
2305+
ti_status.to_run[key] = ti
2306+
return
23092307

2310-
# special case
2311-
if ti.state == State.UP_FOR_RESCHEDULE:
2312-
self.log.debug(
2313-
"Task instance %s reschedule period not "
2314-
"expired yet", ti)
2315-
if key in ti_status.running:
2316-
ti_status.running.pop(key)
2317-
ti_status.to_run[key] = ti
2318-
continue
2308+
# special case
2309+
if ti.state == State.UP_FOR_RESCHEDULE:
2310+
self.log.debug(
2311+
"Task instance %s reschedule period not "
2312+
"expired yet", ti)
2313+
if key in ti_status.running:
2314+
ti_status.running.pop(key)
2315+
ti_status.to_run[key] = ti
2316+
return
2317+
2318+
# all remaining tasks
2319+
self.log.debug('Adding %s to not_ready', ti)
2320+
ti_status.not_ready.add(key)
23192321

2320-
# all remaining tasks
2321-
self.log.debug('Adding %s to not_ready', ti)
2322-
ti_status.not_ready.add(key)
2322+
for task in self.dag.topological_sort():
2323+
for key, ti in list(ti_status.to_run.items()):
2324+
_per_task_process(task, key, ti)
23232325

23242326
# execute the tasks in the queue
23252327
self.heartbeat()

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/ashb/airflow/commit/f559d43abf4e53a3a7afbfba4b3469ffddf9182c

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy