Description
Body
Elasticsearch (when configured to output json to stdout) requires, naturally, that the logs are sent as json to stdout.
Currently when running with EXECUTE_TASKS_NEW_PYTHON_INTERPRETER
set to true, we use check_output
, and we do nothing with the output.
See celery_executor_utils.py
method _execute_in_subprocess
.
I first came up with a POC hack to "fix" this for specifically this use case:
def _run_and_stream(cmd, env):
import subprocess
process = subprocess.Popen(
cmd,
stdout=sys.__stdout__,
stderr=sys.__stderr__,
text=True,
env=env,
close_fds=True,
)
while True:
if process.poll() is not None:
break
time.sleep(.5)
return process.poll()
def _execute_in_subprocess(command_to_exec: CommandType, celery_task_id: str | None = None) -> None:
env = os.environ.copy()
if celery_task_id:
env["external_executor_id"] = celery_task_id
try:
_run_and_stream(command_to_exec, env=env)
except subprocess.CalledProcessError as e:
log.exception("[%s] execute_command encountered a CalledProcessError", celery_task_id)
log.error(e.output)
msg = f"Celery command failed on host: {get_hostname()} with celery_task_id {celery_task_id}"
raise AirflowException(msg)
I later found out about subprocess.run
which seems definitely better and would not require the _run_and_stream
function, though I have not tested this:
def _execute_in_subprocess(command_to_exec: CommandType, celery_task_id: str | None = None) -> None:
env = os.environ.copy()
if celery_task_id:
env["external_executor_id"] = celery_task_id
try:
subprocess.run(command_to_exec, stderr=sys.__stderr__, stdout=sys.__stdout__, close_fds=True, env=env)
except subprocess.CalledProcessError as e:
log.exception("[%s] execute_command encountered a CalledProcessError", celery_task_id)
log.error(e.output)
msg = f"Celery command failed on host: {get_hostname()} with celery_task_id {celery_task_id}"
raise AirflowException(msg)
But testing would need to be done to ensure it doesn't have unintended consequences or break other configurations.
In that snippet I use __stdout__
instead of stdout
because the latter is replaced by a logging proxy in the celery context, which introduces a prefix ForkPoolWorker-1
or sometihng, which would make it no longer ndjson, which I suspect might prevent the line from being parsed properly.
The important thing would be to make sure this is ok for log handlers other than ES with json stdout -- this might not, and a different approach re handling the forwarding of stdout might be required, to support all use cases cleanly.
Anyway, this is a path to pursue, if it is valuable enough to make this combination work.
Last note, there's already some similar code in standard task runner if you look at CAN_FORK -- it's used when the OS cannot fork.
Committer
- I acknowledge that I am a maintainer/committer of the Apache Airflow project.