Skip to content

feat: support setting max_stream_count when fetching query result #2051

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,7 @@ def to_arrow_iterable(
self,
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
max_stream_count: Optional[int] = None,
) -> Iterator["pyarrow.RecordBatch"]:
"""[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream.

Expand All @@ -1836,6 +1837,22 @@ def to_arrow_iterable(
created by the server. If ``max_queue_size`` is :data:`None`, the queue
size is infinite.

max_stream_count (Optional[int]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more consistent if we use the same docstring as here. It also mentions the effect of preserve_order (in this case self._preserve_order), which I think we should make clear here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, _preserve_order is automatically set by parsing the queries, and not a user-facing API. I'll update the docstring to mention that effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

The maximum number of parallel download streams when
using BigQuery Storage API. Ignored if
BigQuery Storage API is not used.

This setting also has no effect if the query result
is deterministically ordered with ORDER BY,
in which case, the number of download stream is always 1.

If set to 0 or None (the default), the number of download
streams is determined by BigQuery the server. However, this behaviour
can require a lot of memory to store temporary download result,
especially with very large queries. In that case,
setting this parameter value to a value > 0 can help
reduce system resource consumption.

Returns:
pyarrow.RecordBatch:
A generator of :class:`~pyarrow.RecordBatch`.
Expand All @@ -1852,6 +1869,7 @@ def to_arrow_iterable(
preserve_order=self._preserve_order,
selected_fields=self._selected_fields,
max_queue_size=max_queue_size,
max_stream_count=max_stream_count,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
Expand Down Expand Up @@ -1978,6 +1996,7 @@ def to_dataframe_iterable(
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
dtypes: Optional[Dict[str, Any]] = None,
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
max_stream_count: Optional[int] = None,
) -> "pandas.DataFrame":
"""Create an iterable of pandas DataFrames, to process the table as a stream.

Expand Down Expand Up @@ -2008,6 +2027,22 @@ def to_dataframe_iterable(

.. versionadded:: 2.14.0

max_stream_count (Optional[int]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum number of parallel download streams when
using BigQuery Storage API. Ignored if
BigQuery Storage API is not used.

This setting also has no effect if the query result
is deterministically ordered with ORDER BY,
in which case, the number of download stream is always 1.

If set to 0 or None (the default), the number of download
streams is determined by BigQuery the server. However, this behaviour
can require a lot of memory to store temporary download result,
especially with very large queries. In that case,
setting this parameter value to a value > 0 can help
reduce system resource consumption.

Returns:
pandas.DataFrame:
A generator of :class:`~pandas.DataFrame`.
Expand All @@ -2034,6 +2069,7 @@ def to_dataframe_iterable(
preserve_order=self._preserve_order,
selected_fields=self._selected_fields,
max_queue_size=max_queue_size,
max_stream_count=max_stream_count,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_dataframe_row_iterator,
Expand Down Expand Up @@ -2690,6 +2726,7 @@ def to_dataframe_iterable(
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
dtypes: Optional[Dict[str, Any]] = None,
max_queue_size: Optional[int] = None,
max_stream_count: Optional[int] = None,
) -> Iterator["pandas.DataFrame"]:
"""Create an iterable of pandas DataFrames, to process the table as a stream.

Expand All @@ -2705,6 +2742,9 @@ def to_dataframe_iterable(
max_queue_size:
Ignored. Added for compatibility with RowIterator.

max_stream_count:
Ignored. Added for compatibility with RowIterator.

Returns:
An iterator yielding a single empty :class:`~pandas.DataFrame`.

Expand All @@ -2719,6 +2759,7 @@ def to_arrow_iterable(
self,
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
max_queue_size: Optional[int] = None,
max_stream_count: Optional[int] = None,
) -> Iterator["pyarrow.RecordBatch"]:
"""Create an iterable of pandas DataFrames, to process the table as a stream.

Expand All @@ -2731,6 +2772,9 @@ def to_arrow_iterable(
max_queue_size:
Ignored. Added for compatibility with RowIterator.

max_stream_count:
Ignored. Added for compatibility with RowIterator.

Returns:
An iterator yielding a single empty :class:`~pyarrow.RecordBatch`.
"""
Expand Down
70 changes: 70 additions & 0 deletions tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5822,3 +5822,73 @@ def test_table_reference_to_bqstorage_v1_stable(table_path):
for klass in (mut.TableReference, mut.Table, mut.TableListItem):
got = klass.from_string(table_path).to_bqstorage()
assert got == expected


@pytest.mark.parametrize("preserve_order", [True, False])
def test_to_arrow_iterable_w_bqstorage_max_stream_count(preserve_order):
pytest.importorskip("pandas")
pytest.importorskip("google.cloud.bigquery_storage")
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud import bigquery_storage

bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
session = bigquery_storage.types.ReadSession()
bqstorage_client.create_read_session.return_value = session

row_iterator = mut.RowIterator(
_mock_client(),
api_request=None,
path=None,
schema=[
schema.SchemaField("colA", "INTEGER"),
],
table=mut.TableReference.from_string("proj.dset.tbl"),
)
row_iterator._preserve_order = preserve_order

max_stream_count = 132
result_iterable = row_iterator.to_arrow_iterable(
bqstorage_client=bqstorage_client, max_stream_count=max_stream_count
)
list(result_iterable)
bqstorage_client.create_read_session.assert_called_once_with(
parent=mock.ANY,
read_session=mock.ANY,
max_stream_count=max_stream_count if not preserve_order else 1,
)


@pytest.mark.parametrize("preserve_order", [True, False])
def test_to_dataframe_iterable_w_bqstorage_max_stream_count(preserve_order):
pytest.importorskip("pandas")
pytest.importorskip("google.cloud.bigquery_storage")
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud import bigquery_storage

bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
session = bigquery_storage.types.ReadSession()
bqstorage_client.create_read_session.return_value = session

row_iterator = mut.RowIterator(
_mock_client(),
api_request=None,
path=None,
schema=[
schema.SchemaField("colA", "INTEGER"),
],
table=mut.TableReference.from_string("proj.dset.tbl"),
)
row_iterator._preserve_order = preserve_order

max_stream_count = 132
result_iterable = row_iterator.to_dataframe_iterable(
bqstorage_client=bqstorage_client, max_stream_count=max_stream_count
)
list(result_iterable)
bqstorage_client.create_read_session.assert_called_once_with(
parent=mock.ANY,
read_session=mock.ANY,
max_stream_count=max_stream_count if not preserve_order else 1,
)