Skip to content

Commit 58486e5

Browse files
committed
BUG: Add support to replace partitions in date-partitioned tables (googleapis#47)
1 parent 79c9067 commit 58486e5

File tree

3 files changed

+123
-28
lines changed

3 files changed

+123
-28
lines changed

docs/source/changelog.rst

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ Changelog
1414
- Use the `google-auth <https://google-auth.readthedocs.io/en/latest/>`__ library for authentication because ``oauth2client`` is deprecated. (:issue:`39`)
1515
- :func:`read_gbq` now has a ``auth_local_webserver`` boolean argument for controlling whether to use web server or console flow when getting user credentials. Replaces `--noauth_local_webserver` command line argument. (:issue:`35`)
1616
- :func:`read_gbq` now displays the BigQuery Job ID and standard price in verbose output. (:issue:`70` and :issue:`71`)
17+
- ``read_gbq`` now displays the BigQuery Job ID and standard price in verbose output. (:issue:`70` and :issue:`71`)
18+
- Add support to replace partitions in `date-partitioned tables <https://cloud.google.com/bigquery/docs/partitioned-tables>`__. Partition must be specified with a partition decorator separator (``$``). (:issue:`47`)
1719

1820
0.1.6 / 2017-05-03
1921
------------------

pandas_gbq/gbq.py

+30-28
Original file line numberDiff line numberDiff line change
@@ -769,26 +769,6 @@ def schema_is_subset(self, dataset_id, table_id, schema):
769769

770770
return all(field in fields_remote for field in fields_local)
771771

772-
def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
773-
delay = 0
774-
775-
# Changes to table schema may take up to 2 minutes as of May 2015 See
776-
# `Issue 191
777-
# <https://code.google.com/p/google-bigquery/issues/detail?id=191>`__
778-
# Compare previous schema with new schema to determine if there should
779-
# be a 120 second delay
780-
781-
if not self.verify_schema(dataset_id, table_id, table_schema):
782-
self._print('The existing table has a different schema. Please '
783-
'wait 2 minutes. See Google BigQuery issue #191')
784-
delay = 120
785-
786-
table = _Table(self.project_id, dataset_id,
787-
private_key=self.private_key)
788-
table.delete(table_id)
789-
table.create(table_id, table_schema)
790-
sleep(delay)
791-
792772

793773
def _parse_data(schema, rows):
794774
# see:
@@ -1053,17 +1033,32 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
10531033
"already exists. "
10541034
"Change the if_exists parameter to "
10551035
"append or replace data.")
1056-
elif if_exists == 'replace':
1057-
connector.delete_and_recreate_table(
1058-
dataset_id, table_id, table_schema)
1059-
elif if_exists == 'append':
1036+
else:
1037+
delay = 0
10601038
if not connector.schema_is_subset(dataset_id,
10611039
table_id,
10621040
table_schema):
1063-
raise InvalidSchema("Please verify that the structure and "
1064-
"data types in the DataFrame match the "
1065-
"schema of the destination table.")
1041+
if if_exists == 'append' \
1042+
or table.partition_decorator in table_id:
1043+
raise InvalidSchema("Please verify that the structure "
1044+
"and data types in the DataFrame "
1045+
"match the schema of the destination "
1046+
"table.")
1047+
elif if_exists == 'replace':
1048+
table._print('The existing table has a different schema. '
1049+
'Please wait 2 minutes. See Google BigQuery '
1050+
'issue #191')
1051+
delay = 120
1052+
if if_exists == 'replace':
1053+
table.delete(table_id)
1054+
if table.partition_decorator not in table_id:
1055+
table.create(table_id, table_schema)
1056+
sleep(delay)
1057+
10661058
else:
1059+
if table.partition_decorator in table_id:
1060+
raise TableCreationError("Cannot create a partition without the "
1061+
"main table.")
10671062
table.create(table_id, table_schema)
10681063

10691064
connector.load_data(dataframe, dataset_id, table_id, chunksize)
@@ -1108,6 +1103,8 @@ def _generate_bq_schema(df, default_type='STRING'):
11081103

11091104
class _Table(GbqConnector):
11101105

1106+
partition_decorator = '$'
1107+
11111108
def __init__(self, project_id, dataset_id, reauth=False, verbose=False,
11121109
private_key=None):
11131110
try:
@@ -1144,7 +1141,7 @@ def exists(self, table_id):
11441141
else:
11451142
self.process_http_error(ex)
11461143

1147-
def create(self, table_id, schema):
1144+
def create(self, table_id, schema, date_partitioned=False):
11481145
""" Create a table in Google BigQuery given a table and schema
11491146
11501147
Parameters
@@ -1154,6 +1151,8 @@ def create(self, table_id, schema):
11541151
schema : str
11551152
Use the generate_bq_schema to generate your table schema from a
11561153
dataframe.
1154+
date_partitioned: boolean
1155+
Whether table is to be created as a date partitioned table.
11571156
"""
11581157

11591158
if self.exists(table_id):
@@ -1173,6 +1172,9 @@ def create(self, table_id, schema):
11731172
'datasetId': self.dataset_id
11741173
}
11751174
}
1175+
if date_partitioned:
1176+
# The only type supported is DAY
1177+
body.update({'timePartitioning': {'type': 'DAY'}})
11761178

11771179
try:
11781180
self.service.tables().insert(

pandas_gbq/tests/test_gbq.py

+91
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121

2222
TABLE_ID = 'new_test'
23+
DPT_TABLE_ID = 'dpt_test'
2324

2425

2526
def _skip_if_no_project_id():
@@ -946,6 +947,8 @@ def setup_method(self, method):
946947
private_key=_get_private_key_path())
947948
self.destination_table = "{0}{1}.{2}".format(self.dataset_prefix, "1",
948949
TABLE_ID)
950+
self.destination_dpt = "{0}{1}.{2}".format(self.dataset_prefix, "1",
951+
DPT_TABLE_ID)
949952
self.dataset.create(self.dataset_prefix + "1")
950953

951954
@classmethod
@@ -1080,6 +1083,94 @@ def test_upload_data_if_table_exists_replace(self):
10801083
private_key=_get_private_key_path())
10811084
assert result['num_rows'][0] == 5
10821085

1086+
def test_upload_data_if_table_exists_replace_dpt_partition(self):
1087+
# Issue #47; tests that 'replace' is done by the subsequent call
1088+
test_dpt_suffix = "20170101"
1089+
test_size = 10
1090+
df = make_mixed_dataframe_v2(test_size)
1091+
df_different_schema = tm.makeMixedDataFrame()
1092+
1093+
dpt_partition = self.destination_dpt + '$' + test_dpt_suffix
1094+
self.table.create(DPT_TABLE_ID, gbq._generate_bq_schema(df))
1095+
gbq.to_gbq(df, dpt_partition, _get_project_id(),
1096+
private_key=_get_private_key_path())
1097+
1098+
gbq.to_gbq(df_different_schema, dpt_partition,
1099+
_get_project_id(), if_exists='replace',
1100+
private_key=_get_private_key_path())
1101+
1102+
sleep(30)
1103+
1104+
# Test whole table
1105+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1106+
.format(self.destination_dpt),
1107+
project_id=_get_project_id(),
1108+
private_key=_get_private_key_path())
1109+
assert result0['num_rows'][0] == 5
1110+
1111+
# Test destination partition
1112+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1113+
.format(dpt_partition),
1114+
project_id=_get_project_id(),
1115+
private_key=_get_private_key_path())
1116+
assert result1['num_rows'][0] == 5
1117+
self.table.delete(DPT_TABLE_ID)
1118+
1119+
def test_upload_data_if_table_exists_append_dpt_partition(self):
1120+
# Issue #47; tests that 'append' appends to an existing partition
1121+
test_dpt_suffix = "20170101"
1122+
test_size = 10
1123+
df = make_mixed_dataframe_v2(test_size)
1124+
1125+
dpt_partition = self.destination_dpt + '$' + test_dpt_suffix
1126+
self.table.create(DPT_TABLE_ID, gbq._generate_bq_schema(df))
1127+
1128+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1129+
.format(dpt_partition),
1130+
project_id=_get_project_id(),
1131+
private_key=_get_private_key_path())
1132+
assert result0['num_rows'][0] == 5
1133+
1134+
gbq.to_gbq(df, dpt_partition,
1135+
_get_project_id(), if_exists='append',
1136+
private_key=_get_private_key_path())
1137+
1138+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1139+
.format(dpt_partition),
1140+
project_id=_get_project_id(),
1141+
private_key=_get_private_key_path())
1142+
1143+
assert result1['num_rows'][0] == 15
1144+
1145+
sleep(30)
1146+
1147+
# Test whole table
1148+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1149+
.format(self.destination_dpt),
1150+
project_id=_get_project_id(),
1151+
private_key=_get_private_key_path())
1152+
assert result0['num_rows'][0] == 5
1153+
1154+
# Test destination partition
1155+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1156+
.format(dpt_partition),
1157+
project_id=_get_project_id(),
1158+
private_key=_get_private_key_path())
1159+
assert result1['num_rows'][0] == 10
1160+
self.table.delete(DPT_TABLE_ID)
1161+
1162+
def test_table_creation_error_raised_when_dpt_does_not_exist(self):
1163+
test_dpt_suffix = "20170101"
1164+
test_size = 10
1165+
df = make_mixed_dataframe_v2(test_size)
1166+
dpt_partition = 'dataset.foobar' + '$' + test_dpt_suffix
1167+
1168+
with pytest.raises(gbq.TableCreationError):
1169+
gbq.to_gbq(df,
1170+
dpt_partition,
1171+
project_id=_get_project_id(),
1172+
private_key=_get_private_key_path())
1173+
10831174
def test_upload_data_if_table_exists_raises_value_error(self):
10841175
test_id = "4"
10851176
test_size = 10

0 commit comments

Comments
 (0)