diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 0f1d6e1b..37028922 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -14,6 +14,7 @@ Changelog - Use the `google-auth `__ library for authentication because ``oauth2client`` is deprecated. (:issue:`39`) - :func:`read_gbq` now has a ``auth_local_webserver`` boolean argument for controlling whether to use web server or console flow when getting user credentials. Replaces `--noauth_local_webserver` command line argument. (:issue:`35`) - :func:`read_gbq` now displays the BigQuery Job ID and standard price in verbose output. (:issue:`70` and :issue:`71`) +- Add support to replace partitions in `date-partitioned tables `__. Partition must be specified with a partition decorator separator (``$``). (:issue:`47`) 0.1.6 / 2017-05-03 ------------------ diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 0c538662..75f142d1 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -769,26 +769,6 @@ def schema_is_subset(self, dataset_id, table_id, schema): return all(field in fields_remote for field in fields_local) - def delete_and_recreate_table(self, dataset_id, table_id, table_schema): - delay = 0 - - # Changes to table schema may take up to 2 minutes as of May 2015 See - # `Issue 191 - # `__ - # Compare previous schema with new schema to determine if there should - # be a 120 second delay - - if not self.verify_schema(dataset_id, table_id, table_schema): - self._print('The existing table has a different schema. Please ' - 'wait 2 minutes. See Google BigQuery issue #191') - delay = 120 - - table = _Table(self.project_id, dataset_id, - private_key=self.private_key) - table.delete(table_id) - table.create(table_id, table_schema) - sleep(delay) - def _parse_data(schema, rows): # see: @@ -1053,19 +1033,33 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, "already exists. " "Change the if_exists parameter to " "append or replace data.") - elif if_exists == 'replace': - connector.delete_and_recreate_table( - dataset_id, table_id, table_schema) - elif if_exists == 'append': + else: + delay = 0 if not connector.schema_is_subset(dataset_id, table_id, table_schema): - raise InvalidSchema("Please verify that the structure and " - "data types in the DataFrame match the " - "schema of the destination table.") - else: - table.create(table_id, table_schema) + if if_exists == 'append' \ + or table.partition_decorator in table_id: + raise InvalidSchema("Please verify that the structure " + "and data types in the DataFrame " + "match the schema of the destination " + "table.") + elif if_exists == 'replace': + table._print('The existing table has a different schema. ' + 'Please wait 2 minutes. See Google BigQuery ' + 'issue #191') + delay = 120 + if if_exists == 'replace': + table.delete(table_id) + if table.partition_decorator not in table_id: + table.create(table_id, table_schema) + sleep(delay) + else: + is_dpt = table.partition_decorator in table_id + table.create(table_id.split('$')[0], + table_schema, + date_partitioned=is_dpt) connector.load_data(dataframe, dataset_id, table_id, chunksize) @@ -1108,6 +1102,8 @@ def _generate_bq_schema(df, default_type='STRING'): class _Table(GbqConnector): + partition_decorator = '$' + def __init__(self, project_id, dataset_id, reauth=False, verbose=False, private_key=None): try: @@ -1144,7 +1140,7 @@ def exists(self, table_id): else: self.process_http_error(ex) - def create(self, table_id, schema): + def create(self, table_id, schema, date_partitioned=False): """ Create a table in Google BigQuery given a table and schema Parameters @@ -1154,6 +1150,8 @@ def create(self, table_id, schema): schema : str Use the generate_bq_schema to generate your table schema from a dataframe. + date_partitioned: boolean + Whether table is to be created as a date partitioned table. """ if self.exists(table_id): @@ -1173,6 +1171,9 @@ def create(self, table_id, schema): 'datasetId': self.dataset_id } } + if date_partitioned: + # The only type supported is DAY + body.update({'timePartitioning': {'type': 'DAY'}}) try: self.service.tables().insert( diff --git a/pandas_gbq/tests/test_gbq.py b/pandas_gbq/tests/test_gbq.py index c9ac31dd..21c21174 100644 --- a/pandas_gbq/tests/test_gbq.py +++ b/pandas_gbq/tests/test_gbq.py @@ -1,7 +1,7 @@ import pytest import re -from datetime import datetime +from datetime import datetime, timedelta import pytz from time import sleep import os @@ -20,6 +20,7 @@ TABLE_ID = 'new_test' +DPT_TABLE_ID = 'dpt_test' def _skip_if_no_project_id(): @@ -946,6 +947,8 @@ def setup_method(self, method): private_key=_get_private_key_path()) self.destination_table = "{0}{1}.{2}".format(self.dataset_prefix, "1", TABLE_ID) + self.destination_dpt = "{0}{1}.{2}".format(self.dataset_prefix, "1", + DPT_TABLE_ID) self.dataset.create(self.dataset_prefix + "1") @classmethod @@ -1080,6 +1083,115 @@ def test_upload_data_if_table_exists_replace(self): private_key=_get_private_key_path()) assert result['num_rows'][0] == 5 + def test_upload_data_if_table_exists_replace_dpt_partition(self): + # Issue #47; tests that 'replace' is done by the subsequent call + test_dpt_suffix = datetime.now().strftime('%Y%m%d') + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + df_different_schema = tm.makeMixedDataFrame() + + dpt_partition = self.destination_dpt + '$' + test_dpt_suffix + self.table.create(DPT_TABLE_ID, + gbq._generate_bq_schema(df), + date_partitioned=True) + + with pytest.raises(gbq.InvalidSchema): + gbq.to_gbq(df_different_schema, dpt_partition, + _get_project_id(), if_exists='replace', + private_key=_get_private_key_path()) + + gbq.to_gbq(df, dpt_partition, _get_project_id(), + private_key=_get_private_key_path(), + if_exists='replace') + + # Test partition + result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(self.destination_dpt), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + assert result0['num_rows'][0] == 10 + + # Test whole table + result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(self.destination_dpt), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + assert result1['num_rows'][0] == 10 + + self.table.delete(DPT_TABLE_ID) + + def test_upload_data_if_table_exists_append_dpt_partition(self): + # Issue #47; tests that 'append' appends to an existing partition + test_dpt_suffix = (datetime.now() + + timedelta(days=1)).strftime('%Y%m%d') + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + + self.table.create(DPT_TABLE_ID, + gbq._generate_bq_schema(df), + date_partitioned=True) + + dpt_partition = self.destination_dpt + '$' + test_dpt_suffix + + result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(self.destination_dpt), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + + assert result0['num_rows'][0] == 0 + + gbq.to_gbq(df, dpt_partition, + _get_project_id(), if_exists='append', + private_key=_get_private_key_path()) + + result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(dpt_partition), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + + assert result1['num_rows'][0] == 10 + + gbq.to_gbq(df.head(), dpt_partition, + _get_project_id(), if_exists='append', + private_key=_get_private_key_path()) + + # Test destination partition + result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(dpt_partition), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + + assert result1['num_rows'][0] == 15 + + def test_table_creation_error_raised_when_dpt_exists(self): + test_dpt_suffix = datetime.now().strftime('%Y%m%d') + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + table_name = 'foobar' + self.table.create(table_name, + gbq._generate_bq_schema(df), + date_partitioned=True) + dpt_partition = table_name + '$' + test_dpt_suffix + with pytest.raises(gbq.TableCreationError): + gbq.to_gbq(df, + self.table.dataset_id + '.' + dpt_partition, + project_id=_get_project_id(), + private_key=_get_private_key_path()) + self.table.delete(table_name) + sleep(30) + + def test_table_created_from_dpt_suffixed_id(self): + test_dpt_suffix = datetime.now().strftime('%Y%m%d') + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + table_name = 'foobar' + dpt_partition = table_name + '$' + test_dpt_suffix + gbq.to_gbq(df, + self.table.dataset_id + '.' + dpt_partition, + project_id=_get_project_id(), + private_key=_get_private_key_path()) + self.table.delete(table_name) + def test_upload_data_if_table_exists_raises_value_error(self): test_id = "4" test_size = 10