Skip to content

Commit 5a337ff

Browse files
committed
BUG: Add support to replace partitions in date-partitioned tables (#47)
1 parent 78cdf9a commit 5a337ff

File tree

3 files changed

+98
-10
lines changed

3 files changed

+98
-10
lines changed

docs/source/changelog.rst

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Changelog
88
- The dataframe passed to ```.to_gbq(...., if_exists='append')``` needs to contain only a subset of the fields in the BigQuery schema. (:issue:`24`)
99
- Use the `google-auth <https://google-auth.readthedocs.io/en/latest/>`__ library for authentication because oauth2client is deprecated. (:issue:`39`)
1010
- ``read_gbq`` now has a ``auth_local_webserver`` boolean argument for controlling whether to use web server or console flow when getting user credentials. Replaces `--noauth_local_webserver` command line argument (:issue:`35`)
11+
- Add support to replace partitions in date-partitioned tables (:issue:`47`)
1112

1213
0.1.6 / 2017-05-03
1314
------------------

pandas_gbq/gbq.py

+21-10
Original file line numberDiff line numberDiff line change
@@ -1032,17 +1032,26 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
10321032
"already exists. "
10331033
"Change the if_exists parameter to "
10341034
"append or replace data.")
1035-
elif if_exists == 'replace':
1036-
connector.delete_and_recreate_table(
1037-
dataset_id, table_id, table_schema)
1038-
elif if_exists == 'append':
1039-
if not connector.schema_is_subset(dataset_id,
1040-
table_id,
1041-
table_schema):
1042-
raise InvalidSchema("Please verify that the structure and "
1043-
"data types in the DataFrame match the "
1044-
"schema of the destination table.")
1035+
else:
1036+
delay = 0
1037+
if not connector.verify_schema(dataset_id, table_id, table_schema):
1038+
if if_exists == 'append' or table.partition_decorator in table_id:
1039+
raise InvalidSchema("Please verify that the structure and "
1040+
"data types in the DataFrame match the "
1041+
"schema of the destination table.")
1042+
elif if_exists == 'replace':
1043+
table._print('The existing table has a different schema. Please '
1044+
'wait 2 minutes. See Google BigQuery issue #191')
1045+
delay = 120
1046+
if if_exists == 'replace':
1047+
table.delete(table_id)
1048+
if table.partition_decorator not in table_id:
1049+
table.create(table_id, table_schema)
1050+
sleep(delay)
1051+
10451052
else:
1053+
if table.partition_decorator in table_id:
1054+
raise TableCreationError("Cannot create a partition without the main table.")
10461055
table.create(table_id, table_schema)
10471056

10481057
connector.load_data(dataframe, dataset_id, table_id, chunksize)
@@ -1087,6 +1096,8 @@ def _generate_bq_schema(df, default_type='STRING'):
10871096

10881097
class _Table(GbqConnector):
10891098

1099+
partition_decorator = '$'
1100+
10901101
def __init__(self, project_id, dataset_id, reauth=False, verbose=False,
10911102
private_key=None):
10921103
try:

pandas_gbq/tests/test_gbq.py

+76
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121

2222
TABLE_ID = 'new_test'
23+
DPT_TABLE_ID = 'dpt_test'
2324

2425

2526
def _skip_if_no_project_id():
@@ -933,6 +934,8 @@ def setup_method(self, method):
933934
private_key=_get_private_key_path())
934935
self.destination_table = "{0}{1}.{2}".format(self.dataset_prefix, "1",
935936
TABLE_ID)
937+
self.destination_date_partitioned_table = "{0}{1}.{2}".format(self.dataset_prefix, "1",
938+
DPT_TABLE_ID)
936939
self.dataset.create(self.dataset_prefix + "1")
937940

938941
@classmethod
@@ -1056,6 +1059,79 @@ def test_upload_data_if_table_exists_replace(self):
10561059
private_key=_get_private_key_path())
10571060
assert result['num_rows'][0] == 5
10581061

1062+
def test_upload_data_if_table_exists_replace_dpt_partition(self):
1063+
# Issue #47; tests that 'replace' is done by the subsequent call
1064+
test_dpt_suffix = "20170101"
1065+
test_size = 10
1066+
df = make_mixed_dataframe_v2(test_size)
1067+
df_different_schema = tm.makeMixedDataFrame()
1068+
1069+
dpt_partition = self.destination_date_partitioned_table + '$' + test_dpt_suffix
1070+
1071+
gbq.to_gbq(df, dpt_partition, _get_project_id(),
1072+
chunksize=10000, private_key=_get_private_key_path())
1073+
1074+
gbq.to_gbq(df_different_schema, dpt_partition,
1075+
_get_project_id(), if_exists='replace',
1076+
private_key=_get_private_key_path())
1077+
1078+
sleep(30)
1079+
1080+
# Test whole table
1081+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1082+
.format(self.destination_date_partitioned_table),
1083+
project_id=_get_project_id(),
1084+
private_key=_get_private_key_path())
1085+
assert result0['num_rows'][0] == 5
1086+
1087+
# Test destination partition
1088+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1089+
.format(dpt_partition),
1090+
project_id=_get_project_id(),
1091+
private_key=_get_private_key_path())
1092+
assert result1['num_rows'][0] == 5
1093+
1094+
def test_upload_data_if_table_exists_append_dpt_partition(self):
1095+
# Issue #47; tests that 'append' appends to an existing partition
1096+
test_dpt_suffix = "20170101"
1097+
test_size = 10
1098+
df = make_mixed_dataframe_v2(test_size)
1099+
1100+
dpt_partition = self.destination_date_partitioned_table + '$' + test_dpt_suffix
1101+
1102+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1103+
.format(dpt_partition),
1104+
project_id=_get_project_id(),
1105+
private_key=_get_private_key_path())
1106+
assert result0['num_rows'][0] == 5
1107+
1108+
gbq.to_gbq(df, dpt_partition,
1109+
_get_project_id(), if_exists='append',
1110+
private_key=_get_private_key_path())
1111+
1112+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1113+
.format(dpt_partition),
1114+
project_id=_get_project_id(),
1115+
private_key=_get_private_key_path())
1116+
1117+
assert result1['num_rows'][0] == 15
1118+
1119+
sleep(30)
1120+
1121+
# Test whole table
1122+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1123+
.format(self.destination_date_partitioned_table),
1124+
project_id=_get_project_id(),
1125+
private_key=_get_private_key_path())
1126+
assert result0['num_rows'][0] == 5
1127+
1128+
# Test destination partition
1129+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1130+
.format(dpt_partition),
1131+
project_id=_get_project_id(),
1132+
private_key=_get_private_key_path())
1133+
assert result1['num_rows'][0] == 10
1134+
10591135
def test_upload_data_if_table_exists_raises_value_error(self):
10601136
test_id = "4"
10611137
test_size = 10

0 commit comments

Comments
 (0)