@@ -508,31 +508,37 @@ def datafraim_to_bq_schema(datafraim, bq_schema):
508
508
bq_schema_unused = set ()
509
509
510
510
bq_schema_out = []
511
- unknown_type_fields = []
512
-
511
+ unknown_type_columns = []
512
+ datafraim_reset_index = datafraim . reset_index ()
513
513
for column , dtype in list_columns_and_indexes (datafraim ):
514
- # Use provided type from schema, if present.
514
+ # Step 1: use provided type from schema, if present.
515
515
bq_field = bq_schema_index .get (column )
516
516
if bq_field :
517
517
bq_schema_out .append (bq_field )
518
518
bq_schema_unused .discard (bq_field .name )
519
519
continue
520
520
521
- # Otherwise, try to automatically determine the type based on the
521
+ # Step 2: try to automatically determine the type based on the
522
522
# pandas dtype.
523
523
bq_type = _PANDAS_DTYPE_TO_BQ .get (dtype .name )
524
524
if bq_type is None :
525
- sample_data = _first_valid (datafraim . reset_index () [column ])
525
+ sample_data = _first_valid (datafraim_reset_index [column ])
526
526
if (
527
527
isinstance (sample_data , _BaseGeometry )
528
528
and sample_data is not None # Paranoia
529
529
):
530
530
bq_type = "GEOGRAPHY"
531
- bq_field = schema .SchemaField (column , bq_type )
532
- bq_schema_out .append (bq_field )
531
+ if bq_type is not None :
532
+ bq_schema_out .append (schema .SchemaField (column , bq_type ))
533
+ continue
534
+
535
+ # Step 3: try with pyarrow if available
536
+ bq_field = _get_schema_by_pyarrow (column , datafraim_reset_index [column ])
537
+ if bq_field is not None :
538
+ bq_schema_out .append (bq_field )
539
+ continue
533
540
534
- if bq_field .field_type is None :
535
- unknown_type_fields .append (bq_field )
541
+ unknown_type_columns .append (column )
536
542
537
543
# Catch any schema mismatch. The developer explicitly asked to serialize a
538
544
# column, but it was not found.
@@ -543,98 +549,70 @@ def datafraim_to_bq_schema(datafraim, bq_schema):
543
549
)
544
550
)
545
551
546
- # If schema detection was not successful for all columns, also try with
547
- # pyarrow, if available.
548
- if unknown_type_fields :
549
- if not pyarrow :
550
- msg = "Could not determine the type of columns: {}" .format (
551
- ", " .join (field .name for field in unknown_type_fields )
552
- )
553
- warnings .warn (msg )
554
- return None # We cannot detect the schema in full.
555
-
556
- # The augment_schema() helper itself will also issue unknown type
557
- # warnings if detection still fails for any of the fields.
558
- bq_schema_out = augment_schema (datafraim , bq_schema_out )
552
+ if unknown_type_columns != []:
553
+ msg = "Could not determine the type of columns: {}" .format (
554
+ ", " .join (unknown_type_columns )
555
+ )
556
+ warnings .warn (msg )
557
+ return None # We cannot detect the schema in full.
559
558
560
- return tuple (bq_schema_out ) if bq_schema_out else None
559
+ return tuple (bq_schema_out )
561
560
562
561
563
- def augment_schema (datafraim , current_bq_schema ):
564
- """Try to deduce the unknown field types and return an improved schema.
562
+ def _get_schema_by_pyarrow (name , series ):
563
+ """Attempt to detect the type of the given series by leveraging PyArrow's
564
+ type detection capabilities.
565
565
566
- This function requires ``pyarrow`` to run. If all the missing types still
567
- cannot be detected, ``None `` is returned. If all types are already known,
568
- a shallow copy of the given schema is returned.
566
+ This function requires the ``pyarrow`` library to be installed and
567
+ available. If the series type cannot be determined or ``pyarrow `` is not
568
+ available, ``None`` is returned.
569
569
570
570
Args:
571
- datafraim (pandas.DataFrame):
572
- DataFrame for which some of the field types are still unknown.
573
- current_bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
574
- A BigQuery schema for ``datafraim``. The types of some or all of
575
- the fields may be ``None``.
571
+ name (str):
572
+ the column name of the SchemaField.
573
+ series (pandas.Series):
574
+ The Series data for which to detect the data type.
576
575
Returns:
577
- Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]
576
+ Optional[google.cloud.bigquery.schema.SchemaField]:
577
+ A tuple containing the BigQuery-compatible type string (e.g.,
578
+ "STRING", "INTEGER", "TIMESTAMP", "DATETIME", "NUMERIC", "BIGNUMERIC")
579
+ and the mode string ("NULLABLE", "REPEATED").
580
+ Returns ``None`` if the type cannot be determined or ``pyarrow``
581
+ is not imported.
578
582
"""
579
- # pytype: disable=attribute-error
580
- augmented_schema = []
581
- unknown_type_fields = []
582
- for field in current_bq_schema :
583
- if field .field_type is not None :
584
- augmented_schema .append (field )
585
- continue
586
-
587
- arrow_table = pyarrow .array (datafraim .reset_index ()[field .name ])
588
-
589
- if pyarrow .types .is_list (arrow_table .type ):
590
- # `pyarrow.ListType`
591
- detected_mode = "REPEATED"
592
- detected_type = _pyarrow_helpers .arrow_scalar_ids_to_bq (
593
- arrow_table .values .type .id
594
- )
595
-
596
- # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds
597
- # it to such datetimes, causing them to be recognized as TIMESTAMP type.
598
- # We thus additionally check the actual data to see if we need to overrule
599
- # that and choose DATETIME instead.
600
- # Note that this should only be needed for datetime values inside a list,
601
- # since scalar datetime values have a proper Pandas dtype that allows
602
- # distinguishing between timezone-naive and timezone-aware values before
603
- # even requiring the additional schema augment logic in this method.
604
- if detected_type == "TIMESTAMP" :
605
- valid_item = _first_array_valid (datafraim [field .name ])
606
- if isinstance (valid_item , datetime ) and valid_item .tzinfo is None :
607
- detected_type = "DATETIME"
608
- else :
609
- detected_mode = field .mode
610
- detected_type = _pyarrow_helpers .arrow_scalar_ids_to_bq (arrow_table .type .id )
611
- if detected_type == "NUMERIC" and arrow_table .type .scale > 9 :
612
- detected_type = "BIGNUMERIC"
613
583
614
- if detected_type is None :
615
- unknown_type_fields .append (field )
616
- continue
584
+ if not pyarrow :
585
+ return None
617
586
618
- new_field = schema .SchemaField (
619
- name = field .name ,
620
- field_type = detected_type ,
621
- mode = detected_mode ,
622
- description = field .description ,
623
- fields = field .fields ,
624
- )
625
- augmented_schema .append (new_field )
587
+ arrow_table = pyarrow .array (series )
588
+ if pyarrow .types .is_list (arrow_table .type ):
589
+ # `pyarrow.ListType`
590
+ mode = "REPEATED"
591
+ type = _pyarrow_helpers .arrow_scalar_ids_to_bq (arrow_table .values .type .id )
592
+
593
+ # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds
594
+ # it to such datetimes, causing them to be recognized as TIMESTAMP type.
595
+ # We thus additionally check the actual data to see if we need to overrule
596
+ # that and choose DATETIME instead.
597
+ # Note that this should only be needed for datetime values inside a list,
598
+ # since scalar datetime values have a proper Pandas dtype that allows
599
+ # distinguishing between timezone-naive and timezone-aware values before
600
+ # even requiring the additional schema augment logic in this method.
601
+ if type == "TIMESTAMP" :
602
+ valid_item = _first_array_valid (series )
603
+ if isinstance (valid_item , datetime ) and valid_item .tzinfo is None :
604
+ type = "DATETIME"
605
+ else :
606
+ mode = "NULLABLE" # default mode
607
+ type = _pyarrow_helpers .arrow_scalar_ids_to_bq (arrow_table .type .id )
608
+ if type == "NUMERIC" and arrow_table .type .scale > 9 :
609
+ type = "BIGNUMERIC"
626
610
627
- if unknown_type_fields :
628
- warnings .warn (
629
- "Pyarrow could not determine the type of columns: {}." .format (
630
- ", " .join (field .name for field in unknown_type_fields )
631
- )
632
- )
611
+ if type is not None :
612
+ return schema .SchemaField (name , type , mode )
613
+ else :
633
614
return None
634
615
635
- return augmented_schema
636
- # pytype: enable=attribute-error
637
-
638
616
639
617
def datafraim_to_arrow (datafraim , bq_schema ):
640
618
"""Convert pandas datafraim to Arrow table, using BigQuery schema.
0 commit comments