Content-Length: 370565 | pFad | https://github.com/apache/airflow/commit/bb5e403a320e7377e5040cb180f61b4f5a9ea558

44 Honor schema type for MySQL to GCS data pre-process (#8090) · apache/airflow@bb5e403 · GitHub
Skip to content

Commit bb5e403

Browse files
authored
Honor schema type for MySQL to GCS data pre-process (#8090)
1 parent c9c336c commit bb5e403

File tree

2 files changed

+28
-12
lines changed

2 files changed

+28
-12
lines changed

airflow/providers/google/cloud/operators/mysql_to_gcs.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,16 @@ def field_to_bigquery(self, field):
102102
def convert_type(self, value, schema_type):
103103
"""
104104
Takes a value from MySQLdb, and converts it to a value that's safe for
105-
JSON/Google Cloud Storage/BigQuery. Dates are converted to UTC seconds.
106-
Decimals are converted to floats. Binary type fields are encoded with base64,
107-
as imported BYTES data must be base64-encoded according to Bigquery SQL
108-
date type documentation: https://cloud.google.com/bigquery/data-types
105+
JSON/Google Cloud Storage/BigQuery.
106+
107+
* Datetimes are converted to UTC seconds.
108+
* Decimals are converted to floats.
109+
* Dates are converted to ISO formatted string if given schema_type is
110+
DATE, or UTC seconds otherwise.
111+
* Binary type fields are converted to integer if given schema_type is
112+
INTEGER, or encoded with base64 otherwise. Imported BYTES data must
113+
be base64-encoded according to BigQuery documentation:
114+
https://cloud.google.com/bigquery/data-types
109115
110116
:param value: MySQLdb column value
111117
:type value: Any
@@ -114,12 +120,20 @@ def convert_type(self, value, schema_type):
114120
"""
115121
if value is None:
116122
return value
117-
if isinstance(value, (datetime, date)):
118-
return calendar.timegm(value.timetuple())
119-
if isinstance(value, timedelta):
120-
return value.total_seconds()
121-
if isinstance(value, Decimal):
122-
return float(value)
123-
if isinstance(value, bytes) or schema_type == "BYTES":
124-
return base64.standard_b64encode(value).decode('ascii')
123+
if isinstance(value, datetime):
124+
value = calendar.timegm(value.timetuple())
125+
elif isinstance(value, timedelta):
126+
value = value.total_seconds()
127+
elif isinstance(value, Decimal):
128+
value = float(value)
129+
elif isinstance(value, date):
130+
if schema_type == "DATE":
131+
value = value.isoformat()
132+
else:
133+
value = calendar.timegm(value.timetuple())
134+
elif isinstance(value, bytes):
135+
if schema_type == "INTEGER":
136+
value = int.from_bytes(value, "big")
137+
else:
138+
value = base64.standard_b64encode(value).decode('ascii')
125139
return value

tests/providers/google/cloud/operators/test_mysql_to_gcs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,11 @@ def test_init(self):
9292
@parameterized.expand([
9393
("string", None, "string"),
9494
(datetime.date(1970, 1, 2), None, 86400),
95+
(datetime.date(1970, 1, 2), "DATE", "1970-01-02"),
9596
(datetime.datetime(1970, 1, 1, 1, 0), None, 3600),
9697
(decimal.Decimal(5), None, 5),
9798
(b"bytes", "BYTES", "Ynl0ZXM="),
99+
(b"\x00\x01", "INTEGER", 1),
98100
(None, "BYTES", None)
99101
])
100102
def test_convert_type(self, value, schema_type, expected):

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/bb5e403a320e7377e5040cb180f61b4f5a9ea558

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy