|
54 | 54 | from airflow.sdk.bases.operator import BaseOperator
|
55 | 55 | from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator
|
56 | 56 | from airflow.sdk.definitions._internal.node import validate_key
|
57 |
| -from airflow.sdk.definitions._internal.types import NOTSET |
| 57 | +from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet |
58 | 58 | from airflow.sdk.definitions.asset import AssetAll, BaseAsset
|
59 | 59 | from airflow.sdk.definitions.context import Context
|
60 | 60 | from airflow.sdk.definitions.param import DagParam, ParamsDict
|
@@ -1014,7 +1014,7 @@ def _validate_owner_links(self, _, owner_links):
|
1014 | 1014 | def test(
|
1015 | 1015 | self,
|
1016 | 1016 | run_after: datetime | None = None,
|
1017 |
| - logical_date: datetime | None = None, |
| 1017 | + logical_date: datetime | None | ArgNotSet = NOTSET, |
1018 | 1018 | run_conf: dict[str, Any] | None = None,
|
1019 | 1019 | conn_file_path: str | None = None,
|
1020 | 1020 | variable_file_path: str | None = None,
|
@@ -1082,6 +1082,10 @@ def add_logger_if_needed(ti: TaskInstance):
|
1082 | 1082 |
|
1083 | 1083 | with exit_stack:
|
1084 | 1084 | self.validate()
|
| 1085 | + |
| 1086 | + # Allow users to explicitly pass None. If it isn't set, we default to current time. |
| 1087 | + logical_date = logical_date if not isinstance(logical_date, ArgNotSet) else timezone.utcnow() |
| 1088 | + |
1085 | 1089 | log.debug("Clearing existing task instances for logical date %s", logical_date)
|
1086 | 1090 | # TODO: Replace with calling client.dag_run.clear in Execution API at some point
|
1087 | 1091 | SchedulerDAG.clear_dags(
|
|
0 commit comments