Skip to content

BUG: fix AttributeError with BQ Storage API to download empty results #310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Changelog
=========

.. _changelog-0.13.1:

0.13.1 / 2020-02-13
-------------------

- Fix ``AttributeError`` with BQ Storage API to download empty results.
(:issue:`299`)


.. _changelog-0.13.0:

0.13.0 / 2019-12-12
Expand Down
30 changes: 7 additions & 23 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
try:
# The BigQuery Storage API client is an optional dependency. It is only
# required when use_bqstorage_api=True.
from google.cloud import bigquery_storage
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage = None
bigquery_storage_v1beta1 = None

from pandas_gbq.exceptions import AccessDenied
import pandas_gbq.schema
import pandas_gbq.timestamp


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -564,7 +566,7 @@ def _download_results(
df = _cast_empty_df_dtypes(schema_fields, df)

# Ensure any TIMESTAMP columns are tz-aware.
df = _localize_df(schema_fields, df)
df = pandas_gbq.timestamp.localize_df(df, schema_fields)

logger.debug("Got {} rows.\n".format(rows_iter.total_rows))
return df
Expand Down Expand Up @@ -784,29 +786,11 @@ def _cast_empty_df_dtypes(schema_fields, df):
return df


def _localize_df(schema_fields, df):
"""Localize any TIMESTAMP columns to tz-aware type.

In pandas versions before 0.24.0, DatetimeTZDtype cannot be used as the
dtype in Series/DataFrame construction, so localize those columns after
the DataFrame is constructed.
"""
for field in schema_fields:
column = str(field["name"])
if field["mode"].upper() == "REPEATED":
continue

if field["type"].upper() == "TIMESTAMP" and df[column].dt.tz is None:
df[column] = df[column].dt.tz_localize("UTC")

return df


def _make_bqstorage_client(use_bqstorage_api, credentials):
if not use_bqstorage_api:
return None

if bigquery_storage is None:
if bigquery_storage_v1beta1 is None:
raise ImportError(
"Install the google-cloud-bigquery-storage and fastavro/pyarrow "
"packages to use the BigQuery Storage API."
Expand All @@ -818,7 +802,7 @@ def _make_bqstorage_client(use_bqstorage_api, credentials):
client_info = google.api_core.gapic_v1.client_info.ClientInfo(
user_agent="pandas-{}".format(pandas.__version__)
)
return bigquery_storage.BigQueryStorageClient(
return bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=credentials, client_info=client_info
)

Expand Down
40 changes: 40 additions & 0 deletions pandas_gbq/timestamp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Helpers for working with TIMESTAMP data type.

Private module.
"""


def localize_df(df, schema_fields):
"""Localize any TIMESTAMP columns to tz-aware type.

In pandas versions before 0.24.0, DatetimeTZDtype cannot be used as the
dtype in Series/DataFrame construction, so localize those columns after
the DataFrame is constructed.

Parameters
----------
schema_fields: sequence of dict
BigQuery schema in parsed JSON data format.
df: pandaas.DataFrame
DataFrame in which to localize TIMESTAMP columns.


Returns
-------
pandas.DataFrame
DataFrame with localized TIMESTAMP columns.
"""
if len(df.index) == 0:
# If there are no rows, there is nothing to do.
# Fix for https://github.com/pydata/pandas-gbq/issues/299
return df

for field in schema_fields:
column = str(field["name"])
if "mode" in field and field["mode"].upper() == "REPEATED":
continue

if field["type"].upper() == "TIMESTAMP" and df[column].dt.tz is None:
df[column] = df[column].dt.tz_localize("UTC")

return df
78 changes: 78 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import google.oauth2.service_account
import pytest


@pytest.fixture(params=["env"])
def project(request, project_id):
if request.param == "env":
return project_id
elif request.param == "none":
return None


@pytest.fixture()
def credentials(private_key_path):
return google.oauth2.service_account.Credentials.from_service_account_file(
private_key_path
)


@pytest.fixture()
def gbq_connector(project, credentials):
from pandas_gbq import gbq

return gbq.GbqConnector(project, credentials=credentials)


@pytest.fixture()
def random_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset = bigquery.Dataset(dataset_ref)
bigquery_client.create_dataset(dataset)
return dataset


@pytest.fixture()
def tokyo_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "asia-northeast1"
bigquery_client.create_dataset(dataset)
return random_dataset_id


@pytest.fixture()
def tokyo_table(bigquery_client, tokyo_dataset):
table_id = "tokyo_table"
# Create a random table using DDL.
# https://github.com/GoogleCloudPlatform/golang-samples/blob/2ab2c6b79a1ea3d71d8f91609b57a8fbde07ae5d/bigquery/snippets/snippet.go#L739
bigquery_client.query(
"""CREATE TABLE {}.{}
AS SELECT
2000 + CAST(18 * RAND() as INT64) as year,
IF(RAND() > 0.5,"foo","bar") as token
FROM UNNEST(GENERATE_ARRAY(0,5,1)) as r
""".format(
tokyo_dataset, table_id
),
location="asia-northeast1",
).result()
return table_id


@pytest.fixture()
def gbq_dataset(project, credentials):
from pandas_gbq import gbq

return gbq._Dataset(project, credentials=credentials)


@pytest.fixture()
def gbq_table(project, credentials, random_dataset_id):
from pandas_gbq import gbq

return gbq._Table(project, random_dataset_id, credentials=credentials)
112 changes: 0 additions & 112 deletions tests/system/test_gbq.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# -*- coding: utf-8 -*-

import sys
import uuid
from datetime import datetime

import google.oauth2.service_account
import numpy as np
import pandas
import pandas.api.types
Expand All @@ -28,76 +26,6 @@ def test_imports():
gbq._test_google_api_imports()


@pytest.fixture(params=["env"])
def project(request, project_id):
if request.param == "env":
return project_id
elif request.param == "none":
return None


@pytest.fixture()
def credentials(private_key_path):
return google.oauth2.service_account.Credentials.from_service_account_file(
private_key_path
)


@pytest.fixture()
def gbq_connector(project, credentials):
return gbq.GbqConnector(project, credentials=credentials)


@pytest.fixture()
def random_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset = bigquery.Dataset(dataset_ref)
bigquery_client.create_dataset(dataset)
return dataset


@pytest.fixture()
def tokyo_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "asia-northeast1"
bigquery_client.create_dataset(dataset)
return random_dataset_id


@pytest.fixture()
def tokyo_table(bigquery_client, tokyo_dataset):
table_id = "tokyo_table"
# Create a random table using DDL.
# https://github.com/GoogleCloudPlatform/golang-samples/blob/2ab2c6b79a1ea3d71d8f91609b57a8fbde07ae5d/bigquery/snippets/snippet.go#L739
bigquery_client.query(
"""CREATE TABLE {}.{}
AS SELECT
2000 + CAST(18 * RAND() as INT64) as year,
IF(RAND() > 0.5,"foo","bar") as token
FROM UNNEST(GENERATE_ARRAY(0,5,1)) as r
""".format(
tokyo_dataset, table_id
),
location="asia-northeast1",
).result()
return table_id


@pytest.fixture()
def gbq_dataset(project, credentials):
return gbq._Dataset(project, credentials=credentials)


@pytest.fixture()
def gbq_table(project, credentials, random_dataset_id):
return gbq._Table(project, random_dataset_id, credentials=credentials)


def make_mixed_dataframe_v2(test_size):
# create df to test for all BQ datatypes except RECORD
bools = np.random.randint(2, size=(1, test_size)).astype(bool)
Expand Down Expand Up @@ -600,9 +528,6 @@ def test_zero_rows(self, project_id):
empty_columns,
columns=["name", "number", "is_hurricane", "iso_time"],
)
expected_result["iso_time"] = expected_result[
"iso_time"
].dt.tz_localize("UTC")
tm.assert_frame_equal(df, expected_result, check_index_type=False)

def test_one_row_one_column(self, project_id):
Expand Down Expand Up @@ -917,43 +842,6 @@ def test_tokyo(self, tokyo_dataset, tokyo_table, project_id):
assert df["max_year"][0] >= 2000


@pytest.mark.slow(reason="Large query for BQ Storage API tests.")
def test_read_gbq_w_bqstorage_api(credentials, random_dataset):
pytest.importorskip("google.cloud.bigquery_storage")
df = gbq.read_gbq(
"""
SELECT
total_amount,
passenger_count,
trip_distance
FROM `bigquery-public-data.new_york_taxi_trips.tlc_green_trips_2014`
-- Select non-null rows for no-copy conversion from Arrow to pandas.
WHERE total_amount IS NOT NULL
AND passenger_count IS NOT NULL
AND trip_distance IS NOT NULL
LIMIT 10000000
""",
use_bqstorage_api=True,
credentials=credentials,
configuration={
"query": {
"destinationTable": {
"projectId": random_dataset.project,
"datasetId": random_dataset.dataset_id,
"tableId": "".join(
[
"test_read_gbq_w_bqstorage_api_",
str(uuid.uuid4()).replace("-", "_"),
]
),
},
"writeDisposition": "WRITE_TRUNCATE",
}
},
)
assert len(df) == 10000000


class TestToGBQIntegration(object):
@pytest.fixture(autouse=True, scope="function")
def setup(self, project, credentials, random_dataset_id):
Expand Down
Loading