Skip to content

BUG: Add support to replace partitions in date-partitioned tables #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Changelog
- Use the `google-auth <https://google-auth.readthedocs.io/en/latest/>`__ library for authentication because ``oauth2client`` is deprecated. (:issue:`39`)
- :func:`read_gbq` now has a ``auth_local_webserver`` boolean argument for controlling whether to use web server or console flow when getting user credentials. Replaces `--noauth_local_webserver` command line argument. (:issue:`35`)
- :func:`read_gbq` now displays the BigQuery Job ID and standard price in verbose output. (:issue:`70` and :issue:`71`)
- Add support to replace partitions in `date-partitioned tables <https://cloud.google.com/bigquery/docs/partitioned-tables>`__. Partition must be specified with a partition decorator separator (``$``). (:issue:`47`)

0.1.6 / 2017-05-03
------------------
Expand Down
61 changes: 31 additions & 30 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,26 +769,6 @@ def schema_is_subset(self, dataset_id, table_id, schema):

return all(field in fields_remote for field in fields_local)

def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
delay = 0

# Changes to table schema may take up to 2 minutes as of May 2015 See
# `Issue 191
# <https://code.google.com/p/google-bigquery/issues/detail?id=191>`__
# Compare previous schema with new schema to determine if there should
# be a 120 second delay

if not self.verify_schema(dataset_id, table_id, table_schema):
self._print('The existing table has a different schema. Please '
'wait 2 minutes. See Google BigQuery issue #191')
delay = 120

table = _Table(self.project_id, dataset_id,
private_key=self.private_key)
table.delete(table_id)
table.create(table_id, table_schema)
sleep(delay)


def _parse_data(schema, rows):
# see:
Expand Down Expand Up @@ -1053,19 +1033,33 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
"already exists. "
"Change the if_exists parameter to "
"append or replace data.")
elif if_exists == 'replace':
connector.delete_and_recreate_table(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function delete_and_recreate_table is unused in pandas-gbq code so the actual function can be removed as well

dataset_id, table_id, table_schema)
elif if_exists == 'append':
else:
delay = 0
if not connector.schema_is_subset(dataset_id,
table_id,
table_schema):
raise InvalidSchema("Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table.")
else:
table.create(table_id, table_schema)
if if_exists == 'append' \
or table.partition_decorator in table_id:
raise InvalidSchema("Please verify that the structure "
"and data types in the DataFrame "
"match the schema of the destination "
"table.")
elif if_exists == 'replace':
table._print('The existing table has a different schema. '
'Please wait 2 minutes. See Google BigQuery '
'issue #191')
delay = 120
if if_exists == 'replace':
table.delete(table_id)
if table.partition_decorator not in table_id:
table.create(table_id, table_schema)
sleep(delay)

else:
is_dpt = table.partition_decorator in table_id
table.create(table_id.split('$')[0],
table_schema,
date_partitioned=is_dpt)
connector.load_data(dataframe, dataset_id, table_id, chunksize)


Expand Down Expand Up @@ -1108,6 +1102,8 @@ def _generate_bq_schema(df, default_type='STRING'):

class _Table(GbqConnector):

partition_decorator = '$'

def __init__(self, project_id, dataset_id, reauth=False, verbose=False,
private_key=None):
try:
Expand Down Expand Up @@ -1144,7 +1140,7 @@ def exists(self, table_id):
else:
self.process_http_error(ex)

def create(self, table_id, schema):
def create(self, table_id, schema, date_partitioned=False):
""" Create a table in Google BigQuery given a table and schema

Parameters
Expand All @@ -1154,6 +1150,8 @@ def create(self, table_id, schema):
schema : str
Use the generate_bq_schema to generate your table schema from a
dataframe.
date_partitioned: boolean
Whether table is to be created as a date partitioned table.
"""

if self.exists(table_id):
Expand All @@ -1173,6 +1171,9 @@ def create(self, table_id, schema):
'datasetId': self.dataset_id
}
}
if date_partitioned:
# The only type supported is DAY
body.update({'timePartitioning': {'type': 'DAY'}})

try:
self.service.tables().insert(
Expand Down
114 changes: 113 additions & 1 deletion pandas_gbq/tests/test_gbq.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest

import re
from datetime import datetime
from datetime import datetime, timedelta
import pytz
from time import sleep
import os
Expand All @@ -20,6 +20,7 @@


TABLE_ID = 'new_test'
DPT_TABLE_ID = 'dpt_test'


def _skip_if_no_project_id():
Expand Down Expand Up @@ -946,6 +947,8 @@ def setup_method(self, method):
private_key=_get_private_key_path())
self.destination_table = "{0}{1}.{2}".format(self.dataset_prefix, "1",
TABLE_ID)
self.destination_dpt = "{0}{1}.{2}".format(self.dataset_prefix, "1",
DPT_TABLE_ID)
self.dataset.create(self.dataset_prefix + "1")

@classmethod
Expand Down Expand Up @@ -1080,6 +1083,115 @@ def test_upload_data_if_table_exists_replace(self):
private_key=_get_private_key_path())
assert result['num_rows'][0] == 5

def test_upload_data_if_table_exists_replace_dpt_partition(self):
# Issue #47; tests that 'replace' is done by the subsequent call
test_dpt_suffix = datetime.now().strftime('%Y%m%d')
test_size = 10
df = make_mixed_dataframe_v2(test_size)
df_different_schema = tm.makeMixedDataFrame()

dpt_partition = self.destination_dpt + '$' + test_dpt_suffix
self.table.create(DPT_TABLE_ID,
gbq._generate_bq_schema(df),
date_partitioned=True)

with pytest.raises(gbq.InvalidSchema):
gbq.to_gbq(df_different_schema, dpt_partition,
_get_project_id(), if_exists='replace',
private_key=_get_private_key_path())

gbq.to_gbq(df, dpt_partition, _get_project_id(),
private_key=_get_private_key_path(),
if_exists='replace')

# Test partition
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
.format(self.destination_dpt),
project_id=_get_project_id(),
private_key=_get_private_key_path())
assert result0['num_rows'][0] == 10

# Test whole table
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
.format(self.destination_dpt),
project_id=_get_project_id(),
private_key=_get_private_key_path())
assert result1['num_rows'][0] == 10

self.table.delete(DPT_TABLE_ID)

def test_upload_data_if_table_exists_append_dpt_partition(self):
# Issue #47; tests that 'append' appends to an existing partition
test_dpt_suffix = (datetime.now() +
timedelta(days=1)).strftime('%Y%m%d')
test_size = 10
df = make_mixed_dataframe_v2(test_size)

self.table.create(DPT_TABLE_ID,
gbq._generate_bq_schema(df),
date_partitioned=True)

dpt_partition = self.destination_dpt + '$' + test_dpt_suffix

result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
.format(self.destination_dpt),
project_id=_get_project_id(),
private_key=_get_private_key_path())

assert result0['num_rows'][0] == 0

gbq.to_gbq(df, dpt_partition,
_get_project_id(), if_exists='append',
private_key=_get_private_key_path())

result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
.format(dpt_partition),
project_id=_get_project_id(),
private_key=_get_private_key_path())

assert result1['num_rows'][0] == 10

gbq.to_gbq(df.head(), dpt_partition,
_get_project_id(), if_exists='append',
private_key=_get_private_key_path())

# Test destination partition
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
.format(dpt_partition),
project_id=_get_project_id(),
private_key=_get_private_key_path())

assert result1['num_rows'][0] == 15

def test_table_creation_error_raised_when_dpt_exists(self):
test_dpt_suffix = datetime.now().strftime('%Y%m%d')
test_size = 10
df = make_mixed_dataframe_v2(test_size)
table_name = 'foobar'
self.table.create(table_name,
gbq._generate_bq_schema(df),
date_partitioned=True)
dpt_partition = table_name + '$' + test_dpt_suffix
with pytest.raises(gbq.TableCreationError):
gbq.to_gbq(df,
self.table.dataset_id + '.' + dpt_partition,
project_id=_get_project_id(),
private_key=_get_private_key_path())
self.table.delete(table_name)
sleep(30)

def test_table_created_from_dpt_suffixed_id(self):
test_dpt_suffix = datetime.now().strftime('%Y%m%d')
test_size = 10
df = make_mixed_dataframe_v2(test_size)
table_name = 'foobar'
dpt_partition = table_name + '$' + test_dpt_suffix
gbq.to_gbq(df,
self.table.dataset_id + '.' + dpt_partition,
project_id=_get_project_id(),
private_key=_get_private_key_path())
self.table.delete(table_name)

def test_upload_data_if_table_exists_raises_value_error(self):
test_id = "4"
test_size = 10
Expand Down