Skip to content

Commit 392083b

Browse files
authored
StreamManager: retry with get result request on already exist errors (#6345)
* StreamManager: retry with get result request on already exist errors * Fix unused import * Clarified comment for 'program already exists' retry
1 parent 0e288a7 commit 392083b

File tree

2 files changed

+113
-22
lines changed

2 files changed

+113
-22
lines changed

cirq-google/cirq_google/engine/stream_manager.py

+12-18
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,6 @@
3030
]
3131

3232

33-
class ProgramAlreadyExistsError(Exception):
34-
def __init__(self, program_name: str):
35-
# Call the base class constructor with the parameters it needs
36-
super().__init__(f"'{program_name}' already exists")
37-
38-
3933
class StreamError(Exception):
4034
pass
4135

@@ -150,7 +144,6 @@ def submit(
150144
A future for the job result, or the job if the job has failed.
151145
152146
Raises:
153-
ProgramAlreadyExistsError: if the program already exists.
154147
StreamError: if there is a non-retryable error while executing the job.
155148
ValueError: if program name is not set.
156149
concurrent.futures.CancelledError: if the stream is stopped while a job is in flight.
@@ -298,6 +291,7 @@ async def _manage_execution(
298291
current_request,
299292
create_program_and_job_request,
300293
_to_create_job_request(create_program_and_job_request),
294+
_to_get_result_request(create_program_and_job_request),
301295
)
302296
continue
303297
else: # pragma: no cover
@@ -319,7 +313,8 @@ def _get_retry_request_or_raise(
319313
error: quantum.StreamError,
320314
current_request,
321315
create_program_and_job_request,
322-
create_job_request: quantum.QuantumRunStreamRequest,
316+
create_job_request,
317+
get_result_request: quantum.QuantumRunStreamRequest,
323318
):
324319
"""Decide whether the given stream error is retryable.
325320
@@ -330,19 +325,18 @@ def _get_retry_request_or_raise(
330325
return create_program_and_job_request
331326
elif error.code == Code.PROGRAM_ALREADY_EXISTS:
332327
if 'create_quantum_program_and_job' in current_request:
333-
raise ProgramAlreadyExistsError(
334-
current_request.create_quantum_program_and_job.quantum_program.name
335-
)
328+
# If the program already exists and is created as part of the stream client, the job
329+
# should also exist because they are created at the same time.
330+
# If the job is missing, the program is created outside StreamManager.
331+
# A `CreateQuantumJobRequest` will be issued after a `GetQuantumResultRequest` is
332+
# attempted.
333+
return get_result_request
336334
elif error.code == Code.JOB_DOES_NOT_EXIST:
337335
if 'get_quantum_result' in current_request:
338336
return create_job_request
339-
340-
# Code.JOB_ALREADY_EXISTS should never happen.
341-
# The first stream request is always a CreateQuantumProgramAndJobRequest, which never fails
342-
# with this error because jobs are scoped within a program.
343-
# CreateQuantumJobRequests would fail with a PROGRAM_ALREADY_EXISTS if the job already
344-
# exists because program and job creation happen atomically for a
345-
# CreateQuantumProgramAndJobRequest.
337+
elif error.code == Code.JOB_ALREADY_EXISTS:
338+
if not 'get_quantum_result' in current_request:
339+
return get_result_request
346340

347341
raise StreamError(error.message)
348342

cirq-google/cirq_google/engine/stream_manager_test.py

+101-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from cirq_google.engine.asyncio_executor import AsyncioExecutor
2525
from cirq_google.engine.stream_manager import (
2626
_get_retry_request_or_raise,
27-
ProgramAlreadyExistsError,
2827
ResponseDemux,
2928
StreamError,
3029
StreamManager,
@@ -524,9 +523,40 @@ async def test():
524523
duet.run(test)
525524

526525
@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
527-
def test_submit_program_already_exists_expects_program_already_exists_error(
526+
def test_submit_program_already_exists_expects_get_result_request(self, client_constructor):
527+
expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
528+
fake_client, manager = setup(client_constructor)
529+
530+
async def test():
531+
async with duet.timeout_scope(5):
532+
actual_result_future = manager.submit(
533+
REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0
534+
)
535+
await fake_client.wait_for_requests()
536+
await fake_client.reply(
537+
quantum.QuantumRunStreamResponse(
538+
error=quantum.StreamError(
539+
code=quantum.StreamError.Code.PROGRAM_ALREADY_EXISTS
540+
)
541+
)
542+
)
543+
await fake_client.wait_for_requests()
544+
await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result))
545+
actual_result = await actual_result_future
546+
manager.stop()
547+
548+
assert actual_result == expected_result
549+
assert len(fake_client.all_stream_requests) == 2
550+
assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0]
551+
assert 'get_quantum_result' in fake_client.all_stream_requests[1]
552+
553+
duet.run(test)
554+
555+
@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
556+
def test_submit_program_already_exists_but_job_does_not_exist_expects_create_job_request(
528557
self, client_constructor
529558
):
559+
expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
530560
fake_client, manager = setup(client_constructor)
531561

532562
async def test():
@@ -542,10 +572,75 @@ async def test():
542572
)
543573
)
544574
)
545-
with pytest.raises(ProgramAlreadyExistsError):
546-
await actual_result_future
575+
await fake_client.wait_for_requests()
576+
await fake_client.reply(
577+
quantum.QuantumRunStreamResponse(
578+
error=quantum.StreamError(code=quantum.StreamError.Code.JOB_DOES_NOT_EXIST)
579+
)
580+
)
581+
await fake_client.wait_for_requests()
582+
await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result))
583+
actual_result = await actual_result_future
547584
manager.stop()
548585

586+
assert actual_result == expected_result
587+
assert len(fake_client.all_stream_requests) == 3
588+
assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0]
589+
assert 'get_quantum_result' in fake_client.all_stream_requests[1]
590+
assert 'create_quantum_job' in fake_client.all_stream_requests[2]
591+
592+
duet.run(test)
593+
594+
@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
595+
def test_submit_job_already_exist_expects_get_result_request(self, client_constructor):
596+
"""Verifies the behavior when the client receives a JOB_ALREADY_EXISTS error.
597+
598+
This error is only expected to be triggered in the following race condition:
599+
1. The client sends a CreateQuantumProgramAndJobRequest.
600+
2. The client's stream disconnects.
601+
3. The client retries with a new stream and a GetQuantumResultRequest.
602+
4. The job doesn't exist yet, and the client receives a "job not found" error.
603+
5. Scheduler creates the program and job.
604+
6. The client retries with a CreateJobRequest and fails with a "job already exists" error.
605+
606+
The JOB_ALREADY_EXISTS error from `CreateQuantumJobRequest` is only possible if the job
607+
doesn't exist yet at the last `GetQuantumResultRequest`.
608+
"""
609+
expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
610+
fake_client, manager = setup(client_constructor)
611+
612+
async def test():
613+
async with duet.timeout_scope(5):
614+
actual_result_future = manager.submit(
615+
REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0
616+
)
617+
await fake_client.wait_for_requests()
618+
await fake_client.reply(google_exceptions.ServiceUnavailable('unavailable'))
619+
await fake_client.wait_for_requests()
620+
# Trigger a retry with `CreateQuantumJobRequest`.
621+
await fake_client.reply(
622+
quantum.QuantumRunStreamResponse(
623+
error=quantum.StreamError(code=quantum.StreamError.Code.JOB_DOES_NOT_EXIST)
624+
)
625+
)
626+
await fake_client.wait_for_requests()
627+
await fake_client.reply(
628+
quantum.QuantumRunStreamResponse(
629+
error=quantum.StreamError(code=quantum.StreamError.Code.JOB_ALREADY_EXISTS)
630+
)
631+
)
632+
await fake_client.wait_for_requests()
633+
await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result))
634+
actual_result = await actual_result_future
635+
manager.stop()
636+
637+
assert actual_result == expected_result
638+
assert len(fake_client.all_stream_requests) == 4
639+
assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0]
640+
assert 'get_quantum_result' in fake_client.all_stream_requests[1]
641+
assert 'create_quantum_job' in fake_client.all_stream_requests[2]
642+
assert 'get_quantum_result' in fake_client.all_stream_requests[3]
643+
549644
duet.run(test)
550645

551646
@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
@@ -690,6 +785,7 @@ async def test():
690785
(Code.PROGRAM_ALREADY_EXISTS, 'get_quantum_result'),
691786
(Code.JOB_DOES_NOT_EXIST, 'create_quantum_program_and_job'),
692787
(Code.JOB_DOES_NOT_EXIST, 'create_quantum_job'),
788+
(Code.JOB_ALREADY_EXISTS, 'get_quantum_result'),
693789
],
694790
)
695791
def test_get_retry_request_or_raise_expects_stream_error(
@@ -720,6 +816,7 @@ def test_get_retry_request_or_raise_expects_stream_error(
720816
current_request,
721817
create_quantum_program_and_job_request,
722818
create_quantum_job_request,
819+
get_quantum_result_request,
723820
)
724821

725822
@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)

0 commit comments

Comments
 (0)