Content-Length: 290249 | pFad | https://github.com/apache/airflow/issues/47420

D4 EXECUTE_TASKS_NEW_PYTHON_INTERPRETER setting no play nice with elasticsearch · Issue #47420 · apache/airflow · GitHub
Skip to content

EXECUTE_TASKS_NEW_PYTHON_INTERPRETER setting no play nice with elasticsearch #47420

Closed
@dstandish

Description

@dstandish

Body

Elasticsearch (when configured to output json to stdout) requires, naturally, that the logs are sent as json to stdout.

Currently when running with EXECUTE_TASKS_NEW_PYTHON_INTERPRETER set to true, we use check_output, and we do nothing with the output.

See celery_executor_utils.py method _execute_in_subprocess.

I first came up with a POC hack to "fix" this for specifically this use case:

def _run_and_stream(cmd, env):
    import subprocess

    process = subprocess.Popen(
        cmd,
        stdout=sys.__stdout__,
        stderr=sys.__stderr__,
        text=True,
        env=env,
        close_fds=True,
    )

    while True:
        if process.poll() is not None:
            break
        time.sleep(.5)

    return process.poll()


def _execute_in_subprocess(command_to_exec: CommandType, celery_task_id: str | None = None) -> None:
    env = os.environ.copy()
    if celery_task_id:
        env["external_executor_id"] = celery_task_id
    try:
        _run_and_stream(command_to_exec, env=env)
    except subprocess.CalledProcessError as e:
        log.exception("[%s] execute_command encountered a CalledProcessError", celery_task_id)
        log.error(e.output)
        msg = f"Celery command failed on host: {get_hostname()} with celery_task_id {celery_task_id}"
        raise AirflowException(msg)

I later found out about subprocess.run which seems definitely better and would not require the _run_and_stream function, though I have not tested this:

def _execute_in_subprocess(command_to_exec: CommandType, celery_task_id: str | None = None) -> None:
    env = os.environ.copy()
    if celery_task_id:
        env["external_executor_id"] = celery_task_id
    try:
        subprocess.run(command_to_exec, stderr=sys.__stderr__, stdout=sys.__stdout__, close_fds=True, env=env)
    except subprocess.CalledProcessError as e:
        log.exception("[%s] execute_command encountered a CalledProcessError", celery_task_id)
        log.error(e.output)
        msg = f"Celery command failed on host: {get_hostname()} with celery_task_id {celery_task_id}"
        raise AirflowException(msg)

But testing would need to be done to ensure it doesn't have unintended consequences or break other configurations.

In that snippet I use __stdout__ instead of stdout because the latter is replaced by a logging proxy in the celery context, which introduces a prefix ForkPoolWorker-1 or sometihng, which would make it no longer ndjson, which I suspect might prevent the line from being parsed properly.

The important thing would be to make sure this is ok for log handlers other than ES with json stdout -- this might not, and a different approach re handling the forwarding of stdout might be required, to support all use cases cleanly.

Anyway, this is a path to pursue, if it is valuable enough to make this combination work.

Last note, there's already some similar code in standard task runner if you look at CAN_FORK -- it's used when the OS cannot fork.

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions









    ApplySandwichStrip

    pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


    --- a PPN by Garber Painting Akron. With Image Size Reduction included!

    Fetched URL: https://github.com/apache/airflow/issues/47420

    Alternative Proxies:

    Alternative Proxy

    pFad Proxy

    pFad v3 Proxy

    pFad v4 Proxy