Skip to content

Commit dde9dc5

Browse files
authored
feat: use pyarrow stream compression, if available (#593)
* feat: use pyarrow stream compression, if available * Remove unnecessary pyarrow version check Arrow stream compression requires pyarrow>=1.0.0, but that's already guaranteed by a version pin in setup.py if bqstorage extra is installed. * Remvoe unused pyarrow version parsing in tests * Only use arrow compression in tests if available
1 parent c8b5581 commit dde9dc5

File tree

5 files changed

+146
-14
lines changed

5 files changed

+146
-14
lines changed

google/cloud/bigquery/_pandas_helpers.py

+13
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@
3333
except ImportError: # pragma: NO COVER
3434
pyarrow = None
3535

36+
try:
37+
from google.cloud.bigquery_storage import ArrowSerializationOptions
38+
except ImportError:
39+
_ARROW_COMPRESSION_SUPPORT = False
40+
else:
41+
# Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
42+
_ARROW_COMPRESSION_SUPPORT = True
43+
3644
from google.cloud.bigquery import schema
3745

3846

@@ -631,6 +639,11 @@ def _download_table_bqstorage(
631639
for field in selected_fields:
632640
requested_session.read_options.selected_fields.append(field.name)
633641

642+
if _ARROW_COMPRESSION_SUPPORT:
643+
requested_session.read_options.arrow_serialization_options.buffer_compression = (
644+
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
645+
)
646+
634647
session = bqstorage_client.create_read_session(
635648
parent="projects/{}".format(project_id),
636649
read_session=requested_session,

google/cloud/bigquery/dbapi/cursor.py

+14
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@
1919
import copy
2020
import logging
2121

22+
try:
23+
from google.cloud.bigquery_storage import ArrowSerializationOptions
24+
except ImportError:
25+
_ARROW_COMPRESSION_SUPPORT = False
26+
else:
27+
# Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
28+
_ARROW_COMPRESSION_SUPPORT = True
29+
2230
from google.cloud.bigquery import job
2331
from google.cloud.bigquery.dbapi import _helpers
2432
from google.cloud.bigquery.dbapi import exceptions
@@ -255,6 +263,12 @@ def _bqstorage_fetch(self, bqstorage_client):
255263
table=table_reference.to_bqstorage(),
256264
data_format=bigquery_storage.types.DataFormat.ARROW,
257265
)
266+
267+
if _ARROW_COMPRESSION_SUPPORT:
268+
requested_session.read_options.arrow_serialization_options.buffer_compression = (
269+
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
270+
)
271+
258272
read_session = bqstorage_client.create_read_session(
259273
parent="projects/{}".format(table_reference.project),
260274
read_session=requested_session,

tests/system/test_client.py

-8
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import psutil
3030
import pytest
31-
import pkg_resources
3231

3332
from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT
3433
from . import helpers
@@ -116,13 +115,6 @@
116115
(TooManyRequests, InternalServerError, ServiceUnavailable)
117116
)
118117

119-
PYARROW_MINIMUM_VERSION = pkg_resources.parse_version("0.17.0")
120-
121-
if pyarrow:
122-
PYARROW_INSTALLED_VERSION = pkg_resources.get_distribution("pyarrow").parsed_version
123-
else:
124-
PYARROW_INSTALLED_VERSION = None
125-
126118
MTLS_TESTING = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE") == "true"
127119

128120

tests/unit/job/test_query_pandas.py

+72-6
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,22 @@
4141
from .helpers import _make_job_resource
4242

4343

44+
@pytest.fixture
45+
def table_read_options_kwarg():
46+
# Create a BigQuery Storage table read options object with pyarrow compression
47+
# enabled if a recent-enough version of google-cloud-bigquery-storage dependency is
48+
# installed to support the compression.
49+
if not hasattr(bigquery_storage, "ArrowSerializationOptions"):
50+
return {}
51+
52+
read_options = bigquery_storage.ReadSession.TableReadOptions(
53+
arrow_serialization_options=bigquery_storage.ArrowSerializationOptions(
54+
buffer_compression=bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
55+
)
56+
)
57+
return {"read_options": read_options}
58+
59+
4460
@pytest.mark.parametrize(
4561
"query,expected",
4662
(
@@ -82,7 +98,7 @@ def test__contains_order_by(query, expected):
8298
"SelecT name, age froM table OrdeR \n\t BY other_column;",
8399
),
84100
)
85-
def test_to_dataframe_bqstorage_preserve_order(query):
101+
def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg):
86102
from google.cloud.bigquery.job import QueryJob as target_class
87103

88104
job_resource = _make_job_resource(
@@ -123,8 +139,10 @@ def test_to_dataframe_bqstorage_preserve_order(query):
123139
destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
124140
**job_resource["configuration"]["query"]["destinationTable"]
125141
)
126-
expected_session = bigquery_storage.types.ReadSession(
127-
table=destination_table, data_format=bigquery_storage.types.DataFormat.ARROW,
142+
expected_session = bigquery_storage.ReadSession(
143+
table=destination_table,
144+
data_format=bigquery_storage.DataFormat.ARROW,
145+
**table_read_options_kwarg,
128146
)
129147
bqstorage_client.create_read_session.assert_called_once_with(
130148
parent="projects/test-project",
@@ -431,7 +449,7 @@ def test_to_dataframe_ddl_query():
431449
@pytest.mark.skipif(
432450
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
433451
)
434-
def test_to_dataframe_bqstorage():
452+
def test_to_dataframe_bqstorage(table_read_options_kwarg):
435453
from google.cloud.bigquery.job import QueryJob as target_class
436454

437455
resource = _make_job_resource(job_type="query", ended=True)
@@ -468,8 +486,10 @@ def test_to_dataframe_bqstorage():
468486
destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
469487
**resource["configuration"]["query"]["destinationTable"]
470488
)
471-
expected_session = bigquery_storage.types.ReadSession(
472-
table=destination_table, data_format=bigquery_storage.types.DataFormat.ARROW,
489+
expected_session = bigquery_storage.ReadSession(
490+
table=destination_table,
491+
data_format=bigquery_storage.DataFormat.ARROW,
492+
**table_read_options_kwarg,
473493
)
474494
bqstorage_client.create_read_session.assert_called_once_with(
475495
parent=f"projects/{client.project}",
@@ -478,6 +498,52 @@ def test_to_dataframe_bqstorage():
478498
)
479499

480500

501+
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
502+
@pytest.mark.skipif(
503+
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
504+
)
505+
def test_to_dataframe_bqstorage_no_pyarrow_compression():
506+
from google.cloud.bigquery.job import QueryJob as target_class
507+
508+
resource = _make_job_resource(job_type="query", ended=True)
509+
query_resource = {
510+
"jobComplete": True,
511+
"jobReference": resource["jobReference"],
512+
"totalRows": "4",
513+
"schema": {"fields": [{"name": "name", "type": "STRING", "mode": "NULLABLE"}]},
514+
}
515+
connection = _make_connection(query_resource)
516+
client = _make_client(connection=connection)
517+
job = target_class.from_api_repr(resource, client)
518+
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
519+
session = bigquery_storage.types.ReadSession()
520+
session.avro_schema.schema = json.dumps(
521+
{
522+
"type": "record",
523+
"name": "__root__",
524+
"fields": [{"name": "name", "type": ["null", "string"]}],
525+
}
526+
)
527+
bqstorage_client.create_read_session.return_value = session
528+
529+
with mock.patch(
530+
"google.cloud.bigquery._pandas_helpers._ARROW_COMPRESSION_SUPPORT", new=False
531+
):
532+
job.to_dataframe(bqstorage_client=bqstorage_client)
533+
534+
destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
535+
**resource["configuration"]["query"]["destinationTable"]
536+
)
537+
expected_session = bigquery_storage.ReadSession(
538+
table=destination_table, data_format=bigquery_storage.DataFormat.ARROW,
539+
)
540+
bqstorage_client.create_read_session.assert_called_once_with(
541+
parent=f"projects/{client.project}",
542+
read_session=expected_session,
543+
max_stream_count=0,
544+
)
545+
546+
481547
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
482548
def test_to_dataframe_column_dtypes():
483549
from google.cloud.bigquery.job import QueryJob as target_class

tests/unit/test_dbapi_cursor.py

+47
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def _mock_job(
123123
schema=schema,
124124
num_dml_affected_rows=num_dml_affected_rows,
125125
)
126+
mock_job.destination.project = "P"
126127
mock_job.destination.to_bqstorage.return_value = (
127128
"projects/P/datasets/DS/tables/T"
128129
)
@@ -380,6 +381,52 @@ def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self):
380381
# the default client was not used
381382
mock_client.list_rows.assert_not_called()
382383

384+
@unittest.skipIf(
385+
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
386+
)
387+
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
388+
def test_fetchall_w_bqstorage_client_no_arrow_compression(self):
389+
from google.cloud.bigquery import dbapi
390+
from google.cloud.bigquery import table
391+
392+
# Use unordered data to also test any non-determenistic key order in dicts.
393+
row_data = [table.Row([1.2, 1.1], {"bar": 1, "foo": 0})]
394+
bqstorage_streamed_rows = [{"bar": _to_pyarrow(1.2), "foo": _to_pyarrow(1.1)}]
395+
396+
mock_client = self._mock_client(rows=row_data)
397+
mock_bqstorage_client = self._mock_bqstorage_client(
398+
stream_count=1, rows=bqstorage_streamed_rows,
399+
)
400+
401+
connection = dbapi.connect(
402+
client=mock_client, bqstorage_client=mock_bqstorage_client,
403+
)
404+
cursor = connection.cursor()
405+
cursor.execute("SELECT foo, bar FROM some_table")
406+
407+
with mock.patch(
408+
"google.cloud.bigquery.dbapi.cursor._ARROW_COMPRESSION_SUPPORT", new=False
409+
):
410+
rows = cursor.fetchall()
411+
412+
mock_client.list_rows.assert_not_called() # The default client was not used.
413+
414+
# Check the BQ Storage session config.
415+
expected_session = bigquery_storage.ReadSession(
416+
table="projects/P/datasets/DS/tables/T",
417+
data_format=bigquery_storage.DataFormat.ARROW,
418+
)
419+
mock_bqstorage_client.create_read_session.assert_called_once_with(
420+
parent="projects/P", read_session=expected_session, max_stream_count=1
421+
)
422+
423+
# Check the data returned.
424+
field_value = op.itemgetter(1)
425+
sorted_row_data = [sorted(row.items(), key=field_value) for row in rows]
426+
expected_row_data = [[("foo", 1.1), ("bar", 1.2)]]
427+
428+
self.assertEqual(sorted_row_data, expected_row_data)
429+
383430
def test_execute_custom_job_id(self):
384431
from google.cloud.bigquery.dbapi import connect
385432

0 commit comments

Comments
 (0)