-
Notifications
You must be signed in to change notification settings - Fork 61
feat: implement read_rows #762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement read_rows #762
Conversation
Co-authored-by: Mariatta Wijaya <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made changes based on PR feedback, but held off on adjusting tests until we finish the discussions
google/cloud/bigtable/_read_rows.py
Outdated
- request: the request dict to send to the Bigtable API | ||
- client: the Bigtable client to use to make the request | ||
- operation_timeout: the timeout to use for the entire operation, in seconds | ||
- buffer_size: the size of the buffer to use for caching rows from the network |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, if we still don’t have clear consensus on the buffer, maybe we should err towards simplicity and remove it then?
I do worry it could impact the perceived throughput if the gapic call is always blocked on the consumer. But we can wait for user feedback first, and could always add a buffer back later if that ends up being an issue.
google/cloud/bigtable/_read_rows.py
Outdated
buffer_task = asyncio.create_task( | ||
self._generator_to_buffer(buffer, new_gapic_stream) | ||
) | ||
buffered_stream = self._buffer_to_generator(buffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only background task created should be the buffer_task, which is cleaned up in the finally black below.
We could add more tests/checks around this to be safe, but I'll hold off on any until we resolve https://github.com/googleapis/python-bigtable/pull/762/files#r1201158174
google/cloud/bigtable/_read_rows.py
Outdated
self._emit_count += 1 | ||
self._last_emitted_row_key = new_item.row_key | ||
if total_row_limit and self._emit_count >= total_row_limit: | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will be closed when the buffer_task is closed in the finally
block
google/cloud/bigtable/_read_rows.py
Outdated
self, | ||
request: dict[str, Any], | ||
client: BigtableAsyncClient, | ||
operation_timeout: float = 600.0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should operation_time be after the *?
google/cloud/bigtable/_read_rows.py
Outdated
# revise next request's row limit based on number emitted | ||
if total_row_limit: | ||
new_limit = total_row_limit - self._emit_count | ||
if new_limit <= 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might have to raise an error if the count goes negative. The situation would imply that there is a bug in the client or server and the results cant be trusted
google/cloud/bigtable/_read_rows.py
Outdated
params_str = f'table_name={self._request.get("table_name", "")}' | ||
if self._request.get("app_profile_id", None): | ||
params_str = ( | ||
f'{params_str},app_profile_id={self._request.get("app_profile_id", "")}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. is the default arg for get necessary here? it seems like L183 guarantees that the key is present
google/cloud/bigtable/_read_rows.py
Outdated
buffer_task = asyncio.create_task( | ||
self._generator_to_buffer(buffer, new_gapic_stream) | ||
) | ||
buffered_stream = self._buffer_to_generator(buffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping on this
* feat: add new v3.0.0 API skeleton (#745) * feat: improve rows filters (#751) * feat: read rows query model class (#752) * feat: implement row and cell model classes (#753) * feat: add pooled grpc transport (#748) * feat: implement read_rows (#762) * feat: implement mutate rows (#769) * feat: literal value filter (#767) * feat: row_exists and read_row (#778) * feat: read_modify_write and check_and_mutate_row (#780) * feat: sharded read rows (#766) * feat: ping and warm with metadata (#810) * feat: mutate rows batching (#770) * chore: restructure module paths (#816) * feat: improve timeout structure (#819) * fix: api errors apply to all bulk mutations * chore: reduce public api surface (#820) * feat: improve error group tracebacks on < py11 (#825) * feat: optimize read_rows (#852) * chore: add user agent suffix (#842) * feat: optimize retries (#854) * feat: add test proxy (#836) * chore(tests): add conformance tests to CI for v3 (#870) * chore(tests): turn off fast fail for conformance tets (#882) * feat: add TABLE_DEFAULTS enum for table method arguments (#880) * fix: pass None for retry in gapic calls (#881) * feat: replace internal dictionaries with protos in gapic calls (#875) * chore: optimize gapic calls (#863) * feat: expose retryable error codes to users (#879) * chore: update api_core submodule (#897) * chore: merge main into experimental_v3 (#900) * chore: pin conformance tests to v0.0.2 (#903) * fix: bulk mutation eventual success (#909) --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
This PR implements the read_rows RPC call, along with related features like the row merging state machine and smart retries.
Most of the logic is implemented in private classes in
_read_rows
:ReadRowsOperation
is the highest level class, providing an interface for asynchronousmerging with or without retries
StateMachine
is used internally to track the state of the merge, includingrows the current row and the keys of the rows that have been processed.
It processes a stream of chunks, and will raise
InvalidChunk
if it reachesan invalid state.
State
classes track the current state of theStateMachine
, and define whatto do on the next chunk.
RowBuilder
is used by theStateMachine
to build a Row object.I also added a
ReadRowsIterator
class, which is what users will interact with when reading from a stream. It adds idle_timeouts and can providerequest_stats
(though this isn't fully implemented yet)The changes in this PR have been tested with the read_rows conformance tests using my test proxy from #747