Content-Length: 576288 | pFad | https://github.com/googleapis/google-cloud-python/commit/b492bdcc2d288022b5c81e90aea993432eec078a

F2 fix(bigquerystorage): resume reader connection on `EOS` internal erro… · googleapis/google-cloud-python@b492bdc · GitHub
Skip to content

Commit b492bdc

Browse files
authored
fix(bigquerystorage): resume reader connection on EOS internal error (#9994)
* fix(bigquerystorage): resume reader connection on `EOS` internal error It's infeasible for the backend to change the status of `EOS on DATA` internal errors, so instead we check the error message to see if it's an error that is resumable. We don't want to try to resume on *all* internal errors, so inspecting the message is the best we can do. * fix: update resumable errors to match string from test failure * refactor: use more readable any instead of loop
1 parent 7a64502 commit b492bdc

File tree

2 files changed

+76
-6
lines changed

2 files changed

+76
-6
lines changed

bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@
4343

4444
_STREAM_RESUMPTION_EXCEPTIONS = (google.api_core.exceptions.ServiceUnavailable,)
4545

46+
# The Google API endpoint can unexpectedly close long-running HTTP/2 streams.
47+
# Unfortunately, this condition is surfaced to the caller as an internal error
48+
# by gRPC. We don't want to resume on all internal errors, so instead we look
49+
# for error message that we know are caused by problems that are safe to
50+
# reconnect.
51+
_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = (
52+
# See: https://github.com/googleapis/google-cloud-python/pull/9994
53+
"RST_STREAM",
54+
)
55+
4656
_FASTAVRO_REQUIRED = (
4757
"fastavro is required to parse ReadRowResponse messages with Avro bytes."
4858
)
@@ -131,6 +141,13 @@ def __iter__(self):
131141
yield message
132142

133143
return # Made it through the whole stream.
144+
except google.api_core.exceptions.InternalServerError as exc:
145+
resumable_error = any(
146+
resumable_message in exc.message
147+
for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES
148+
)
149+
if not resumable_error:
150+
raise
134151
except _STREAM_RESUMPTION_EXCEPTIONS:
135152
# Transient error, so reconnect to the stream.
136153
pass

bigquery_storage/tests/unit/test_reader.py

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,13 @@ def _bq_to_arrow_batch_objects(bq_blocks, arrow_schema):
154154
arrays.append(
155155
pyarrow.array(
156156
(row[name] for row in block),
157-
type=arrow_schema.field_by_name(name).type,
157+
type=arrow_schema.field(name).type,
158158
size=len(block),
159159
)
160160
)
161-
arrow_batches.append(pyarrow.RecordBatch.from_arrays(arrays, arrow_schema))
161+
arrow_batches.append(
162+
pyarrow.RecordBatch.from_arrays(arrays, schema=arrow_schema)
163+
)
162164
return arrow_batches
163165

164166

@@ -173,6 +175,22 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema):
173175
return arrow_batches
174176

175177

178+
def _pages_w_nonresumable_internal_error(avro_blocks):
179+
for block in avro_blocks:
180+
yield block
181+
raise google.api_core.exceptions.InternalServerError(
182+
"INTERNAL: Got a nonresumable error."
183+
)
184+
185+
186+
def _pages_w_resumable_internal_error(avro_blocks):
187+
for block in avro_blocks:
188+
yield block
189+
raise google.api_core.exceptions.InternalServerError(
190+
"INTERNAL: Received RST_STREAM with error code 2."
191+
)
192+
193+
176194
def _pages_w_unavailable(pages):
177195
for page in pages:
178196
yield page
@@ -363,6 +381,29 @@ def test_rows_w_timeout(class_under_test, mock_client):
363381
mock_client.read_rows.assert_not_called()
364382

365383

384+
def test_rows_w_nonresumable_internal_error(class_under_test, mock_client):
385+
bq_columns = [{"name": "int_col", "type": "int64"}]
386+
avro_schema = _bq_to_avro_schema(bq_columns)
387+
read_session = _generate_avro_read_session(avro_schema)
388+
bq_blocks = [[{"int_col": 1024}, {"int_col": 512}], [{"int_col": 256}]]
389+
avro_blocks = _pages_w_nonresumable_internal_error(
390+
_bq_to_avro_blocks(bq_blocks, avro_schema)
391+
)
392+
393+
stream_position = bigquery_storage_v1beta1.types.StreamPosition(
394+
stream={"name": "test"}
395+
)
396+
397+
reader = class_under_test(avro_blocks, mock_client, stream_position, {})
398+
399+
with pytest.raises(
400+
google.api_core.exceptions.InternalServerError, match="nonresumable error"
401+
):
402+
list(reader.rows(read_session))
403+
404+
mock_client.read_rows.assert_not_called()
405+
406+
366407
def test_rows_w_reconnect(class_under_test, mock_client):
367408
bq_columns = [{"name": "int_col", "type": "int64"}]
368409
avro_schema = _bq_to_avro_schema(bq_columns)
@@ -372,13 +413,18 @@ def test_rows_w_reconnect(class_under_test, mock_client):
372413
[{"int_col": 345}, {"int_col": 456}],
373414
]
374415
avro_blocks_1 = _pages_w_unavailable(_bq_to_avro_blocks(bq_blocks_1, avro_schema))
375-
bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
416+
bq_blocks_2 = [[{"int_col": 1024}, {"int_col": 512}], [{"int_col": 256}]]
376417
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)
418+
avro_blocks_2 = _pages_w_resumable_internal_error(
419+
_bq_to_avro_blocks(bq_blocks_2, avro_schema)
420+
)
421+
bq_blocks_3 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
422+
avro_blocks_3 = _bq_to_avro_blocks(bq_blocks_3, avro_schema)
377423

378-
for block in avro_blocks_2:
424+
for block in avro_blocks_3:
379425
block.status.estimated_row_count = 7
380426

381-
mock_client.read_rows.return_value = avro_blocks_2
427+
mock_client.read_rows.side_effect = (avro_blocks_2, avro_blocks_3)
382428
stream_position = bigquery_storage_v1beta1.types.StreamPosition(
383429
stream={"name": "test"}
384430
)
@@ -395,17 +441,24 @@ def test_rows_w_reconnect(class_under_test, mock_client):
395441
itertools.chain(
396442
itertools.chain.from_iterable(bq_blocks_1),
397443
itertools.chain.from_iterable(bq_blocks_2),
444+
itertools.chain.from_iterable(bq_blocks_3),
398445
)
399446
)
400447

401448
assert tuple(got) == expected
402449
assert got.total_rows == 7
403-
mock_client.read_rows.assert_called_once_with(
450+
mock_client.read_rows.assert_any_call(
404451
bigquery_storage_v1beta1.types.StreamPosition(
405452
stream={"name": "test"}, offset=4
406453
),
407454
metadata={"test-key": "test-value"},
408455
)
456+
mock_client.read_rows.assert_called_with(
457+
bigquery_storage_v1beta1.types.StreamPosition(
458+
stream={"name": "test"}, offset=7
459+
),
460+
metadata={"test-key": "test-value"},
461+
)
409462

410463

411464
def test_rows_w_reconnect_by_page(class_under_test, mock_client):

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/googleapis/google-cloud-python/commit/b492bdcc2d288022b5c81e90aea993432eec078a

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy