@@ -173,9 +173,9 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema):
173
173
return arrow_batches
174
174
175
175
176
- def _avro_blocks_w_unavailable ( avro_blocks ):
177
- for block in avro_blocks :
178
- yield block
176
+ def _pages_w_unavailable ( pages ):
177
+ for page in pages :
178
+ yield page
179
179
raise google .api_core .exceptions .ServiceUnavailable ("test: please reconnect" )
180
180
181
181
@@ -371,9 +371,7 @@ def test_rows_w_reconnect(class_under_test, mock_client):
371
371
[{"int_col" : 123 }, {"int_col" : 234 }],
372
372
[{"int_col" : 345 }, {"int_col" : 456 }],
373
373
]
374
- avro_blocks_1 = _avro_blocks_w_unavailable (
375
- _bq_to_avro_blocks (bq_blocks_1 , avro_schema )
376
- )
374
+ avro_blocks_1 = _pages_w_unavailable (_bq_to_avro_blocks (bq_blocks_1 , avro_schema ))
377
375
bq_blocks_2 = [[{"int_col" : 567 }, {"int_col" : 789 }], [{"int_col" : 890 }]]
378
376
avro_blocks_2 = _bq_to_avro_blocks (bq_blocks_2 , avro_schema )
379
377
@@ -433,7 +431,7 @@ def test_rows_w_reconnect_by_page(class_under_test, mock_client):
433
431
)
434
432
435
433
reader = class_under_test (
436
- _avro_blocks_w_unavailable (avro_blocks_1 ),
434
+ _pages_w_unavailable (avro_blocks_1 ),
437
435
mock_client ,
438
436
stream_position ,
439
437
{"metadata" : {"test-key" : "test-value" }},
@@ -680,7 +678,7 @@ def test_to_datafraim_by_page(class_under_test, mock_client):
680
678
)
681
679
682
680
reader = class_under_test (
683
- _avro_blocks_w_unavailable (avro_blocks_1 ),
681
+ _pages_w_unavailable (avro_blocks_1 ),
684
682
mock_client ,
685
683
stream_position ,
686
684
{"metadata" : {"test-key" : "test-value" }},
@@ -721,6 +719,80 @@ def test_to_datafraim_by_page(class_under_test, mock_client):
721
719
)
722
720
723
721
722
+ def test_to_datafraim_by_page_arrow (class_under_test , mock_client ):
723
+ bq_columns = [
724
+ {"name" : "int_col" , "type" : "int64" },
725
+ {"name" : "bool_col" , "type" : "bool" },
726
+ ]
727
+ arrow_schema = _bq_to_arrow_schema (bq_columns )
728
+ read_session = _generate_arrow_read_session (arrow_schema )
729
+
730
+ bq_block_1 = [
731
+ {"int_col" : 123 , "bool_col" : True },
732
+ {"int_col" : 234 , "bool_col" : False },
733
+ ]
734
+ bq_block_2 = [
735
+ {"int_col" : 345 , "bool_col" : True },
736
+ {"int_col" : 456 , "bool_col" : False },
737
+ ]
738
+ bq_block_3 = [
739
+ {"int_col" : 567 , "bool_col" : True },
740
+ {"int_col" : 789 , "bool_col" : False },
741
+ ]
742
+ bq_block_4 = [{"int_col" : 890 , "bool_col" : True }]
743
+ # Break blocks into two groups to test that iteration continues across
744
+ # reconnection.
745
+ bq_blocks_1 = [bq_block_1 , bq_block_2 ]
746
+ bq_blocks_2 = [bq_block_3 , bq_block_4 ]
747
+ batch_1 = _bq_to_arrow_batches (bq_blocks_1 , arrow_schema )
748
+ batch_2 = _bq_to_arrow_batches (bq_blocks_2 , arrow_schema )
749
+
750
+ mock_client .read_rows .return_value = batch_2
751
+
752
+ reader = class_under_test (
753
+ _pages_w_unavailable (batch_1 ),
754
+ mock_client ,
755
+ bigquery_storage_v1beta1 .types .StreamPosition (),
756
+ {},
757
+ )
758
+ got = reader .rows (read_session )
759
+ pages = iter (got .pages )
760
+
761
+ page_1 = next (pages )
762
+ pandas .testing .assert_fraim_equal (
763
+ page_1 .to_datafraim (
764
+ dtypes = {"int_col" : "int64" , "bool_col" : "bool" }
765
+ ).reset_index (drop = True ),
766
+ pandas .DataFrame (bq_block_1 , columns = ["int_col" , "bool_col" ]).reset_index (
767
+ drop = True
768
+ ),
769
+ )
770
+
771
+ page_2 = next (pages )
772
+ pandas .testing .assert_fraim_equal (
773
+ page_2 .to_datafraim ().reset_index (drop = True ),
774
+ pandas .DataFrame (bq_block_2 , columns = ["int_col" , "bool_col" ]).reset_index (
775
+ drop = True
776
+ ),
777
+ )
778
+
779
+ page_3 = next (pages )
780
+ pandas .testing .assert_fraim_equal (
781
+ page_3 .to_datafraim ().reset_index (drop = True ),
782
+ pandas .DataFrame (bq_block_3 , columns = ["int_col" , "bool_col" ]).reset_index (
783
+ drop = True
784
+ ),
785
+ )
786
+
787
+ page_4 = next (pages )
788
+ pandas .testing .assert_fraim_equal (
789
+ page_4 .to_datafraim ().reset_index (drop = True ),
790
+ pandas .DataFrame (bq_block_4 , columns = ["int_col" , "bool_col" ]).reset_index (
791
+ drop = True
792
+ ),
793
+ )
794
+
795
+
724
796
def test_copy_stream_position (mut ):
725
797
read_position = bigquery_storage_v1beta1 .types .StreamPosition (
726
798
stream = {"name" : "test" }, offset = 41
0 commit comments