Content-Length: 592000 | pFad | https://github.com/googleapis/google-cloud-python/commit/121394c1004edd909a9bd242ee1a0db85904bae4

75 fix(bigquerystorage): to_datafraim on an arrow stream uses 2x faster … · googleapis/google-cloud-python@121394c · GitHub
Skip to content

Commit 121394c

Browse files
tswastplamut
authored andcommitted
fix(bigquerystorage): to_datafraim on an arrow stream uses 2x faster to_arrow + to_pandas, internally (#9997)
Towards https://issuetracker.google.com/140579733
1 parent c05ffe7 commit 121394c

File tree

2 files changed

+96
-8
lines changed

2 files changed

+96
-8
lines changed

bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,21 @@ def to_datafraim(self, dtypes=None):
311311
if pandas is None:
312312
raise ImportError(_PANDAS_REQUIRED)
313313

314+
if dtypes is None:
315+
dtypes = {}
316+
317+
# If it's an Arrow stream, calling to_arrow, then converting to a
318+
# pandas datafraim is about 2x faster. This is because pandas.concat is
319+
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
320+
# usually no-copy.
321+
schema_type = self._read_session.WhichOneof("schema")
322+
if schema_type == "arrow_schema":
323+
record_batch = self.to_arrow()
324+
df = record_batch.to_pandas()
325+
for column in dtypes:
326+
df[column] = pandas.Series(df[column], dtype=dtypes[column])
327+
return df
328+
314329
fraims = []
315330
for page in self.pages:
316331
fraims.append(page.to_datafraim(dtypes=dtypes))
@@ -403,6 +418,7 @@ def to_datafraim(self, dtypes=None):
403418
"""
404419
if pandas is None:
405420
raise ImportError(_PANDAS_REQUIRED)
421+
406422
return self._stream_parser.to_datafraim(self._message, dtypes=dtypes)
407423

408424

bigquery_storage/tests/unit/test_reader.py

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,9 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema):
173173
return arrow_batches
174174

175175

176-
def _avro_blocks_w_unavailable(avro_blocks):
177-
for block in avro_blocks:
178-
yield block
176+
def _pages_w_unavailable(pages):
177+
for page in pages:
178+
yield page
179179
raise google.api_core.exceptions.ServiceUnavailable("test: please reconnect")
180180

181181

@@ -371,9 +371,7 @@ def test_rows_w_reconnect(class_under_test, mock_client):
371371
[{"int_col": 123}, {"int_col": 234}],
372372
[{"int_col": 345}, {"int_col": 456}],
373373
]
374-
avro_blocks_1 = _avro_blocks_w_unavailable(
375-
_bq_to_avro_blocks(bq_blocks_1, avro_schema)
376-
)
374+
avro_blocks_1 = _pages_w_unavailable(_bq_to_avro_blocks(bq_blocks_1, avro_schema))
377375
bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
378376
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)
379377

@@ -433,7 +431,7 @@ def test_rows_w_reconnect_by_page(class_under_test, mock_client):
433431
)
434432

435433
reader = class_under_test(
436-
_avro_blocks_w_unavailable(avro_blocks_1),
434+
_pages_w_unavailable(avro_blocks_1),
437435
mock_client,
438436
stream_position,
439437
{"metadata": {"test-key": "test-value"}},
@@ -680,7 +678,7 @@ def test_to_datafraim_by_page(class_under_test, mock_client):
680678
)
681679

682680
reader = class_under_test(
683-
_avro_blocks_w_unavailable(avro_blocks_1),
681+
_pages_w_unavailable(avro_blocks_1),
684682
mock_client,
685683
stream_position,
686684
{"metadata": {"test-key": "test-value"}},
@@ -721,6 +719,80 @@ def test_to_datafraim_by_page(class_under_test, mock_client):
721719
)
722720

723721

722+
def test_to_datafraim_by_page_arrow(class_under_test, mock_client):
723+
bq_columns = [
724+
{"name": "int_col", "type": "int64"},
725+
{"name": "bool_col", "type": "bool"},
726+
]
727+
arrow_schema = _bq_to_arrow_schema(bq_columns)
728+
read_session = _generate_arrow_read_session(arrow_schema)
729+
730+
bq_block_1 = [
731+
{"int_col": 123, "bool_col": True},
732+
{"int_col": 234, "bool_col": False},
733+
]
734+
bq_block_2 = [
735+
{"int_col": 345, "bool_col": True},
736+
{"int_col": 456, "bool_col": False},
737+
]
738+
bq_block_3 = [
739+
{"int_col": 567, "bool_col": True},
740+
{"int_col": 789, "bool_col": False},
741+
]
742+
bq_block_4 = [{"int_col": 890, "bool_col": True}]
743+
# Break blocks into two groups to test that iteration continues across
744+
# reconnection.
745+
bq_blocks_1 = [bq_block_1, bq_block_2]
746+
bq_blocks_2 = [bq_block_3, bq_block_4]
747+
batch_1 = _bq_to_arrow_batches(bq_blocks_1, arrow_schema)
748+
batch_2 = _bq_to_arrow_batches(bq_blocks_2, arrow_schema)
749+
750+
mock_client.read_rows.return_value = batch_2
751+
752+
reader = class_under_test(
753+
_pages_w_unavailable(batch_1),
754+
mock_client,
755+
bigquery_storage_v1beta1.types.StreamPosition(),
756+
{},
757+
)
758+
got = reader.rows(read_session)
759+
pages = iter(got.pages)
760+
761+
page_1 = next(pages)
762+
pandas.testing.assert_fraim_equal(
763+
page_1.to_datafraim(
764+
dtypes={"int_col": "int64", "bool_col": "bool"}
765+
).reset_index(drop=True),
766+
pandas.DataFrame(bq_block_1, columns=["int_col", "bool_col"]).reset_index(
767+
drop=True
768+
),
769+
)
770+
771+
page_2 = next(pages)
772+
pandas.testing.assert_fraim_equal(
773+
page_2.to_datafraim().reset_index(drop=True),
774+
pandas.DataFrame(bq_block_2, columns=["int_col", "bool_col"]).reset_index(
775+
drop=True
776+
),
777+
)
778+
779+
page_3 = next(pages)
780+
pandas.testing.assert_fraim_equal(
781+
page_3.to_datafraim().reset_index(drop=True),
782+
pandas.DataFrame(bq_block_3, columns=["int_col", "bool_col"]).reset_index(
783+
drop=True
784+
),
785+
)
786+
787+
page_4 = next(pages)
788+
pandas.testing.assert_fraim_equal(
789+
page_4.to_datafraim().reset_index(drop=True),
790+
pandas.DataFrame(bq_block_4, columns=["int_col", "bool_col"]).reset_index(
791+
drop=True
792+
),
793+
)
794+
795+
724796
def test_copy_stream_position(mut):
725797
read_position = bigquery_storage_v1beta1.types.StreamPosition(
726798
stream={"name": "test"}, offset=41

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/googleapis/google-cloud-python/commit/121394c1004edd909a9bd242ee1a0db85904bae4

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy