Content-Length: 587882 | pFad | https://github.com/apache/airflow/commit/5bfacf81c63668ea63e7cb48f4a708a67d0ac0a2

2C [Issue#22846] allow option to encode or not encode UUID when uploadin… · apache/airflow@5bfacf8 · GitHub
Skip to content

Commit 5bfacf8

Browse files
authored
[Issue#22846] allow option to encode or not encode UUID when uploading from Cassandra to GCS (#23766)
1 parent 8494fc7 commit 5bfacf8

File tree

2 files changed

+40
-29
lines changed

2 files changed

+40
-29
lines changed

airflow/providers/google/cloud/transfers/cassandra_to_gcs.py

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ class CassandraToGCSOperator(BaseOperator):
7979
:param query_timeout: (Optional) The amount of time, in seconds, used to execute the Cassandra query.
8080
If not set, the timeout value will be set in Session.execute() by Cassandra driver.
8181
If set to None, there is no timeout.
82+
:param encode_uuid: (Optional) Option to encode UUID or not when upload from Cassandra to GCS.
83+
Default is to encode UUID.
8284
"""
8385

8486
template_fields: Sequence[str] = (
@@ -105,6 +107,7 @@ def __init__(
105107
delegate_to: Optional[str] = None,
106108
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
107109
query_timeout: Union[float, None, NotSetType] = NOT_SET,
110+
encode_uuid: bool = True,
108111
**kwargs,
109112
) -> None:
110113
super().__init__(**kwargs)
@@ -120,6 +123,7 @@ def __init__(
120123
self.gzip = gzip
121124
self.impersonation_chain = impersonation_chain
122125
self.query_timeout = query_timeout
126+
self.encode_uuid = encode_uuid
123127

124128
# Default Cassandra to BigQuery type mapping
125129
CQL_TYPE_MAP = {
@@ -256,13 +260,11 @@ def _upload_to_gcs(self, file_to_upload):
256260
gzip=self.gzip,
257261
)
258262

259-
@classmethod
260-
def generate_data_dict(cls, names: Iterable[str], values: Any) -> Dict[str, Any]:
263+
def generate_data_dict(self, names: Iterable[str], values: Any) -> Dict[str, Any]:
261264
"""Generates data structure that will be stored as file in GCS."""
262-
return {n: cls.convert_value(v) for n, v in zip(names, values)}
265+
return {n: self.convert_value(v) for n, v in zip(names, values)}
263266

264-
@classmethod
265-
def convert_value(cls, value: Optional[Any]) -> Optional[Any]:
267+
def convert_value(self, value: Optional[Any]) -> Optional[Any]:
266268
"""Convert value to BQ type."""
267269
if not value:
268270
return value
@@ -271,59 +273,58 @@ def convert_value(cls, value: Optional[Any]) -> Optional[Any]:
271273
elif isinstance(value, bytes):
272274
return b64encode(value).decode('ascii')
273275
elif isinstance(value, UUID):
274-
return b64encode(value.bytes).decode('ascii')
276+
if self.encode_uuid:
277+
return b64encode(value.bytes).decode('ascii')
278+
else:
279+
return str(value)
275280
elif isinstance(value, (datetime, Date)):
276281
return str(value)
277282
elif isinstance(value, Decimal):
278283
return float(value)
279284
elif isinstance(value, Time):
280285
return str(value).split('.')[0]
281286
elif isinstance(value, (list, SortedSet)):
282-
return cls.convert_array_types(value)
287+
return self.convert_array_types(value)
283288
elif hasattr(value, '_fields'):
284-
return cls.convert_user_type(value)
289+
return self.convert_user_type(value)
285290
elif isinstance(value, tuple):
286-
return cls.convert_tuple_type(value)
291+
return self.convert_tuple_type(value)
287292
elif isinstance(value, OrderedMapSerializedKey):
288-
return cls.convert_map_type(value)
293+
return self.convert_map_type(value)
289294
else:
290295
raise AirflowException('Unexpected value: ' + str(value))
291296

292-
@classmethod
293-
def convert_array_types(cls, value: Union[List[Any], SortedSet]) -> List[Any]:
297+
def convert_array_types(self, value: Union[List[Any], SortedSet]) -> List[Any]:
294298
"""Maps convert_value over array."""
295-
return [cls.convert_value(nested_value) for nested_value in value]
299+
return [self.convert_value(nested_value) for nested_value in value]
296300

297-
@classmethod
298-
def convert_user_type(cls, value: Any) -> Dict[str, Any]:
301+
def convert_user_type(self, value: Any) -> Dict[str, Any]:
299302
"""
300303
Converts a user type to RECORD that contains n fields, where n is the
301304
number of attributes. Each element in the user type class will be converted to its
302305
corresponding data type in BQ.
303306
"""
304307
names = value._fields
305-
values = [cls.convert_value(getattr(value, name)) for name in names]
306-
return cls.generate_data_dict(names, values)
308+
values = [self.convert_value(getattr(value, name)) for name in names]
309+
return self.generate_data_dict(names, values)
307310

308-
@classmethod
309-
def convert_tuple_type(cls, values: Tuple[Any]) -> Dict[str, Any]:
311+
def convert_tuple_type(self, values: Tuple[Any]) -> Dict[str, Any]:
310312
"""
311313
Converts a tuple to RECORD that contains n fields, each will be converted
312314
to its corresponding data type in bq and will be named 'field_<index>', where
313315
index is determined by the order of the tuple elements defined in cassandra.
314316
"""
315317
names = ['field_' + str(i) for i in range(len(values))]
316-
return cls.generate_data_dict(names, values)
318+
return self.generate_data_dict(names, values)
317319

318-
@classmethod
319-
def convert_map_type(cls, value: OrderedMapSerializedKey) -> List[Dict[str, Any]]:
320+
def convert_map_type(self, value: OrderedMapSerializedKey) -> List[Dict[str, Any]]:
320321
"""
321322
Converts a map to a repeated RECORD that contains two fields: 'key' and 'value',
322323
each will be converted to its corresponding data type in BQ.
323324
"""
324325
converted_map = []
325326
for k, v in zip(value.keys(), value.values()):
326-
converted_map.append({'key': cls.convert_value(k), 'value': cls.convert_value(v)})
327+
converted_map.append({'key': self.convert_value(k), 'value': self.convert_value(v)})
327328
return converted_map
328329

329330
@classmethod

tests/providers/google/cloud/transfers/test_cassandra_to_gcs.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,28 @@
2323
from airflow.providers.google.cloud.transfers.cassandra_to_gcs import CassandraToGCSOperator
2424

2525
TMP_FILE_NAME = "temp-file"
26+
TEST_BUCKET = "test-bucket"
27+
SCHEMA = "schema.json"
28+
FILENAME = "data.json"
29+
CQL = "select * from keyspace1.table1"
30+
TASK_ID = "test-cas-to-gcs"
2631

2732

2833
class TestCassandraToGCS(unittest.TestCase):
2934
@mock.patch("airflow.providers.google.cloud.transfers.cassandra_to_gcs.NamedTemporaryFile")
3035
@mock.patch("airflow.providers.google.cloud.transfers.cassandra_to_gcs.GCSHook.upload")
3136
@mock.patch("airflow.providers.google.cloud.transfers.cassandra_to_gcs.CassandraHook")
3237
def test_execute(self, mock_hook, mock_upload, mock_tempfile):
33-
test_bucket = "test-bucket"
34-
schema = "schema.json"
35-
filename = "data.json"
38+
test_bucket = TEST_BUCKET
39+
schema = SCHEMA
40+
filename = FILENAME
3641
gzip = True
3742
query_timeout = 20
3843
mock_tempfile.return_value.name = TMP_FILE_NAME
3944

4045
operator = CassandraToGCSOperator(
41-
task_id="test-cas-to-gcs",
42-
cql="select * from keyspace1.table1",
46+
task_id=TASK_ID,
47+
cql=CQL,
4348
bucket=test_bucket,
4449
filename=filename,
4550
schema_filename=schema,
@@ -70,7 +75,10 @@ def test_execute(self, mock_hook, mock_upload, mock_tempfile):
7075
mock_upload.assert_has_calls([call_schema, call_data], any_order=True)
7176

7277
def test_convert_value(self):
73-
op = CassandraToGCSOperator
78+
op = CassandraToGCSOperator(task_id=TASK_ID, bucket=TEST_BUCKET, cql=CQL, filename=FILENAME)
79+
unencoded_uuid_op = CassandraToGCSOperator(
80+
task_id=TASK_ID, bucket=TEST_BUCKET, cql=CQL, filename=FILENAME, encode_uuid=False
81+
)
7482
assert op.convert_value(None) is None
7583
assert op.convert_value(1) == 1
7684
assert op.convert_value(1.0) == 1.0
@@ -95,6 +103,8 @@ def test_convert_value(self):
95103
test_uuid = uuid.uuid4()
96104
encoded_uuid = b64encode(test_uuid.bytes).decode("ascii")
97105
assert op.convert_value(test_uuid) == encoded_uuid
106+
unencoded_uuid = str(test_uuid)
107+
assert unencoded_uuid_op.convert_value(test_uuid) == unencoded_uuid
98108

99109
byte_str = b"abc"
100110
encoded_b = b64encode(byte_str).decode("ascii")

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/5bfacf81c63668ea63e7cb48f4a708a67d0ac0a2

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy