Content-Length: 361010 | pFad | https://github.com/apache/airflow/commit/47b05a87f004dc273a4757ba49f03808a86f77e7

DD Improve handling of job_id in BigQuery operators (#11287) · apache/airflow@47b05a8 · GitHub
Skip to content

Commit 47b05a8

Browse files
authored
Improve handling of job_id in BigQuery operators (#11287)
Make autogenerated job_id more unique by using microseconds and hash of configuration. Replace dots in job_id. Closes: #11280
1 parent 18dcac8 commit 47b05a8

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

airflow/providers/google/cloud/hooks/bigquery.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
This module contains a BigQuery Hook, as well as a very basic PEP 249
2121
implementation for BigQuery.
2222
"""
23+
import hashlib
24+
import json
2325
import logging
2426
import time
2527
import warnings
2628
from copy import deepcopy
29+
from datetime import timedelta, datetime
2730
from typing import Any, Dict, Iterable, List, Mapping, NoReturn, Optional, Sequence, Tuple, Type, Union
2831

2932
from google.api_core.retry import Retry
@@ -1443,6 +1446,15 @@ def get_job(
14431446
job = client.get_job(job_id=job_id, project=project_id, location=location)
14441447
return job
14451448

1449+
@staticmethod
1450+
def _custom_job_id(configuration: Dict[str, Any]) -> str:
1451+
hash_base = json.dumps(configuration, sort_keys=True)
1452+
uniqueness_suffix = hashlib.md5(hash_base.encode()).hexdigest()
1453+
microseconds_from_epoch = int(
1454+
(datetime.now() - datetime.fromtimestamp(0)) / timedelta(microseconds=1)
1455+
)
1456+
return f"airflow_{microseconds_from_epoch}_{uniqueness_suffix}"
1457+
14461458
@GoogleBaseHook.fallback_to_default_project_id
14471459
def insert_job(
14481460
self,
@@ -1472,7 +1484,7 @@ def insert_job(
14721484
:type location: str
14731485
"""
14741486
location = location or self.location
1475-
job_id = job_id or f"airflow_{int(time.time())}"
1487+
job_id = job_id or self._custom_job_id(configuration)
14761488

14771489
client = self.get_client(project_id=project_id, location=location)
14781490
job_data = {

airflow/providers/google/cloud/operators/bigquery.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2068,8 +2068,9 @@ def _job_id(self, context):
20682068
if self.job_id:
20692069
return f"{self.job_id}_{uniqueness_suffix}"
20702070

2071-
exec_date = re.sub(r"\:|-|\+", "_", context['execution_date'].isoformat())
2072-
return f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{uniqueness_suffix}"
2071+
exec_date = context['execution_date'].isoformat()
2072+
job_id = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{uniqueness_suffix}"
2073+
return re.sub(r"\:|-|\+\.", "_", job_id)
20732074

20742075
def execute(self, context: Any):
20752076
hook = BigQueryHook(

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/47b05a87f004dc273a4757ba49f03808a86f77e7

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy