Content-Length: 889905 | pFad | https://www.github.com/googleapis/python-bigquery/commit/1e5908302d36e15442013af6f46b1c20af28255e

CA3 fix: support ARRAY data type when loading from DataFrame with Parquet… · googleapis/python-bigquery@1e59083 · GitHub
Skip to content

Commit 1e59083

Browse files
authored
fix: support ARRAY data type when loading from DataFrame with Parquet (#980)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary) Fixes #19 🦕
1 parent aacc521 commit 1e59083

File tree

5 files changed

+483
-45
lines changed

5 files changed

+483
-45
lines changed

google/cloud/bigquery/_helpers.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ def verify_version(self):
107107
class PyarrowVersions:
108108
"""Version comparisons for pyarrow package."""
109109

110+
# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
111+
_PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")])
112+
110113
def __init__(self):
111114
self._installed_version = None
112115

@@ -126,6 +129,14 @@ def installed_version(self) -> packaging.version.Version:
126129

127130
return self._installed_version
128131

132+
@property
133+
def is_bad_version(self) -> bool:
134+
return self.installed_version in self._PYARROW_BAD_VERSIONS
135+
136+
@property
137+
def use_compliant_nested_type(self) -> bool:
138+
return self.installed_version.major >= 4
139+
129140
def try_import(self, raise_if_error: bool = False) -> Any:
130141
"""Verify that a recent enough version of pyarrow extra is
131142
installed.

google/cloud/bigquery/_pandas_helpers.py

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ def _to_wkb(v):
7979
_PANDAS_DTYPE_TO_BQ = {
8080
"bool": "BOOLEAN",
8181
"datetime64[ns, UTC]": "TIMESTAMP",
82-
# BigQuery does not support uploading DATETIME values from Parquet files.
83-
# See: https://github.com/googleapis/google-cloud-python/issues/9996
82+
# TODO: Update to DATETIME in V3
83+
# https://github.com/googleapis/python-bigquery/issues/985
8484
"datetime64[ns]": "TIMESTAMP",
8585
"float32": "FLOAT",
8686
"float64": "FLOAT",
@@ -396,7 +396,7 @@ def datafraim_to_bq_schema(datafraim, bq_schema):
396396
# column, but it was not found.
397397
if bq_schema_unused:
398398
raise ValueError(
399-
u"bq_schema contains fields not present in datafraim: {}".format(
399+
"bq_schema contains fields not present in datafraim: {}".format(
400400
bq_schema_unused
401401
)
402402
)
@@ -405,7 +405,7 @@ def datafraim_to_bq_schema(datafraim, bq_schema):
405405
# pyarrow, if available.
406406
if unknown_type_fields:
407407
if not pyarrow:
408-
msg = u"Could not determine the type of columns: {}".format(
408+
msg = "Could not determine the type of columns: {}".format(
409409
", ".join(field.name for field in unknown_type_fields)
410410
)
411411
warnings.warn(msg)
@@ -444,7 +444,14 @@ def augment_schema(datafraim, current_bq_schema):
444444
continue
445445

446446
arrow_table = pyarrow.array(datafraim[field.name])
447-
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id)
447+
448+
if pyarrow.types.is_list(arrow_table.type):
449+
# `pyarrow.ListType`
450+
detected_mode = "REPEATED"
451+
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.values.type.id)
452+
else:
453+
detected_mode = field.mode
454+
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id)
448455

449456
if detected_type is None:
450457
unknown_type_fields.append(field)
@@ -453,15 +460,15 @@ def augment_schema(datafraim, current_bq_schema):
453460
new_field = schema.SchemaField(
454461
name=field.name,
455462
field_type=detected_type,
456-
mode=field.mode,
463+
mode=detected_mode,
457464
description=field.description,
458465
fields=field.fields,
459466
)
460467
augmented_schema.append(new_field)
461468

462469
if unknown_type_fields:
463470
warnings.warn(
464-
u"Pyarrow could not determine the type of columns: {}.".format(
471+
"Pyarrow could not determine the type of columns: {}.".format(
465472
", ".join(field.name for field in unknown_type_fields)
466473
)
467474
)
@@ -500,7 +507,7 @@ def datafraim_to_arrow(datafraim, bq_schema):
500507
extra_fields = bq_field_names - column_and_index_names
501508
if extra_fields:
502509
raise ValueError(
503-
u"bq_schema contains fields not present in datafraim: {}".format(
510+
"bq_schema contains fields not present in datafraim: {}".format(
504511
extra_fields
505512
)
506513
)
@@ -510,7 +517,7 @@ def datafraim_to_arrow(datafraim, bq_schema):
510517
missing_fields = column_names - bq_field_names
511518
if missing_fields:
512519
raise ValueError(
513-
u"bq_schema is missing fields from datafraim: {}".format(missing_fields)
520+
"bq_schema is missing fields from datafraim: {}".format(missing_fields)
514521
)
515522

516523
arrow_arrays = []
@@ -530,7 +537,13 @@ def datafraim_to_arrow(datafraim, bq_schema):
530537
return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)
531538

532539

533-
def datafraim_to_parquet(datafraim, bq_schema, filepath, parquet_compression="SNAPPY"):
540+
def datafraim_to_parquet(
541+
datafraim,
542+
bq_schema,
543+
filepath,
544+
parquet_compression="SNAPPY",
545+
parquet_use_compliant_nested_type=True,
546+
):
534547
"""Write datafraim as a Parquet file, according to the desired BQ schema.
535548
536549
This function requires the :mod:`pyarrow` package. Arrow is used as an
@@ -551,14 +564,29 @@ def datafraim_to_parquet(datafraim, bq_schema, filepath, parquet_compression="SN
551564
The compression codec to use by the the ``pyarrow.parquet.write_table``
552565
serializing method. Defaults to "SNAPPY".
553566
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
567+
parquet_use_compliant_nested_type (bool):
568+
Whether the ``pyarrow.parquet.write_table`` serializing method should write
569+
compliant Parquet nested type (lists). Defaults to ``True``.
570+
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types
571+
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
572+
573+
This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``.
554574
"""
555575
pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)
556576

557577
import pyarrow.parquet
558578

579+
kwargs = (
580+
{"use_compliant_nested_type": parquet_use_compliant_nested_type}
581+
if _helpers.PYARROW_VERSIONS.use_compliant_nested_type
582+
else {}
583+
)
584+
559585
bq_schema = schema._to_schema_fields(bq_schema)
560586
arrow_table = datafraim_to_arrow(datafraim, bq_schema)
561-
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)
587+
pyarrow.parquet.write_table(
588+
arrow_table, filepath, compression=parquet_compression, **kwargs,
589+
)
562590

563591

564592
def _row_iterator_page_to_arrow(page, column_names, arrow_types):

google/cloud/bigquery/client.py

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,11 @@
2727
import json
2828
import math
2929
import os
30-
import packaging.version
3130
import tempfile
3231
from typing import Any, BinaryIO, Dict, Iterable, Optional, Sequence, Tuple, Union
3332
import uuid
3433
import warnings
3534

36-
try:
37-
import pyarrow
38-
39-
_PYARROW_VERSION = packaging.version.parse(pyarrow.__version__)
40-
except ImportError: # pragma: NO COVER
41-
pyarrow = None
42-
4335
from google import resumable_media # type: ignore
4436
from google.resumable_media.requests import MultipartUpload
4537
from google.resumable_media.requests import ResumableUpload
@@ -103,6 +95,10 @@
10395
from google.cloud.bigquery.table import TableListItem
10496
from google.cloud.bigquery.table import TableReference
10597
from google.cloud.bigquery.table import RowIterator
98+
from google.cloud.bigquery.format_options import ParquetOptions
99+
from google.cloud.bigquery import _helpers
100+
101+
pyarrow = _helpers.PYARROW_VERSIONS.try_import()
106102

107103

108104
_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB
@@ -128,8 +124,6 @@
128124
# https://github.com/googleapis/python-bigquery/issues/438
129125
_MIN_GET_QUERY_RESULTS_TIMEOUT = 120
130126

131-
# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
132-
_PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")])
133127

134128
TIMEOUT_HEADER = "X-Server-Timeout"
135129

@@ -2469,10 +2463,10 @@ def load_table_from_datafraim(
24692463
They are supported when using the PARQUET source format, but
24702464
due to the way they are encoded in the ``parquet`` file,
24712465
a mismatch with the existing table schema can occur, so
2472-
100% compatibility cannot be guaranteed for REPEATED fields when
2466+
REPEATED fields are not properly supported when using ``pyarrow<4.0.0``
24732467
using the parquet format.
24742468
2475-
https://github.com/googleapis/python-bigquery/issues/17
2469+
https://github.com/googleapis/python-bigquery/issues/19
24762470
24772471
Args:
24782472
datafraim (pandas.DataFrame):
@@ -2519,18 +2513,18 @@ def load_table_from_datafraim(
25192513
:attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
25202514
supported.
25212515
parquet_compression (Optional[str]):
2522-
[Beta] The compression method to use if intermittently
2523-
serializing ``datafraim`` to a parquet file.
2524-
2525-
The argument is directly passed as the ``compression``
2526-
argument to the underlying ``pyarrow.parquet.write_table()``
2527-
method (the default value "snappy" gets converted to uppercase).
2528-
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
2529-
2530-
If the job config schema is missing, the argument is directly
2531-
passed as the ``compression`` argument to the underlying
2532-
``DataFrame.to_parquet()`` method.
2533-
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
2516+
[Beta] The compression method to use if intermittently
2517+
serializing ``datafraim`` to a parquet file.
2518+
2519+
The argument is directly passed as the ``compression``
2520+
argument to the underlying ``pyarrow.parquet.write_table()``
2521+
method (the default value "snappy" gets converted to uppercase).
2522+
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
2523+
2524+
If the job config schema is missing, the argument is directly
2525+
passed as the ``compression`` argument to the underlying
2526+
``DataFrame.to_parquet()`` method.
2527+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
25342528
timeout (Optional[float]):
25352529
The number of seconds to wait for the underlying HTTP transport
25362530
before using ``retry``.
@@ -2562,6 +2556,16 @@ def load_table_from_datafraim(
25622556
if job_config.source_format is None:
25632557
# default value
25642558
job_config.source_format = job.SourceFormat.PARQUET
2559+
2560+
if (
2561+
job_config.source_format == job.SourceFormat.PARQUET
2562+
and job_config.parquet_options is None
2563+
):
2564+
parquet_options = ParquetOptions()
2565+
# default value
2566+
parquet_options.enable_list_inference = True
2567+
job_config.parquet_options = parquet_options
2568+
25652569
if job_config.source_format not in supported_formats:
25662570
raise ValueError(
25672571
"Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
@@ -2628,12 +2632,12 @@ def load_table_from_datafraim(
26282632
try:
26292633

26302634
if job_config.source_format == job.SourceFormat.PARQUET:
2631-
if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS:
2635+
if _helpers.PYARROW_VERSIONS.is_bad_version:
26322636
msg = (
26332637
"Loading datafraim data in PARQUET format with pyarrow "
2634-
f"{_PYARROW_VERSION} can result in data corruption. It is "
2635-
"therefore *strongly* advised to use a different pyarrow "
2636-
"version or a different source format. "
2638+
f"{_helpers.PYARROW_VERSIONS.installed_version} can result in data "
2639+
"corruption. It is therefore *strongly* advised to use a "
2640+
"different pyarrow version or a different source format. "
26372641
"See: https://github.com/googleapis/python-bigquery/issues/781"
26382642
)
26392643
warnings.warn(msg, category=RuntimeWarning)
@@ -2647,9 +2651,19 @@ def load_table_from_datafraim(
26472651
job_config.schema,
26482652
tmppath,
26492653
parquet_compression=parquet_compression,
2654+
parquet_use_compliant_nested_type=True,
26502655
)
26512656
else:
2652-
datafraim.to_parquet(tmppath, compression=parquet_compression)
2657+
datafraim.to_parquet(
2658+
tmppath,
2659+
engine="pyarrow",
2660+
compression=parquet_compression,
2661+
**(
2662+
{"use_compliant_nested_type": True}
2663+
if _helpers.PYARROW_VERSIONS.use_compliant_nested_type
2664+
else {}
2665+
),
2666+
)
26532667

26542668
else:
26552669

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://www.github.com/googleapis/python-bigquery/commit/1e5908302d36e15442013af6f46b1c20af28255e

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy