Content-Length: 454567 | pFad | https://github.com/apache/airflow/commit/99d1535287df7f8cfced39baff7a08f6fcfdf8ca

FC Fix: GCS To BigQuery source_object (#16160) · apache/airflow@99d1535 · GitHub
Skip to content

Commit 99d1535

Browse files
authored
Fix: GCS To BigQuery source_object (#16160)
* Fix: GCS To BigQuery source_object #16008 Fix GCS To BigQuery source_object to accept both str and list * convert source_objects to list if not list converting source_objects to list instead of modifying the logic part * add tests
1 parent b7d1039 commit 99d1535

File tree

2 files changed

+76
-3
lines changed

2 files changed

+76
-3
lines changed

airflow/providers/google/cloud/transfers/gcs_to_bigquery.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ class GCSToBigQueryOperator(BaseOperator):
4242
4343
:param bucket: The bucket to load from. (templated)
4444
:type bucket: str
45-
:param source_objects: List of Google Cloud Storage URIs to load from. (templated)
45+
:param source_objects: String or List of Google Cloud Storage URIs to load from. (templated)
4646
If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI.
47-
:type source_objects: list[str]
47+
:type source_objects: str, list[str]
4848
:param destination_project_dataset_table: The dotted
4949
``(<project>.|<project>:)<dataset>.<table>`` BigQuery table to load data into.
5050
If ``<project>`` is not included, project will be the project defined in
@@ -219,7 +219,7 @@ def __init__(
219219
if time_partitioning is None:
220220
time_partitioning = {}
221221
self.bucket = bucket
222-
self.source_objects = source_objects
222+
self.source_objects = source_objects if isinstance(source_objects, list) else [source_objects]
223223
self.schema_object = schema_object
224224

225225
# BQ config

tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
TEST_BUCKET = 'test-bucket'
2727
MAX_ID_KEY = 'id'
2828
TEST_SOURCE_OBJECTS = ['test/objects/*']
29+
TEST_SOURCE_OBJECTS_AS_STRING = 'test/objects/*'
2930
LABELS = {'k1': 'v1'}
3031
DESCRIPTION = "Test Description"
3132

@@ -216,3 +217,75 @@ def test_description_external_table(self, bq_hook):
216217
description=DESCRIPTION,
217218
)
218219
# fmt: on
220+
221+
@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook')
222+
def test_source_objects_as_list(self, bq_hook):
223+
operator = GCSToBigQueryOperator(
224+
task_id=TASK_ID,
225+
bucket=TEST_BUCKET,
226+
source_objects=TEST_SOURCE_OBJECTS,
227+
destination_project_dataset_table=TEST_EXPLICIT_DEST,
228+
)
229+
230+
operator.execute(None)
231+
232+
bq_hook.return_value.get_conn.return_value.cursor.return_value.run_load.assert_called_once_with(
233+
destination_project_dataset_table=mock.ANY,
234+
schema_fields=mock.ANY,
235+
source_uris=[f'gs://{TEST_BUCKET}/{source_object}' for source_object in TEST_SOURCE_OBJECTS],
236+
source_format=mock.ANY,
237+
autodetect=mock.ANY,
238+
create_disposition=mock.ANY,
239+
skip_leading_rows=mock.ANY,
240+
write_disposition=mock.ANY,
241+
field_delimiter=mock.ANY,
242+
max_bad_records=mock.ANY,
243+
quote_character=mock.ANY,
244+
ignore_unknown_values=mock.ANY,
245+
allow_quoted_newlines=mock.ANY,
246+
allow_jagged_rows=mock.ANY,
247+
encoding=mock.ANY,
248+
schema_update_options=mock.ANY,
249+
src_fmt_configs=mock.ANY,
250+
time_partitioning=mock.ANY,
251+
cluster_fields=mock.ANY,
252+
encryption_configuration=mock.ANY,
253+
labels=mock.ANY,
254+
description=mock.ANY,
255+
)
256+
257+
@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook')
258+
def test_source_objects_as_string(self, bq_hook):
259+
operator = GCSToBigQueryOperator(
260+
task_id=TASK_ID,
261+
bucket=TEST_BUCKET,
262+
source_objects=TEST_SOURCE_OBJECTS_AS_STRING,
263+
destination_project_dataset_table=TEST_EXPLICIT_DEST,
264+
)
265+
266+
operator.execute(None)
267+
268+
bq_hook.return_value.get_conn.return_value.cursor.return_value.run_load.assert_called_once_with(
269+
destination_project_dataset_table=mock.ANY,
270+
schema_fields=mock.ANY,
271+
source_uris=[f'gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}'],
272+
source_format=mock.ANY,
273+
autodetect=mock.ANY,
274+
create_disposition=mock.ANY,
275+
skip_leading_rows=mock.ANY,
276+
write_disposition=mock.ANY,
277+
field_delimiter=mock.ANY,
278+
max_bad_records=mock.ANY,
279+
quote_character=mock.ANY,
280+
ignore_unknown_values=mock.ANY,
281+
allow_quoted_newlines=mock.ANY,
282+
allow_jagged_rows=mock.ANY,
283+
encoding=mock.ANY,
284+
schema_update_options=mock.ANY,
285+
src_fmt_configs=mock.ANY,
286+
time_partitioning=mock.ANY,
287+
cluster_fields=mock.ANY,
288+
encryption_configuration=mock.ANY,
289+
labels=mock.ANY,
290+
description=mock.ANY,
291+
)

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/99d1535287df7f8cfced39baff7a08f6fcfdf8ca

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy