Content-Length: 480900 | pFad | https://github.com/apache/airflow/commit/79ef8bed891c22eb76adf99158288d1b44426dc0

24 Added Upload Multiple Entity Read Files to specified big query datase… · apache/airflow@79ef8be · GitHub
Skip to content

Commit 79ef8be

Browse files
michalslowikowski00michalslowikowski00
and
michalslowikowski00
authored
Added Upload Multiple Entity Read Files to specified big query dataset (#8610)
Co-authored-by: michalslowikowski00 <michal.slowikowski@polidea.com>
1 parent cbebed2 commit 79ef8be

File tree

2 files changed

+47
-13
lines changed

2 files changed

+47
-13
lines changed

airflow/providers/google/marketing_platform/example_dags/example_display_video.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
from airflow import models
2525
from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
26+
from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
2627
from airflow.providers.google.marketing_platform.operators.display_video import (
2728
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
2829
GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator,
@@ -38,15 +39,14 @@
3839
BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://test-display-video-bucket")
3940
ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567)
4041
OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv")
41-
PATH_TO_UPLOAD_FILE = os.environ.get(
42-
"GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt"
43-
)
44-
PATH_TO_SAVED_FILE = os.environ.get(
45-
"GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt"
46-
)
42+
PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt")
43+
PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt")
4744
BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
4845
SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
4946
BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test")
47+
GMP_PARTNER_ID = os.environ.get("GMP_PARTNER_ID", 123)
48+
ENTITY_TYPE = os.environ.get("GMP_ENTITY_TYPE", "LineItem")
49+
ERF_SOURCE_OBJECT = GoogleDisplayVideo360Hook.erf_uri(GMP_PARTNER_ID, ENTITY_TYPE)
5050

5151
REPORT = {
5252
"kind": "doubleclickbidmanager#query",
@@ -68,15 +68,17 @@
6868

6969
PARAMS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
7070

71-
BODY_REQUEST: Dict = {
71+
CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST: Dict = {
7272
"version": SDF_VERSION,
7373
"advertiserId": ADVERTISER_ID,
7474
"inventorySourceFilter": {"inventorySourceIds": []},
7575
}
76-
# [END howto_display_video_env_variables]
7776

78-
# download_line_items variables
79-
REQUEST_BODY = {"filterType": ADVERTISER_ID, "format": "CSV", "fileSpec": "EWF"}
77+
DOWNLOAD_LINE_ITEMS_REQUEST: Dict = {
78+
"filterType": ADVERTISER_ID,
79+
"format": "CSV",
80+
"fileSpec": "EWF"}
81+
# [END howto_display_video_env_variables]
8082

8183
default_args = {"start_date": dates.days_ago(1)}
8284

@@ -119,10 +121,20 @@
119121
)
120122
# [END howto_google_display_video_deletequery_report_operator]
121123

124+
# [START howto_google_display_video_upload_multiple_entity_read_files_to_big_query]
125+
upload_erf_to_bq = GCSToBigQueryOperator(
126+
task_id='upload_erf_to_bq',
127+
bucket=BUCKET,
128+
source_objects=ERF_SOURCE_OBJECT,
129+
destination_project_dataset_table=f"{BQ_DATA_SET}.gcs_to_bq_table",
130+
write_disposition='WRITE_TRUNCATE',
131+
dag=dag)
132+
# [END howto_google_display_video_upload_multiple_entity_read_files_to_big_query]
133+
122134
# [START howto_google_display_video_download_line_items_operator]
123135
download_line_items = GoogleDisplayVideo360DownloadLineItemsOperator(
124136
task_id="download_line_items",
125-
request_body=REQUEST_BODY,
137+
request_body=DOWNLOAD_LINE_ITEMS_REQUEST,
126138
bucket_name=BUCKET,
127139
object_name=OBJECT_NAME,
128140
gzip=False,
@@ -139,7 +151,7 @@
139151

140152
# [START howto_google_display_video_create_sdf_download_task_operator]
141153
create_sdf_download_task = GoogleDisplayVideo360CreateSDFDownloadTaskOperator(
142-
task_id="create_sdf_download_task", body_request=BODY_REQUEST
154+
task_id="create_sdf_download_task", body_request=CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST
143155
)
144156
operation_name = '{{ task_instance.xcom_pull("create_sdf_download_task")["name"] }}'
145157
# [END howto_google_display_video_create_sdf_download_task_operator]

airflow/providers/google/marketing_platform/hooks/display_video.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,28 @@ def get_conn_to_display_video(self) -> Resource:
7070
)
7171
return self._conn
7272

73+
@staticmethod
74+
def erf_uri(partner_id, entity_type) -> List[str]:
75+
"""
76+
Return URI for all Entity Read Files in bucket.
77+
78+
For example, if you were generating a file name to retrieve the entity read file
79+
for partner 123 accessing the line_item table from April 2, 2013, your filename
80+
would look something like this:
81+
gdbm-123/entity/20130402.0.LineItem.json
82+
83+
More information:
84+
https://developers.google.com/bid-manager/guides/entity-read/overview
85+
86+
:param partner_id The numeric ID of your Partner.
87+
:type partner_id: int
88+
:param entity_type: The type of file Partner, Advertiser, InsertionOrder,
89+
LineItem, Creative, Pixel, InventorySource, UserList, UniversalChannel, and summary.
90+
:type entity_type: str
91+
"""
92+
93+
return [f"gdbm-{partner_id}/entity/{{{{ ds_nodash }}}}.*.{entity_type}.json"]
94+
7395
def create_query(self, query: Dict[str, Any]) -> Dict:
7496
"""
7597
Creates a query.
@@ -125,7 +147,7 @@ def list_queries(self, ) -> List[Dict]:
125147
.listqueries()
126148
.execute(num_retries=self.num_retries)
127149
)
128-
return response.get("queries", [])
150+
return response.get('queries', [])
129151

130152
def run_query(self, query_id: str, params: Dict[str, Any]) -> None:
131153
"""

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/79ef8bed891c22eb76adf99158288d1b44426dc0

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy