Content-Length: 513938 | pFad | https://github.com/apache/airflow/commit/d1fe67184da26fb0bca2416e26f321747fa4aa5d

95 Add `use_legacy_sql` param to `BigQueryGetDataOperator` (#31190) · apache/airflow@d1fe671 · GitHub
Skip to content

Commit d1fe671

Browse files
authored
Add use_legacy_sql param to BigQueryGetDataOperator (#31190)
1 parent 2453231 commit d1fe671

File tree

2 files changed

+19
-15
lines changed

2 files changed

+19
-15
lines changed

airflow/providers/google/cloud/operators/bigquery.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
819819
Defaults to 4 seconds.
820820
:param as_dict: if True returns the result as a list of dictionaries, otherwise as list of lists
821821
(default: False).
822+
:param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false).
822823
"""
823824

824825
template_fields: Sequence[str] = (
@@ -845,6 +846,7 @@ def __init__(
845846
deferrable: bool = False,
846847
poll_interval: float = 4.0,
847848
as_dict: bool = False,
849+
use_legacy_sql: bool = True,
848850
**kwargs,
849851
) -> None:
850852
super().__init__(**kwargs)
@@ -860,14 +862,15 @@ def __init__(
860862
self.deferrable = deferrable
861863
self.poll_interval = poll_interval
862864
self.as_dict = as_dict
865+
self.use_legacy_sql = use_legacy_sql
863866

864867
def _submit_job(
865868
self,
866869
hook: BigQueryHook,
867870
job_id: str,
868871
) -> BigQueryJob:
869872
get_query = self.generate_query()
870-
configuration = {"query": {"query": get_query}}
873+
configuration = {"query": {"query": get_query, "useLegacySql": self.use_legacy_sql}}
871874
"""Submit a new job and get the job id for polling the status using Triggerer."""
872875
return hook.insert_job(
873876
configuration=configuration,
@@ -887,18 +890,23 @@ def generate_query(self) -> str:
887890
query += self.selected_fields
888891
else:
889892
query += "*"
890-
query += f" from {self.dataset_id}.{self.table_id} limit {self.max_results}"
893+
query += f" from `{self.project_id}.{self.dataset_id}.{self.table_id}` limit {self.max_results}"
891894
return query
892895

893896
def execute(self, context: Context):
894897
hook = BigQueryHook(
895898
gcp_conn_id=self.gcp_conn_id,
896899
impersonation_chain=self.impersonation_chain,
900+
use_legacy_sql=self.use_legacy_sql,
897901
)
898902

899903
if not self.deferrable:
900904
self.log.info(
901-
"Fetching Data from %s.%s max results: %s", self.dataset_id, self.table_id, self.max_results
905+
"Fetching Data from %s.%s.%s max results: %s",
906+
self.project_id,
907+
self.dataset_id,
908+
self.table_id,
909+
self.max_results,
902910
)
903911
if not self.selected_fields:
904912
schema: dict[str, list] = hook.get_schema(

tests/providers/google/cloud/operators/test_bigquery.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
"refreshIntervalMs": 2000000,
8383
}
8484
TEST_TABLE = "test-table"
85+
GCP_CONN_ID = "google_cloud_default"
8586

8687

8788
class TestBigQueryCreateEmptyTableOperator:
@@ -791,6 +792,7 @@ def test_execute(self, mock_hook, as_dict):
791792
max_results = 100
792793
selected_fields = "DATE"
793794
operator = BigQueryGetDataOperator(
795+
gcp_conn_id=GCP_CONN_ID,
794796
task_id=TASK_ID,
795797
dataset_id=TEST_DATASET,
796798
table_id=TEST_TABLE_ID,
@@ -799,8 +801,10 @@ def test_execute(self, mock_hook, as_dict):
799801
selected_fields=selected_fields,
800802
location=TEST_DATASET_LOCATION,
801803
as_dict=as_dict,
804+
use_legacy_sql=False,
802805
)
803806
operator.execute(None)
807+
mock_hook.assert_called_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None, use_legacy_sql=False)
804808
mock_hook.return_value.list_rows.assert_called_once_with(
805809
dataset_id=TEST_DATASET,
806810
table_id=TEST_TABLE_ID,
@@ -818,12 +822,6 @@ def test_bigquery_get_data_operator_async_with_selected_fields(
818822
Asserts that a task is deferred and a BigQuerygetDataTrigger will be fired
819823
when the BigQueryGetDataOperator is executed with deferrable=True.
820824
"""
821-
job_id = "123456"
822-
hash_ = "hash"
823-
real_job_id = f"{job_id}_{hash_}"
824-
825-
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
826-
827825
ti = create_task_instance_of_operator(
828826
BigQueryGetDataOperator,
829827
dag_id="dag_id",
@@ -833,6 +831,7 @@ def test_bigquery_get_data_operator_async_with_selected_fields(
833831
max_results=100,
834832
selected_fields="value,name",
835833
deferrable=True,
834+
use_legacy_sql=False,
836835
)
837836

838837
with pytest.raises(TaskDeferred) as exc:
@@ -851,12 +850,6 @@ def test_bigquery_get_data_operator_async_without_selected_fields(
851850
Asserts that a task is deferred and a BigQueryGetDataTrigger will be fired
852851
when the BigQueryGetDataOperator is executed with deferrable=True.
853852
"""
854-
job_id = "123456"
855-
hash_ = "hash"
856-
real_job_id = f"{job_id}_{hash_}"
857-
858-
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
859-
860853
ti = create_task_instance_of_operator(
861854
BigQueryGetDataOperator,
862855
dag_id="dag_id",
@@ -866,6 +859,7 @@ def test_bigquery_get_data_operator_async_without_selected_fields(
866859
max_results=100,
867860
deferrable=True,
868861
as_dict=as_dict,
862+
use_legacy_sql=False,
869863
)
870864

871865
with pytest.raises(TaskDeferred) as exc:
@@ -886,6 +880,7 @@ def test_bigquery_get_data_operator_execute_failure(self, as_dict):
886880
max_results=100,
887881
deferrable=True,
888882
as_dict=as_dict,
883+
use_legacy_sql=False,
889884
)
890885

891886
with pytest.raises(AirflowException):
@@ -904,6 +899,7 @@ def test_bigquery_get_data_op_execute_complete_with_records(self, as_dict):
904899
max_results=100,
905900
deferrable=True,
906901
as_dict=as_dict,
902+
use_legacy_sql=False,
907903
)
908904

909905
with mock.patch.object(operator.log, "info") as mock_log_info:

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/d1fe67184da26fb0bca2416e26f321747fa4aa5d

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy