Skip to content

BUG: Fix uploading of dataframes containing int64 and float64 columns #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/requirements-3.5-0.18.1.pip
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
google-auth==1.0.0
google-auth-oauthlib==0.0.1
mock
google-cloud-bigquery==0.28.0
google-cloud-bigquery==0.29.0
3 changes: 2 additions & 1 deletion docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ Changelog

- Fix an issue where Unicode couldn't be uploaded in Python 2 (:issue:`93`)
- Add support for a passed schema in :func:``to_gbq`` instead inferring the schema from the passed ``DataFrame`` with ``DataFrame.dtypes`` (:issue:`46`)

- Fix an issue where a dataframe containing both integer and floating point columns could not be uploaded with ``to_gbq`` (:issue:`116`)
- ``to_gbq`` now uses ``to_csv`` to avoid manually looping over rows in a dataframe (should result in faster table uploads) (:issue:`96`)

0.3.0 / 2018-01-03
------------------
Expand Down
74 changes: 74 additions & 0 deletions pandas_gbq/_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Helper methods for loading data into BigQuery"""

from google.cloud import bigquery
import six

from pandas_gbq import _schema


def encode_chunk(dataframe):
"""Return a file-like object of CSV-encoded rows.

Args:
dataframe (pandas.DataFrame): A chunk of a dataframe to encode
"""
csv_buffer = six.StringIO()
dataframe.to_csv(
csv_buffer, index=False, header=False, encoding='utf-8',
date_format='%Y-%m-%d %H:%M')

# Convert to a BytesIO buffer so that unicode text is properly handled.
# See: https://github.com/pydata/pandas-gbq/issues/106
body = csv_buffer.getvalue()
if isinstance(body, bytes):
body = body.decode('utf-8')
body = body.encode('utf-8')
return six.BytesIO(body)


def encode_chunks(dataframe, chunksize=None):
dataframe = dataframe.reset_index(drop=True)
if chunksize is None:
yield 0, encode_chunk(dataframe)
return

remaining_rows = len(dataframe)
total_rows = remaining_rows
start_index = 0
while start_index < total_rows:
end_index = start_index + chunksize
chunk_buffer = encode_chunk(dataframe[start_index:end_index])
start_index += chunksize
remaining_rows = max(0, remaining_rows - chunksize)
yield remaining_rows, chunk_buffer


def load_chunks(
client, dataframe, dataset_id, table_id, chunksize=None, schema=None):
destination_table = client.dataset(dataset_id).table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = 'WRITE_APPEND'
job_config.source_format = 'CSV'

if schema is None:
schema = _schema.generate_bq_schema(dataframe)

# Manually create the schema objects, adding NULLABLE mode
# as a workaround for
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4456
for field in schema['fields']:
if 'mode' not in field:
field['mode'] = 'NULLABLE'

job_config.schema = [
bigquery.SchemaField.from_api_repr(field)
for field in schema['fields']
]

chunks = encode_chunks(dataframe, chunksize=chunksize)
for remaining_rows, chunk_buffer in chunks:
yield remaining_rows
client.load_table_from_file(
chunk_buffer,
destination_table,
job_config=job_config).result()
29 changes: 29 additions & 0 deletions pandas_gbq/_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Helper methods for BigQuery schemas"""


def generate_bq_schema(dataframe, default_type='STRING'):
"""Given a passed dataframe, generate the associated Google BigQuery schema.

Arguments:
dataframe (pandas.DataFrame): D
default_type : string
The default big query type in case the type of the column
does not exist in the schema.
"""

type_mapping = {
'i': 'INTEGER',
'b': 'BOOLEAN',
'f': 'FLOAT',
'O': 'STRING',
'S': 'STRING',
'U': 'STRING',
'M': 'TIMESTAMP'
}

fields = []
for column_name, dtype in dataframe.dtypes.iteritems():
fields.append({'name': column_name,
'type': type_mapping.get(dtype.kind, default_type)})

return {'fields': fields}
95 changes: 32 additions & 63 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,45 +556,22 @@ def run_query(self, query, **kwargs):

return schema, result_rows

def load_data(self, dataframe, dataset_id, table_id, chunksize):
from google.cloud.bigquery import LoadJobConfig
from six import BytesIO

destination_table = self.client.dataset(dataset_id).table(table_id)
job_config = LoadJobConfig()
job_config.write_disposition = 'WRITE_APPEND'
job_config.source_format = 'NEWLINE_DELIMITED_JSON'
rows = []
remaining_rows = len(dataframe)

total_rows = remaining_rows
self._print("\n\n")
def load_data(
self, dataframe, dataset_id, table_id, chunksize=None,
schema=None):
from pandas_gbq import _load

for index, row in dataframe.reset_index(drop=True).iterrows():
row_json = row.to_json(
force_ascii=False, date_unit='s', date_format='iso')
rows.append(row_json)
remaining_rows -= 1
total_rows = len(dataframe)
self._print("\n\n")

if (len(rows) % chunksize == 0) or (remaining_rows == 0):
try:
for remaining_rows in _load.load_chunks(
self.client, dataframe, dataset_id, table_id,
chunksize=chunksize):
self._print("\rLoad is {0}% Complete".format(
((total_rows - remaining_rows) * 100) / total_rows))

body = '{}\n'.format('\n'.join(rows))
if isinstance(body, bytes):
body = body.decode('utf-8')
body = body.encode('utf-8')
body = BytesIO(body)

try:
self.client.load_table_from_file(
body,
destination_table,
job_config=job_config).result()
except self.http_error as ex:
self.process_http_error(ex)

rows = []
except self.http_error as ex:
self.process_http_error(ex)

self._print("\n")

Expand Down Expand Up @@ -888,7 +865,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
return final_df


def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
def to_gbq(dataframe, destination_table, project_id, chunksize=None,
verbose=True, reauth=False, if_exists='fail', private_key=None,
auth_local_webserver=False, table_schema=None):
"""Write a DataFrame to a Google BigQuery table.
Expand Down Expand Up @@ -922,8 +899,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
Name of table to be written, in the form 'dataset.tablename'
project_id : str
Google BigQuery Account project ID.
chunksize : int (default 10000)
Number of rows to be inserted in each chunk from the dataframe.
chunksize : int (default None)
Number of rows to be inserted in each chunk from the dataframe. Use
``None`` to load the dataframe in a single chunk.
verbose : boolean (default True)
Show percentage complete
reauth : boolean (default False)
Expand Down Expand Up @@ -985,7 +963,7 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
raise TableCreationError("Could not create the table because it "
"already exists. "
"Change the if_exists parameter to "
"append or replace data.")
"'append' or 'replace' data.")
elif if_exists == 'replace':
connector.delete_and_recreate_table(
dataset_id, table_id, table_schema)
Expand All @@ -999,19 +977,14 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
else:
table.create(table_id, table_schema)

connector.load_data(dataframe, dataset_id, table_id, chunksize)
connector.load_data(
dataframe, dataset_id, table_id, chunksize=chunksize,
schema=table_schema)


def generate_bq_schema(df, default_type='STRING'):
# deprecation TimeSeries, #11121
warnings.warn("generate_bq_schema is deprecated and will be removed in "
"a future version", FutureWarning, stacklevel=2)

return _generate_bq_schema(df, default_type=default_type)


def _generate_bq_schema(df, default_type='STRING'):
""" Given a passed df, generate the associated Google BigQuery schema.
"""DEPRECATED: Given a passed df, generate the associated Google BigQuery
schema.

Parameters
----------
Expand All @@ -1020,23 +993,16 @@ def _generate_bq_schema(df, default_type='STRING'):
The default big query type in case the type of the column
does not exist in the schema.
"""
# deprecation TimeSeries, #11121
warnings.warn("generate_bq_schema is deprecated and will be removed in "
"a future version", FutureWarning, stacklevel=2)

type_mapping = {
'i': 'INTEGER',
'b': 'BOOLEAN',
'f': 'FLOAT',
'O': 'STRING',
'S': 'STRING',
'U': 'STRING',
'M': 'TIMESTAMP'
}
return _generate_bq_schema(df, default_type=default_type)

fields = []
for column_name, dtype in df.dtypes.iteritems():
fields.append({'name': column_name,
'type': type_mapping.get(dtype.kind, default_type)})

return {'fields': fields}
def _generate_bq_schema(df, default_type='STRING'):
from pandas_gbq import _schema
return _schema.generate_bq_schema(df, default_type=default_type)


class _Table(GbqConnector):
Expand Down Expand Up @@ -1096,6 +1062,9 @@ def create(self, table_id, schema):
table_ref = self.client.dataset(self.dataset_id).table(table_id)
table = Table(table_ref)

# Manually create the schema objects, adding NULLABLE mode
# as a workaround for
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4456
for field in schema['fields']:
if 'mode' not in field:
field['mode'] = 'NULLABLE'
Expand Down
40 changes: 40 additions & 0 deletions pandas_gbq/tests/test__load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this file name currently has two underscores

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'm aware. I'm following the convention that the filename should be test_ + the filename of the file under test.

import numpy
import pandas


def test_encode_chunk_with_unicode():
"""Test that a dataframe containing unicode can be encoded as a file.

See: https://github.com/pydata/pandas-gbq/issues/106
"""
from pandas_gbq._load import encode_chunk

df = pandas.DataFrame(
numpy.random.randn(6, 4), index=range(6), columns=list('ABCD'))
df['s'] = u'信用卡'
csv_buffer = encode_chunk(df)
csv_bytes = csv_buffer.read()
csv_string = csv_bytes.decode('utf-8')
assert u'信用卡' in csv_string


def test_encode_chunks_splits_dataframe():
from pandas_gbq._load import encode_chunks
df = pandas.DataFrame(numpy.random.randn(6, 4), index=range(6))
chunks = list(encode_chunks(df, chunksize=2))
assert len(chunks) == 3
remaining, buffer = chunks[0]
assert remaining == 4
assert len(buffer.readlines()) == 2


def test_encode_chunks_with_chunksize_none():
from pandas_gbq._load import encode_chunks
df = pandas.DataFrame(numpy.random.randn(6, 4), index=range(6))
chunks = list(encode_chunks(df))
assert len(chunks) == 1
remaining, buffer = chunks[0]
assert remaining == 0
assert len(buffer.readlines()) == 6
55 changes: 55 additions & 0 deletions pandas_gbq/tests/test__schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

import datetime

import pandas
import pytest

from pandas_gbq import _schema


@pytest.mark.parametrize(
'dataframe,expected_schema',
[
(
pandas.DataFrame(data={'col1': [1, 2, 3]}),
{'fields': [{'name': 'col1', 'type': 'INTEGER'}]},
),
(
pandas.DataFrame(data={'col1': [True, False]}),
{'fields': [{'name': 'col1', 'type': 'BOOLEAN'}]},
),
(
pandas.DataFrame(data={'col1': [1.0, 3.14]}),
{'fields': [{'name': 'col1', 'type': 'FLOAT'}]},
),
(
pandas.DataFrame(data={'col1': [u'hello', u'world']}),
{'fields': [{'name': 'col1', 'type': 'STRING'}]},
),
(
pandas.DataFrame(data={'col1': [datetime.datetime.now()]}),
{'fields': [{'name': 'col1', 'type': 'TIMESTAMP'}]},
),
(
pandas.DataFrame(
data={
'col1': [datetime.datetime.now()],
'col2': [u'hello'],
'col3': [3.14],
'col4': [True],
'col5': [4],
}),
{
'fields': [
{'name': 'col1', 'type': 'TIMESTAMP'},
{'name': 'col2', 'type': 'STRING'},
{'name': 'col3', 'type': 'FLOAT'},
{'name': 'col4', 'type': 'BOOLEAN'},
{'name': 'col5', 'type': 'INTEGER'},
],
},
),
])
def test_generate_bq_schema(dataframe, expected_schema):
schema = _schema.generate_bq_schema(dataframe)
assert schema == expected_schema
Loading