Skip to content

StreamManager: retry with get result request on already exist errors #6345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions cirq-google/cirq_google/engine/stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@
]


class ProgramAlreadyExistsError(Exception):
def __init__(self, program_name: str):
# Call the base class constructor with the parameters it needs
super().__init__(f"'{program_name}' already exists")


class StreamError(Exception):
pass

Expand Down Expand Up @@ -150,7 +144,6 @@ def submit(
A future for the job result, or the job if the job has failed.

Raises:
ProgramAlreadyExistsError: if the program already exists.
StreamError: if there is a non-retryable error while executing the job.
ValueError: if program name is not set.
concurrent.futures.CancelledError: if the stream is stopped while a job is in flight.
Expand Down Expand Up @@ -298,6 +291,7 @@ async def _manage_execution(
current_request,
create_program_and_job_request,
_to_create_job_request(create_program_and_job_request),
_to_get_result_request(create_program_and_job_request),
)
continue
else: # pragma: no cover
Expand All @@ -319,7 +313,8 @@ def _get_retry_request_or_raise(
error: quantum.StreamError,
current_request,
create_program_and_job_request,
create_job_request: quantum.QuantumRunStreamRequest,
create_job_request,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you keep the type hint here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still there, just moved down by 1 param

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's not (typically) how type annotations work in python. They should be on each parameter

get_result_request: quantum.QuantumRunStreamRequest,
):
"""Decide whether the given stream error is retryable.

Expand All @@ -330,19 +325,17 @@ def _get_retry_request_or_raise(
return create_program_and_job_request
elif error.code == Code.PROGRAM_ALREADY_EXISTS:
if 'create_quantum_program_and_job' in current_request:
raise ProgramAlreadyExistsError(
current_request.create_quantum_program_and_job.quantum_program.name
)
# If the program already exists and is created as part of the stream client, the job
# should also exist because they are created at the same time.
# If the job is missing, a `CreateQuantumJobRequest` will be issued after a
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the program get created w/o the job if they're created together in a CreateProgramAndJobRequest? Does a closed stream kill the server-side handling?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified a bit in the comment - it's for the unlikely case that the program is created outside StreamManager. The logic here doesn't explicitly try to solve this case, but it just happens to do the right thing.

# `GetQuantumResultRequest` is attempted.
return get_result_request
elif error.code == Code.JOB_DOES_NOT_EXIST:
if 'get_quantum_result' in current_request:
return create_job_request

# Code.JOB_ALREADY_EXISTS should never happen.
# The first stream request is always a CreateQuantumProgramAndJobRequest, which never fails
# with this error because jobs are scoped within a program.
# CreateQuantumJobRequests would fail with a PROGRAM_ALREADY_EXISTS if the job already
# exists because program and job creation happen atomically for a
# CreateQuantumProgramAndJobRequest.
elif error.code == Code.JOB_ALREADY_EXISTS:
if not 'get_quantum_result' in current_request:
return get_result_request

raise StreamError(error.message)

Expand Down
105 changes: 101 additions & 4 deletions cirq-google/cirq_google/engine/stream_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from cirq_google.engine.asyncio_executor import AsyncioExecutor
from cirq_google.engine.stream_manager import (
_get_retry_request_or_raise,
ProgramAlreadyExistsError,
ResponseDemux,
StreamError,
StreamManager,
Expand Down Expand Up @@ -524,9 +523,40 @@ async def test():
duet.run(test)

@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
def test_submit_program_already_exists_expects_program_already_exists_error(
def test_submit_program_already_exists_expects_get_result_request(self, client_constructor):
expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
fake_client, manager = setup(client_constructor)

async def test():
async with duet.timeout_scope(5):
actual_result_future = manager.submit(
REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0
)
await fake_client.wait_for_requests()
await fake_client.reply(
quantum.QuantumRunStreamResponse(
error=quantum.StreamError(
code=quantum.StreamError.Code.PROGRAM_ALREADY_EXISTS
)
)
)
await fake_client.wait_for_requests()
await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result))
actual_result = await actual_result_future
manager.stop()

assert actual_result == expected_result
assert len(fake_client.all_stream_requests) == 2
assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0]
assert 'get_quantum_result' in fake_client.all_stream_requests[1]

duet.run(test)

@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
def test_submit_program_already_exists_but_job_does_not_exist_expects_create_job_request(
self, client_constructor
):
expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
fake_client, manager = setup(client_constructor)

async def test():
Expand All @@ -542,10 +572,75 @@ async def test():
)
)
)
with pytest.raises(ProgramAlreadyExistsError):
await actual_result_future
await fake_client.wait_for_requests()
await fake_client.reply(
quantum.QuantumRunStreamResponse(
error=quantum.StreamError(code=quantum.StreamError.Code.JOB_DOES_NOT_EXIST)
)
)
await fake_client.wait_for_requests()
await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result))
actual_result = await actual_result_future
manager.stop()

assert actual_result == expected_result
assert len(fake_client.all_stream_requests) == 3
assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0]
assert 'get_quantum_result' in fake_client.all_stream_requests[1]
assert 'create_quantum_job' in fake_client.all_stream_requests[2]

duet.run(test)

@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
def test_submit_job_already_exist_expects_get_result_request(self, client_constructor):
"""Verifies the behavior when the client receives a JOB_ALREADY_EXISTS error.

This error is only expected to be triggered in the following race condition:
1. The client sends a CreateQuantumProgramAndJobRequest.
2. The client's stream disconnects.
3. The client retries with a new stream and a GetQuantumResultRequest.
4. The job doesn't exist yet, and the client receives a "job not found" error.
5. Scheduler creates the program and job.
6. The client retries with a CreateJobRequest and fails with a "job already exists" error.

The JOB_ALREADY_EXISTS error from `CreateQuantumJobRequest` is only possible if the job
doesn't exist yet at the last `GetQuantumResultRequest`.
"""
expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
fake_client, manager = setup(client_constructor)

async def test():
async with duet.timeout_scope(5):
actual_result_future = manager.submit(
REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0
)
await fake_client.wait_for_requests()
await fake_client.reply(google_exceptions.ServiceUnavailable('unavailable'))
await fake_client.wait_for_requests()
# Trigger a retry with `CreateQuantumJobRequest`.
await fake_client.reply(
quantum.QuantumRunStreamResponse(
error=quantum.StreamError(code=quantum.StreamError.Code.JOB_DOES_NOT_EXIST)
)
)
await fake_client.wait_for_requests()
await fake_client.reply(
quantum.QuantumRunStreamResponse(
error=quantum.StreamError(code=quantum.StreamError.Code.JOB_ALREADY_EXISTS)
)
)
await fake_client.wait_for_requests()
await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result))
actual_result = await actual_result_future
manager.stop()

assert actual_result == expected_result
assert len(fake_client.all_stream_requests) == 4
assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0]
assert 'get_quantum_result' in fake_client.all_stream_requests[1]
assert 'create_quantum_job' in fake_client.all_stream_requests[2]
assert 'get_quantum_result' in fake_client.all_stream_requests[3]

duet.run(test)

@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
Expand Down Expand Up @@ -690,6 +785,7 @@ async def test():
(Code.PROGRAM_ALREADY_EXISTS, 'get_quantum_result'),
(Code.JOB_DOES_NOT_EXIST, 'create_quantum_program_and_job'),
(Code.JOB_DOES_NOT_EXIST, 'create_quantum_job'),
(Code.JOB_ALREADY_EXISTS, 'get_quantum_result'),
],
)
def test_get_retry_request_or_raise_expects_stream_error(
Expand Down Expand Up @@ -720,6 +816,7 @@ def test_get_retry_request_or_raise_expects_stream_error(
current_request,
create_quantum_program_and_job_request,
create_quantum_job_request,
get_quantum_result_request,
)

@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
Expand Down