Skip to content

Commit cefce15

Browse files
committed
feat: make QueryJob.done() method more performant
1 parent 3ce826e commit cefce15

File tree

2 files changed

+129
-184
lines changed

2 files changed

+129
-184
lines changed

google/cloud/bigquery/job/query.py

Lines changed: 35 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import re
2020

2121
from google.api_core import exceptions
22+
from google.api_core.future import polling as polling_future
2223
import requests
2324

2425
from google.cloud.bigquery.dataset import Dataset
@@ -42,7 +43,6 @@
4243
from google.cloud.bigquery._tqdm_helpers import wait_for_query
4344

4445
from google.cloud.bigquery.job.base import _AsyncJob
45-
from google.cloud.bigquery.job.base import _DONE_STATE
4646
from google.cloud.bigquery.job.base import _JobConfig
4747
from google.cloud.bigquery.job.base import _JobReference
4848

@@ -974,61 +974,6 @@ def estimated_bytes_processed(self):
974974
result = int(result)
975975
return result
976976

977-
def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
978-
"""Refresh the job and checks if it is complete.
979-
980-
Args:
981-
retry (Optional[google.api_core.retry.Retry]):
982-
How to retry the call that retrieves query results. If the job state is
983-
``DONE``, retrying is aborted early, as the job will not change anymore.
984-
timeout (Optional[float]):
985-
The number of seconds to wait for the underlying HTTP transport
986-
before using ``retry``.
987-
reload (Optional[bool]):
988-
If ``True``, make an API call to refresh the job state of
989-
unfinished jobs before checking. Default ``True``.
990-
991-
Returns:
992-
bool: ``True`` if the job is complete or if fetching its status resulted in
993-
an error, ``False`` otherwise.
994-
"""
995-
# Do not refresh if the state is already done, as the job will not
996-
# change once complete.
997-
is_done = self.state == _DONE_STATE
998-
if not reload or is_done:
999-
return is_done
1000-
1001-
# If an explicit timeout is not given, fall back to the transport timeout
1002-
# stored in _blocking_poll() in the process of polling for job completion.
1003-
transport_timeout = timeout if timeout is not None else self._transport_timeout
1004-
1005-
try:
1006-
self._reload_query_results(retry=retry, timeout=transport_timeout)
1007-
except exceptions.GoogleAPIError as exc:
1008-
# Reloading also updates error details on self, thus no need for an
1009-
# explicit self.set_exception() call if reloading succeeds.
1010-
try:
1011-
self.reload(retry=retry, timeout=transport_timeout)
1012-
except exceptions.GoogleAPIError:
1013-
# Use the query results reload exception, as it generally contains
1014-
# much more useful error information.
1015-
self.set_exception(exc)
1016-
return True
1017-
else:
1018-
return self.state == _DONE_STATE
1019-
1020-
# Only reload the job once we know the query is complete.
1021-
# This will ensure that fields such as the destination table are
1022-
# correctly populated.
1023-
if self._query_results.complete:
1024-
try:
1025-
self.reload(retry=retry, timeout=transport_timeout)
1026-
except exceptions.GoogleAPIError as exc:
1027-
self.set_exception(exc)
1028-
return True
1029-
1030-
return self.state == _DONE_STATE
1031-
1032977
def _blocking_poll(self, timeout=None, **kwargs):
1033978
self._done_timeout = timeout
1034979
self._transport_timeout = timeout
@@ -1130,6 +1075,40 @@ def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None):
11301075
timeout=transport_timeout,
11311076
)
11321077

1078+
def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None):
1079+
"""Check if the query has finished running and raise if it's not.
1080+
1081+
If the query has finished, also reload the job itself.
1082+
"""
1083+
# If an explicit timeout is not given, fall back to the transport timeout
1084+
# stored in _blocking_poll() in the process of polling for job completion.
1085+
transport_timeout = timeout if timeout is not None else self._transport_timeout
1086+
1087+
try:
1088+
self._reload_query_results(retry=retry, timeout=transport_timeout)
1089+
except exceptions.GoogleAPIError as exc:
1090+
# Reloading also updates error details on self, thus no need for an
1091+
# explicit self.set_exception() call if reloading succeeds.
1092+
try:
1093+
self.reload(retry=retry, timeout=transport_timeout)
1094+
except exceptions.GoogleAPIError:
1095+
# Use the query results reload exception, as it generally contains
1096+
# much more useful error information.
1097+
self.set_exception(exc)
1098+
finally:
1099+
return
1100+
1101+
# Only reload the job once we know the query is complete.
1102+
# This will ensure that fields such as the destination table are
1103+
# correctly populated.
1104+
if not self._query_results.complete:
1105+
raise polling_future._OperationNotComplete()
1106+
else:
1107+
try:
1108+
self.reload(retry=retry, timeout=transport_timeout)
1109+
except exceptions.GoogleAPIError as exc:
1110+
self.set_exception(exc)
1111+
11331112
def result(
11341113
self,
11351114
page_size=None,

tests/unit/job/test_query.py

Lines changed: 94 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -309,132 +309,6 @@ def test_cancelled(self):
309309

310310
self.assertTrue(job.cancelled())
311311

312-
def test_done_job_complete(self):
313-
client = _make_client(project=self.PROJECT)
314-
resource = self._make_resource(ended=True)
315-
job = self._get_target_class().from_api_repr(resource, client)
316-
job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
317-
{"jobComplete": True, "jobReference": resource["jobReference"]}
318-
)
319-
self.assertTrue(job.done())
320-
321-
def test_done_w_timeout(self):
322-
client = _make_client(project=self.PROJECT)
323-
resource = self._make_resource(ended=False)
324-
job = self._get_target_class().from_api_repr(resource, client)
325-
326-
with mock.patch.object(
327-
client, "_get_query_results"
328-
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
329-
job.done(timeout=42)
330-
331-
fake_get_results.assert_called_once()
332-
call_args = fake_get_results.call_args
333-
self.assertEqual(call_args.kwargs.get("timeout"), 42)
334-
335-
call_args = fake_reload.call_args
336-
self.assertEqual(call_args.kwargs.get("timeout"), 42)
337-
338-
def test_done_w_timeout_and_longer_internal_api_timeout(self):
339-
client = _make_client(project=self.PROJECT)
340-
resource = self._make_resource(ended=False)
341-
job = self._get_target_class().from_api_repr(resource, client)
342-
job._done_timeout = 8.8
343-
344-
with mock.patch.object(
345-
client, "_get_query_results"
346-
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
347-
job.done(timeout=5.5)
348-
349-
# The expected timeout used is simply the given timeout, as the latter
350-
# is shorter than the job's internal done timeout.
351-
expected_timeout = 5.5
352-
353-
fake_get_results.assert_called_once()
354-
call_args = fake_get_results.call_args
355-
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)
356-
357-
call_args = fake_reload.call_args
358-
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)
359-
360-
def test_done_w_query_results_error_reload_ok_job_finished(self):
361-
client = _make_client(project=self.PROJECT)
362-
bad_request_error = exceptions.BadRequest("Error in query")
363-
client._get_query_results = mock.Mock(side_effect=bad_request_error)
364-
365-
resource = self._make_resource(ended=False)
366-
job = self._get_target_class().from_api_repr(resource, client)
367-
job._exception = None
368-
369-
def fake_reload(self, *args, **kwargs):
370-
self._properties["status"]["state"] = "DONE"
371-
self.set_exception(copy.copy(bad_request_error))
372-
373-
fake_reload_method = types.MethodType(fake_reload, job)
374-
375-
with mock.patch.object(job, "reload", new=fake_reload_method):
376-
is_done = job.done()
377-
378-
assert is_done
379-
assert isinstance(job._exception, exceptions.BadRequest)
380-
381-
def test_done_w_query_results_error_reload_ok_job_still_running(self):
382-
client = _make_client(project=self.PROJECT)
383-
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
384-
client._get_query_results = mock.Mock(side_effect=retry_error)
385-
386-
resource = self._make_resource(ended=False)
387-
job = self._get_target_class().from_api_repr(resource, client)
388-
job._exception = None
389-
390-
def fake_reload(self, *args, **kwargs):
391-
self._properties["status"]["state"] = "RUNNING"
392-
393-
fake_reload_method = types.MethodType(fake_reload, job)
394-
395-
with mock.patch.object(job, "reload", new=fake_reload_method):
396-
is_done = job.done()
397-
398-
assert not is_done
399-
assert job._exception is None
400-
401-
def test_done_w_query_results_error_reload_error(self):
402-
client = _make_client(project=self.PROJECT)
403-
bad_request_error = exceptions.BadRequest("Error in query")
404-
client._get_query_results = mock.Mock(side_effect=bad_request_error)
405-
406-
resource = self._make_resource(ended=False)
407-
job = self._get_target_class().from_api_repr(resource, client)
408-
reload_error = exceptions.DataLoss("Oops, sorry!")
409-
job.reload = mock.Mock(side_effect=reload_error)
410-
job._exception = None
411-
412-
is_done = job.done()
413-
414-
assert is_done
415-
assert job._exception is bad_request_error
416-
417-
def test_done_w_job_query_results_ok_reload_error(self):
418-
client = _make_client(project=self.PROJECT)
419-
query_results = google.cloud.bigquery.query._QueryResults(
420-
properties={
421-
"jobComplete": True,
422-
"jobReference": {"projectId": self.PROJECT, "jobId": "12345"},
423-
}
424-
)
425-
client._get_query_results = mock.Mock(return_value=query_results)
426-
427-
resource = self._make_resource(ended=False)
428-
job = self._get_target_class().from_api_repr(resource, client)
429-
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
430-
job.reload = mock.Mock(side_effect=retry_error)
431-
job._exception = None
432-
433-
is_done = job.done()
434-
435-
assert is_done
436-
assert job._exception is retry_error
437-
438312
def test_query_plan(self):
439313
from google.cloud._helpers import _RFC3339_MICROS
440314
from google.cloud.bigquery.job import QueryPlanEntry
@@ -1905,8 +1779,6 @@ def test_reload_w_timeout(self):
19051779
)
19061780

19071781
def test_iter(self):
1908-
import types
1909-
19101782
begun_resource = self._make_resource()
19111783
query_resource = {
19121784
"jobComplete": True,
@@ -1921,3 +1793,97 @@ def test_iter(self):
19211793
job = self._make_one(self.JOB_ID, self.QUERY, client)
19221794

19231795
self.assertIsInstance(iter(job), types.GeneratorType)
1796+
1797+
def test__done_or_raise_w_timeout(self):
1798+
client = _make_client(project=self.PROJECT)
1799+
resource = self._make_resource(ended=False)
1800+
job = self._get_target_class().from_api_repr(resource, client)
1801+
1802+
with mock.patch.object(
1803+
client, "_get_query_results"
1804+
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
1805+
job._done_or_raise(timeout=42)
1806+
1807+
fake_get_results.assert_called_once()
1808+
call_args = fake_get_results.call_args
1809+
self.assertEqual(call_args.kwargs.get("timeout"), 42)
1810+
1811+
call_args = fake_reload.call_args
1812+
self.assertEqual(call_args.kwargs.get("timeout"), 42)
1813+
1814+
def test__done_or_raise_w_timeout_and_longer_internal_api_timeout(self):
1815+
client = _make_client(project=self.PROJECT)
1816+
resource = self._make_resource(ended=False)
1817+
job = self._get_target_class().from_api_repr(resource, client)
1818+
job._done_timeout = 8.8
1819+
1820+
with mock.patch.object(
1821+
client, "_get_query_results"
1822+
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
1823+
job._done_or_raise(timeout=5.5)
1824+
1825+
# The expected timeout used is simply the given timeout, as the latter
1826+
# is shorter than the job's internal done timeout.
1827+
expected_timeout = 5.5
1828+
1829+
fake_get_results.assert_called_once()
1830+
call_args = fake_get_results.call_args
1831+
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)
1832+
1833+
call_args = fake_reload.call_args
1834+
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)
1835+
1836+
def test__done_or_raise_w_query_results_error_reload_ok(self):
1837+
client = _make_client(project=self.PROJECT)
1838+
bad_request_error = exceptions.BadRequest("Error in query")
1839+
client._get_query_results = mock.Mock(side_effect=bad_request_error)
1840+
1841+
resource = self._make_resource(ended=False)
1842+
job = self._get_target_class().from_api_repr(resource, client)
1843+
job._exception = None
1844+
1845+
def fake_reload(self, *args, **kwargs):
1846+
self._properties["status"]["state"] = "DONE"
1847+
self.set_exception(copy.copy(bad_request_error))
1848+
1849+
fake_reload_method = types.MethodType(fake_reload, job)
1850+
1851+
with mock.patch.object(job, "reload", new=fake_reload_method):
1852+
job._done_or_raise()
1853+
1854+
assert isinstance(job._exception, exceptions.BadRequest)
1855+
1856+
def test__done_or_raise_w_query_results_error_reload_error(self):
1857+
client = _make_client(project=self.PROJECT)
1858+
bad_request_error = exceptions.BadRequest("Error in query")
1859+
client._get_query_results = mock.Mock(side_effect=bad_request_error)
1860+
1861+
resource = self._make_resource(ended=False)
1862+
job = self._get_target_class().from_api_repr(resource, client)
1863+
reload_error = exceptions.DataLoss("Oops, sorry!")
1864+
job.reload = mock.Mock(side_effect=reload_error)
1865+
job._exception = None
1866+
1867+
job._done_or_raise()
1868+
1869+
assert job._exception is bad_request_error
1870+
1871+
def test__done_or_raise_w_job_query_results_ok_reload_error(self):
1872+
client = _make_client(project=self.PROJECT)
1873+
query_results = google.cloud.bigquery.query._QueryResults(
1874+
properties={
1875+
"jobComplete": True,
1876+
"jobReference": {"projectId": self.PROJECT, "jobId": "12345"},
1877+
}
1878+
)
1879+
client._get_query_results = mock.Mock(return_value=query_results)
1880+
1881+
resource = self._make_resource(ended=False)
1882+
job = self._get_target_class().from_api_repr(resource, client)
1883+
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
1884+
job.reload = mock.Mock(side_effect=retry_error)
1885+
job._exception = None
1886+
1887+
job._done_or_raise()
1888+
1889+
assert job._exception is retry_error

0 commit comments

Comments
 (0)