Skip to content

Commit c7b22da

Browse files
committed
BUG: Add support to replace partitions in date-partitioned tables (#47)
1 parent 78cdf9a commit c7b22da

File tree

3 files changed

+102
-10
lines changed

3 files changed

+102
-10
lines changed

docs/source/changelog.rst

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Changelog
88
- The dataframe passed to ```.to_gbq(...., if_exists='append')``` needs to contain only a subset of the fields in the BigQuery schema. (:issue:`24`)
99
- Use the `google-auth <https://google-auth.readthedocs.io/en/latest/>`__ library for authentication because oauth2client is deprecated. (:issue:`39`)
1010
- ``read_gbq`` now has a ``auth_local_webserver`` boolean argument for controlling whether to use web server or console flow when getting user credentials. Replaces `--noauth_local_webserver` command line argument (:issue:`35`)
11+
- Add support to replace partitions in `date-partitioned tables <https://cloud.google.com/bigquery/docs/partitioned-tables>`__. Partition must be specified with a partition decorator separator (``$``). (:issue:`47`)
1112

1213
0.1.6 / 2017-05-03
1314
------------------

pandas_gbq/gbq.py

+25-10
Original file line numberDiff line numberDiff line change
@@ -1032,17 +1032,30 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
10321032
"already exists. "
10331033
"Change the if_exists parameter to "
10341034
"append or replace data.")
1035-
elif if_exists == 'replace':
1036-
connector.delete_and_recreate_table(
1037-
dataset_id, table_id, table_schema)
1038-
elif if_exists == 'append':
1039-
if not connector.schema_is_subset(dataset_id,
1040-
table_id,
1041-
table_schema):
1042-
raise InvalidSchema("Please verify that the structure and "
1043-
"data types in the DataFrame match the "
1044-
"schema of the destination table.")
1035+
else:
1036+
delay = 0
1037+
if not connector.verify_schema(dataset_id, table_id, table_schema):
1038+
if if_exists == 'append' \
1039+
or table.partition_decorator in table_id:
1040+
raise InvalidSchema("Please verify that the structure "
1041+
"and data types in the DataFrame "
1042+
"match the schema of the destination "
1043+
"table.")
1044+
elif if_exists == 'replace':
1045+
table._print('The existing table has a different schema. '
1046+
'Please wait 2 minutes. See Google BigQuery '
1047+
'issue #191')
1048+
delay = 120
1049+
if if_exists == 'replace':
1050+
table.delete(table_id)
1051+
if table.partition_decorator not in table_id:
1052+
table.create(table_id, table_schema)
1053+
sleep(delay)
1054+
10451055
else:
1056+
if table.partition_decorator in table_id:
1057+
raise TableCreationError("Cannot create a partition without the "
1058+
"main table.")
10461059
table.create(table_id, table_schema)
10471060

10481061
connector.load_data(dataframe, dataset_id, table_id, chunksize)
@@ -1087,6 +1100,8 @@ def _generate_bq_schema(df, default_type='STRING'):
10871100

10881101
class _Table(GbqConnector):
10891102

1103+
partition_decorator = '$'
1104+
10901105
def __init__(self, project_id, dataset_id, reauth=False, verbose=False,
10911106
private_key=None):
10921107
try:

pandas_gbq/tests/test_gbq.py

+76
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121

2222
TABLE_ID = 'new_test'
23+
DPT_TABLE_ID = 'dpt_test'
2324

2425

2526
def _skip_if_no_project_id():
@@ -933,6 +934,8 @@ def setup_method(self, method):
933934
private_key=_get_private_key_path())
934935
self.destination_table = "{0}{1}.{2}".format(self.dataset_prefix, "1",
935936
TABLE_ID)
937+
self.destination_dpt = "{0}{1}.{2}".format(self.dataset_prefix, "1",
938+
DPT_TABLE_ID)
936939
self.dataset.create(self.dataset_prefix + "1")
937940

938941
@classmethod
@@ -1056,6 +1059,79 @@ def test_upload_data_if_table_exists_replace(self):
10561059
private_key=_get_private_key_path())
10571060
assert result['num_rows'][0] == 5
10581061

1062+
def test_upload_data_if_table_exists_replace_dpt_partition(self):
1063+
# Issue #47; tests that 'replace' is done by the subsequent call
1064+
test_dpt_suffix = "20170101"
1065+
test_size = 10
1066+
df = make_mixed_dataframe_v2(test_size)
1067+
df_different_schema = tm.makeMixedDataFrame()
1068+
1069+
dpt_partition = self.destination_dpt + '$' + test_dpt_suffix
1070+
1071+
gbq.to_gbq(df, dpt_partition, _get_project_id(),
1072+
chunksize=10000, private_key=_get_private_key_path())
1073+
1074+
gbq.to_gbq(df_different_schema, dpt_partition,
1075+
_get_project_id(), if_exists='replace',
1076+
private_key=_get_private_key_path())
1077+
1078+
sleep(30)
1079+
1080+
# Test whole table
1081+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1082+
.format(self.destination_dpt),
1083+
project_id=_get_project_id(),
1084+
private_key=_get_private_key_path())
1085+
assert result0['num_rows'][0] == 5
1086+
1087+
# Test destination partition
1088+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1089+
.format(dpt_partition),
1090+
project_id=_get_project_id(),
1091+
private_key=_get_private_key_path())
1092+
assert result1['num_rows'][0] == 5
1093+
1094+
def test_upload_data_if_table_exists_append_dpt_partition(self):
1095+
# Issue #47; tests that 'append' appends to an existing partition
1096+
test_dpt_suffix = "20170101"
1097+
test_size = 10
1098+
df = make_mixed_dataframe_v2(test_size)
1099+
1100+
dpt_partition = self.destination_dpt + '$' + test_dpt_suffix
1101+
1102+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1103+
.format(dpt_partition),
1104+
project_id=_get_project_id(),
1105+
private_key=_get_private_key_path())
1106+
assert result0['num_rows'][0] == 5
1107+
1108+
gbq.to_gbq(df, dpt_partition,
1109+
_get_project_id(), if_exists='append',
1110+
private_key=_get_private_key_path())
1111+
1112+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1113+
.format(dpt_partition),
1114+
project_id=_get_project_id(),
1115+
private_key=_get_private_key_path())
1116+
1117+
assert result1['num_rows'][0] == 15
1118+
1119+
sleep(30)
1120+
1121+
# Test whole table
1122+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1123+
.format(self.destination_dpt),
1124+
project_id=_get_project_id(),
1125+
private_key=_get_private_key_path())
1126+
assert result0['num_rows'][0] == 5
1127+
1128+
# Test destination partition
1129+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1130+
.format(dpt_partition),
1131+
project_id=_get_project_id(),
1132+
private_key=_get_private_key_path())
1133+
assert result1['num_rows'][0] == 10
1134+
10591135
def test_upload_data_if_table_exists_raises_value_error(self):
10601136
test_id = "4"
10611137
test_size = 10

0 commit comments

Comments
 (0)