Skip to content

Commit f4c6499

Browse files
committed
BUG: fix AttributeError with BQ Storage API to download empty results
Refactors timestamp helpers to their own file to help reduce the size of the gbq module.
1 parent 2897b81 commit f4c6499

File tree

7 files changed

+294
-132
lines changed

7 files changed

+294
-132
lines changed

docs/source/changelog.rst

+9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
Changelog
22
=========
33

4+
.. _changelog-0.13.1:
5+
6+
0.13.1 / 2020-02-12
7+
-------------------
8+
9+
- Fix ``AttributeError`` with BQ Storage API to download empty results.
10+
(:issue:`299`)
11+
12+
413
.. _changelog-0.13.0:
514

615
0.13.0 / 2019-12-12

pandas_gbq/gbq.py

+7-23
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
try:
99
# The BigQuery Storage API client is an optional dependency. It is only
1010
# required when use_bqstorage_api=True.
11-
from google.cloud import bigquery_storage
11+
from google.cloud import bigquery_storage_v1beta1
1212
except ImportError: # pragma: NO COVER
13-
bigquery_storage = None
13+
bigquery_storage_v1beta1 = None
1414

1515
from pandas_gbq.exceptions import AccessDenied
1616
import pandas_gbq.schema
17+
import pandas_gbq.timestamp
18+
1719

1820
logger = logging.getLogger(__name__)
1921

@@ -561,7 +563,7 @@ def _download_results(
561563
df = _cast_empty_df_dtypes(schema_fields, df)
562564

563565
# Ensure any TIMESTAMP columns are tz-aware.
564-
df = _localize_df(schema_fields, df)
566+
df = pandas_gbq.timestamp.localize_df(df, schema_fields)
565567

566568
logger.debug("Got {} rows.\n".format(rows_iter.total_rows))
567569
return df
@@ -781,29 +783,11 @@ def _cast_empty_df_dtypes(schema_fields, df):
781783
return df
782784

783785

784-
def _localize_df(schema_fields, df):
785-
"""Localize any TIMESTAMP columns to tz-aware type.
786-
787-
In pandas versions before 0.24.0, DatetimeTZDtype cannot be used as the
788-
dtype in Series/DataFrame construction, so localize those columns after
789-
the DataFrame is constructed.
790-
"""
791-
for field in schema_fields:
792-
column = str(field["name"])
793-
if field["mode"].upper() == "REPEATED":
794-
continue
795-
796-
if field["type"].upper() == "TIMESTAMP" and df[column].dt.tz is None:
797-
df[column] = df[column].dt.tz_localize("UTC")
798-
799-
return df
800-
801-
802786
def _make_bqstorage_client(use_bqstorage_api, credentials):
803787
if not use_bqstorage_api:
804788
return None
805789

806-
if bigquery_storage is None:
790+
if bigquery_storage_v1beta1 is None:
807791
raise ImportError(
808792
"Install the google-cloud-bigquery-storage and fastavro/pyarrow "
809793
"packages to use the BigQuery Storage API."
@@ -815,7 +799,7 @@ def _make_bqstorage_client(use_bqstorage_api, credentials):
815799
client_info = google.api_core.gapic_v1.client_info.ClientInfo(
816800
user_agent="pandas-{}".format(pandas.__version__)
817801
)
818-
return bigquery_storage.BigQueryStorageClient(
802+
return bigquery_storage_v1beta1.BigQueryStorageClient(
819803
credentials=credentials, client_info=client_info
820804
)
821805

pandas_gbq/timestamp.py

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Helpers for working with TIMESTAMP data type.
2+
3+
Private module.
4+
"""
5+
6+
7+
def localize_df(df, schema_fields):
8+
"""Localize any TIMESTAMP columns to tz-aware type.
9+
10+
In pandas versions before 0.24.0, DatetimeTZDtype cannot be used as the
11+
dtype in Series/DataFrame construction, so localize those columns after
12+
the DataFrame is constructed.
13+
14+
Parameters
15+
----------
16+
schema_fields: sequence of dict
17+
BigQuery schema in parsed JSON data format.
18+
df: pandaas.DataFrame
19+
DataFrame in which to localize TIMESTAMP columns.
20+
21+
22+
Returns
23+
-------
24+
pandas.DataFrame
25+
DataFrame with localized TIMESTAMP columns.
26+
"""
27+
if len(df.index) == 0:
28+
# If there are no rows, there is nothing to do.
29+
# Fix for https://github.com/pydata/pandas-gbq/issues/299
30+
return df
31+
32+
for field in schema_fields:
33+
column = str(field["name"])
34+
if "mode" in field and field["mode"].upper() == "REPEATED":
35+
continue
36+
37+
if field["type"].upper() == "TIMESTAMP" and df[column].dt.tz is None:
38+
df[column] = df[column].dt.tz_localize("UTC")
39+
40+
return df

tests/system/conftest.py

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
2+
import google.oauth2.service_account
3+
import pytest
4+
5+
6+
@pytest.fixture(params=["env"])
7+
def project(request, project_id):
8+
if request.param == "env":
9+
return project_id
10+
elif request.param == "none":
11+
return None
12+
13+
14+
@pytest.fixture()
15+
def credentials(private_key_path):
16+
return google.oauth2.service_account.Credentials.from_service_account_file(
17+
private_key_path
18+
)
19+
20+
21+
@pytest.fixture()
22+
def gbq_connector(project, credentials):
23+
return gbq.GbqConnector(project, credentials=credentials)
24+
25+
26+
@pytest.fixture()
27+
def random_dataset(bigquery_client, random_dataset_id):
28+
from google.cloud import bigquery
29+
30+
dataset_ref = bigquery_client.dataset(random_dataset_id)
31+
dataset = bigquery.Dataset(dataset_ref)
32+
bigquery_client.create_dataset(dataset)
33+
return dataset
34+
35+
36+
@pytest.fixture()
37+
def tokyo_dataset(bigquery_client, random_dataset_id):
38+
from google.cloud import bigquery
39+
40+
dataset_ref = bigquery_client.dataset(random_dataset_id)
41+
dataset = bigquery.Dataset(dataset_ref)
42+
dataset.location = "asia-northeast1"
43+
bigquery_client.create_dataset(dataset)
44+
return random_dataset_id
45+
46+
47+
@pytest.fixture()
48+
def tokyo_table(bigquery_client, tokyo_dataset):
49+
table_id = "tokyo_table"
50+
# Create a random table using DDL.
51+
# https://github.com/GoogleCloudPlatform/golang-samples/blob/2ab2c6b79a1ea3d71d8f91609b57a8fbde07ae5d/bigquery/snippets/snippet.go#L739
52+
bigquery_client.query(
53+
"""CREATE TABLE {}.{}
54+
AS SELECT
55+
2000 + CAST(18 * RAND() as INT64) as year,
56+
IF(RAND() > 0.5,"foo","bar") as token
57+
FROM UNNEST(GENERATE_ARRAY(0,5,1)) as r
58+
""".format(
59+
tokyo_dataset, table_id
60+
),
61+
location="asia-northeast1",
62+
).result()
63+
return table_id
64+
65+
66+
@pytest.fixture()
67+
def gbq_dataset(project, credentials):
68+
from pandas_gbq import gbq
69+
70+
return gbq._Dataset(project, credentials=credentials)
71+
72+
73+
@pytest.fixture()
74+
def gbq_table(project, credentials, random_dataset_id):
75+
from pandas_gbq import gbq
76+
77+
return gbq._Table(project, random_dataset_id, credentials=credentials)

tests/system/test_gbq.py

-109
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
# -*- coding: utf-8 -*-
22

33
import sys
4-
import uuid
54
from datetime import datetime
65

7-
import google.oauth2.service_account
86
import numpy as np
97
import pandas
108
import pandas.api.types
@@ -28,76 +26,6 @@ def test_imports():
2826
gbq._test_google_api_imports()
2927

3028

31-
@pytest.fixture(params=["env"])
32-
def project(request, project_id):
33-
if request.param == "env":
34-
return project_id
35-
elif request.param == "none":
36-
return None
37-
38-
39-
@pytest.fixture()
40-
def credentials(private_key_path):
41-
return google.oauth2.service_account.Credentials.from_service_account_file(
42-
private_key_path
43-
)
44-
45-
46-
@pytest.fixture()
47-
def gbq_connector(project, credentials):
48-
return gbq.GbqConnector(project, credentials=credentials)
49-
50-
51-
@pytest.fixture()
52-
def random_dataset(bigquery_client, random_dataset_id):
53-
from google.cloud import bigquery
54-
55-
dataset_ref = bigquery_client.dataset(random_dataset_id)
56-
dataset = bigquery.Dataset(dataset_ref)
57-
bigquery_client.create_dataset(dataset)
58-
return dataset
59-
60-
61-
@pytest.fixture()
62-
def tokyo_dataset(bigquery_client, random_dataset_id):
63-
from google.cloud import bigquery
64-
65-
dataset_ref = bigquery_client.dataset(random_dataset_id)
66-
dataset = bigquery.Dataset(dataset_ref)
67-
dataset.location = "asia-northeast1"
68-
bigquery_client.create_dataset(dataset)
69-
return random_dataset_id
70-
71-
72-
@pytest.fixture()
73-
def tokyo_table(bigquery_client, tokyo_dataset):
74-
table_id = "tokyo_table"
75-
# Create a random table using DDL.
76-
# https://github.com/GoogleCloudPlatform/golang-samples/blob/2ab2c6b79a1ea3d71d8f91609b57a8fbde07ae5d/bigquery/snippets/snippet.go#L739
77-
bigquery_client.query(
78-
"""CREATE TABLE {}.{}
79-
AS SELECT
80-
2000 + CAST(18 * RAND() as INT64) as year,
81-
IF(RAND() > 0.5,"foo","bar") as token
82-
FROM UNNEST(GENERATE_ARRAY(0,5,1)) as r
83-
""".format(
84-
tokyo_dataset, table_id
85-
),
86-
location="asia-northeast1",
87-
).result()
88-
return table_id
89-
90-
91-
@pytest.fixture()
92-
def gbq_dataset(project, credentials):
93-
return gbq._Dataset(project, credentials=credentials)
94-
95-
96-
@pytest.fixture()
97-
def gbq_table(project, credentials, random_dataset_id):
98-
return gbq._Table(project, random_dataset_id, credentials=credentials)
99-
100-
10129
def make_mixed_dataframe_v2(test_size):
10230
# create df to test for all BQ datatypes except RECORD
10331
bools = np.random.randint(2, size=(1, test_size)).astype(bool)
@@ -898,43 +826,6 @@ def test_tokyo(self, tokyo_dataset, tokyo_table, project_id):
898826
assert df["max_year"][0] >= 2000
899827

900828

901-
@pytest.mark.slow(reason="Large query for BQ Storage API tests.")
902-
def test_read_gbq_w_bqstorage_api(credentials, random_dataset):
903-
pytest.importorskip("google.cloud.bigquery_storage")
904-
df = gbq.read_gbq(
905-
"""
906-
SELECT
907-
total_amount,
908-
passenger_count,
909-
trip_distance
910-
FROM `bigquery-public-data.new_york_taxi_trips.tlc_green_trips_2014`
911-
-- Select non-null rows for no-copy conversion from Arrow to pandas.
912-
WHERE total_amount IS NOT NULL
913-
AND passenger_count IS NOT NULL
914-
AND trip_distance IS NOT NULL
915-
LIMIT 10000000
916-
""",
917-
use_bqstorage_api=True,
918-
credentials=credentials,
919-
configuration={
920-
"query": {
921-
"destinationTable": {
922-
"projectId": random_dataset.project,
923-
"datasetId": random_dataset.dataset_id,
924-
"tableId": "".join(
925-
[
926-
"test_read_gbq_w_bqstorage_api_",
927-
str(uuid.uuid4()).replace("-", "_"),
928-
]
929-
),
930-
},
931-
"writeDisposition": "WRITE_TRUNCATE",
932-
}
933-
},
934-
)
935-
assert len(df) == 10000000
936-
937-
938829
class TestToGBQIntegration(object):
939830
@pytest.fixture(autouse=True, scope="function")
940831
def setup(self, project, credentials, random_dataset_id):

0 commit comments

Comments
 (0)