This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 65
Expand file tree
/
Copy path_read_rows.py
More file actions
391 lines (351 loc) · 15.9 KB
/
_read_rows.py
File metadata and controls
391 lines (351 loc) · 15.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
from typing import Sequence, TYPE_CHECKING
import time
from grpc import StatusCode
from google.cloud.bigtable_v2.types import ReadRowsRequest as ReadRowsRequestPB
from google.cloud.bigtable_v2.types import ReadRowsResponse as ReadRowsResponsePB
from google.cloud.bigtable_v2.types import RowSet as RowSetPB
from google.cloud.bigtable_v2.types import RowRange as RowRangePB
from google.cloud.bigtable.data.row import Row, Cell
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.exceptions import InvalidChunk
from google.cloud.bigtable.data.exceptions import _RowSetComplete
from google.cloud.bigtable.data.exceptions import _ResetRow
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._metrics import tracked_retry
from google.api_core import retry as retries
from google.cloud.bigtable.data._cross_sync import CrossSync
if TYPE_CHECKING:
from google.cloud.bigtable.data._metrics import ActiveOperationMetric
if CrossSync.is_async:
from google.cloud.bigtable.data._async.client import (
_DataApiTargetAsync as TargetType,
)
else:
from google.cloud.bigtable.data._sync_autogen.client import _DataApiTarget as TargetType # type: ignore
__CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen._read_rows"
@CrossSync.convert_class("_ReadRowsOperation")
class _ReadRowsOperationAsync:
"""
ReadRowsOperation handles the logic of merging chunks from a ReadRowsResponse stream
into a stream of Row objects.
ReadRowsOperation.merge_row_response_stream takes in a stream of ReadRowsResponse
and turns them into a stream of Row objects using an internal
StateMachine.
ReadRowsOperation(request, client) handles row merging logic end-to-end, including
performing retries on stream errors.
Args:
query: The query to execute
target: The table or view to send the request to
operation_timeout: The total time to allow for the operation, in seconds
attempt_timeout: The time to allow for each individual attempt, in seconds
metric: the metric object representing the active operation
retryable_exceptions: A list of exceptions that should trigger a retry
"""
__slots__ = (
"attempt_timeout_gen",
"operation_timeout",
"request",
"target",
"_predicate",
"_last_yielded_row_key",
"_remaining_count",
"_operation_metric",
)
def __init__(
self,
query: ReadRowsQuery,
target: TargetType,
operation_timeout: float,
attempt_timeout: float,
metric: ActiveOperationMetric,
retryable_exceptions: Sequence[type[Exception]] = (),
):
self.attempt_timeout_gen = _attempt_timeout_generator(
attempt_timeout, operation_timeout
)
self.operation_timeout = operation_timeout
if isinstance(query, dict):
self.request = ReadRowsRequestPB(
**query,
**target._request_path,
app_profile_id=target.app_profile_id,
)
else:
self.request = query._to_pb(target)
self.target = target
self._predicate = retries.if_exception_type(*retryable_exceptions)
self._last_yielded_row_key: bytes | None = None
self._remaining_count: int | None = self.request.rows_limit or None
self._operation_metric = metric
def start_operation(self) -> CrossSync.Iterable[Row]:
"""
Start the read_rows operation, retrying on retryable errors.
Yields:
Row: The next row in the stream
"""
return tracked_retry(
retry_fn=CrossSync.retry_target_stream,
operation=self._operation_metric,
target=self._read_rows_attempt,
predicate=self._predicate,
timeout=self.operation_timeout,
)
def _read_rows_attempt(self) -> CrossSync.Iterable[Row]:
"""
Attempt a single read_rows rpc call.
This function is intended to be wrapped by retry logic,
which will call this function until it succeeds or
a non-retryable error is raised.
Yields:
Row: The next row in the stream
"""
self._operation_metric.start_attempt()
# revise request keys and ranges between attempts
if self._last_yielded_row_key is not None:
# if this is a retry, try to trim down the request to avoid ones we've already processed
try:
self.request.rows = self._revise_request_rowset(
row_set=self.request.rows,
last_seen_row_key=self._last_yielded_row_key,
)
except _RowSetComplete:
# if we've already seen all the rows, we're done
return self.merge_rows(None)
# revise the limit based on number of rows already yielded
if self._remaining_count is not None:
self.request.rows_limit = self._remaining_count
if self._remaining_count == 0:
return self.merge_rows(None)
# create and return a new row merger
gapic_stream = self.target.client._gapic_client.read_rows(
self.request,
timeout=next(self.attempt_timeout_gen),
retry=None,
)
chunked_stream = self.chunk_stream(gapic_stream)
return self.merge_rows(chunked_stream)
@CrossSync.convert()
async def chunk_stream(
self, stream: CrossSync.Awaitable[CrossSync.Iterable[ReadRowsResponsePB]]
) -> CrossSync.Iterable[ReadRowsResponsePB.CellChunk]:
"""
process chunks out of raw read_rows stream
Args:
stream: the raw read_rows stream from the gapic client
Yields:
ReadRowsResponsePB.CellChunk: the next chunk in the stream
"""
async for resp in await stream:
# extract proto from proto-plus wrapper
resp = resp._pb
# handle last_scanned_row_key packets, sent when server
# has scanned past the end of the row range
if resp.last_scanned_row_key:
if (
self._last_yielded_row_key is not None
and resp.last_scanned_row_key <= self._last_yielded_row_key
):
raise InvalidChunk("last scanned out of order")
self._last_yielded_row_key = resp.last_scanned_row_key
current_key = None
# process each chunk in the response
for c in resp.chunks:
if current_key is None:
current_key = c.row_key
if current_key is None:
raise InvalidChunk("first chunk is missing a row key")
elif (
self._last_yielded_row_key
and current_key <= self._last_yielded_row_key
):
raise InvalidChunk("row keys should be strictly increasing")
yield c
if c.reset_row:
current_key = None
elif c.commit_row:
# update row state after each commit
self._last_yielded_row_key = current_key
if self._remaining_count is not None:
self._remaining_count -= 1
if self._remaining_count < 0:
raise InvalidChunk("emit count exceeds row limit")
current_key = None
@CrossSync.convert(
replace_symbols={"__aiter__": "__iter__", "__anext__": "__next__"},
)
async def merge_rows(
self, chunks: CrossSync.Iterable[ReadRowsResponsePB.CellChunk] | None
) -> CrossSync.Iterable[Row]:
"""
Merge chunks into rows
Args:
chunks: the chunk stream to merge
Yields:
Row: the next row in the stream
"""
try:
if chunks is None:
self._operation_metric.end_with_success()
return
it = chunks.__aiter__()
# For each row
while True:
try:
c = await it.__anext__()
except CrossSync.StopIteration:
# stream complete
self._operation_metric.end_with_success()
return
row_key = c.row_key
if not row_key:
raise InvalidChunk("first row chunk is missing key")
cells = []
# shared per cell storage
family: str | None = None
qualifier: bytes | None = None
try:
# for each cell
while True:
if c.reset_row:
raise _ResetRow(c)
k = c.row_key
f = c.family_name.value
q = c.qualifier.value if c.HasField("qualifier") else None
if k and k != row_key:
raise InvalidChunk("unexpected new row key")
if f:
family = f
if q is not None:
qualifier = q
else:
raise InvalidChunk("new family without qualifier")
elif family is None:
raise InvalidChunk("missing family")
elif q is not None:
if family is None:
raise InvalidChunk("new qualifier without family")
qualifier = q
elif qualifier is None:
raise InvalidChunk("missing qualifier")
ts = c.timestamp_micros
labels = c.labels if c.labels else []
value = c.value
# merge split cells
if c.value_size > 0:
buffer = [value]
while c.value_size > 0:
# throws when premature end
c = await it.__anext__()
t = c.timestamp_micros
cl = c.labels
k = c.row_key
if (
c.HasField("family_name")
and c.family_name.value != family
):
raise InvalidChunk("family changed mid cell")
if (
c.HasField("qualifier")
and c.qualifier.value != qualifier
):
raise InvalidChunk("qualifier changed mid cell")
if t and t != ts:
raise InvalidChunk("timestamp changed mid cell")
if cl and cl != labels:
raise InvalidChunk("labels changed mid cell")
if k and k != row_key:
raise InvalidChunk("row key changed mid cell")
if c.reset_row:
raise _ResetRow(c)
buffer.append(c.value)
value = b"".join(buffer)
cells.append(
Cell(value, row_key, family, qualifier, ts, list(labels))
)
if c.commit_row:
block_time = time.monotonic_ns()
yield Row(row_key, cells)
# most metric operations use setters, but this one updates
# the value directly to avoid extra overhead
if self._operation_metric.active_attempt is not None:
self._operation_metric.active_attempt.application_blocking_time_ns += ( # type: ignore
time.monotonic_ns() - block_time
)
break
c = await it.__anext__()
except _ResetRow as e:
c = e.chunk
if (
c.row_key
or c.HasField("family_name")
or c.HasField("qualifier")
or c.timestamp_micros
or c.labels
or c.value
):
raise InvalidChunk("reset row with data")
continue
except CrossSync.StopIteration:
raise InvalidChunk("premature end of stream")
except GeneratorExit as close_exception:
# handle aclose()
self._operation_metric.end_with_status(StatusCode.CANCELLED)
raise close_exception
except Exception as generic_exception:
# handle exceptions in retry wrapper
raise generic_exception
@staticmethod
def _revise_request_rowset(
row_set: RowSetPB,
last_seen_row_key: bytes,
) -> RowSetPB:
"""
Revise the rows in the request to avoid ones we've already processed.
Args:
row_set: the row set from the request
last_seen_row_key: the last row key encountered
Returns:
RowSetPB: the new rowset after adusting for the last seen key
Raises:
_RowSetComplete: if there are no rows left to process after the revision
"""
# if user is doing a whole table scan, start a new one with the last seen key
if row_set is None or (not row_set.row_ranges and not row_set.row_keys):
last_seen = last_seen_row_key
return RowSetPB(row_ranges=[RowRangePB(start_key_open=last_seen)])
# remove seen keys from user-specific key list
adjusted_keys: list[bytes] = [
k for k in row_set.row_keys if k > last_seen_row_key
]
# adjust ranges to ignore keys before last seen
adjusted_ranges: list[RowRangePB] = []
for row_range in row_set.row_ranges:
end_key = row_range.end_key_closed or row_range.end_key_open or None
if end_key is None or end_key > last_seen_row_key:
# end range is after last seen key
new_range = RowRangePB(row_range)
start_key = row_range.start_key_closed or row_range.start_key_open
if start_key is None or start_key <= last_seen_row_key:
# replace start key with last seen
new_range.start_key_open = last_seen_row_key
adjusted_ranges.append(new_range)
if len(adjusted_keys) == 0 and len(adjusted_ranges) == 0:
# if the query is empty after revision, raise an exception
# this will avoid an unwanted full table scan
raise _RowSetComplete()
return RowSetPB(row_keys=adjusted_keys, row_ranges=adjusted_ranges)