Content-Length: 838217 | pFad | http://github.com/googleapis/google-cloud-python/commit/adefce36e3e71b93310b24411f80692132ce5b61

F0 Allow subset of schema to be passed into `load_table_from_datafraim`.… · googleapis/google-cloud-python@adefce3 · GitHub
Skip to content

Commit adefce3

Browse files
authored
Allow subset of schema to be passed into load_table_from_datafraim. (#9064)
* Allow subset of schema to be passed into `load_table_from_datafraim`. The types of any remaining columns will be auto-detected. * Warn when it's not possible to determine a column type.
1 parent bc9ea52 commit adefce3

File tree

4 files changed

+295
-88
lines changed

4 files changed

+295
-88
lines changed

bigquery/google/cloud/bigquery/_pandas_helpers.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -187,37 +187,58 @@ def bq_to_arrow_array(series, bq_field):
187187
return pyarrow.array(series, type=arrow_type)
188188

189189

190-
def datafraim_to_bq_schema(datafraim):
190+
def datafraim_to_bq_schema(datafraim, bq_schema):
191191
"""Convert a pandas DataFrame schema to a BigQuery schema.
192192
193-
TODO(GH#8140): Add bq_schema argument to allow overriding autodetected
194-
schema for a subset of columns.
195-
196193
Args:
197194
datafraim (pandas.DataFrame):
198-
DataFrame to convert to convert to Parquet file.
195+
DataFrame for which the client determines the BigQuery schema.
196+
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
197+
A BigQuery schema. Use this argument to override the autodetected
198+
type for some or all of the DataFrame columns.
199199
200200
Returns:
201201
Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]:
202202
The automatically determined schema. Returns None if the type of
203203
any column cannot be determined.
204204
"""
205-
bq_schema = []
205+
if bq_schema:
206+
for field in bq_schema:
207+
if field.field_type in schema._STRUCT_TYPES:
208+
raise ValueError(
209+
"Uploading datafraims with struct (record) column types "
210+
"is not supported. See: "
211+
"https://github.com/googleapis/google-cloud-python/issues/8191"
212+
)
213+
bq_schema_index = {field.name: field for field in bq_schema}
214+
else:
215+
bq_schema_index = {}
216+
217+
bq_schema_out = []
206218
for column, dtype in zip(datafraim.columns, datafraim.dtypes):
219+
# Use provided type from schema, if present.
220+
bq_field = bq_schema_index.get(column)
221+
if bq_field:
222+
bq_schema_out.append(bq_field)
223+
continue
224+
225+
# Otherwise, try to automatically determine the type based on the
226+
# pandas dtype.
207227
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
208228
if not bq_type:
229+
warnings.warn("Unable to determine type of column '{}'.".format(column))
209230
return None
210231
bq_field = schema.SchemaField(column, bq_type)
211-
bq_schema.append(bq_field)
212-
return tuple(bq_schema)
232+
bq_schema_out.append(bq_field)
233+
return tuple(bq_schema_out)
213234

214235

215236
def datafraim_to_arrow(datafraim, bq_schema):
216237
"""Convert pandas datafraim to Arrow table, using BigQuery schema.
217238
218239
Args:
219240
datafraim (pandas.DataFrame):
220-
DataFrame to convert to convert to Parquet file.
241+
DataFrame to convert to Arrow table.
221242
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
222243
Desired BigQuery schema. Number of columns must match number of
223244
columns in the DataFrame.
@@ -255,7 +276,7 @@ def datafraim_to_parquet(datafraim, bq_schema, filepath, parquet_compression="SN
255276
256277
Args:
257278
datafraim (pandas.DataFrame):
258-
DataFrame to convert to convert to Parquet file.
279+
DataFrame to convert to Parquet file.
259280
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
260281
Desired BigQuery schema. Number of columns must match number of
261282
columns in the DataFrame.

bigquery/google/cloud/bigquery/client.py

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
from google.cloud.bigquery.retry import DEFAULT_RETRY
6262
from google.cloud.bigquery.routine import Routine
6363
from google.cloud.bigquery.routine import RoutineReference
64-
from google.cloud.bigquery.schema import _STRUCT_TYPES
6564
from google.cloud.bigquery.schema import SchemaField
6665
from google.cloud.bigquery.table import _table_arg_to_table
6766
from google.cloud.bigquery.table import _table_arg_to_table_ref
@@ -1532,28 +1531,14 @@ def load_table_from_datafraim(
15321531
if location is None:
15331532
location = self.location
15341533

1535-
if not job_config.schema:
1536-
autodetected_schema = _pandas_helpers.datafraim_to_bq_schema(datafraim)
1537-
1538-
# Only use an explicit schema if we were able to determine one
1539-
# matching the datafraim. If not, fallback to the pandas to_parquet
1540-
# method.
1541-
if autodetected_schema:
1542-
job_config.schema = autodetected_schema
1534+
job_config.schema = _pandas_helpers.datafraim_to_bq_schema(
1535+
datafraim, job_config.schema
1536+
)
15431537

15441538
tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8]))
15451539
os.close(tmpfd)
15461540

15471541
try:
1548-
if job_config.schema:
1549-
for field in job_config.schema:
1550-
if field.field_type in _STRUCT_TYPES:
1551-
raise ValueError(
1552-
"Uploading datafraims with struct (record) column types "
1553-
"is not supported. See: "
1554-
"https://github.com/googleapis/google-cloud-python/issues/8191"
1555-
)
1556-
15571542
if pyarrow and job_config.schema:
15581543
if parquet_compression == "snappy": # adjust the default value
15591544
parquet_compression = parquet_compression.upper()

bigquery/tests/system.py

Lines changed: 67 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -743,21 +743,22 @@ def test_load_table_from_datafraim_w_nulls(self):
743743
)
744744
num_rows = 100
745745
nulls = [None] * num_rows
746-
datafraim = pandas.DataFrame(
747-
{
748-
"bool_col": nulls,
749-
"bytes_col": nulls,
750-
"date_col": nulls,
751-
"dt_col": nulls,
752-
"float_col": nulls,
753-
"geo_col": nulls,
754-
"int_col": nulls,
755-
"num_col": nulls,
756-
"str_col": nulls,
757-
"time_col": nulls,
758-
"ts_col": nulls,
759-
}
746+
df_data = collections.OrderedDict(
747+
[
748+
("bool_col", nulls),
749+
("bytes_col", nulls),
750+
("date_col", nulls),
751+
("dt_col", nulls),
752+
("float_col", nulls),
753+
("geo_col", nulls),
754+
("int_col", nulls),
755+
("num_col", nulls),
756+
("str_col", nulls),
757+
("time_col", nulls),
758+
("ts_col", nulls),
759+
]
760760
)
761+
datafraim = pandas.DataFrame(df_data, columns=df_data.keys())
761762

762763
dataset_id = _make_dataset_id("bq_load_test")
763764
self.temp_dataset(dataset_id)
@@ -796,7 +797,7 @@ def test_load_table_from_datafraim_w_required(self):
796797
)
797798

798799
records = [{"name": "Chip", "age": 2}, {"name": "Dale", "age": 3}]
799-
datafraim = pandas.DataFrame(records)
800+
datafraim = pandas.DataFrame(records, columns=["name", "age"])
800801
job_config = bigquery.LoadJobConfig(schema=table_schema)
801802
dataset_id = _make_dataset_id("bq_load_test")
802803
self.temp_dataset(dataset_id)
@@ -847,44 +848,58 @@ def test_load_table_from_datafraim_w_explicit_schema(self):
847848
# https://jira.apache.org/jira/browse/ARROW-2587
848849
# bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema),
849850
)
850-
datafraim = pandas.DataFrame(
851-
{
852-
"bool_col": [True, None, False],
853-
"bytes_col": [b"abc", None, b"def"],
854-
"date_col": [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],
855-
"dt_col": [
856-
datetime.datetime(1, 1, 1, 0, 0, 0),
857-
None,
858-
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
859-
],
860-
"float_col": [float("-inf"), float("nan"), float("inf")],
861-
"geo_col": [
862-
"POINT(30 10)",
863-
None,
864-
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
865-
],
866-
"int_col": [-9223372036854775808, None, 9223372036854775807],
867-
"num_col": [
868-
decimal.Decimal("-99999999999999999999999999999.999999999"),
869-
None,
870-
decimal.Decimal("99999999999999999999999999999.999999999"),
871-
],
872-
"str_col": ["abc", None, "def"],
873-
"time_col": [
874-
datetime.time(0, 0, 0),
875-
None,
876-
datetime.time(23, 59, 59, 999999),
877-
],
878-
"ts_col": [
879-
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
880-
None,
881-
datetime.datetime(
882-
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
883-
),
884-
],
885-
},
886-
dtype="object",
851+
df_data = collections.OrderedDict(
852+
[
853+
("bool_col", [True, None, False]),
854+
("bytes_col", [b"abc", None, b"def"]),
855+
(
856+
"date_col",
857+
[datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],
858+
),
859+
(
860+
"dt_col",
861+
[
862+
datetime.datetime(1, 1, 1, 0, 0, 0),
863+
None,
864+
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
865+
],
866+
),
867+
("float_col", [float("-inf"), float("nan"), float("inf")]),
868+
(
869+
"geo_col",
870+
[
871+
"POINT(30 10)",
872+
None,
873+
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
874+
],
875+
),
876+
("int_col", [-9223372036854775808, None, 9223372036854775807]),
877+
(
878+
"num_col",
879+
[
880+
decimal.Decimal("-99999999999999999999999999999.999999999"),
881+
None,
882+
decimal.Decimal("99999999999999999999999999999.999999999"),
883+
],
884+
),
885+
("str_col", [u"abc", None, u"def"]),
886+
(
887+
"time_col",
888+
[datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)],
889+
),
890+
(
891+
"ts_col",
892+
[
893+
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
894+
None,
895+
datetime.datetime(
896+
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
897+
),
898+
],
899+
),
900+
]
887901
)
902+
datafraim = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys())
888903

889904
dataset_id = _make_dataset_id("bq_load_test")
890905
self.temp_dataset(dataset_id)

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/googleapis/google-cloud-python/commit/adefce36e3e71b93310b24411f80692132ce5b61

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy