@@ -154,11 +154,13 @@ def _bq_to_arrow_batch_objects(bq_blocks, arrow_schema):
154
154
arrays .append (
155
155
pyarrow .array (
156
156
(row [name ] for row in block ),
157
- type = arrow_schema .field_by_name (name ).type ,
157
+ type = arrow_schema .field (name ).type ,
158
158
size = len (block ),
159
159
)
160
160
)
161
- arrow_batches .append (pyarrow .RecordBatch .from_arrays (arrays , arrow_schema ))
161
+ arrow_batches .append (
162
+ pyarrow .RecordBatch .from_arrays (arrays , schema = arrow_schema )
163
+ )
162
164
return arrow_batches
163
165
164
166
@@ -173,6 +175,22 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema):
173
175
return arrow_batches
174
176
175
177
178
+ def _pages_w_nonresumable_internal_error (avro_blocks ):
179
+ for block in avro_blocks :
180
+ yield block
181
+ raise google .api_core .exceptions .InternalServerError (
182
+ "INTERNAL: Got a nonresumable error."
183
+ )
184
+
185
+
186
+ def _pages_w_resumable_internal_error (avro_blocks ):
187
+ for block in avro_blocks :
188
+ yield block
189
+ raise google .api_core .exceptions .InternalServerError (
190
+ "INTERNAL: Received RST_STREAM with error code 2."
191
+ )
192
+
193
+
176
194
def _pages_w_unavailable (pages ):
177
195
for page in pages :
178
196
yield page
@@ -363,6 +381,29 @@ def test_rows_w_timeout(class_under_test, mock_client):
363
381
mock_client .read_rows .assert_not_called ()
364
382
365
383
384
+ def test_rows_w_nonresumable_internal_error (class_under_test , mock_client ):
385
+ bq_columns = [{"name" : "int_col" , "type" : "int64" }]
386
+ avro_schema = _bq_to_avro_schema (bq_columns )
387
+ read_session = _generate_avro_read_session (avro_schema )
388
+ bq_blocks = [[{"int_col" : 1024 }, {"int_col" : 512 }], [{"int_col" : 256 }]]
389
+ avro_blocks = _pages_w_nonresumable_internal_error (
390
+ _bq_to_avro_blocks (bq_blocks , avro_schema )
391
+ )
392
+
393
+ stream_position = bigquery_storage_v1beta1 .types .StreamPosition (
394
+ stream = {"name" : "test" }
395
+ )
396
+
397
+ reader = class_under_test (avro_blocks , mock_client , stream_position , {})
398
+
399
+ with pytest .raises (
400
+ google .api_core .exceptions .InternalServerError , match = "nonresumable error"
401
+ ):
402
+ list (reader .rows (read_session ))
403
+
404
+ mock_client .read_rows .assert_not_called ()
405
+
406
+
366
407
def test_rows_w_reconnect (class_under_test , mock_client ):
367
408
bq_columns = [{"name" : "int_col" , "type" : "int64" }]
368
409
avro_schema = _bq_to_avro_schema (bq_columns )
@@ -372,13 +413,18 @@ def test_rows_w_reconnect(class_under_test, mock_client):
372
413
[{"int_col" : 345 }, {"int_col" : 456 }],
373
414
]
374
415
avro_blocks_1 = _pages_w_unavailable (_bq_to_avro_blocks (bq_blocks_1 , avro_schema ))
375
- bq_blocks_2 = [[{"int_col" : 567 }, {"int_col" : 789 }], [{"int_col" : 890 }]]
416
+ bq_blocks_2 = [[{"int_col" : 1024 }, {"int_col" : 512 }], [{"int_col" : 256 }]]
376
417
avro_blocks_2 = _bq_to_avro_blocks (bq_blocks_2 , avro_schema )
418
+ avro_blocks_2 = _pages_w_resumable_internal_error (
419
+ _bq_to_avro_blocks (bq_blocks_2 , avro_schema )
420
+ )
421
+ bq_blocks_3 = [[{"int_col" : 567 }, {"int_col" : 789 }], [{"int_col" : 890 }]]
422
+ avro_blocks_3 = _bq_to_avro_blocks (bq_blocks_3 , avro_schema )
377
423
378
- for block in avro_blocks_2 :
424
+ for block in avro_blocks_3 :
379
425
block .status .estimated_row_count = 7
380
426
381
- mock_client .read_rows .return_value = avro_blocks_2
427
+ mock_client .read_rows .side_effect = ( avro_blocks_2 , avro_blocks_3 )
382
428
stream_position = bigquery_storage_v1beta1 .types .StreamPosition (
383
429
stream = {"name" : "test" }
384
430
)
@@ -395,17 +441,24 @@ def test_rows_w_reconnect(class_under_test, mock_client):
395
441
itertools .chain (
396
442
itertools .chain .from_iterable (bq_blocks_1 ),
397
443
itertools .chain .from_iterable (bq_blocks_2 ),
444
+ itertools .chain .from_iterable (bq_blocks_3 ),
398
445
)
399
446
)
400
447
401
448
assert tuple (got ) == expected
402
449
assert got .total_rows == 7
403
- mock_client .read_rows .assert_called_once_with (
450
+ mock_client .read_rows .assert_any_call (
404
451
bigquery_storage_v1beta1 .types .StreamPosition (
405
452
stream = {"name" : "test" }, offset = 4
406
453
),
407
454
metadata = {"test-key" : "test-value" },
408
455
)
456
+ mock_client .read_rows .assert_called_with (
457
+ bigquery_storage_v1beta1 .types .StreamPosition (
458
+ stream = {"name" : "test" }, offset = 7
459
+ ),
460
+ metadata = {"test-key" : "test-value" },
461
+ )
409
462
410
463
411
464
def test_rows_w_reconnect_by_page (class_under_test , mock_client ):
0 commit comments