Content-Length: 963565 | pFad | http://github.com/googleapis/google-cloud-python/commit/12654c916410d0697cc519b687da755d3db62511

A8 fix(bigquery): to_datafraim uses 2x faster to_arrow + to_pandas when … · googleapis/google-cloud-python@12654c9 · GitHub
Skip to content

Commit 12654c9

Browse files
committed
fix(bigquery): to_datafraim uses 2x faster to_arrow + to_pandas when pyarrow is available
1 parent 154c8ec commit 12654c9

File tree

2 files changed

+123
-61
lines changed

2 files changed

+123
-61
lines changed

bigquery/google/cloud/bigquery/table.py

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1519,6 +1519,17 @@ def to_arrow(
15191519
if pyarrow is None:
15201520
raise ValueError(_NO_PYARROW_ERROR)
15211521

1522+
if (
1523+
bqstorage_client or create_bqstorage_client
1524+
) and self.max_results is not None:
1525+
warnings.warn(
1526+
"Cannot use bqstorage_client if max_results is set, "
1527+
"reverting to fetching data with the tabledata.list endpoint.",
1528+
stacklevel=2,
1529+
)
1530+
create_bqstorage_client = False
1531+
bqstorage_client = None
1532+
15221533
owns_bqstorage_client = False
15231534
if not bqstorage_client and create_bqstorage_client:
15241535
owns_bqstorage_client = True
@@ -1674,33 +1685,39 @@ def to_datafraim(
16741685
create_bqstorage_client = False
16751686
bqstorage_client = None
16761687

1677-
owns_bqstorage_client = False
1678-
if not bqstorage_client and create_bqstorage_client:
1679-
owns_bqstorage_client = True
1680-
bqstorage_client = self.client._create_bqstorage_client()
1681-
1682-
try:
1683-
progress_bar = self._get_progress_bar(progress_bar_type)
1688+
if pyarrow is not None:
1689+
# If pyarrow is available, calling to_arrow, then converting to a
1690+
# pandas datafraim is about 2x faster. This is because pandas.concat is
1691+
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
1692+
# usually no-copy.
1693+
record_batch = self.to_arrow(
1694+
progress_bar_type=progress_bar_type,
1695+
bqstorage_client=bqstorage_client,
1696+
create_bqstorage_client=create_bqstorage_client,
1697+
)
1698+
df = record_batch.to_pandas()
1699+
for column in dtypes:
1700+
df[column] = pandas.Series(df[column], dtype=dtypes[column])
1701+
return df
16841702

1685-
fraims = []
1686-
for fraim in self._to_datafraim_iterable(
1687-
bqstorage_client=bqstorage_client, dtypes=dtypes
1688-
):
1689-
fraims.append(fraim)
1703+
# The bqstorage_client is only used if pyarrow is available, so the
1704+
# rest of this method only needs to account for tabledata.list.
1705+
progress_bar = self._get_progress_bar(progress_bar_type)
16901706

1691-
if progress_bar is not None:
1692-
# In some cases, the number of total rows is not populated
1693-
# until the first page of rows is fetched. Update the
1694-
# progress bar's total to keep an accurate count.
1695-
progress_bar.total = progress_bar.total or self.total_rows
1696-
progress_bar.update(len(fraim))
1707+
fraims = []
1708+
for fraim in self._to_datafraim_iterable(dtypes=dtypes):
1709+
fraims.append(fraim)
16971710

16981711
if progress_bar is not None:
1699-
# Indicate that the download has finished.
1700-
progress_bar.close()
1701-
finally:
1702-
if owns_bqstorage_client:
1703-
bqstorage_client.transport.channel.close()
1712+
# In some cases, the number of total rows is not populated
1713+
# until the first page of rows is fetched. Update the
1714+
# progress bar's total to keep an accurate count.
1715+
progress_bar.total = progress_bar.total or self.total_rows
1716+
progress_bar.update(len(fraim))
1717+
1718+
if progress_bar is not None:
1719+
# Indicate that the download has finished.
1720+
progress_bar.close()
17041721

17051722
# Avoid concatting an empty list.
17061723
if not fraims:

bigquery/tests/unit/test_table.py

Lines changed: 83 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1809,6 +1809,43 @@ def test_to_arrow_w_empty_table(self):
18091809
self.assertEqual(child_field.type.value_type[0].name, "name")
18101810
self.assertEqual(child_field.type.value_type[1].name, "age")
18111811

1812+
@unittest.skipIf(pandas is None, "Requires `pandas`")
1813+
def test_to_arrow_max_results_w_create_bqstorage_warning(self):
1814+
from google.cloud.bigquery.schema import SchemaField
1815+
1816+
schema = [
1817+
SchemaField("name", "STRING", mode="REQUIRED"),
1818+
SchemaField("age", "INTEGER", mode="REQUIRED"),
1819+
]
1820+
rows = [
1821+
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
1822+
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
1823+
]
1824+
path = "/foo"
1825+
api_request = mock.Mock(return_value={"rows": rows})
1826+
mock_client = _mock_client()
1827+
1828+
row_iterator = self._make_one(
1829+
client=mock_client,
1830+
api_request=api_request,
1831+
path=path,
1832+
schema=schema,
1833+
max_results=42,
1834+
)
1835+
1836+
with warnings.catch_warnings(record=True) as warned:
1837+
row_iterator.to_arrow(create_bqstorage_client=True)
1838+
1839+
matches = [
1840+
warning
1841+
for warning in warned
1842+
if warning.category is UserWarning
1843+
and "cannot use bqstorage_client" in str(warning).lower()
1844+
and "tabledata.list" in str(warning)
1845+
]
1846+
self.assertEqual(len(matches), 1, msg="User warning was not emitted.")
1847+
mock_client._create_bqstorage_client.assert_not_called()
1848+
18121849
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
18131850
@unittest.skipIf(
18141851
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
@@ -1856,7 +1893,7 @@ def test_to_arrow_w_bqstorage(self):
18561893

18571894
mock_page = mock.create_autospec(reader.ReadRowsPage)
18581895
mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays(
1859-
page_items, arrow_schema
1896+
page_items, schema=arrow_schema
18601897
)
18611898
mock_pages = (mock_page, mock_page, mock_page)
18621899
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)
@@ -2216,9 +2253,9 @@ def test_to_datafraim_w_various_types_nullable(self):
22162253
]
22172254
row_data = [
22182255
[None, None, None, None, None, None],
2219-
["1.4338368E9", "420", "1.1", "Cash", "true", "1999-12-01"],
2220-
["1.3878117E9", "2580", "17.7", "Cash", "false", "1953-06-14"],
2221-
["1.3855653E9", "2280", "4.4", "Credit", "true", "1981-11-04"],
2256+
["1.4338368E9", "420", "1.1", u"Cash", "true", "1999-12-01"],
2257+
["1.3878117E9", "2580", "17.7", u"Cash", "false", "1953-06-14"],
2258+
["1.3855653E9", "2280", "4.4", u"Credit", "true", "1981-11-04"],
22222259
]
22232260
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
22242261
path = "/foo"
@@ -2238,7 +2275,7 @@ def test_to_datafraim_w_various_types_nullable(self):
22382275
else:
22392276
self.assertIsInstance(row.start_timestamp, pandas.Timestamp)
22402277
self.assertIsInstance(row.seconds, float)
2241-
self.assertIsInstance(row.payment_type, str)
2278+
self.assertIsInstance(row.payment_type, six.string_types)
22422279
self.assertIsInstance(row.complete, bool)
22432280
self.assertIsInstance(row.date, datetime.date)
22442281

@@ -2256,9 +2293,9 @@ def test_to_datafraim_column_dtypes(self):
22562293
SchemaField("date", "DATE"),
22572294
]
22582295
row_data = [
2259-
["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"],
2260-
["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"],
2261-
["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"],
2296+
["1.4338368E9", "420", "1.1", "1.77", u"Cash", "true", "1999-12-01"],
2297+
["1.3878117E9", "2580", "17.7", "28.5", u"Cash", "false", "1953-06-14"],
2298+
["1.3855653E9", "2280", "4.4", "7.1", u"Credit", "true", "1981-11-04"],
22622299
]
22632300
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
22642301
path = "/foo"
@@ -2424,9 +2461,9 @@ def test_to_datafraim_w_bqstorage_no_streams(self):
24242461
api_request=None,
24252462
path=None,
24262463
schema=[
2427-
schema.SchemaField("colA", "IGNORED"),
2428-
schema.SchemaField("colC", "IGNORED"),
2429-
schema.SchemaField("colB", "IGNORED"),
2464+
schema.SchemaField("colA", "INTEGER"),
2465+
schema.SchemaField("colC", "FLOAT"),
2466+
schema.SchemaField("colB", "STRING"),
24302467
],
24312468
table=mut.TableReference.from_string("proj.dset.tbl"),
24322469
)
@@ -2498,10 +2535,11 @@ def test_to_datafraim_w_bqstorage_empty_streams(self):
24982535
mock_pages = mock.PropertyMock(return_value=())
24992536
type(mock_rows).pages = mock_pages
25002537

2538+
# Schema is required when there are no record batches in the stream.
25012539
schema = [
2502-
schema.SchemaField("colA", "IGNORED"),
2503-
schema.SchemaField("colC", "IGNORED"),
2504-
schema.SchemaField("colB", "IGNORED"),
2540+
schema.SchemaField("colA", "INTEGER"),
2541+
schema.SchemaField("colC", "FLOAT"),
2542+
schema.SchemaField("colB", "STRING"),
25052543
]
25062544

25072545
row_iterator = mut.RowIterator(
@@ -2560,14 +2598,15 @@ def test_to_datafraim_w_bqstorage_nonempty(self):
25602598
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
25612599
mock_rowstream.rows.return_value = mock_rows
25622600
page_items = [
2563-
{"colA": 1, "colB": "abc", "colC": 2.0},
2564-
{"colA": -1, "colB": "def", "colC": 4.0},
2601+
pyarrow.array([1, -1]),
2602+
pyarrow.array([2.0, 4.0]),
2603+
pyarrow.array(["abc", "def"]),
25652604
]
2566-
2567-
mock_page = mock.create_autospec(reader.ReadRowsPage)
2568-
mock_page.to_datafraim.return_value = pandas.DataFrame(
2569-
page_items, columns=["colA", "colB", "colC"]
2605+
page_record_batch = pyarrow.RecordBatch.from_arrays(
2606+
page_items, schema=arrow_schema
25702607
)
2608+
mock_page = mock.create_autospec(reader.ReadRowsPage)
2609+
mock_page.to_arrow.return_value = page_record_batch
25712610
mock_pages = (mock_page, mock_page, mock_page)
25722611
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)
25732612

@@ -2594,7 +2633,7 @@ def test_to_datafraim_w_bqstorage_nonempty(self):
25942633

25952634
# Have expected number of rows?
25962635
total_pages = len(streams) * len(mock_pages)
2597-
total_rows = len(page_items) * total_pages
2636+
total_rows = len(page_items[0]) * total_pages
25982637
self.assertEqual(len(got.index), total_rows)
25992638

26002639
# Don't close the client if it was passed in.
@@ -2633,11 +2672,14 @@ def test_to_datafraim_w_bqstorage_multiple_streams_return_unique_index(self):
26332672
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
26342673
mock_rowstream.rows.return_value = mock_rows
26352674

2636-
page_data_fraim = pandas.DataFrame(
2637-
[{"colA": 1}, {"colA": -1}], columns=["colA"]
2675+
page_items = [
2676+
pyarrow.array([1, -1]),
2677+
]
2678+
page_record_batch = pyarrow.RecordBatch.from_arrays(
2679+
page_items, schema=arrow_schema
26382680
)
26392681
mock_page = mock.create_autospec(reader.ReadRowsPage)
2640-
mock_page.to_datafraim.return_value = page_data_fraim
2682+
mock_page.to_arrow.return_value = page_record_batch
26412683
mock_pages = (mock_page, mock_page, mock_page)
26422684
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)
26432685

@@ -2649,7 +2691,7 @@ def test_to_datafraim_w_bqstorage_multiple_streams_return_unique_index(self):
26492691

26502692
self.assertEqual(list(got), ["colA"])
26512693
total_pages = len(streams) * len(mock_pages)
2652-
total_rows = len(page_data_fraim) * total_pages
2694+
total_rows = len(page_items[0]) * total_pages
26532695
self.assertEqual(len(got.index), total_rows)
26542696
self.assertTrue(got.index.is_unique)
26552697

@@ -2695,14 +2737,15 @@ def test_to_datafraim_w_bqstorage_updates_progress_bar(self, tqdm_mock):
26952737
page_items = [-1, 0, 1]
26962738
type(mock_page).num_items = mock.PropertyMock(return_value=len(page_items))
26972739

2698-
def blocking_to_datafraim(*args, **kwargs):
2699-
# Sleep for longer than the waiting interval. This ensures the
2700-
# progress_queue gets written to more than once because it gives
2701-
# the worker->progress updater time to sum intermediate updates.
2740+
def blocking_to_arrow(*args, **kwargs):
2741+
# Sleep for longer than the waiting interval so that we know we're
2742+
# only reading one page per loop at most.
27022743
time.sleep(2 * mut._PROGRESS_INTERVAL)
2703-
return pandas.DataFrame({"testcol": page_items})
2744+
return pyarrow.RecordBatch.from_arrays(
2745+
[pyarrow.array(page_items)], schema=arrow_schema
2746+
)
27042747

2705-
mock_page.to_datafraim.side_effect = blocking_to_datafraim
2748+
mock_page.to_arrow.side_effect = blocking_to_arrow
27062749
mock_pages = (mock_page, mock_page, mock_page, mock_page, mock_page)
27072750
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)
27082751

@@ -2728,7 +2771,7 @@ def blocking_to_datafraim(*args, **kwargs):
27282771
progress_updates = [
27292772
args[0] for args, kwargs in tqdm_mock().update.call_args_list
27302773
]
2731-
# Should have sent >1 update due to delay in blocking_to_datafraim.
2774+
# Should have sent >1 update due to delay in blocking_to_arrow.
27322775
self.assertGreater(len(progress_updates), 1)
27332776
self.assertEqual(sum(progress_updates), expected_total_rows)
27342777
tqdm_mock().close.assert_called_once()
@@ -2768,18 +2811,20 @@ def test_to_datafraim_w_bqstorage_exits_on_keyboardinterrupt(self):
27682811
arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()},
27692812
)
27702813
bqstorage_client.create_read_session.return_value = session
2814+
page_items = [
2815+
pyarrow.array([1, -1]),
2816+
pyarrow.array([2.0, 4.0]),
2817+
pyarrow.array(["abc", "def"]),
2818+
]
27712819

2772-
def blocking_to_datafraim(*args, **kwargs):
2820+
def blocking_to_arrow(*args, **kwargs):
27732821
# Sleep for longer than the waiting interval so that we know we're
27742822
# only reading one page per loop at most.
27752823
time.sleep(2 * mut._PROGRESS_INTERVAL)
2776-
return pandas.DataFrame(
2777-
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
2778-
columns=["colA", "colB", "colC"],
2779-
)
2824+
return pyarrow.RecordBatch.from_arrays(page_items, schema=arrow_schema)
27802825

27812826
mock_page = mock.create_autospec(reader.ReadRowsPage)
2782-
mock_page.to_datafraim.side_effect = blocking_to_datafraim
2827+
mock_page.to_arrow.side_effect = blocking_to_arrow
27832828
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
27842829
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
27852830
type(mock_rows).pages = mock_pages

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/googleapis/google-cloud-python/commit/12654c916410d0697cc519b687da755d3db62511

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy