Description
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.8.1
What happened?
I wrote a listener plugin and added it to the plugins directory. The listener includes the methods:
- on_task_instance_running
- on_task_instance_success
- on_task_instance_failed
- on_dag_run_running
- on_dag_run_success
- on_dag_run_failed.
When trying to extract values from Variables and Connections in the on_dag_run_running, on_dag_run_success, on_dag_run_failed methods, an error occurs already when running dag run:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 52, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 79, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/job.py", line 393, in run_job
return execute_job(job, execute_callable=execute_callable)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/job.py", line 422, in execute_job
ret = execute_callable()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 855, in _execute
self._run_scheduler_loop()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 987, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 1063, in _do_scheduling
self._start_queued_dagruns(session)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 1404, in _start_queued_dagruns
dag_run.notify_dagrun_state_changed()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 862, in notify_dagrun_state_changed
get_listener_manager().hook.on_dag_run_running(dag_run=self, msg=msg)
File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_hooks.py", line 493, in __call__
return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_manager.py", line 115, in _hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_callers.py", line 113, in _multicall
raise exception.with_traceback(exception.__traceback__)
File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_callers.py", line 77, in _multicall
res = hook_impl.function(*args)
File "/opt/airflow/plugins/metadata/airflow_metadata.py", line 206, in on_dag_run_running
my_connection = BaseHook.get_connection("my_connection")
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/hooks/base.py", line 82, in get_connection
conn = Connection.get_connection_from_secrets(conn_id)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/connection.py", line 479, in get_connection_from_secrets
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
airflow.exceptions.AirflowNotFoundException: The conn_id `my_connection` isn't defined
Although the connection is explicitly defined in Connections and everything works in the case of listening to tasks.
How can I fix it so that if I listen to Dog Run, everything works too?
What you think should happen instead?
I expect the values to be pulled up from Connection and Variable when listening to DagRun.
How to reproduce
Add to Connection "my_connection" and add to Variable "environment"
from airflow.listeners import hookimpl
from airflow.models.taskinstance import TaskInstance
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin
class AirflowListener:
@hookimpl
def on_task_instance_running(self, task_instance: TaskInstance) -> None:
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
@hookimpl
def on_task_instance_success(self, task_instance: TaskInstance) -> None:
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
@hookimpl
def on_task_instance_failed(self, task_instance: TaskInstance) -> None:
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
@hookimpl
def on_dag_run_running(self, dag_run: DagRun):
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
@hookimpl
def on_dag_run_success(self, dag_run: DagRun):
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
@hookimpl
def on_dag_run_failed(self, dag_run: DagRun):
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
class AirflowListenerPlugin(AirflowPlugin):
name = "AirflowListener"
listeners = [AirflowListener()]
Operating System
macOS Sonoma 14.1.2
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
apache/airflow:2.8.1-python3.9
executor: CeleryExecutor
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct