Skip to content

Commit 7cec25f

Browse files
mremesMatti
authored and
Matti
committed
BUG: Add support to replace partitions in date-partitioned tables (#47)
1 parent 964a19b commit 7cec25f

File tree

3 files changed

+117
-31
lines changed

3 files changed

+117
-31
lines changed

docs/source/changelog.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ Changelog
77
- Drop support for Python 3.4 (:issue:`40`)
88
- The dataframe passed to ```.to_gbq(...., if_exists='append')``` needs to contain only a subset of the fields in the BigQuery schema. (:issue:`24`)
99
- Use the `google-auth <https://google-auth.readthedocs.io/en/latest/>`__ library for authentication because oauth2client is deprecated. (:issue:`39`)
10-
- ``read_gbq`` now has a ``auth_local_webserver`` boolean argument for controlling whether to use web server or console flow when getting user credentials. Replaces `--noauth_local_webserver` command line argument. (:issue:`35`)
1110
- ``read_gbq`` now displays the BigQuery Job ID and standard price in verbose output. (:issue:`70` and :issue:`71`)
11+
- Add support to replace partitions in `date-partitioned tables <https://cloud.google.com/bigquery/docs/partitioned-tables>`__. Partition must be specified with a partition decorator separator (``$``). (:issue:`47`)
1212

1313
0.1.6 / 2017-05-03
1414
------------------

pandas_gbq/gbq.py

+27-30
Original file line numberDiff line numberDiff line change
@@ -755,27 +755,7 @@ def schema_is_subset(self, dataset_id, table_id, schema):
755755
fields_local = schema['fields']
756756

757757
return all(field in fields_remote for field in fields_local)
758-
759-
def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
760-
delay = 0
761-
762-
# Changes to table schema may take up to 2 minutes as of May 2015 See
763-
# `Issue 191
764-
# <https://code.google.com/p/google-bigquery/issues/detail?id=191>`__
765-
# Compare previous schema with new schema to determine if there should
766-
# be a 120 second delay
767-
768-
if not self.verify_schema(dataset_id, table_id, table_schema):
769-
self._print('The existing table has a different schema. Please '
770-
'wait 2 minutes. See Google BigQuery issue #191')
771-
delay = 120
772-
773-
table = _Table(self.project_id, dataset_id,
774-
private_key=self.private_key)
775-
table.delete(table_id)
776-
table.create(table_id, table_schema)
777-
sleep(delay)
778-
758+
779759

780760
def _parse_data(schema, rows):
781761
# see:
@@ -1040,17 +1020,32 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
10401020
"already exists. "
10411021
"Change the if_exists parameter to "
10421022
"append or replace data.")
1043-
elif if_exists == 'replace':
1044-
connector.delete_and_recreate_table(
1045-
dataset_id, table_id, table_schema)
1046-
elif if_exists == 'append':
1047-
if not connector.schema_is_subset(dataset_id,
1048-
table_id,
1023+
else:
1024+
delay = 0
1025+
if not connector.schema_is_subset(dataset_id,
1026+
table_id,
10491027
table_schema):
1050-
raise InvalidSchema("Please verify that the structure and "
1051-
"data types in the DataFrame match the "
1052-
"schema of the destination table.")
1028+
if if_exists == 'append' \
1029+
or table.partition_decorator in table_id:
1030+
raise InvalidSchema("Please verify that the structure "
1031+
"and data types in the DataFrame "
1032+
"match the schema of the destination "
1033+
"table.")
1034+
elif if_exists == 'replace':
1035+
table._print('The existing table has a different schema. '
1036+
'Please wait 2 minutes. See Google BigQuery '
1037+
'issue #191')
1038+
delay = 120
1039+
if if_exists == 'replace':
1040+
table.delete(table_id)
1041+
if table.partition_decorator not in table_id:
1042+
table.create(table_id, table_schema)
1043+
sleep(delay)
1044+
10531045
else:
1046+
if table.partition_decorator in table_id:
1047+
raise TableCreationError("Cannot create a partition without the "
1048+
"main table.")
10541049
table.create(table_id, table_schema)
10551050

10561051
connector.load_data(dataframe, dataset_id, table_id, chunksize)
@@ -1095,6 +1090,8 @@ def _generate_bq_schema(df, default_type='STRING'):
10951090

10961091
class _Table(GbqConnector):
10971092

1093+
partition_decorator = '$'
1094+
10981095
def __init__(self, project_id, dataset_id, reauth=False, verbose=False,
10991096
private_key=None):
11001097
try:

pandas_gbq/tests/test_gbq.py

+89
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121

2222
TABLE_ID = 'new_test'
23+
DPT_TABLE_ID = 'dpt_test'
2324

2425

2526
def _skip_if_no_project_id():
@@ -933,6 +934,8 @@ def setup_method(self, method):
933934
private_key=_get_private_key_path())
934935
self.destination_table = "{0}{1}.{2}".format(self.dataset_prefix, "1",
935936
TABLE_ID)
937+
self.destination_dpt = "{0}{1}.{2}".format(self.dataset_prefix, "1",
938+
DPT_TABLE_ID)
936939
self.dataset.create(self.dataset_prefix + "1")
937940

938941
@classmethod
@@ -1056,6 +1059,92 @@ def test_upload_data_if_table_exists_replace(self):
10561059
private_key=_get_private_key_path())
10571060
assert result['num_rows'][0] == 5
10581061

1062+
def test_upload_data_if_table_exists_replace_dpt_partition(self):
1063+
# Issue #47; tests that 'replace' is done by the subsequent call
1064+
test_dpt_suffix = "20170101"
1065+
test_size = 10
1066+
df = make_mixed_dataframe_v2(test_size)
1067+
df_different_schema = tm.makeMixedDataFrame()
1068+
1069+
dpt_partition = self.destination_dpt + '$' + test_dpt_suffix
1070+
self.table.create(self.destination_dpt, gbq._generate_bq_schema(df))
1071+
gbq.to_gbq(df, dpt_partition, _get_project_id(),
1072+
chunksize=10000, private_key=_get_private_key_path())
1073+
1074+
gbq.to_gbq(df_different_schema, dpt_partition,
1075+
_get_project_id(), if_exists='replace',
1076+
private_key=_get_private_key_path())
1077+
1078+
sleep(30)
1079+
1080+
# Test whole table
1081+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1082+
.format(self.destination_dpt),
1083+
project_id=_get_project_id(),
1084+
private_key=_get_private_key_path())
1085+
assert result0['num_rows'][0] == 5
1086+
1087+
# Test destination partition
1088+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1089+
.format(dpt_partition),
1090+
project_id=_get_project_id(),
1091+
private_key=_get_private_key_path())
1092+
assert result1['num_rows'][0] == 5
1093+
1094+
def test_upload_data_if_table_exists_append_dpt_partition(self):
1095+
# Issue #47; tests that 'append' appends to an existing partition
1096+
test_dpt_suffix = "20170101"
1097+
test_size = 10
1098+
df = make_mixed_dataframe_v2(test_size)
1099+
1100+
dpt_partition = self.destination_dpt + '$' + test_dpt_suffix
1101+
1102+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1103+
.format(dpt_partition),
1104+
project_id=_get_project_id(),
1105+
private_key=_get_private_key_path())
1106+
assert result0['num_rows'][0] == 5
1107+
1108+
gbq.to_gbq(df, dpt_partition,
1109+
_get_project_id(), if_exists='append',
1110+
private_key=_get_private_key_path())
1111+
1112+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1113+
.format(dpt_partition),
1114+
project_id=_get_project_id(),
1115+
private_key=_get_private_key_path())
1116+
1117+
assert result1['num_rows'][0] == 15
1118+
1119+
sleep(30)
1120+
1121+
# Test whole table
1122+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1123+
.format(self.destination_dpt),
1124+
project_id=_get_project_id(),
1125+
private_key=_get_private_key_path())
1126+
assert result0['num_rows'][0] == 5
1127+
1128+
# Test destination partition
1129+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1130+
.format(dpt_partition),
1131+
project_id=_get_project_id(),
1132+
private_key=_get_private_key_path())
1133+
assert result1['num_rows'][0] == 10
1134+
1135+
def test_table_creation_error_raised_when_dpt_does_not_exist(self):
1136+
test_dpt_suffix = "20170101"
1137+
test_id = "18"
1138+
test_size = 10
1139+
df = make_mixed_dataframe_v2(test_size)
1140+
dpt_partition = 'dataset.foobar' + '$' + test_dpt_suffix
1141+
1142+
with pytest.raises(gbq.TableCreationError):
1143+
gbq.to_gbq(df,
1144+
dpt_partition,
1145+
project_id=_get_project_id(),
1146+
private_key=_get_private_key_path())
1147+
10591148
def test_upload_data_if_table_exists_raises_value_error(self):
10601149
test_id = "4"
10611150
test_size = 10

0 commit comments

Comments
 (0)