Skip to content

Commit 45b9308

Browse files
authored
fix: avoid possible job already exists error (#751)
* fix: avoid possible job already exists error If job create request fails, a query job might still have started successfully. This commit handles this edge case and returns such query job one can be found. * Catch only Conflict errors on query job create
1 parent 5deef6f commit 45b9308

File tree

2 files changed

+99
-2
lines changed

2 files changed

+99
-2
lines changed

google/cloud/bigquery/client.py

+24-2
Original file line numberDiff line numberDiff line change
@@ -3190,6 +3190,7 @@ def query(
31903190
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.QueryJobConfig`
31913191
class.
31923192
"""
3193+
job_id_given = job_id is not None
31933194
job_id = _make_job_id(job_id, job_id_prefix)
31943195

31953196
if project is None:
@@ -3221,9 +3222,30 @@ def query(
32213222

32223223
job_ref = job._JobReference(job_id, project=project, location=location)
32233224
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)
3224-
query_job._begin(retry=retry, timeout=timeout)
32253225

3226-
return query_job
3226+
try:
3227+
query_job._begin(retry=retry, timeout=timeout)
3228+
except core_exceptions.Conflict as create_exc:
3229+
# The thought is if someone is providing their own job IDs and they get
3230+
# their job ID generation wrong, this could end up returning results for
3231+
# the wrong query. We thus only try to recover if job ID was not given.
3232+
if job_id_given:
3233+
raise create_exc
3234+
3235+
try:
3236+
query_job = self.get_job(
3237+
job_id,
3238+
project=project,
3239+
location=location,
3240+
retry=retry,
3241+
timeout=timeout,
3242+
)
3243+
except core_exceptions.GoogleAPIError: # (includes RetryError)
3244+
raise create_exc
3245+
else:
3246+
return query_job
3247+
else:
3248+
return query_job
32273249

32283250
def insert_rows(
32293251
self,

tests/unit/test_client.py

+75
Original file line numberDiff line numberDiff line change
@@ -4617,6 +4617,81 @@ def test_query_w_query_parameters(self):
46174617
},
46184618
)
46194619

4620+
def test_query_job_rpc_fail_w_random_error(self):
4621+
from google.api_core.exceptions import Unknown
4622+
from google.cloud.bigquery.job import QueryJob
4623+
4624+
creds = _make_credentials()
4625+
http = object()
4626+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
4627+
4628+
job_create_error = Unknown("Not sure what went wrong.")
4629+
job_begin_patcher = mock.patch.object(
4630+
QueryJob, "_begin", side_effect=job_create_error
4631+
)
4632+
with job_begin_patcher:
4633+
with pytest.raises(Unknown, match="Not sure what went wrong."):
4634+
client.query("SELECT 1;", job_id="123")
4635+
4636+
def test_query_job_rpc_fail_w_conflict_job_id_given(self):
4637+
from google.api_core.exceptions import Conflict
4638+
from google.cloud.bigquery.job import QueryJob
4639+
4640+
creds = _make_credentials()
4641+
http = object()
4642+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
4643+
4644+
job_create_error = Conflict("Job already exists.")
4645+
job_begin_patcher = mock.patch.object(
4646+
QueryJob, "_begin", side_effect=job_create_error
4647+
)
4648+
with job_begin_patcher:
4649+
with pytest.raises(Conflict, match="Job already exists."):
4650+
client.query("SELECT 1;", job_id="123")
4651+
4652+
def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_fails(self):
4653+
from google.api_core.exceptions import Conflict
4654+
from google.api_core.exceptions import DataLoss
4655+
from google.cloud.bigquery.job import QueryJob
4656+
4657+
creds = _make_credentials()
4658+
http = object()
4659+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
4660+
4661+
job_create_error = Conflict("Job already exists.")
4662+
job_begin_patcher = mock.patch.object(
4663+
QueryJob, "_begin", side_effect=job_create_error
4664+
)
4665+
get_job_patcher = mock.patch.object(
4666+
client, "get_job", side_effect=DataLoss("we lost yor job, sorry")
4667+
)
4668+
4669+
with job_begin_patcher, get_job_patcher:
4670+
# If get job request fails, the original exception should be raised.
4671+
with pytest.raises(Conflict, match="Job already exists."):
4672+
client.query("SELECT 1;", job_id=None)
4673+
4674+
def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_succeeds(self):
4675+
from google.api_core.exceptions import Conflict
4676+
from google.cloud.bigquery.job import QueryJob
4677+
4678+
creds = _make_credentials()
4679+
http = object()
4680+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
4681+
4682+
job_create_error = Conflict("Job already exists.")
4683+
job_begin_patcher = mock.patch.object(
4684+
QueryJob, "_begin", side_effect=job_create_error
4685+
)
4686+
get_job_patcher = mock.patch.object(
4687+
client, "get_job", return_value=mock.sentinel.query_job
4688+
)
4689+
4690+
with job_begin_patcher, get_job_patcher:
4691+
result = client.query("SELECT 1;", job_id=None)
4692+
4693+
assert result is mock.sentinel.query_job
4694+
46204695
def test_insert_rows_w_timeout(self):
46214696
from google.cloud.bigquery.schema import SchemaField
46224697
from google.cloud.bigquery.table import Table

0 commit comments

Comments
 (0)