Skip to content

Commit dc78edd

Browse files
feat: add progress bar to QueryJob.to_dataframe and to_arrow (#352)
* feat: add progress bar for to_arrow method * feat: add progress bar for to_dataframe * feat: add default progress bar and unit test * feat: nit * feat: result timout for without queryplan
1 parent b899ad1 commit dc78edd

File tree

5 files changed

+367
-40
lines changed

5 files changed

+367
-40
lines changed
+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Copyright 2019 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Shared helper functions for tqdm progress bar."""
16+
17+
import concurrent.futures
18+
import time
19+
import warnings
20+
21+
try:
22+
import tqdm
23+
except ImportError: # pragma: NO COVER
24+
tqdm = None
25+
26+
_NO_TQDM_ERROR = (
27+
"A progress bar was requested, but there was an error loading the tqdm "
28+
"library. Please install tqdm to use the progress bar functionality."
29+
)
30+
31+
_PROGRESS_BAR_UPDATE_INTERVAL = 0.5
32+
33+
34+
def get_progress_bar(progress_bar_type, description, total, unit):
35+
"""Construct a tqdm progress bar object, if tqdm is ."""
36+
if tqdm is None:
37+
if progress_bar_type is not None:
38+
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
39+
return None
40+
41+
try:
42+
if progress_bar_type == "tqdm":
43+
return tqdm.tqdm(desc=description, total=total, unit=unit)
44+
elif progress_bar_type == "tqdm_notebook":
45+
return tqdm.tqdm_notebook(desc=description, total=total, unit=unit)
46+
elif progress_bar_type == "tqdm_gui":
47+
return tqdm.tqdm_gui(desc=description, total=total, unit=unit)
48+
except (KeyError, TypeError):
49+
# Protect ourselves from any tqdm errors. In case of
50+
# unexpected tqdm behavior, just fall back to showing
51+
# no progress bar.
52+
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
53+
return None
54+
55+
56+
def wait_for_query(query_job, progress_bar_type=None):
57+
"""Return query result and display a progress bar while the query running, if tqdm is installed."""
58+
if progress_bar_type is None:
59+
return query_job.result()
60+
61+
default_total = 1
62+
current_stage = None
63+
start_time = time.time()
64+
progress_bar = get_progress_bar(
65+
progress_bar_type, "Query is running", default_total, "query"
66+
)
67+
i = 0
68+
while True:
69+
if query_job.query_plan:
70+
default_total = len(query_job.query_plan)
71+
current_stage = query_job.query_plan[i]
72+
progress_bar.total = len(query_job.query_plan)
73+
progress_bar.set_description(
74+
"Query executing stage {} and status {} : {:0.2f}s".format(
75+
current_stage.name, current_stage.status, time.time() - start_time,
76+
),
77+
)
78+
try:
79+
query_result = query_job.result(timeout=_PROGRESS_BAR_UPDATE_INTERVAL)
80+
progress_bar.update(default_total)
81+
progress_bar.set_description(
82+
"Query complete after {:0.2f}s".format(time.time() - start_time),
83+
)
84+
break
85+
except concurrent.futures.TimeoutError:
86+
query_job.reload() # Refreshes the state via a GET request.
87+
if current_stage:
88+
if current_stage.status == "COMPLETE":
89+
if i < default_total - 1:
90+
progress_bar.update(i + 1)
91+
i += 1
92+
continue
93+
progress_bar.close()
94+
return query_result

google/cloud/bigquery/job/query.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from google.cloud.bigquery.table import _table_arg_to_table_ref
4141
from google.cloud.bigquery.table import TableReference
4242
from google.cloud.bigquery.table import TimePartitioning
43+
from google.cloud.bigquery._tqdm_helpers import wait_for_query
4344

4445
from google.cloud.bigquery.job.base import _AsyncJob
4546
from google.cloud.bigquery.job.base import _DONE_STATE
@@ -1259,7 +1260,8 @@ def to_arrow(
12591260
12601261
..versionadded:: 1.17.0
12611262
"""
1262-
return self.result().to_arrow(
1263+
query_result = wait_for_query(self, progress_bar_type)
1264+
return query_result.to_arrow(
12631265
progress_bar_type=progress_bar_type,
12641266
bqstorage_client=bqstorage_client,
12651267
create_bqstorage_client=create_bqstorage_client,
@@ -1328,7 +1330,8 @@ def to_dataframe(
13281330
Raises:
13291331
ValueError: If the `pandas` library cannot be imported.
13301332
"""
1331-
return self.result().to_dataframe(
1333+
query_result = wait_for_query(self, progress_bar_type)
1334+
return query_result.to_dataframe(
13321335
bqstorage_client=bqstorage_client,
13331336
dtypes=dtypes,
13341337
progress_bar_type=progress_bar_type,

google/cloud/bigquery/table.py

+5-36
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,6 @@
3636
except ImportError: # pragma: NO COVER
3737
pyarrow = None
3838

39-
try:
40-
import tqdm
41-
except ImportError: # pragma: NO COVER
42-
tqdm = None
43-
4439
import google.api_core.exceptions
4540
from google.api_core.page_iterator import HTTPIterator
4641

@@ -50,6 +45,7 @@
5045
from google.cloud.bigquery.schema import _build_schema_resource
5146
from google.cloud.bigquery.schema import _parse_schema_resource
5247
from google.cloud.bigquery.schema import _to_schema_fields
48+
from google.cloud.bigquery._tqdm_helpers import get_progress_bar
5349
from google.cloud.bigquery.external_config import ExternalConfig
5450
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
5551

@@ -68,10 +64,7 @@
6864
"The pyarrow library is not installed, please install "
6965
"pyarrow to use the to_arrow() function."
7066
)
71-
_NO_TQDM_ERROR = (
72-
"A progress bar was requested, but there was an error loading the tqdm "
73-
"library. Please install tqdm to use the progress bar functionality."
74-
)
67+
7568
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
7669

7770

@@ -1418,32 +1411,6 @@ def total_rows(self):
14181411
"""int: The total number of rows in the table."""
14191412
return self._total_rows
14201413

1421-
def _get_progress_bar(self, progress_bar_type):
1422-
"""Construct a tqdm progress bar object, if tqdm is installed."""
1423-
if tqdm is None:
1424-
if progress_bar_type is not None:
1425-
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
1426-
return None
1427-
1428-
description = "Downloading"
1429-
unit = "rows"
1430-
1431-
try:
1432-
if progress_bar_type == "tqdm":
1433-
return tqdm.tqdm(desc=description, total=self.total_rows, unit=unit)
1434-
elif progress_bar_type == "tqdm_notebook":
1435-
return tqdm.tqdm_notebook(
1436-
desc=description, total=self.total_rows, unit=unit
1437-
)
1438-
elif progress_bar_type == "tqdm_gui":
1439-
return tqdm.tqdm_gui(desc=description, total=self.total_rows, unit=unit)
1440-
except (KeyError, TypeError):
1441-
# Protect ourselves from any tqdm errors. In case of
1442-
# unexpected tqdm behavior, just fall back to showing
1443-
# no progress bar.
1444-
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
1445-
return None
1446-
14471414
def _to_page_iterable(
14481415
self, bqstorage_download, tabledata_list_download, bqstorage_client=None
14491416
):
@@ -1551,7 +1518,9 @@ def to_arrow(
15511518
owns_bqstorage_client = bqstorage_client is not None
15521519

15531520
try:
1554-
progress_bar = self._get_progress_bar(progress_bar_type)
1521+
progress_bar = get_progress_bar(
1522+
progress_bar_type, "Downloading", self.total_rows, "rows"
1523+
)
15551524

15561525
record_batches = []
15571526
for record_batch in self._to_arrow_iterable(

0 commit comments

Comments
 (0)