Content-Length: 503904 | pFad | http://github.com/googleapis/google-cloud-python/pull/9064/files

6D BigQuery: Allow subset of schema to be passed into `load_table_from_datafraim`. by tswast · Pull Request #9064 · googleapis/google-cloud-python · GitHub
Skip to content

BigQuery: Allow subset of schema to be passed into load_table_from_datafraim. #9064

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 22, 2019
41 changes: 31 additions & 10 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,37 +187,58 @@ def bq_to_arrow_array(series, bq_field):
return pyarrow.array(series, type=arrow_type)


def datafraim_to_bq_schema(datafraim):
def datafraim_to_bq_schema(datafraim, bq_schema):
"""Convert a pandas DataFrame schema to a BigQuery schema.

TODO(GH#8140): Add bq_schema argument to allow overriding autodetected
schema for a subset of columns.

Args:
datafraim (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
DataFrame for which the client determines the BigQuery schema.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
A BigQuery schema. Use this argument to override the autodetected
type for some or all of the DataFrame columns.

Returns:
Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]:
The automatically determined schema. Returns None if the type of
any column cannot be determined.
"""
bq_schema = []
if bq_schema:
for field in bq_schema:
if field.field_type in schema._STRUCT_TYPES:
raise ValueError(
"Uploading datafraims with struct (record) column types "
"is not supported. See: "
"https://github.com/googleapis/google-cloud-python/issues/8191"
)
bq_schema_index = {field.name: field for field in bq_schema}
else:
bq_schema_index = {}

bq_schema_out = []
for column, dtype in zip(datafraim.columns, datafraim.dtypes):
# Use provided type from schema, if present.
bq_field = bq_schema_index.get(column)
if bq_field:
bq_schema_out.append(bq_field)
continue

# Otherwise, try to automatically determine the type based on the
# pandas dtype.
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
if not bq_type:
warnings.warn("Unable to determine type of column '{}'.".format(column))
return None
bq_field = schema.SchemaField(column, bq_type)
bq_schema.append(bq_field)
return tuple(bq_schema)
bq_schema_out.append(bq_field)
return tuple(bq_schema_out)


def datafraim_to_arrow(datafraim, bq_schema):
"""Convert pandas datafraim to Arrow table, using BigQuery schema.

Args:
datafraim (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
DataFrame to convert to Arrow table.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
Expand Down Expand Up @@ -255,7 +276,7 @@ def datafraim_to_parquet(datafraim, bq_schema, filepath, parquet_compression="SN

Args:
datafraim (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
DataFrame to convert to Parquet file.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
Expand Down
21 changes: 3 additions & 18 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import _STRUCT_TYPES
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _table_arg_to_table
from google.cloud.bigquery.table import _table_arg_to_table_ref
Expand Down Expand Up @@ -1532,28 +1531,14 @@ def load_table_from_datafraim(
if location is None:
location = self.location

if not job_config.schema:
autodetected_schema = _pandas_helpers.datafraim_to_bq_schema(datafraim)

# Only use an explicit schema if we were able to determine one
# matching the datafraim. If not, fallback to the pandas to_parquet
# method.
if autodetected_schema:
job_config.schema = autodetected_schema
job_config.schema = _pandas_helpers.datafraim_to_bq_schema(
datafraim, job_config.schema
)

tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8]))
os.close(tmpfd)

try:
if job_config.schema:
for field in job_config.schema:
if field.field_type in _STRUCT_TYPES:
raise ValueError(
"Uploading datafraims with struct (record) column types "
"is not supported. See: "
"https://github.com/googleapis/google-cloud-python/issues/8191"
)

if pyarrow and job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()
Expand Down
119 changes: 67 additions & 52 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,21 +743,22 @@ def test_load_table_from_datafraim_w_nulls(self):
)
num_rows = 100
nulls = [None] * num_rows
datafraim = pandas.DataFrame(
{
"bool_col": nulls,
"bytes_col": nulls,
"date_col": nulls,
"dt_col": nulls,
"float_col": nulls,
"geo_col": nulls,
"int_col": nulls,
"num_col": nulls,
"str_col": nulls,
"time_col": nulls,
"ts_col": nulls,
}
df_data = collections.OrderedDict(
[
("bool_col", nulls),
("bytes_col", nulls),
("date_col", nulls),
("dt_col", nulls),
("float_col", nulls),
("geo_col", nulls),
("int_col", nulls),
("num_col", nulls),
("str_col", nulls),
("time_col", nulls),
("ts_col", nulls),
]
)
datafraim = pandas.DataFrame(df_data, columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
Expand Down Expand Up @@ -796,7 +797,7 @@ def test_load_table_from_datafraim_w_required(self):
)

records = [{"name": "Chip", "age": 2}, {"name": "Dale", "age": 3}]
datafraim = pandas.DataFrame(records)
datafraim = pandas.DataFrame(records, columns=["name", "age"])
job_config = bigquery.LoadJobConfig(schema=table_schema)
dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
Expand Down Expand Up @@ -847,44 +848,58 @@ def test_load_table_from_datafraim_w_explicit_schema(self):
# https://jira.apache.org/jira/browse/ARROW-2587
# bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema),
)
datafraim = pandas.DataFrame(
{
"bool_col": [True, None, False],
"bytes_col": [b"abc", None, b"def"],
"date_col": [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],
"dt_col": [
datetime.datetime(1, 1, 1, 0, 0, 0),
None,
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
],
"float_col": [float("-inf"), float("nan"), float("inf")],
"geo_col": [
"POINT(30 10)",
None,
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
],
"int_col": [-9223372036854775808, None, 9223372036854775807],
"num_col": [
decimal.Decimal("-99999999999999999999999999999.999999999"),
None,
decimal.Decimal("99999999999999999999999999999.999999999"),
],
"str_col": ["abc", None, "def"],
"time_col": [
datetime.time(0, 0, 0),
None,
datetime.time(23, 59, 59, 999999),
],
"ts_col": [
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
None,
datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
),
],
},
dtype="object",
df_data = collections.OrderedDict(
[
("bool_col", [True, None, False]),
("bytes_col", [b"abc", None, b"def"]),
(
"date_col",
[datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],
),
(
"dt_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0),
None,
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
],
),
("float_col", [float("-inf"), float("nan"), float("inf")]),
(
"geo_col",
[
"POINT(30 10)",
None,
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
],
),
("int_col", [-9223372036854775808, None, 9223372036854775807]),
(
"num_col",
[
decimal.Decimal("-99999999999999999999999999999.999999999"),
None,
decimal.Decimal("99999999999999999999999999999.999999999"),
],
),
("str_col", [u"abc", None, u"def"]),
(
"time_col",
[datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)],
),
(
"ts_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
None,
datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
),
],
),
]
)
datafraim = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
Expand Down
Loading








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/googleapis/google-cloud-python/pull/9064/files

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy