|
1 | 1 | import logging
|
2 | 2 | import time
|
3 | 3 | import warnings
|
4 |
| -from collections import OrderedDict |
5 | 4 | from datetime import datetime
|
6 | 5 |
|
7 | 6 | import numpy as np
|
8 |
| -from pandas import DataFrame |
9 | 7 |
|
10 | 8 | from pandas_gbq.exceptions import AccessDenied
|
11 | 9 |
|
@@ -37,7 +35,7 @@ def _check_google_client_version():
|
37 | 35 | raise ImportError("Could not import pkg_resources (setuptools).")
|
38 | 36 |
|
39 | 37 | # https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md
|
40 |
| - bigquery_minimum_version = pkg_resources.parse_version("0.32.0") |
| 38 | + bigquery_minimum_version = pkg_resources.parse_version("1.9.0") |
41 | 39 | BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution(
|
42 | 40 | "google-cloud-bigquery"
|
43 | 41 | ).parsed_version
|
@@ -482,15 +480,16 @@ def run_query(self, query, **kwargs):
|
482 | 480 | rows_iter = query_reply.result()
|
483 | 481 | except self.http_error as ex:
|
484 | 482 | self.process_http_error(ex)
|
485 |
| - result_rows = list(rows_iter) |
486 |
| - total_rows = rows_iter.total_rows |
487 |
| - schema = { |
488 |
| - "fields": [field.to_api_repr() for field in rows_iter.schema] |
489 |
| - } |
490 | 483 |
|
491 |
| - logger.debug("Got {} rows.\n".format(total_rows)) |
| 484 | + schema_fields = [field.to_api_repr() for field in rows_iter.schema] |
| 485 | + nullsafe_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields) |
| 486 | + df = rows_iter.to_dataframe(dtypes=nullsafe_dtypes) |
| 487 | + |
| 488 | + if df.empty: |
| 489 | + df = _cast_empty_df_dtypes(schema_fields, df) |
492 | 490 |
|
493 |
| - return schema, result_rows |
| 491 | + logger.debug("Got {} rows.\n".format(rows_iter.total_rows)) |
| 492 | + return df |
494 | 493 |
|
495 | 494 | def load_data(
|
496 | 495 | self,
|
@@ -638,45 +637,62 @@ def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
|
638 | 637 | table.create(table_id, table_schema)
|
639 | 638 |
|
640 | 639 |
|
641 |
| -def _parse_schema(schema_fields): |
642 |
| - # see: |
| 640 | +def _bqschema_to_nullsafe_dtypes(schema_fields): |
| 641 | + # Only specify dtype when the dtype allows nulls. Otherwise, use pandas's |
| 642 | + # default dtype choice. |
| 643 | + # |
| 644 | + # See: |
643 | 645 | # http://pandas.pydata.org/pandas-docs/dev/missing_data.html
|
644 | 646 | # #missing-data-casting-rules-and-indexing
|
645 | 647 | dtype_map = {
|
646 | 648 | "FLOAT": np.dtype(float),
|
| 649 | + # Even though TIMESTAMPs are timezone-aware in BigQuery, pandas doesn't |
| 650 | + # support datetime64[ns, UTC] as dtype in DataFrame constructors. See: |
| 651 | + # https://github.com/pandas-dev/pandas/issues/12513 |
647 | 652 | "TIMESTAMP": "datetime64[ns]",
|
648 | 653 | "TIME": "datetime64[ns]",
|
649 | 654 | "DATE": "datetime64[ns]",
|
650 | 655 | "DATETIME": "datetime64[ns]",
|
651 |
| - "BOOLEAN": bool, |
652 |
| - "INTEGER": np.int64, |
653 | 656 | }
|
654 | 657 |
|
| 658 | + dtypes = {} |
655 | 659 | for field in schema_fields:
|
656 | 660 | name = str(field["name"])
|
657 | 661 | if field["mode"].upper() == "REPEATED":
|
658 |
| - yield name, object |
659 |
| - else: |
660 |
| - dtype = dtype_map.get(field["type"].upper()) |
661 |
| - yield name, dtype |
| 662 | + continue |
| 663 | + |
| 664 | + dtype = dtype_map.get(field["type"].upper()) |
| 665 | + if dtype: |
| 666 | + dtypes[name] = dtype |
662 | 667 |
|
| 668 | + return dtypes |
663 | 669 |
|
664 |
| -def _parse_data(schema, rows): |
665 | 670 |
|
666 |
| - column_dtypes = OrderedDict(_parse_schema(schema["fields"])) |
667 |
| - df = DataFrame(data=(iter(r) for r in rows), columns=column_dtypes.keys()) |
| 671 | +def _cast_empty_df_dtypes(schema_fields, df): |
| 672 | + """Cast any columns in an empty dataframe to correct type. |
668 | 673 |
|
669 |
| - for column in df: |
670 |
| - dtype = column_dtypes[column] |
671 |
| - null_safe = ( |
672 |
| - df[column].notnull().all() |
673 |
| - or dtype == float |
674 |
| - or dtype == "datetime64[ns]" |
| 674 | + In an empty dataframe, pandas cannot choose a dtype unless one is |
| 675 | + explicitly provided. The _bqschema_to_nullsafe_dtypes() function only |
| 676 | + provides dtypes when the dtype safely handles null values. This means |
| 677 | + that empty int64 and boolean columns are incorrectly classified as |
| 678 | + ``object``. |
| 679 | + """ |
| 680 | + if not df.empty: |
| 681 | + raise ValueError( |
| 682 | + "DataFrame must be empty in order to cast non-nullsafe dtypes" |
675 | 683 | )
|
676 |
| - if dtype and null_safe: |
677 |
| - df[column] = df[column].astype( |
678 |
| - column_dtypes[column], errors="ignore" |
679 |
| - ) |
| 684 | + |
| 685 | + dtype_map = {"BOOLEAN": bool, "INTEGER": np.int64} |
| 686 | + |
| 687 | + for field in schema_fields: |
| 688 | + column = str(field["name"]) |
| 689 | + if field["mode"].upper() == "REPEATED": |
| 690 | + continue |
| 691 | + |
| 692 | + dtype = dtype_map.get(field["type"].upper()) |
| 693 | + if dtype: |
| 694 | + df[column] = df[column].astype(dtype) |
| 695 | + |
680 | 696 | return df
|
681 | 697 |
|
682 | 698 |
|
@@ -825,8 +841,8 @@ def read_gbq(
|
825 | 841 | credentials=credentials,
|
826 | 842 | private_key=private_key,
|
827 | 843 | )
|
828 |
| - schema, rows = connector.run_query(query, configuration=configuration) |
829 |
| - final_df = _parse_data(schema, rows) |
| 844 | + |
| 845 | + final_df = connector.run_query(query, configuration=configuration) |
830 | 846 |
|
831 | 847 | # Reindex the DataFrame on the provided column
|
832 | 848 | if index_col is not None:
|
|
0 commit comments