From 62fd5651bbb12f988c9c6358c83a2aa2624e02c4 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 21 Sep 2021 22:45:07 +0100 Subject: [PATCH 01/18] fix: use compliant Parquet by default --- google/cloud/bigquery/_pandas_helpers.py | 20 +++++++++- google/cloud/bigquery/client.py | 49 +++++++++++++++++------- 2 files changed, 54 insertions(+), 15 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 869c0215d..424a6312c 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -530,7 +530,13 @@ def dataframe_to_arrow(dataframe, bq_schema): return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) -def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SNAPPY"): +def dataframe_to_parquet( + dataframe, + bq_schema, + filepath, + parquet_compression="SNAPPY", + parquet_use_compliant_nested_type=True, +): """Write dataframe as a Parquet file, according to the desired BQ schema. This function requires the :mod:`pyarrow` package. Arrow is used as an @@ -551,6 +557,11 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN The compression codec to use by the the ``pyarrow.parquet.write_table`` serializing method. Defaults to "SNAPPY". https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table + parquet_use_compliant_nested_type (bool): + Whether the ``pyarrow.parquet.write_table`` serializing method should write + compliant Parquet nested type (lists). Defaults to ``True``. + https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types + https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table """ pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True) @@ -558,7 +569,12 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN bq_schema = schema._to_schema_fields(bq_schema) arrow_table = dataframe_to_arrow(dataframe, bq_schema) - pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression) + pyarrow.parquet.write_table( + arrow_table, + filepath, + compression=parquet_compression, + use_compliant_nested_type=parquet_use_compliant_nested_type, + ) def _row_iterator_page_to_arrow(page, column_names, arrow_types): diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 47ff83c5d..a40acb7b2 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2456,6 +2456,7 @@ def load_table_from_dataframe( project: str = None, job_config: LoadJobConfig = None, parquet_compression: str = "snappy", + parquet_use_compliant_nested_type: bool = True, timeout: float = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of a table from a pandas DataFrame. @@ -2519,18 +2520,34 @@ def load_table_from_dataframe( :attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are supported. parquet_compression (Optional[str]): - [Beta] The compression method to use if intermittently - serializing ``dataframe`` to a parquet file. - - The argument is directly passed as the ``compression`` - argument to the underlying ``pyarrow.parquet.write_table()`` - method (the default value "snappy" gets converted to uppercase). - https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table - - If the job config schema is missing, the argument is directly - passed as the ``compression`` argument to the underlying - ``DataFrame.to_parquet()`` method. - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet + [Beta] The compression method to use if intermittently + serializing ``dataframe`` to a parquet file. + + The argument is directly passed as the ``compression`` + argument to the underlying ``pyarrow.parquet.write_table()`` + method (the default value "snappy" gets converted to uppercase). + https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table + + If the job config schema is missing, the argument is directly + passed as the ``compression`` argument to the underlying + ``DataFrame.to_parquet()`` method. + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet + parquet_use_compliant_nested_type (bool): + Whether the ``pyarrow.parquet.write_table`` serializing method should write + compliant Parquet nested type (lists). Defaults to ``True``. + + The argument is directly passed as the ``use_compliant_nested_type`` + argument to the underlying ``pyarrow.parquet.write_table()`` + method. + https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table + + If the job config schema is missing, the argument is directly + passed as an additonal ``kwarg`` argument to the underlying + ``DataFrame.to_parquet()`` method. + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet + + This argument is only present to allow for backwards compatibility with + tables created using an old version of this method. timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. @@ -2647,9 +2664,15 @@ def load_table_from_dataframe( job_config.schema, tmppath, parquet_compression=parquet_compression, + parquet_use_compliant_nested_type=parquet_use_compliant_nested_type, ) else: - dataframe.to_parquet(tmppath, compression=parquet_compression) + dataframe.to_parquet( + tmppath, + engine="pyarrow", + compression=parquet_compression, + use_compliant_nested_type=parquet_use_compliant_nested_type, + ) else: From f44a6aedb84978aca2a04f7dc76a65b0d0866b6c Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 21 Sep 2021 23:40:14 +0100 Subject: [PATCH 02/18] chore: bump minimum `pyarrow` version --- setup.py | 6 +++--- testing/constraints-3.6.txt | 2 +- tests/system/test_arrow.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/setup.py b/setup.py index e7515493d..e1c6653ad 100644 --- a/setup.py +++ b/setup.py @@ -54,11 +54,11 @@ # grpc.Channel.close() method isn't added until 1.32.0. # https://github.com/grpc/grpc/pull/15254 "grpcio >= 1.38.1, < 2.0dev", - "pyarrow >= 3.0.0, < 6.0dev", + "pyarrow >= 4.0.0, < 6.0dev", ], "geopandas": ["geopandas>=0.9.0, <1.0dev", "Shapely>=1.6.0, <2.0dev"], - "pandas": ["pandas>=0.23.0", "pyarrow >= 3.0.0, < 6.0dev"], - "bignumeric_type": ["pyarrow >= 3.0.0, < 6.0dev"], + "pandas": ["pandas>=0.23.0", "pyarrow >= 4.0.0, < 6.0dev"], + "bignumeric_type": ["pyarrow >= 4.0.0, < 6.0dev"], "tqdm": ["tqdm >= 4.7.4, <5.0.0dev"], "opentelemetry": [ "opentelemetry-api >= 0.11b0", diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index 23d2724f7..7735f35bb 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -17,7 +17,7 @@ opentelemetry-sdk==0.11b0 pandas==0.24.2 proto-plus==1.10.0 protobuf==3.12.0 -pyarrow==3.0.0 +pyarrow==4.0.0 requests==2.18.0 Shapely==1.6.0 six==1.13.0 diff --git a/tests/system/test_arrow.py b/tests/system/test_arrow.py index 96f9dea25..7f6f39028 100644 --- a/tests/system/test_arrow.py +++ b/tests/system/test_arrow.py @@ -23,8 +23,8 @@ pyarrow = pytest.importorskip( - "pyarrow", minversion="3.0.0" -) # Needs decimal256 for BIGNUMERIC columns. + "pyarrow", minversion="4.0.0" +) # Needs `use_compliant_nested_type` for `load_table_from_dataframe`. @pytest.mark.parametrize( From 8179ded79d9934c48f8c0d2cc81a573428b41fea Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 22 Sep 2021 14:55:13 +0100 Subject: [PATCH 03/18] fix: default to `ParquetOptions.enable_list_inference is True` --- google/cloud/bigquery/client.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index a40acb7b2..17c90250d 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -103,6 +103,7 @@ from google.cloud.bigquery.table import TableListItem from google.cloud.bigquery.table import TableReference from google.cloud.bigquery.table import RowIterator +from google.cloud.bigquery.format_options import ParquetOptions _DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB @@ -2579,6 +2580,13 @@ def load_table_from_dataframe( if job_config.source_format is None: # default value job_config.source_format = job.SourceFormat.PARQUET + + if job_config.parquet_options is None: + parquet_options = ParquetOptions() + # default value + parquet_options.enable_list_inference = True + job_config.parquet_options = parquet_options + if job_config.source_format not in supported_formats: raise ValueError( "Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format( From bbdad5dbb82b911a6bb17df6bd65599f35df2d3b Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 22 Sep 2021 15:51:05 +0100 Subject: [PATCH 04/18] feat: detect `pyarrow.ListType` as `REPEATED` --- google/cloud/bigquery/_pandas_helpers.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 424a6312c..d16d79227 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -444,7 +444,14 @@ def augment_schema(dataframe, current_bq_schema): continue arrow_table = pyarrow.array(dataframe[field.name]) - detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id) + + if arrow_table.type.id == 25: + # `pyarrow.ListType` + detected_mode = "REPEATED" + detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.values.type.id) + else: + detected_mode = field.mode + detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id) if detected_type is None: unknown_type_fields.append(field) @@ -453,7 +460,7 @@ def augment_schema(dataframe, current_bq_schema): new_field = schema.SchemaField( name=field.name, field_type=detected_type, - mode=field.mode, + mode=detected_mode, description=field.description, fields=field.fields, ) From 839004cad331591466e547b16960f5d2a08bf39b Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 22 Sep 2021 16:25:31 +0100 Subject: [PATCH 05/18] fix: add tests for arrays in DataFrames --- tests/unit/test_client.py | 118 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index d2a75413f..8f276720b 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -7307,6 +7307,124 @@ def test_load_table_from_dataframe_struct_fields(self): assert sent_config.source_format == job.SourceFormat.PARQUET assert sent_config.schema == schema + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_array_fields(self): + """Test that a DataFrame with array columns can be uploaded correctly. + + See: https://github.com/googleapis/python-bigquery/issues/19 + """ + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + + records = [(3.14, [1, 2])] + dataframe = pandas.DataFrame( + data=records, columns=["float_column", "array_column"] + ) + + schema = [ + SchemaField("float_column", "FLOAT"), + SchemaField("array_column", "INTEGER", mode="REPEATED",), + ] + job_config = job.LoadJobConfig(schema=schema) + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + get_table_patch = mock.patch( + "google.cloud.bigquery.client.Client.get_table", + autospec=True, + side_effect=google.api_core.exceptions.NotFound("Table not found"), + ) + + with load_patch as load_table_from_file, get_table_patch: + client.load_table_from_dataframe( + dataframe, + self.TABLE_REF, + job_config=job_config, + location=self.LOCATION, + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + size=mock.ANY, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + timeout=DEFAULT_TIMEOUT, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.PARQUET + assert sent_config.schema == schema + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_array_fields_w_auto_schema(self): + """Test that a DataFrame with array columns can be uploaded correctly. + + See: https://github.com/googleapis/python-bigquery/issues/19 + """ + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + + records = [(3.14, [1, 2])] + dataframe = pandas.DataFrame( + data=records, columns=["float_column", "array_column"] + ) + + expected_schema = [ + SchemaField("float_column", "FLOAT"), + SchemaField("array_column", "INT64", mode="REPEATED",), + ] + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + get_table_patch = mock.patch( + "google.cloud.bigquery.client.Client.get_table", + autospec=True, + side_effect=google.api_core.exceptions.NotFound("Table not found"), + ) + + with load_patch as load_table_from_file, get_table_patch: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, location=self.LOCATION, + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + size=mock.ANY, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + timeout=DEFAULT_TIMEOUT, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.PARQUET + assert sent_config.schema == expected_schema + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_partial_schema(self): From 3cb24390ffacf9a798b87a6519cc105a39405ad9 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 22 Sep 2021 17:06:24 +0100 Subject: [PATCH 06/18] fix: add to system test to test `REPEATED` schema --- tests/system/test_pandas.py | 91 +++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/tests/system/test_pandas.py b/tests/system/test_pandas.py index 93ce23481..4d29ebe79 100644 --- a/tests/system/test_pandas.py +++ b/tests/system/test_pandas.py @@ -24,6 +24,7 @@ import google.api_core.retry import pkg_resources import pytest +import numpy from google.cloud import bigquery from . import helpers @@ -84,6 +85,81 @@ def test_load_table_from_dataframe_w_automatic_schema(bigquery_client, dataset_i ("uint8_col", pandas.Series([0, 1, 2], dtype="uint8")), ("uint16_col", pandas.Series([3, 4, 5], dtype="uint16")), ("uint32_col", pandas.Series([6, 7, 8], dtype="uint32")), + ("array_bool_col", pandas.Series([[True], [False], [True]])), + ( + "array_ts_col", + pandas.Series( + [ + [ + datetime.datetime( + 2010, 1, 2, 3, 44, 50, tzinfo=datetime.timezone.utc + ), + ], + [ + datetime.datetime( + 2011, 2, 3, 14, 50, 59, tzinfo=datetime.timezone.utc + ), + ], + [ + datetime.datetime( + 2012, 3, 14, 15, 16, tzinfo=datetime.timezone.utc + ), + ], + ], + ), + ), + ( + "array_dt_col", + pandas.Series( + [ + [datetime.datetime(2010, 1, 2, 3, 44, 50)], + [datetime.datetime(2011, 2, 3, 14, 50, 59)], + [datetime.datetime(2012, 3, 14, 15, 16)], + ], + ), + ), + ( + "array_float32_col", + pandas.Series( + [numpy.array([_], dtype="float32") for _ in [1.0, 2.0, 3.0]] + ), + ), + ( + "array_float64_col", + pandas.Series( + [numpy.array([_], dtype="float64") for _ in [4.0, 5.0, 6.0]] + ), + ), + ( + "array_int8_col", + pandas.Series( + [numpy.array([_], dtype="int8") for _ in [-12, -11, -10]] + ), + ), + ( + "array_int16_col", + pandas.Series([numpy.array([_], dtype="int16") for _ in [-9, -8, -7]]), + ), + ( + "array_int32_col", + pandas.Series([numpy.array([_], dtype="int32") for _ in [-6, -5, -4]]), + ), + ( + "array_int64_col", + pandas.Series([numpy.array([_], dtype="int64") for _ in [-3, -2, -1]]), + ), + ( + "array_uint8_col", + pandas.Series([numpy.array([_], dtype="uint8") for _ in [0, 1, 2]]), + ), + ( + "array_uint16_col", + pandas.Series([numpy.array([_], dtype="uint16") for _ in [3, 4, 5]]), + ), + ( + "array_uint32_col", + pandas.Series([numpy.array([_], dtype="uint32") for _ in [6, 7, 8]]), + ), ] ) dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) @@ -112,6 +188,21 @@ def test_load_table_from_dataframe_w_automatic_schema(bigquery_client, dataset_i bigquery.SchemaField("uint8_col", "INTEGER"), bigquery.SchemaField("uint16_col", "INTEGER"), bigquery.SchemaField("uint32_col", "INTEGER"), + bigquery.SchemaField("array_bool_col", "BOOLEAN", mode="REPEATED"), + bigquery.SchemaField("array_ts_col", "TIMESTAMP", mode="REPEATED"), + # BigQuery does not support uploading DATETIME values from + # Parquet files. See: + # https://github.com/googleapis/google-cloud-python/issues/9996 + bigquery.SchemaField("array_dt_col", "TIMESTAMP", mode="REPEATED"), + bigquery.SchemaField("array_float32_col", "FLOAT", mode="REPEATED"), + bigquery.SchemaField("array_float64_col", "FLOAT", mode="REPEATED"), + bigquery.SchemaField("array_int8_col", "INTEGER", mode="REPEATED"), + bigquery.SchemaField("array_int16_col", "INTEGER", mode="REPEATED"), + bigquery.SchemaField("array_int32_col", "INTEGER", mode="REPEATED"), + bigquery.SchemaField("array_int64_col", "INTEGER", mode="REPEATED"), + bigquery.SchemaField("array_uint8_col", "INTEGER", mode="REPEATED"), + bigquery.SchemaField("array_uint16_col", "INTEGER", mode="REPEATED"), + bigquery.SchemaField("array_uint32_col", "INTEGER", mode="REPEATED"), ) assert table.num_rows == 3 From 16b9fc06676ad5ab7a7924436852dd15f078e129 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 22 Sep 2021 17:41:52 +0100 Subject: [PATCH 07/18] fix: only use arg when `pyarrow>=4.0.0` --- google/cloud/bigquery/_pandas_helpers.py | 13 +++++++++---- google/cloud/bigquery/client.py | 8 +++++++- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index d16d79227..d02cf1097 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -569,18 +569,23 @@ def dataframe_to_parquet( compliant Parquet nested type (lists). Defaults to ``True``. https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table + + This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``. """ pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True) import pyarrow.parquet + kwargs = ( + {"use_compliant_nested_type": parquet_use_compliant_nested_type} + if _helpers._PYARROW_VERSION.major >= 4 + else {} + ) + bq_schema = schema._to_schema_fields(bq_schema) arrow_table = dataframe_to_arrow(dataframe, bq_schema) pyarrow.parquet.write_table( - arrow_table, - filepath, - compression=parquet_compression, - use_compliant_nested_type=parquet_use_compliant_nested_type, + arrow_table, filepath, compression=parquet_compression, **kwargs, ) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 17c90250d..3557dd142 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2547,6 +2547,8 @@ def load_table_from_dataframe( ``DataFrame.to_parquet()`` method. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet + This argument is ignored for ``pyarrow`` versions before ``4.0.0``. + This argument is only present to allow for backwards compatibility with tables created using an old version of this method. timeout (Optional[float]): @@ -2679,7 +2681,11 @@ def load_table_from_dataframe( tmppath, engine="pyarrow", compression=parquet_compression, - use_compliant_nested_type=parquet_use_compliant_nested_type, + **{ + "use_compliant_nested_type": parquet_use_compliant_nested_type + } + if _PYARROW_VERSION.major >= 4 + else {}, ) else: From ba1b32142295f0027963508cfef9a833736e25c3 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 22 Sep 2021 17:42:34 +0100 Subject: [PATCH 08/18] Revert "chore: bump minimum `pyarrow` version" This reverts commit f44a6aedb84978aca2a04f7dc76a65b0d0866b6c. --- setup.py | 6 +++--- testing/constraints-3.6.txt | 2 +- tests/system/test_arrow.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/setup.py b/setup.py index e1c6653ad..e7515493d 100644 --- a/setup.py +++ b/setup.py @@ -54,11 +54,11 @@ # grpc.Channel.close() method isn't added until 1.32.0. # https://github.com/grpc/grpc/pull/15254 "grpcio >= 1.38.1, < 2.0dev", - "pyarrow >= 4.0.0, < 6.0dev", + "pyarrow >= 3.0.0, < 6.0dev", ], "geopandas": ["geopandas>=0.9.0, <1.0dev", "Shapely>=1.6.0, <2.0dev"], - "pandas": ["pandas>=0.23.0", "pyarrow >= 4.0.0, < 6.0dev"], - "bignumeric_type": ["pyarrow >= 4.0.0, < 6.0dev"], + "pandas": ["pandas>=0.23.0", "pyarrow >= 3.0.0, < 6.0dev"], + "bignumeric_type": ["pyarrow >= 3.0.0, < 6.0dev"], "tqdm": ["tqdm >= 4.7.4, <5.0.0dev"], "opentelemetry": [ "opentelemetry-api >= 0.11b0", diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index 7735f35bb..23d2724f7 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -17,7 +17,7 @@ opentelemetry-sdk==0.11b0 pandas==0.24.2 proto-plus==1.10.0 protobuf==3.12.0 -pyarrow==4.0.0 +pyarrow==3.0.0 requests==2.18.0 Shapely==1.6.0 six==1.13.0 diff --git a/tests/system/test_arrow.py b/tests/system/test_arrow.py index 7f6f39028..96f9dea25 100644 --- a/tests/system/test_arrow.py +++ b/tests/system/test_arrow.py @@ -23,8 +23,8 @@ pyarrow = pytest.importorskip( - "pyarrow", minversion="4.0.0" -) # Needs `use_compliant_nested_type` for `load_table_from_dataframe`. + "pyarrow", minversion="3.0.0" +) # Needs decimal256 for BIGNUMERIC columns. @pytest.mark.parametrize( From 5f915cd39a42051b3041cda70b363cffbb604414 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 22 Sep 2021 18:33:26 +0100 Subject: [PATCH 09/18] chore: tidy up use of `_helpers.PYARROW_VERSIONS` --- google/cloud/bigquery/_helpers.py | 11 +++++++++++ google/cloud/bigquery/_pandas_helpers.py | 12 ++++++------ google/cloud/bigquery/client.py | 22 ++++++++-------------- tests/unit/test_client.py | 8 ++++++-- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/google/cloud/bigquery/_helpers.py b/google/cloud/bigquery/_helpers.py index 28a76206e..d7189d322 100644 --- a/google/cloud/bigquery/_helpers.py +++ b/google/cloud/bigquery/_helpers.py @@ -107,6 +107,9 @@ def verify_version(self): class PyarrowVersions: """Version comparisons for pyarrow package.""" + # https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414 + _PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")]) + def __init__(self): self._installed_version = None @@ -126,6 +129,14 @@ def installed_version(self) -> packaging.version.Version: return self._installed_version + @property + def is_bad_version(self) -> bool: + return self.installed_version in self._PYARROW_BAD_VERSIONS + + @property + def use_compliant_nested_type(self) -> bool: + return self.installed_version.major >= 4 + def try_import(self, raise_if_error: bool = False) -> Any: """Verify that a recent enough version of pyarrow extra is installed. diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index d02cf1097..c035599d6 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -396,7 +396,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): # column, but it was not found. if bq_schema_unused: raise ValueError( - u"bq_schema contains fields not present in dataframe: {}".format( + "bq_schema contains fields not present in dataframe: {}".format( bq_schema_unused ) ) @@ -405,7 +405,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): # pyarrow, if available. if unknown_type_fields: if not pyarrow: - msg = u"Could not determine the type of columns: {}".format( + msg = "Could not determine the type of columns: {}".format( ", ".join(field.name for field in unknown_type_fields) ) warnings.warn(msg) @@ -468,7 +468,7 @@ def augment_schema(dataframe, current_bq_schema): if unknown_type_fields: warnings.warn( - u"Pyarrow could not determine the type of columns: {}.".format( + "Pyarrow could not determine the type of columns: {}.".format( ", ".join(field.name for field in unknown_type_fields) ) ) @@ -507,7 +507,7 @@ def dataframe_to_arrow(dataframe, bq_schema): extra_fields = bq_field_names - column_and_index_names if extra_fields: raise ValueError( - u"bq_schema contains fields not present in dataframe: {}".format( + "bq_schema contains fields not present in dataframe: {}".format( extra_fields ) ) @@ -517,7 +517,7 @@ def dataframe_to_arrow(dataframe, bq_schema): missing_fields = column_names - bq_field_names if missing_fields: raise ValueError( - u"bq_schema is missing fields from dataframe: {}".format(missing_fields) + "bq_schema is missing fields from dataframe: {}".format(missing_fields) ) arrow_arrays = [] @@ -578,7 +578,7 @@ def dataframe_to_parquet( kwargs = ( {"use_compliant_nested_type": parquet_use_compliant_nested_type} - if _helpers._PYARROW_VERSION.major >= 4 + if _helpers.PYARROW_VERSIONS.use_compliant_nested_type else {} ) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 3557dd142..2d3540028 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -33,13 +33,6 @@ import uuid import warnings -try: - import pyarrow - - _PYARROW_VERSION = packaging.version.parse(pyarrow.__version__) -except ImportError: # pragma: NO COVER - pyarrow = None - from google import resumable_media # type: ignore from google.resumable_media.requests import MultipartUpload from google.resumable_media.requests import ResumableUpload @@ -104,6 +97,9 @@ from google.cloud.bigquery.table import TableReference from google.cloud.bigquery.table import RowIterator from google.cloud.bigquery.format_options import ParquetOptions +from google.cloud.bigquery import _helpers + +pyarrow = _helpers.PYARROW_VERSIONS.try_import() _DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB @@ -129,8 +125,6 @@ # https://github.com/googleapis/python-bigquery/issues/438 _MIN_GET_QUERY_RESULTS_TIMEOUT = 120 -# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414 -_PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")]) TIMEOUT_HEADER = "X-Server-Timeout" @@ -2655,12 +2649,12 @@ def load_table_from_dataframe( try: if job_config.source_format == job.SourceFormat.PARQUET: - if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS: + if _helpers.PYARROW_VERSIONS.is_bad_version: msg = ( "Loading dataframe data in PARQUET format with pyarrow " - f"{_PYARROW_VERSION} can result in data corruption. It is " - "therefore *strongly* advised to use a different pyarrow " - "version or a different source format. " + f"{_helpers.PYARROW_VERSIONS.installed_version} can result in data " + "corruption. It is therefore *strongly* advised to use a " + "different pyarrow version or a different source format. " "See: https://github.com/googleapis/python-bigquery/issues/781" ) warnings.warn(msg, category=RuntimeWarning) @@ -2684,7 +2678,7 @@ def load_table_from_dataframe( **{ "use_compliant_nested_type": parquet_use_compliant_nested_type } - if _PYARROW_VERSION.major >= 4 + if _helpers.PYARROW_VERSIONS.use_compliant_nested_type else {}, ) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 8f276720b..28cf9c8da 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -7672,9 +7672,13 @@ def test_load_table_from_dataframe_w_bad_pyarrow_issues_warning(self): records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] dataframe = pandas.DataFrame(records) + _helpers_mock = mock.MagicMock() + _helpers_mock.PYARROW_VERSIONS = mock.MagicMock() + _helpers_mock.PYARROW_VERSIONS.installed_version = packaging.version.parse( + "2.0.0" + ) # A known bad version of pyarrow. pyarrow_version_patch = mock.patch( - "google.cloud.bigquery.client._PYARROW_VERSION", - packaging.version.parse("2.0.0"), # A known bad version of pyarrow. + "google.cloud.bigquery.client._helpers", _helpers_mock ) get_table_patch = mock.patch( "google.cloud.bigquery.client.Client.get_table", From 1c52bb4b152b466537a41ae2b05d5f2242a29ea8 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 24 Sep 2021 18:02:19 +0100 Subject: [PATCH 10/18] Add TODOs for move to V3 --- google/cloud/bigquery/_pandas_helpers.py | 4 ++-- tests/system/test_pandas.py | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index c035599d6..561c96de0 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -79,8 +79,8 @@ def _to_wkb(v): _PANDAS_DTYPE_TO_BQ = { "bool": "BOOLEAN", "datetime64[ns, UTC]": "TIMESTAMP", - # BigQuery does not support uploading DATETIME values from Parquet files. - # See: https://github.com/googleapis/google-cloud-python/issues/9996 + # TODO: Update to DATETIME in V3 + # https://github.com/googleapis/python-bigquery/issues/985 "datetime64[ns]": "TIMESTAMP", "float32": "FLOAT", "float64": "FLOAT", diff --git a/tests/system/test_pandas.py b/tests/system/test_pandas.py index 4d29ebe79..1f43a369a 100644 --- a/tests/system/test_pandas.py +++ b/tests/system/test_pandas.py @@ -175,9 +175,8 @@ def test_load_table_from_dataframe_w_automatic_schema(bigquery_client, dataset_i assert tuple(table.schema) == ( bigquery.SchemaField("bool_col", "BOOLEAN"), bigquery.SchemaField("ts_col", "TIMESTAMP"), - # BigQuery does not support uploading DATETIME values from - # Parquet files. See: - # https://github.com/googleapis/google-cloud-python/issues/9996 + # TODO: Update to DATETIME in V3 + # https://github.com/googleapis/python-bigquery/issues/985 bigquery.SchemaField("dt_col", "TIMESTAMP"), bigquery.SchemaField("float32_col", "FLOAT"), bigquery.SchemaField("float64_col", "FLOAT"), @@ -190,9 +189,8 @@ def test_load_table_from_dataframe_w_automatic_schema(bigquery_client, dataset_i bigquery.SchemaField("uint32_col", "INTEGER"), bigquery.SchemaField("array_bool_col", "BOOLEAN", mode="REPEATED"), bigquery.SchemaField("array_ts_col", "TIMESTAMP", mode="REPEATED"), - # BigQuery does not support uploading DATETIME values from - # Parquet files. See: - # https://github.com/googleapis/google-cloud-python/issues/9996 + # TODO: Update to DATETIME in V3 + # https://github.com/googleapis/python-bigquery/issues/985 bigquery.SchemaField("array_dt_col", "TIMESTAMP", mode="REPEATED"), bigquery.SchemaField("array_float32_col", "FLOAT", mode="REPEATED"), bigquery.SchemaField("array_float64_col", "FLOAT", mode="REPEATED"), From a452b310b831a3c9eab73cc6d76d25f9aaf36a59 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 24 Sep 2021 18:03:09 +0100 Subject: [PATCH 11/18] Use `pyarrow` type testing function --- google/cloud/bigquery/_pandas_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 561c96de0..0cb851469 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -445,7 +445,7 @@ def augment_schema(dataframe, current_bq_schema): arrow_table = pyarrow.array(dataframe[field.name]) - if arrow_table.type.id == 25: + if pyarrow.types.is_list(arrow_table.type): # `pyarrow.ListType` detected_mode = "REPEATED" detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.values.type.id) From ffb34a6b49d37a77dfe32909980fb4e44e2bfad8 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 24 Sep 2021 18:03:35 +0100 Subject: [PATCH 12/18] Add unit tests for `ParquetOptions` --- tests/unit/test_client.py | 174 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 28cf9c8da..9048c86af 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -57,6 +57,7 @@ from google.cloud import bigquery_v2 from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.retry import DEFAULT_TIMEOUT +from google.cloud.bigquery import ParquetOptions try: from google.cloud import bigquery_storage @@ -6956,6 +6957,179 @@ def test_load_table_from_dataframe_w_custom_job_config_w_source_format(self): # the original config object should not have been modified assert job_config.to_api_repr() == original_config_copy.to_api_repr() + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_parquet_options_none(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] + dataframe = pandas.DataFrame(records) + + job_config = job.LoadJobConfig( + write_disposition=job.WriteDisposition.WRITE_TRUNCATE, + source_format=job.SourceFormat.PARQUET, + ) + + get_table_patch = mock.patch( + "google.cloud.bigquery.client.Client.get_table", + autospec=True, + return_value=mock.Mock( + schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")] + ), + ) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + with load_patch as load_table_from_file, get_table_patch as get_table: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + # no need to fetch and inspect table schema for WRITE_TRUNCATE jobs + assert not get_table.called + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + size=mock.ANY, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + timeout=DEFAULT_TIMEOUT, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.parquet_options.enable_list_inference is True + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_list_inference_none(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] + dataframe = pandas.DataFrame(records) + + parquet_options = ParquetOptions() + + job_config = job.LoadJobConfig( + write_disposition=job.WriteDisposition.WRITE_TRUNCATE, + source_format=job.SourceFormat.PARQUET, + ) + job_config.parquet_options = parquet_options + + original_config_copy = copy.deepcopy(job_config) + + get_table_patch = mock.patch( + "google.cloud.bigquery.client.Client.get_table", + autospec=True, + return_value=mock.Mock( + schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")] + ), + ) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + with load_patch as load_table_from_file, get_table_patch as get_table: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + # no need to fetch and inspect table schema for WRITE_TRUNCATE jobs + assert not get_table.called + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + size=mock.ANY, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + timeout=DEFAULT_TIMEOUT, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.parquet_options.enable_list_inference is None + + # the original config object should not have been modified + assert job_config.to_api_repr() == original_config_copy.to_api_repr() + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_list_inference_false(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] + dataframe = pandas.DataFrame(records) + + parquet_options = ParquetOptions() + parquet_options.enable_list_inference = False + + job_config = job.LoadJobConfig( + write_disposition=job.WriteDisposition.WRITE_TRUNCATE, + source_format=job.SourceFormat.PARQUET, + ) + job_config.parquet_options = parquet_options + + original_config_copy = copy.deepcopy(job_config) + + get_table_patch = mock.patch( + "google.cloud.bigquery.client.Client.get_table", + autospec=True, + return_value=mock.Mock( + schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")] + ), + ) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + with load_patch as load_table_from_file, get_table_patch as get_table: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + # no need to fetch and inspect table schema for WRITE_TRUNCATE jobs + assert not get_table.called + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + size=mock.ANY, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + timeout=DEFAULT_TIMEOUT, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.parquet_options.enable_list_inference is False + + # the original config object should not have been modified + assert job_config.to_api_repr() == original_config_copy.to_api_repr() + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_custom_job_config_w_wrong_source_format(self): From d3828b1fe2bfdf8029ed4af28ac7f971f4f81aed Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 24 Sep 2021 18:03:44 +0100 Subject: [PATCH 13/18] Update docstring --- google/cloud/bigquery/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 2d3540028..d07272863 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2465,10 +2465,10 @@ def load_table_from_dataframe( They are supported when using the PARQUET source format, but due to the way they are encoded in the ``parquet`` file, a mismatch with the existing table schema can occur, so - 100% compatibility cannot be guaranteed for REPEATED fields when + REPEATED fields are not properly support when using ``pyarrow<4.0.0`` using the parquet format. - https://github.com/googleapis/python-bigquery/issues/17 + https://github.com/googleapis/python-bigquery/issues/19 Args: dataframe (pandas.DataFrame): From 62137d8c819cf05c4e7ad4bd940e0943c6104046 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 24 Sep 2021 18:06:24 +0100 Subject: [PATCH 14/18] Remove unused import --- google/cloud/bigquery/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index d07272863..5981feffc 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -27,7 +27,6 @@ import json import math import os -import packaging.version import tempfile from typing import Any, BinaryIO, Dict, Iterable, Optional, Sequence, Tuple, Union import uuid From ba3f145ce62c185946a9d35c06bb5d76f59e3793 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 27 Sep 2021 22:59:41 +0100 Subject: [PATCH 15/18] Remove user facing argument --- google/cloud/bigquery/client.py | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 5981feffc..a752d39ae 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2450,7 +2450,6 @@ def load_table_from_dataframe( project: str = None, job_config: LoadJobConfig = None, parquet_compression: str = "snappy", - parquet_use_compliant_nested_type: bool = True, timeout: float = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of a table from a pandas DataFrame. @@ -2526,24 +2525,6 @@ def load_table_from_dataframe( passed as the ``compression`` argument to the underlying ``DataFrame.to_parquet()`` method. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet - parquet_use_compliant_nested_type (bool): - Whether the ``pyarrow.parquet.write_table`` serializing method should write - compliant Parquet nested type (lists). Defaults to ``True``. - - The argument is directly passed as the ``use_compliant_nested_type`` - argument to the underlying ``pyarrow.parquet.write_table()`` - method. - https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table - - If the job config schema is missing, the argument is directly - passed as an additonal ``kwarg`` argument to the underlying - ``DataFrame.to_parquet()`` method. - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet - - This argument is ignored for ``pyarrow`` versions before ``4.0.0``. - - This argument is only present to allow for backwards compatibility with - tables created using an old version of this method. timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. @@ -2667,16 +2648,14 @@ def load_table_from_dataframe( job_config.schema, tmppath, parquet_compression=parquet_compression, - parquet_use_compliant_nested_type=parquet_use_compliant_nested_type, + parquet_use_compliant_nested_type=True, ) else: dataframe.to_parquet( tmppath, engine="pyarrow", compression=parquet_compression, - **{ - "use_compliant_nested_type": parquet_use_compliant_nested_type - } + **{"use_compliant_nested_type": True} if _helpers.PYARROW_VERSIONS.use_compliant_nested_type else {}, ) From ea54491846668a713eb120c56e12feffd179582f Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 27 Sep 2021 22:59:50 +0100 Subject: [PATCH 16/18] Fix doctring typo --- google/cloud/bigquery/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index a752d39ae..dfe77d337 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2463,7 +2463,7 @@ def load_table_from_dataframe( They are supported when using the PARQUET source format, but due to the way they are encoded in the ``parquet`` file, a mismatch with the existing table schema can occur, so - REPEATED fields are not properly support when using ``pyarrow<4.0.0`` + REPEATED fields are not properly supported when using ``pyarrow<4.0.0`` using the parquet format. https://github.com/googleapis/python-bigquery/issues/19 From 9ec8c67922a14a3903bac6738cb01a03bf074a15 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 6 Oct 2021 16:58:20 -0500 Subject: [PATCH 17/18] Update google/cloud/bigquery/client.py --- google/cloud/bigquery/client.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index dfe77d337..486f028c0 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2655,9 +2655,11 @@ def load_table_from_dataframe( tmppath, engine="pyarrow", compression=parquet_compression, - **{"use_compliant_nested_type": True} - if _helpers.PYARROW_VERSIONS.use_compliant_nested_type - else {}, + **( + {"use_compliant_nested_type": True} + if _helpers.PYARROW_VERSIONS.use_compliant_nested_type + else {} + ), ) else: From 4fa8665301d7b5a5b49decea05f31379a27be0bc Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 7 Oct 2021 10:02:18 -0500 Subject: [PATCH 18/18] Update google/cloud/bigquery/client.py --- google/cloud/bigquery/client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 486f028c0..a8a1c1e16 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2557,7 +2557,10 @@ def load_table_from_dataframe( # default value job_config.source_format = job.SourceFormat.PARQUET - if job_config.parquet_options is None: + if ( + job_config.source_format == job.SourceFormat.PARQUET + and job_config.parquet_options is None + ): parquet_options = ParquetOptions() # default value parquet_options.enable_list_inference = True