Skip to content

feat: read_gbq suggests using BigQuery DataFrames with large results #769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ Note: The canonical version of this documentation can always be found on the
`BigQuery sandbox <https://cloud.google.com/bigquery/docs/sandbox>`__ to
try the service for free.

Also, consider using `BigQuery DataFrames
<https://cloud.google.com/bigquery/docs/bigquery-dataframes-introduction>`__
to process large results with pandas compatible APIs with transparent SQL
pushdown to BigQuery engine. This provides an opportunity to save on costs
and improve performance.

While BigQuery uses standard SQL syntax, it has some important differences
from traditional databases both in functionality, API limitations (size and
quantity of queries or uploads), and how Google charges for use of the
Expand Down
9 changes: 9 additions & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,15 @@ def cover(session):
session.install("coverage", "pytest-cov")
session.run("coverage", "report", "--show-missing", "--fail-under=96")

# Make sure there is no dead code in our test directories.
session.run(
"coverage",
"report",
"--show-missing",
"--include=tests/unit/*",
"--fail-under=100",
)

session.run("coverage", "erase")


Expand Down
12 changes: 12 additions & 0 deletions pandas_gbq/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright (c) 2024 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.
Comment on lines +1 to +3

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just double checking that this is normal given that we use a difference license header for other Google stuff

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not normal, but expected. pandas-gbq was originally split off from pandas itself, so we can't use the Google header. It's not 100% Google's copyright since not all contributions were under the CLA.


# BigQuery uses powers of 2 in calculating data sizes. See:
# https://cloud.google.com/bigquery/pricing#data The documentation uses
# GiB rather than GB to disambiguate from the alternative base 10 units.
# https://en.wikipedia.org/wiki/Byte#Multiple-byte_units
BYTES_IN_KIB = 1024
BYTES_IN_MIB = 1024 * BYTES_IN_KIB
BYTES_IN_GIB = 1024 * BYTES_IN_MIB
BYTES_TO_RECOMMEND_BIGFRAMES = BYTES_IN_GIB
4 changes: 4 additions & 0 deletions pandas_gbq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class InvalidPrivateKeyFormat(ValueError):
"""


class LargeResultsWarning(UserWarning):
"""Raise when results are beyond that recommended for pandas DataFrame."""


class PerformanceWarning(RuntimeWarning):
"""
Raised when a performance-related feature is requested, but unsupported.
Expand Down
10 changes: 0 additions & 10 deletions pandas_gbq/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
BIGQUERY_QUERY_AND_WAIT_VERSION = "3.14.0"
PANDAS_VERBOSITY_DEPRECATION_VERSION = "0.23.0"
PANDAS_BOOLEAN_DTYPE_VERSION = "1.0.0"
PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION = "1.1.0"


class Features:
Expand Down Expand Up @@ -82,14 +81,5 @@ def pandas_has_boolean_dtype(self):
desired_version = packaging.version.parse(PANDAS_BOOLEAN_DTYPE_VERSION)
return self.pandas_installed_version >= desired_version

@property
def pandas_has_parquet_with_lossless_timestamp(self):
import packaging.version

desired_version = packaging.version.parse(
PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION
)
return self.pandas_installed_version >= desired_version


FEATURES = Features()
59 changes: 46 additions & 13 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
if typing.TYPE_CHECKING: # pragma: NO COVER
import pandas

import pandas_gbq.constants
import pandas_gbq.exceptions
from pandas_gbq.exceptions import GenericGBQException, QueryTimeout
from pandas_gbq.features import FEATURES
import pandas_gbq.query
Expand Down Expand Up @@ -478,6 +480,35 @@ def _download_results(
if max_results is not None:
create_bqstorage_client = False

# If we're downloading a large table, BigQuery DataFrames might be a
# better fit. Not all code paths will populate rows_iter._table, but
# if it's not populated that means we are working with a small result
# set.
if (table_ref := getattr(rows_iter, "_table", None)) is not None:
table = self.client.get_table(table_ref)
if (
isinstance((num_bytes := table.num_bytes), int)
and num_bytes > pandas_gbq.constants.BYTES_TO_RECOMMEND_BIGFRAMES
):
num_gib = num_bytes / pandas_gbq.constants.BYTES_IN_GIB
warnings.warn(
f"Recommendation: Your results are {num_gib:.1f} GiB. "
"Consider using BigQuery DataFrames "
"(https://cloud.google.com/bigquery/docs/bigquery-dataframes-introduction) "
"to process large results with pandas compatible APIs with transparent SQL "
"pushdown to BigQuery engine. This provides an opportunity to save on costs "
"and improve performance. "
"Please reach out to [email protected] with any "
"questions or concerns. To disable this message, run "
"warnings.simplefilter('ignore', category=pandas_gbq.exceptions.LargeResultsWarning)",
category=pandas_gbq.exceptions.LargeResultsWarning,
# user's code
# -> read_gbq
# -> run_query
# -> download_results
stacklevel=4,
)

try:
schema_fields = [field.to_api_repr() for field in rows_iter.schema]
conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
Expand Down Expand Up @@ -663,18 +694,25 @@ def read_gbq(
*,
col_order=None,
):
r"""Load data from Google BigQuery using google-cloud-python

The main method a user calls to execute a Query in Google BigQuery
and read results into a pandas DataFrame.
r"""Read data from Google BigQuery to a pandas DataFrame.

This method uses the Google Cloud client library to make requests to
Google BigQuery, documented `here
<https://googleapis.dev/python/bigquery/latest/index.html>`__.
Run a SQL query in BigQuery or read directly from a table
the `Python client library for BigQuery
<https://cloud.google.com/python/docs/reference/bigquery/latest/index.html>`__
and for `BigQuery Storage
<https://cloud.google.com/python/docs/reference/bigquerystorage/latest>`__
to make API requests.

See the :ref:`How to authenticate with Google BigQuery <authentication>`
guide for authentication instructions.

.. note::
Consider using `BigQuery DataFrames
<https://cloud.google.com/bigquery/docs/dataframes-quickstart>`__ to
process large results with pandas compatible APIs that run in the
BigQuery SQL query engine. This provides an opportunity to save on
costs and improve performance.

Parameters
----------
query_or_table : str
Expand Down Expand Up @@ -1050,12 +1088,7 @@ def to_gbq(
)

if api_method == "default":
# Avoid using parquet if pandas doesn't support lossless conversions to
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per coverage report, this was dead code. Our minimum pandas version is beyond the one where this feature wasn't available.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you this is a helpful comment!

# parquet timestamp. See: https://stackoverflow.com/a/69758676/101923
if FEATURES.pandas_has_parquet_with_lossless_timestamp:
api_method = "load_parquet"
else:
api_method = "load_csv"
api_method = "load_parquet"

if chunksize is not None:
if api_method == "load_parquet":
Expand Down
99 changes: 75 additions & 24 deletions tests/unit/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@

import copy
import datetime
import re
from unittest import mock
import warnings

import google.api_core.exceptions
import google.cloud.bigquery
import google.cloud.bigquery.table
import numpy
import packaging.version
import pandas
from pandas import DataFrame
import pytest

from pandas_gbq import gbq
import pandas_gbq.constants
import pandas_gbq.exceptions
import pandas_gbq.features
from pandas_gbq.features import FEATURES

Expand Down Expand Up @@ -147,6 +152,62 @@ def test__transform_read_gbq_configuration_makes_copy(original, expected):
assert did_change == should_change


def test_GbqConnector_download_results_warns_for_large_tables(default_bigquery_client):
gbq._test_google_api_imports()
connector = _make_connector()
rows_iter = mock.create_autospec(
google.cloud.bigquery.table.RowIterator, instance=True
)
table = google.cloud.bigquery.Table.from_api_repr(
{
"tableReference": {
"projectId": "my-proj",
"datasetId": "my-dset",
"tableId": "my_tbl",
},
"numBytes": 2 * pandas_gbq.constants.BYTES_IN_GIB,
},
)
rows_iter._table = table
default_bigquery_client.get_table.reset_mock(side_effect=True)
default_bigquery_client.get_table.return_value = table

with pytest.warns(
pandas_gbq.exceptions.LargeResultsWarning,
match=re.escape("Your results are 2.0 GiB. Consider using BigQuery DataFrames"),
):
connector._download_results(rows_iter)


def test_GbqConnector_download_results_doesnt_warn_for_small_tables(
default_bigquery_client,
):
gbq._test_google_api_imports()
connector = _make_connector()
rows_iter = mock.create_autospec(
google.cloud.bigquery.table.RowIterator, instance=True
)
table = google.cloud.bigquery.Table.from_api_repr(
{
"tableReference": {
"projectId": "my-proj",
"datasetId": "my-dset",
"tableId": "my_tbl",
},
"numBytes": 999 * pandas_gbq.constants.BYTES_IN_MIB,
},
)
rows_iter._table = table
default_bigquery_client.get_table.reset_mock(side_effect=True)
default_bigquery_client.get_table.return_value = table

with warnings.catch_warnings():
warnings.simplefilter(
"error", category=pandas_gbq.exceptions.LargeResultsWarning
)
connector._download_results(rows_iter)


def test_GbqConnector_get_client_w_new_bq(mock_bigquery_client):
gbq._test_google_api_imports()
pytest.importorskip("google.api_core.client_info")
Expand Down Expand Up @@ -191,16 +252,13 @@ def test_to_gbq_with_chunksize_warns_deprecation(
api_method, warning_message, warning_type
):
with pytest.warns(warning_type, match=warning_message):
try:
gbq.to_gbq(
DataFrame([[1]]),
"dataset.tablename",
project_id="my-project",
api_method=api_method,
chunksize=100,
)
except gbq.TableCreationError:
pass
gbq.to_gbq(
DataFrame([[1]]),
"dataset.tablename",
project_id="my-project",
api_method=api_method,
chunksize=100,
)


@pytest.mark.parametrize(["verbose"], [(True,), (False,)])
Expand All @@ -211,15 +269,12 @@ def test_to_gbq_with_verbose_new_pandas_warns_deprecation(monkeypatch, verbose):
mock.PropertyMock(return_value=True),
)
with pytest.warns(FutureWarning, match="verbose is deprecated"):
try:
gbq.to_gbq(
DataFrame([[1]]),
"dataset.tablename",
project_id="my-project",
verbose=verbose,
)
except gbq.TableCreationError:
pass
gbq.to_gbq(
DataFrame([[1]]),
"dataset.tablename",
project_id="my-project",
verbose=verbose,
)


def test_to_gbq_with_private_key_raises_notimplementederror():
Expand All @@ -233,11 +288,7 @@ def test_to_gbq_with_private_key_raises_notimplementederror():


def test_to_gbq_doesnt_run_query(mock_bigquery_client):
try:
gbq.to_gbq(DataFrame([[1]]), "dataset.tablename", project_id="my-project")
except gbq.TableCreationError:
pass

gbq.to_gbq(DataFrame([[1]]), "dataset.tablename", project_id="my-project")
mock_bigquery_client.query.assert_not_called()


Expand Down
5 changes: 1 addition & 4 deletions tests/unit/test_to_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@
import pytest

from pandas_gbq import gbq
from pandas_gbq.features import FEATURES


@pytest.fixture
def expected_load_method(mock_bigquery_client):
if FEATURES.pandas_has_parquet_with_lossless_timestamp:
return mock_bigquery_client.load_table_from_dataframe
return mock_bigquery_client.load_table_from_file
return mock_bigquery_client.load_table_from_dataframe


def test_to_gbq_create_dataset_with_location(mock_bigquery_client):
Expand Down