@@ -79,6 +79,8 @@ class CassandraToGCSOperator(BaseOperator):
79
79
:param query_timeout: (Optional) The amount of time, in seconds, used to execute the Cassandra query.
80
80
If not set, the timeout value will be set in Session.execute() by Cassandra driver.
81
81
If set to None, there is no timeout.
82
+ :param encode_uuid: (Optional) Option to encode UUID or not when upload from Cassandra to GCS.
83
+ Default is to encode UUID.
82
84
"""
83
85
84
86
template_fields : Sequence [str ] = (
@@ -105,6 +107,7 @@ def __init__(
105
107
delegate_to : Optional [str ] = None ,
106
108
impersonation_chain : Optional [Union [str , Sequence [str ]]] = None ,
107
109
query_timeout : Union [float , None , NotSetType ] = NOT_SET ,
110
+ encode_uuid : bool = True ,
108
111
** kwargs ,
109
112
) -> None :
110
113
super ().__init__ (** kwargs )
@@ -120,6 +123,7 @@ def __init__(
120
123
self .gzip = gzip
121
124
self .impersonation_chain = impersonation_chain
122
125
self .query_timeout = query_timeout
126
+ self .encode_uuid = encode_uuid
123
127
124
128
# Default Cassandra to BigQuery type mapping
125
129
CQL_TYPE_MAP = {
@@ -256,13 +260,11 @@ def _upload_to_gcs(self, file_to_upload):
256
260
gzip = self .gzip ,
257
261
)
258
262
259
- @classmethod
260
- def generate_data_dict (cls , names : Iterable [str ], values : Any ) -> Dict [str , Any ]:
263
+ def generate_data_dict (self , names : Iterable [str ], values : Any ) -> Dict [str , Any ]:
261
264
"""Generates data structure that will be stored as file in GCS."""
262
- return {n : cls .convert_value (v ) for n , v in zip (names , values )}
265
+ return {n : self .convert_value (v ) for n , v in zip (names , values )}
263
266
264
- @classmethod
265
- def convert_value (cls , value : Optional [Any ]) -> Optional [Any ]:
267
+ def convert_value (self , value : Optional [Any ]) -> Optional [Any ]:
266
268
"""Convert value to BQ type."""
267
269
if not value :
268
270
return value
@@ -271,59 +273,58 @@ def convert_value(cls, value: Optional[Any]) -> Optional[Any]:
271
273
elif isinstance (value , bytes ):
272
274
return b64encode (value ).decode ('ascii' )
273
275
elif isinstance (value , UUID ):
274
- return b64encode (value .bytes ).decode ('ascii' )
276
+ if self .encode_uuid :
277
+ return b64encode (value .bytes ).decode ('ascii' )
278
+ else :
279
+ return str (value )
275
280
elif isinstance (value , (datetime , Date )):
276
281
return str (value )
277
282
elif isinstance (value , Decimal ):
278
283
return float (value )
279
284
elif isinstance (value , Time ):
280
285
return str (value ).split ('.' )[0 ]
281
286
elif isinstance (value , (list , SortedSet )):
282
- return cls .convert_array_types (value )
287
+ return self .convert_array_types (value )
283
288
elif hasattr (value , '_fields' ):
284
- return cls .convert_user_type (value )
289
+ return self .convert_user_type (value )
285
290
elif isinstance (value , tuple ):
286
- return cls .convert_tuple_type (value )
291
+ return self .convert_tuple_type (value )
287
292
elif isinstance (value , OrderedMapSerializedKey ):
288
- return cls .convert_map_type (value )
293
+ return self .convert_map_type (value )
289
294
else :
290
295
raise AirflowException ('Unexpected value: ' + str (value ))
291
296
292
- @classmethod
293
- def convert_array_types (cls , value : Union [List [Any ], SortedSet ]) -> List [Any ]:
297
+ def convert_array_types (self , value : Union [List [Any ], SortedSet ]) -> List [Any ]:
294
298
"""Maps convert_value over array."""
295
- return [cls .convert_value (nested_value ) for nested_value in value ]
299
+ return [self .convert_value (nested_value ) for nested_value in value ]
296
300
297
- @classmethod
298
- def convert_user_type (cls , value : Any ) -> Dict [str , Any ]:
301
+ def convert_user_type (self , value : Any ) -> Dict [str , Any ]:
299
302
"""
300
303
Converts a user type to RECORD that contains n fields, where n is the
301
304
number of attributes. Each element in the user type class will be converted to its
302
305
corresponding data type in BQ.
303
306
"""
304
307
names = value ._fields
305
- values = [cls .convert_value (getattr (value , name )) for name in names ]
306
- return cls .generate_data_dict (names , values )
308
+ values = [self .convert_value (getattr (value , name )) for name in names ]
309
+ return self .generate_data_dict (names , values )
307
310
308
- @classmethod
309
- def convert_tuple_type (cls , values : Tuple [Any ]) -> Dict [str , Any ]:
311
+ def convert_tuple_type (self , values : Tuple [Any ]) -> Dict [str , Any ]:
310
312
"""
311
313
Converts a tuple to RECORD that contains n fields, each will be converted
312
314
to its corresponding data type in bq and will be named 'field_<index>', where
313
315
index is determined by the order of the tuple elements defined in cassandra.
314
316
"""
315
317
names = ['field_' + str (i ) for i in range (len (values ))]
316
- return cls .generate_data_dict (names , values )
318
+ return self .generate_data_dict (names , values )
317
319
318
- @classmethod
319
- def convert_map_type (cls , value : OrderedMapSerializedKey ) -> List [Dict [str , Any ]]:
320
+ def convert_map_type (self , value : OrderedMapSerializedKey ) -> List [Dict [str , Any ]]:
320
321
"""
321
322
Converts a map to a repeated RECORD that contains two fields: 'key' and 'value',
322
323
each will be converted to its corresponding data type in BQ.
323
324
"""
324
325
converted_map = []
325
326
for k , v in zip (value .keys (), value .values ()):
326
- converted_map .append ({'key' : cls .convert_value (k ), 'value' : cls .convert_value (v )})
327
+ converted_map .append ({'key' : self .convert_value (k ), 'value' : self .convert_value (v )})
327
328
return converted_map
328
329
329
330
@classmethod
0 commit comments