Content-Length: 683641 | pFad | http://github.com/apache/airflow/commit/1ca6b51ea68baffa2fa1b822cd7583932b67415a

08 Avoid double json.dumps for json data export in PostgresToGCSOperator. · apache/airflow@1ca6b51 · GitHub
Skip to content

Commit 1ca6b51

Browse files
Avoid double json.dumps for json data export in PostgresToGCSOperator.
1 parent 3a2eb96 commit 1ca6b51

File tree

8 files changed

+37
-22
lines changed

8 files changed

+37
-22
lines changed

airflow/providers/google/cloud/transfers/mssql_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
8080
}
8181

8282
@classmethod
83-
def convert_type(cls, value, schema_type):
83+
def convert_type(cls, value, schema_type, **kwargs):
8484
"""
8585
Takes a value from MSSQL, and converts it to a value that's safe for
8686
JSON/Google Cloud Storage/BigQuery.

airflow/providers/google/cloud/transfers/mysql_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
9292
'mode': field_mode,
9393
}
9494

95-
def convert_type(self, value, schema_type: str):
95+
def convert_type(self, value, schema_type: str, **kwargs):
9696
"""
9797
Takes a value from MySQLdb, and converts it to a value that's safe for
9898
JSON/Google Cloud Storage/BigQuery.

airflow/providers/google/cloud/transfers/oracle_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
8585
'mode': field_mode,
8686
}
8787

88-
def convert_type(self, value, schema_type):
88+
def convert_type(self, value, schema_type, **kwargs):
8989
"""
9090
Takes a value from Oracle db, and converts it to a value that's safe for
9191
JSON/Google Cloud Storage/BigQuery.

airflow/providers/google/cloud/transfers/postgres_to_gcs.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,17 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
128128
'mode': 'REPEATED' if field[1] in (1009, 1005, 1007, 1016) else 'NULLABLE',
129129
}
130130

131-
def convert_type(self, value, schema_type):
131+
def convert_type(self, value, schema_type, stringify_dict=True):
132132
"""
133133
Takes a value from Postgres, and converts it to a value that's safe for
134134
JSON/Google Cloud Storage/BigQuery.
135135
Timezone aware Datetime are converted to UTC seconds.
136136
Unaware Datetime, Date and Time are converted to ISO formatted strings.
137137
Decimals are converted to floats.
138+
139+
:param value: Postgres column value.
140+
:param schema_type: BigQuery data type.
141+
:param stringify_dict: Specify whether to convert dict to string.
138142
"""
139143
if isinstance(value, datetime.datetime):
140144
iso_format_value = value.isoformat()
@@ -149,7 +153,7 @@ def convert_type(self, value, schema_type):
149153
hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec
150154
)
151155
return str(time_delta)
152-
if isinstance(value, dict):
156+
if stringify_dict and isinstance(value, dict):
153157
return json.dumps(value)
154158
if isinstance(value, Decimal):
155159
return float(value)

airflow/providers/google/cloud/transfers/presto_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
195195

196196
return {"name": field[0], "type": new_field_type}
197197

198-
def convert_type(self, value, schema_type):
198+
def convert_type(self, value, schema_type, **kwargs):
199199
"""
200200
Do nothing. Presto uses JSON on the transport layer, so types are simple.
201201

airflow/providers/google/cloud/transfers/sql_to_gcs.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,12 @@ def execute(self, context: 'Context'):
164164
file_to_upload['file_handle'].close()
165165
counter += 1
166166

167-
def convert_types(self, schema, col_type_dict, row) -> list:
167+
def convert_types(self, schema, col_type_dict, row, stringify_dict=False) -> list:
168168
"""Convert values from DBAPI to output-friendly formats."""
169-
return [self.convert_type(value, col_type_dict.get(name)) for name, value in zip(schema, row)]
169+
return [
170+
self.convert_type(value, col_type_dict.get(name), stringify_dict=stringify_dict)
171+
for name, value in zip(schema, row)
172+
]
170173

171174
def _write_local_data_files(self, cursor):
172175
"""
@@ -200,21 +203,20 @@ def _write_local_data_files(self, cursor):
200203
parquet_writer = self._configure_parquet_file(tmp_file_handle, parquet_schema)
201204

202205
for row in cursor:
203-
# Convert datetime objects to utc seconds, and decimals to floats.
204-
# Convert binary type object to string encoded with base64.
205-
row = self.convert_types(schema, col_type_dict, row)
206-
207206
if self.export_format == 'csv':
207+
row = self.convert_types(schema, col_type_dict, row)
208208
if self.null_marker is not None:
209209
row = [value if value is not None else self.null_marker for value in row]
210210
csv_writer.writerow(row)
211211
elif self.export_format == 'parquet':
212+
row = self.convert_types(schema, col_type_dict, row)
212213
if self.null_marker is not None:
213214
row = [value if value is not None else self.null_marker for value in row]
214215
row_pydic = {col: [value] for col, value in zip(schema, row)}
215216
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
216217
parquet_writer.write_table(tbl)
217218
else:
219+
row = self.convert_types(schema, col_type_dict, row, stringify_dict=False)
218220
row_dict = dict(zip(schema, row))
219221

220222
tmp_file_handle.write(
@@ -287,7 +289,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
287289
"""Convert a DBAPI field to BigQuery schema format."""
288290

289291
@abc.abstractmethod
290-
def convert_type(self, value, schema_type):
292+
def convert_type(self, value, schema_type, **kwargs):
291293
"""Convert a value from DBAPI to output-friendly formats."""
292294

293295
def _get_col_type_dict(self):

airflow/providers/google/cloud/transfers/trino_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
195195

196196
return {"name": field[0], "type": new_field_type}
197197

198-
def convert_type(self, value, schema_type):
198+
def convert_type(self, value, schema_type, **kwargs):
199199
"""
200200
Do nothing. Trino uses JSON on the transport layer, so types are simple.
201201

tests/providers/google/cloud/transfers/test_postgres_to_gcs.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@
3535
FILENAME = 'test_{}.ndjson'
3636

3737
NDJSON_LINES = [
38-
b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
39-
b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
40-
b'{"some_num": 44, "some_str": "mock_row_content_3"}\n',
38+
b'{"some_json": {"firtname": "John", "lastname": "Smith", "nested_dict": {"a": null, "b": "something"}}, "some_num": 42, "some_str": "mock_row_content_1"}\n', # noqa
39+
b'{"some_json": {}, "some_num": 43, "some_str": "mock_row_content_2"}\n',
40+
b'{"some_json": {}, "some_num": 44, "some_str": "mock_row_content_3"}\n',
4141
]
4242
SCHEMA_FILENAME = 'schema_test.json'
4343
SCHEMA_JSON = (
4444
b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, '
45-
b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}]'
45+
b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, '
46+
b'{"mode": "NULLABLE", "name": "some_json", "type": "STRING"}]'
4647
)
4748

4849

@@ -55,16 +56,24 @@ def setUpClass(cls):
5556
with conn.cursor() as cur:
5657
for table in TABLES:
5758
cur.execute(f"DROP TABLE IF EXISTS {table} CASCADE;")
58-
cur.execute(f"CREATE TABLE {table}(some_str varchar, some_num integer);")
59+
cur.execute(f"CREATE TABLE {table}(some_str varchar, some_num integer, some_json json);")
5960

6061
cur.execute(
61-
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_1', 42)
62+
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
63+
(
64+
'mock_row_content_1',
65+
42,
66+
'{"lastname": "Smith", "firtname": "John", \
67+
"nested_dict": {"a": null, "b": "something"}}',
68+
),
6269
)
6370
cur.execute(
64-
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_2', 43)
71+
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
72+
('mock_row_content_2', 43, '{}'),
6573
)
6674
cur.execute(
67-
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_3', 44)
75+
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
76+
('mock_row_content_3', 44, '{}'),
6877
)
6978

7079
@classmethod

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/commit/1ca6b51ea68baffa2fa1b822cd7583932b67415a

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy