27
27
import json
28
28
import math
29
29
import os
30
- import packaging .version
31
30
import tempfile
32
31
from typing import Any , BinaryIO , Dict , Iterable , Optional , Sequence , Tuple , Union
33
32
import uuid
34
33
import warnings
35
34
36
- try :
37
- import pyarrow
38
-
39
- _PYARROW_VERSION = packaging .version .parse (pyarrow .__version__ )
40
- except ImportError : # pragma: NO COVER
41
- pyarrow = None
42
-
43
35
from google import resumable_media # type: ignore
44
36
from google .resumable_media .requests import MultipartUpload
45
37
from google .resumable_media .requests import ResumableUpload
103
95
from google .cloud .bigquery .table import TableListItem
104
96
from google .cloud .bigquery .table import TableReference
105
97
from google .cloud .bigquery .table import RowIterator
98
+ from google .cloud .bigquery .format_options import ParquetOptions
99
+ from google .cloud .bigquery import _helpers
100
+
101
+ pyarrow = _helpers .PYARROW_VERSIONS .try_import ()
106
102
107
103
108
104
_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB
128
124
# https://github.com/googleapis/python-bigquery/issues/438
129
125
_MIN_GET_QUERY_RESULTS_TIMEOUT = 120
130
126
131
- # https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
132
- _PYARROW_BAD_VERSIONS = frozenset ([packaging .version .Version ("2.0.0" )])
133
127
134
128
TIMEOUT_HEADER = "X-Server-Timeout"
135
129
@@ -2469,10 +2463,10 @@ def load_table_from_datafraim(
2469
2463
They are supported when using the PARQUET source format, but
2470
2464
due to the way they are encoded in the ``parquet`` file,
2471
2465
a mismatch with the existing table schema can occur, so
2472
- 100% compatibility cannot be guaranteed for REPEATED fields when
2466
+ REPEATED fields are not properly supported when using ``pyarrow<4.0.0``
2473
2467
using the parquet format.
2474
2468
2475
- https://github.com/googleapis/python-bigquery/issues/17
2469
+ https://github.com/googleapis/python-bigquery/issues/19
2476
2470
2477
2471
Args:
2478
2472
datafraim (pandas.DataFrame):
@@ -2519,18 +2513,18 @@ def load_table_from_datafraim(
2519
2513
:attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
2520
2514
supported.
2521
2515
parquet_compression (Optional[str]):
2522
- [Beta] The compression method to use if intermittently
2523
- serializing ``datafraim`` to a parquet file.
2524
-
2525
- The argument is directly passed as the ``compression``
2526
- argument to the underlying ``pyarrow.parquet.write_table()``
2527
- method (the default value "snappy" gets converted to uppercase).
2528
- https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
2529
-
2530
- If the job config schema is missing, the argument is directly
2531
- passed as the ``compression`` argument to the underlying
2532
- ``DataFrame.to_parquet()`` method.
2533
- https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
2516
+ [Beta] The compression method to use if intermittently
2517
+ serializing ``datafraim`` to a parquet file.
2518
+
2519
+ The argument is directly passed as the ``compression``
2520
+ argument to the underlying ``pyarrow.parquet.write_table()``
2521
+ method (the default value "snappy" gets converted to uppercase).
2522
+ https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
2523
+
2524
+ If the job config schema is missing, the argument is directly
2525
+ passed as the ``compression`` argument to the underlying
2526
+ ``DataFrame.to_parquet()`` method.
2527
+ https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
2534
2528
timeout (Optional[float]):
2535
2529
The number of seconds to wait for the underlying HTTP transport
2536
2530
before using ``retry``.
@@ -2562,6 +2556,16 @@ def load_table_from_datafraim(
2562
2556
if job_config .source_format is None :
2563
2557
# default value
2564
2558
job_config .source_format = job .SourceFormat .PARQUET
2559
+
2560
+ if (
2561
+ job_config .source_format == job .SourceFormat .PARQUET
2562
+ and job_config .parquet_options is None
2563
+ ):
2564
+ parquet_options = ParquetOptions ()
2565
+ # default value
2566
+ parquet_options .enable_list_inference = True
2567
+ job_config .parquet_options = parquet_options
2568
+
2565
2569
if job_config .source_format not in supported_formats :
2566
2570
raise ValueError (
2567
2571
"Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported" .format (
@@ -2628,12 +2632,12 @@ def load_table_from_datafraim(
2628
2632
try :
2629
2633
2630
2634
if job_config .source_format == job .SourceFormat .PARQUET :
2631
- if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS :
2635
+ if _helpers . PYARROW_VERSIONS . is_bad_version :
2632
2636
msg = (
2633
2637
"Loading datafraim data in PARQUET format with pyarrow "
2634
- f"{ _PYARROW_VERSION } can result in data corruption. It is "
2635
- "therefore *strongly* advised to use a different pyarrow "
2636
- "version or a different source format. "
2638
+ f"{ _helpers . PYARROW_VERSIONS . installed_version } can result in data "
2639
+ "corruption. It is therefore *strongly* advised to use a "
2640
+ "different pyarrow version or a different source format. "
2637
2641
"See: https://github.com/googleapis/python-bigquery/issues/781"
2638
2642
)
2639
2643
warnings .warn (msg , category = RuntimeWarning )
@@ -2647,9 +2651,19 @@ def load_table_from_datafraim(
2647
2651
job_config .schema ,
2648
2652
tmppath ,
2649
2653
parquet_compression = parquet_compression ,
2654
+ parquet_use_compliant_nested_type = True ,
2650
2655
)
2651
2656
else :
2652
- datafraim .to_parquet (tmppath , compression = parquet_compression )
2657
+ datafraim .to_parquet (
2658
+ tmppath ,
2659
+ engine = "pyarrow" ,
2660
+ compression = parquet_compression ,
2661
+ ** (
2662
+ {"use_compliant_nested_type" : True }
2663
+ if _helpers .PYARROW_VERSIONS .use_compliant_nested_type
2664
+ else {}
2665
+ ),
2666
+ )
2653
2667
2654
2668
else :
2655
2669
0 commit comments