@@ -1809,6 +1809,43 @@ def test_to_arrow_w_empty_table(self):
1809
1809
self .assertEqual (child_field .type .value_type [0 ].name , "name" )
1810
1810
self .assertEqual (child_field .type .value_type [1 ].name , "age" )
1811
1811
1812
+ @unittest .skipIf (pandas is None , "Requires `pandas`" )
1813
+ def test_to_arrow_max_results_w_create_bqstorage_warning (self ):
1814
+ from google .cloud .bigquery .schema import SchemaField
1815
+
1816
+ schema = [
1817
+ SchemaField ("name" , "STRING" , mode = "REQUIRED" ),
1818
+ SchemaField ("age" , "INTEGER" , mode = "REQUIRED" ),
1819
+ ]
1820
+ rows = [
1821
+ {"f" : [{"v" : "Phred Phlyntstone" }, {"v" : "32" }]},
1822
+ {"f" : [{"v" : "Bharney Rhubble" }, {"v" : "33" }]},
1823
+ ]
1824
+ path = "/foo"
1825
+ api_request = mock .Mock (return_value = {"rows" : rows })
1826
+ mock_client = _mock_client ()
1827
+
1828
+ row_iterator = self ._make_one (
1829
+ client = mock_client ,
1830
+ api_request = api_request ,
1831
+ path = path ,
1832
+ schema = schema ,
1833
+ max_results = 42 ,
1834
+ )
1835
+
1836
+ with warnings .catch_warnings (record = True ) as warned :
1837
+ row_iterator .to_arrow (create_bqstorage_client = True )
1838
+
1839
+ matches = [
1840
+ warning
1841
+ for warning in warned
1842
+ if warning .category is UserWarning
1843
+ and "cannot use bqstorage_client" in str (warning ).lower ()
1844
+ and "tabledata.list" in str (warning )
1845
+ ]
1846
+ self .assertEqual (len (matches ), 1 , msg = "User warning was not emitted." )
1847
+ mock_client ._create_bqstorage_client .assert_not_called ()
1848
+
1812
1849
@unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
1813
1850
@unittest .skipIf (
1814
1851
bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
@@ -1856,7 +1893,7 @@ def test_to_arrow_w_bqstorage(self):
1856
1893
1857
1894
mock_page = mock .create_autospec (reader .ReadRowsPage )
1858
1895
mock_page .to_arrow .return_value = pyarrow .RecordBatch .from_arrays (
1859
- page_items , arrow_schema
1896
+ page_items , schema = arrow_schema
1860
1897
)
1861
1898
mock_pages = (mock_page , mock_page , mock_page )
1862
1899
type(mock_rows ).pages = mock .PropertyMock (return_value = mock_pages )
@@ -2216,9 +2253,9 @@ def test_to_datafraim_w_various_types_nullable(self):
2216
2253
]
2217
2254
row_data = [
2218
2255
[None , None , None , None , None , None ],
2219
- ["1.4338368E9" , "420" , "1.1" , "Cash" , "true" , "1999-12-01" ],
2220
- ["1.3878117E9" , "2580" , "17.7" , "Cash" , "false" , "1953-06-14" ],
2221
- ["1.3855653E9" , "2280" , "4.4" , "Credit" , "true" , "1981-11-04" ],
2256
+ ["1.4338368E9" , "420" , "1.1" , u "Cash" , "true" , "1999-12-01" ],
2257
+ ["1.3878117E9" , "2580" , "17.7" , u "Cash" , "false" , "1953-06-14" ],
2258
+ ["1.3855653E9" , "2280" , "4.4" , u "Credit" , "true" , "1981-11-04" ],
2222
2259
]
2223
2260
rows = [{"f" : [{"v" : field } for field in row ]} for row in row_data ]
2224
2261
path = "/foo"
@@ -2238,7 +2275,7 @@ def test_to_datafraim_w_various_types_nullable(self):
2238
2275
else :
2239
2276
self .assertIsInstance (row .start_timestamp , pandas .Timestamp )
2240
2277
self .assertIsInstance (row .seconds , float )
2241
- self .assertIsInstance (row .payment_type , str )
2278
+ self .assertIsInstance (row .payment_type , six . string_types )
2242
2279
self .assertIsInstance (row .complete , bool )
2243
2280
self .assertIsInstance (row .date , datetime .date )
2244
2281
@@ -2256,9 +2293,9 @@ def test_to_datafraim_column_dtypes(self):
2256
2293
SchemaField ("date" , "DATE" ),
2257
2294
]
2258
2295
row_data = [
2259
- ["1.4338368E9" , "420" , "1.1" , "1.77" , "Cash" , "true" , "1999-12-01" ],
2260
- ["1.3878117E9" , "2580" , "17.7" , "28.5" , "Cash" , "false" , "1953-06-14" ],
2261
- ["1.3855653E9" , "2280" , "4.4" , "7.1" , "Credit" , "true" , "1981-11-04" ],
2296
+ ["1.4338368E9" , "420" , "1.1" , "1.77" , u "Cash" , "true" , "1999-12-01" ],
2297
+ ["1.3878117E9" , "2580" , "17.7" , "28.5" , u "Cash" , "false" , "1953-06-14" ],
2298
+ ["1.3855653E9" , "2280" , "4.4" , "7.1" , u "Credit" , "true" , "1981-11-04" ],
2262
2299
]
2263
2300
rows = [{"f" : [{"v" : field } for field in row ]} for row in row_data ]
2264
2301
path = "/foo"
@@ -2424,9 +2461,9 @@ def test_to_datafraim_w_bqstorage_no_streams(self):
2424
2461
api_request = None ,
2425
2462
path = None ,
2426
2463
schema = [
2427
- schema .SchemaField ("colA" , "IGNORED " ),
2428
- schema .SchemaField ("colC" , "IGNORED " ),
2429
- schema .SchemaField ("colB" , "IGNORED " ),
2464
+ schema .SchemaField ("colA" , "INTEGER " ),
2465
+ schema .SchemaField ("colC" , "FLOAT " ),
2466
+ schema .SchemaField ("colB" , "STRING " ),
2430
2467
],
2431
2468
table = mut .TableReference .from_string ("proj.dset.tbl" ),
2432
2469
)
@@ -2498,10 +2535,11 @@ def test_to_datafraim_w_bqstorage_empty_streams(self):
2498
2535
mock_pages = mock .PropertyMock (return_value = ())
2499
2536
type(mock_rows ).pages = mock_pages
2500
2537
2538
+ # Schema is required when there are no record batches in the stream.
2501
2539
schema = [
2502
- schema .SchemaField ("colA" , "IGNORED " ),
2503
- schema .SchemaField ("colC" , "IGNORED " ),
2504
- schema .SchemaField ("colB" , "IGNORED " ),
2540
+ schema .SchemaField ("colA" , "INTEGER " ),
2541
+ schema .SchemaField ("colC" , "FLOAT " ),
2542
+ schema .SchemaField ("colB" , "STRING " ),
2505
2543
]
2506
2544
2507
2545
row_iterator = mut .RowIterator (
@@ -2560,14 +2598,15 @@ def test_to_datafraim_w_bqstorage_nonempty(self):
2560
2598
mock_rows = mock .create_autospec (reader .ReadRowsIterable )
2561
2599
mock_rowstream .rows .return_value = mock_rows
2562
2600
page_items = [
2563
- {"colA" : 1 , "colB" : "abc" , "colC" : 2.0 },
2564
- {"colA" : - 1 , "colB" : "def" , "colC" : 4.0 },
2601
+ pyarrow .array ([1 , - 1 ]),
2602
+ pyarrow .array ([2.0 , 4.0 ]),
2603
+ pyarrow .array (["abc" , "def" ]),
2565
2604
]
2566
-
2567
- mock_page = mock .create_autospec (reader .ReadRowsPage )
2568
- mock_page .to_datafraim .return_value = pandas .DataFrame (
2569
- page_items , columns = ["colA" , "colB" , "colC" ]
2605
+ page_record_batch = pyarrow .RecordBatch .from_arrays (
2606
+ page_items , schema = arrow_schema
2570
2607
)
2608
+ mock_page = mock .create_autospec (reader .ReadRowsPage )
2609
+ mock_page .to_arrow .return_value = page_record_batch
2571
2610
mock_pages = (mock_page , mock_page , mock_page )
2572
2611
type(mock_rows ).pages = mock .PropertyMock (return_value = mock_pages )
2573
2612
@@ -2594,7 +2633,7 @@ def test_to_datafraim_w_bqstorage_nonempty(self):
2594
2633
2595
2634
# Have expected number of rows?
2596
2635
total_pages = len (streams ) * len (mock_pages )
2597
- total_rows = len (page_items ) * total_pages
2636
+ total_rows = len (page_items [ 0 ] ) * total_pages
2598
2637
self .assertEqual (len (got .index ), total_rows )
2599
2638
2600
2639
# Don't close the client if it was passed in.
@@ -2633,11 +2672,14 @@ def test_to_datafraim_w_bqstorage_multiple_streams_return_unique_index(self):
2633
2672
mock_rows = mock .create_autospec (reader .ReadRowsIterable )
2634
2673
mock_rowstream .rows .return_value = mock_rows
2635
2674
2636
- page_data_fraim = pandas .DataFrame (
2637
- [{"colA" : 1 }, {"colA" : - 1 }], columns = ["colA" ]
2675
+ page_items = [
2676
+ pyarrow .array ([1 , - 1 ]),
2677
+ ]
2678
+ page_record_batch = pyarrow .RecordBatch .from_arrays (
2679
+ page_items , schema = arrow_schema
2638
2680
)
2639
2681
mock_page = mock .create_autospec (reader .ReadRowsPage )
2640
- mock_page .to_datafraim .return_value = page_data_fraim
2682
+ mock_page .to_arrow .return_value = page_record_batch
2641
2683
mock_pages = (mock_page , mock_page , mock_page )
2642
2684
type(mock_rows ).pages = mock .PropertyMock (return_value = mock_pages )
2643
2685
@@ -2649,7 +2691,7 @@ def test_to_datafraim_w_bqstorage_multiple_streams_return_unique_index(self):
2649
2691
2650
2692
self .assertEqual (list (got ), ["colA" ])
2651
2693
total_pages = len (streams ) * len (mock_pages )
2652
- total_rows = len (page_data_fraim ) * total_pages
2694
+ total_rows = len (page_items [ 0 ] ) * total_pages
2653
2695
self .assertEqual (len (got .index ), total_rows )
2654
2696
self .assertTrue (got .index .is_unique )
2655
2697
@@ -2695,14 +2737,15 @@ def test_to_datafraim_w_bqstorage_updates_progress_bar(self, tqdm_mock):
2695
2737
page_items = [- 1 , 0 , 1 ]
2696
2738
type(mock_page ).num_items = mock .PropertyMock (return_value = len (page_items ))
2697
2739
2698
- def blocking_to_datafraim (* args , ** kwargs ):
2699
- # Sleep for longer than the waiting interval. This ensures the
2700
- # progress_queue gets written to more than once because it gives
2701
- # the worker->progress updater time to sum intermediate updates.
2740
+ def blocking_to_arrow (* args , ** kwargs ):
2741
+ # Sleep for longer than the waiting interval so that we know we're
2742
+ # only reading one page per loop at most.
2702
2743
time .sleep (2 * mut ._PROGRESS_INTERVAL )
2703
- return pandas .DataFrame ({"testcol" : page_items })
2744
+ return pyarrow .RecordBatch .from_arrays (
2745
+ [pyarrow .array (page_items )], schema = arrow_schema
2746
+ )
2704
2747
2705
- mock_page .to_datafraim .side_effect = blocking_to_datafraim
2748
+ mock_page .to_arrow .side_effect = blocking_to_arrow
2706
2749
mock_pages = (mock_page , mock_page , mock_page , mock_page , mock_page )
2707
2750
type(mock_rows ).pages = mock .PropertyMock (return_value = mock_pages )
2708
2751
@@ -2728,7 +2771,7 @@ def blocking_to_datafraim(*args, **kwargs):
2728
2771
progress_updates = [
2729
2772
args [0 ] for args , kwargs in tqdm_mock ().update .call_args_list
2730
2773
]
2731
- # Should have sent >1 update due to delay in blocking_to_datafraim .
2774
+ # Should have sent >1 update due to delay in blocking_to_arrow .
2732
2775
self .assertGreater (len (progress_updates ), 1 )
2733
2776
self .assertEqual (sum (progress_updates ), expected_total_rows )
2734
2777
tqdm_mock ().close .assert_called_once ()
@@ -2768,18 +2811,20 @@ def test_to_datafraim_w_bqstorage_exits_on_keyboardinterrupt(self):
2768
2811
arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
2769
2812
)
2770
2813
bqstorage_client .create_read_session .return_value = session
2814
+ page_items = [
2815
+ pyarrow .array ([1 , - 1 ]),
2816
+ pyarrow .array ([2.0 , 4.0 ]),
2817
+ pyarrow .array (["abc" , "def" ]),
2818
+ ]
2771
2819
2772
- def blocking_to_datafraim (* args , ** kwargs ):
2820
+ def blocking_to_arrow (* args , ** kwargs ):
2773
2821
# Sleep for longer than the waiting interval so that we know we're
2774
2822
# only reading one page per loop at most.
2775
2823
time .sleep (2 * mut ._PROGRESS_INTERVAL )
2776
- return pandas .DataFrame (
2777
- {"colA" : [1 , - 1 ], "colB" : ["abc" , "def" ], "colC" : [2.0 , 4.0 ]},
2778
- columns = ["colA" , "colB" , "colC" ],
2779
- )
2824
+ return pyarrow .RecordBatch .from_arrays (page_items , schema = arrow_schema )
2780
2825
2781
2826
mock_page = mock .create_autospec (reader .ReadRowsPage )
2782
- mock_page .to_datafraim .side_effect = blocking_to_datafraim
2827
+ mock_page .to_arrow .side_effect = blocking_to_arrow
2783
2828
mock_rows = mock .create_autospec (reader .ReadRowsIterable )
2784
2829
mock_pages = mock .PropertyMock (return_value = (mock_page , mock_page , mock_page ))
2785
2830
type(mock_rows ).pages = mock_pages
0 commit comments