Skip to content

Commit a3ab9ef

Browse files
authored
feat: make QueryJob.done() method more performant (#544)
1 parent 3ce826e commit a3ab9ef

File tree

2 files changed

+45
-100
lines changed

2 files changed

+45
-100
lines changed

google/cloud/bigquery/job/query.py

+35-56
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import re
2020

2121
from google.api_core import exceptions
22+
from google.api_core.future import polling as polling_future
2223
import requests
2324

2425
from google.cloud.bigquery.dataset import Dataset
@@ -42,7 +43,6 @@
4243
from google.cloud.bigquery._tqdm_helpers import wait_for_query
4344

4445
from google.cloud.bigquery.job.base import _AsyncJob
45-
from google.cloud.bigquery.job.base import _DONE_STATE
4646
from google.cloud.bigquery.job.base import _JobConfig
4747
from google.cloud.bigquery.job.base import _JobReference
4848

@@ -974,61 +974,6 @@ def estimated_bytes_processed(self):
974974
result = int(result)
975975
return result
976976

977-
def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
978-
"""Refresh the job and checks if it is complete.
979-
980-
Args:
981-
retry (Optional[google.api_core.retry.Retry]):
982-
How to retry the call that retrieves query results. If the job state is
983-
``DONE``, retrying is aborted early, as the job will not change anymore.
984-
timeout (Optional[float]):
985-
The number of seconds to wait for the underlying HTTP transport
986-
before using ``retry``.
987-
reload (Optional[bool]):
988-
If ``True``, make an API call to refresh the job state of
989-
unfinished jobs before checking. Default ``True``.
990-
991-
Returns:
992-
bool: ``True`` if the job is complete or if fetching its status resulted in
993-
an error, ``False`` otherwise.
994-
"""
995-
# Do not refresh if the state is already done, as the job will not
996-
# change once complete.
997-
is_done = self.state == _DONE_STATE
998-
if not reload or is_done:
999-
return is_done
1000-
1001-
# If an explicit timeout is not given, fall back to the transport timeout
1002-
# stored in _blocking_poll() in the process of polling for job completion.
1003-
transport_timeout = timeout if timeout is not None else self._transport_timeout
1004-
1005-
try:
1006-
self._reload_query_results(retry=retry, timeout=transport_timeout)
1007-
except exceptions.GoogleAPIError as exc:
1008-
# Reloading also updates error details on self, thus no need for an
1009-
# explicit self.set_exception() call if reloading succeeds.
1010-
try:
1011-
self.reload(retry=retry, timeout=transport_timeout)
1012-
except exceptions.GoogleAPIError:
1013-
# Use the query results reload exception, as it generally contains
1014-
# much more useful error information.
1015-
self.set_exception(exc)
1016-
return True
1017-
else:
1018-
return self.state == _DONE_STATE
1019-
1020-
# Only reload the job once we know the query is complete.
1021-
# This will ensure that fields such as the destination table are
1022-
# correctly populated.
1023-
if self._query_results.complete:
1024-
try:
1025-
self.reload(retry=retry, timeout=transport_timeout)
1026-
except exceptions.GoogleAPIError as exc:
1027-
self.set_exception(exc)
1028-
return True
1029-
1030-
return self.state == _DONE_STATE
1031-
1032977
def _blocking_poll(self, timeout=None, **kwargs):
1033978
self._done_timeout = timeout
1034979
self._transport_timeout = timeout
@@ -1130,6 +1075,40 @@ def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None):
11301075
timeout=transport_timeout,
11311076
)
11321077

1078+
def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None):
1079+
"""Check if the query has finished running and raise if it's not.
1080+
1081+
If the query has finished, also reload the job itself.
1082+
"""
1083+
# If an explicit timeout is not given, fall back to the transport timeout
1084+
# stored in _blocking_poll() in the process of polling for job completion.
1085+
transport_timeout = timeout if timeout is not None else self._transport_timeout
1086+
1087+
try:
1088+
self._reload_query_results(retry=retry, timeout=transport_timeout)
1089+
except exceptions.GoogleAPIError as exc:
1090+
# Reloading also updates error details on self, thus no need for an
1091+
# explicit self.set_exception() call if reloading succeeds.
1092+
try:
1093+
self.reload(retry=retry, timeout=transport_timeout)
1094+
except exceptions.GoogleAPIError:
1095+
# Use the query results reload exception, as it generally contains
1096+
# much more useful error information.
1097+
self.set_exception(exc)
1098+
finally:
1099+
return
1100+
1101+
# Only reload the job once we know the query is complete.
1102+
# This will ensure that fields such as the destination table are
1103+
# correctly populated.
1104+
if not self._query_results.complete:
1105+
raise polling_future._OperationNotComplete()
1106+
else:
1107+
try:
1108+
self.reload(retry=retry, timeout=transport_timeout)
1109+
except exceptions.GoogleAPIError as exc:
1110+
self.set_exception(exc)
1111+
11331112
def result(
11341113
self,
11351114
page_size=None,

tests/unit/job/test_query.py

+10-44
Original file line numberDiff line numberDiff line change
@@ -309,24 +309,15 @@ def test_cancelled(self):
309309

310310
self.assertTrue(job.cancelled())
311311

312-
def test_done_job_complete(self):
313-
client = _make_client(project=self.PROJECT)
314-
resource = self._make_resource(ended=True)
315-
job = self._get_target_class().from_api_repr(resource, client)
316-
job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
317-
{"jobComplete": True, "jobReference": resource["jobReference"]}
318-
)
319-
self.assertTrue(job.done())
320-
321-
def test_done_w_timeout(self):
312+
def test__done_or_raise_w_timeout(self):
322313
client = _make_client(project=self.PROJECT)
323314
resource = self._make_resource(ended=False)
324315
job = self._get_target_class().from_api_repr(resource, client)
325316

326317
with mock.patch.object(
327318
client, "_get_query_results"
328319
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
329-
job.done(timeout=42)
320+
job._done_or_raise(timeout=42)
330321

331322
fake_get_results.assert_called_once()
332323
call_args = fake_get_results.call_args
@@ -335,7 +326,7 @@ def test_done_w_timeout(self):
335326
call_args = fake_reload.call_args
336327
self.assertEqual(call_args.kwargs.get("timeout"), 42)
337328

338-
def test_done_w_timeout_and_longer_internal_api_timeout(self):
329+
def test__done_or_raise_w_timeout_and_longer_internal_api_timeout(self):
339330
client = _make_client(project=self.PROJECT)
340331
resource = self._make_resource(ended=False)
341332
job = self._get_target_class().from_api_repr(resource, client)
@@ -344,7 +335,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self):
344335
with mock.patch.object(
345336
client, "_get_query_results"
346337
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
347-
job.done(timeout=5.5)
338+
job._done_or_raise(timeout=5.5)
348339

349340
# The expected timeout used is simply the given timeout, as the latter
350341
# is shorter than the job's internal done timeout.
@@ -357,7 +348,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self):
357348
call_args = fake_reload.call_args
358349
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)
359350

360-
def test_done_w_query_results_error_reload_ok_job_finished(self):
351+
def test__done_or_raise_w_query_results_error_reload_ok(self):
361352
client = _make_client(project=self.PROJECT)
362353
bad_request_error = exceptions.BadRequest("Error in query")
363354
client._get_query_results = mock.Mock(side_effect=bad_request_error)
@@ -373,32 +364,11 @@ def fake_reload(self, *args, **kwargs):
373364
fake_reload_method = types.MethodType(fake_reload, job)
374365

375366
with mock.patch.object(job, "reload", new=fake_reload_method):
376-
is_done = job.done()
367+
job._done_or_raise()
377368

378-
assert is_done
379369
assert isinstance(job._exception, exceptions.BadRequest)
380370

381-
def test_done_w_query_results_error_reload_ok_job_still_running(self):
382-
client = _make_client(project=self.PROJECT)
383-
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
384-
client._get_query_results = mock.Mock(side_effect=retry_error)
385-
386-
resource = self._make_resource(ended=False)
387-
job = self._get_target_class().from_api_repr(resource, client)
388-
job._exception = None
389-
390-
def fake_reload(self, *args, **kwargs):
391-
self._properties["status"]["state"] = "RUNNING"
392-
393-
fake_reload_method = types.MethodType(fake_reload, job)
394-
395-
with mock.patch.object(job, "reload", new=fake_reload_method):
396-
is_done = job.done()
397-
398-
assert not is_done
399-
assert job._exception is None
400-
401-
def test_done_w_query_results_error_reload_error(self):
371+
def test__done_or_raise_w_query_results_error_reload_error(self):
402372
client = _make_client(project=self.PROJECT)
403373
bad_request_error = exceptions.BadRequest("Error in query")
404374
client._get_query_results = mock.Mock(side_effect=bad_request_error)
@@ -409,12 +379,11 @@ def test_done_w_query_results_error_reload_error(self):
409379
job.reload = mock.Mock(side_effect=reload_error)
410380
job._exception = None
411381

412-
is_done = job.done()
382+
job._done_or_raise()
413383

414-
assert is_done
415384
assert job._exception is bad_request_error
416385

417-
def test_done_w_job_query_results_ok_reload_error(self):
386+
def test__done_or_raise_w_job_query_results_ok_reload_error(self):
418387
client = _make_client(project=self.PROJECT)
419388
query_results = google.cloud.bigquery.query._QueryResults(
420389
properties={
@@ -430,9 +399,8 @@ def test_done_w_job_query_results_ok_reload_error(self):
430399
job.reload = mock.Mock(side_effect=retry_error)
431400
job._exception = None
432401

433-
is_done = job.done()
402+
job._done_or_raise()
434403

435-
assert is_done
436404
assert job._exception is retry_error
437405

438406
def test_query_plan(self):
@@ -1905,8 +1873,6 @@ def test_reload_w_timeout(self):
19051873
)
19061874

19071875
def test_iter(self):
1908-
import types
1909-
19101876
begun_resource = self._make_resource()
19111877
query_resource = {
19121878
"jobComplete": True,

0 commit comments

Comments
 (0)