Skip to content

Commit 4dc36d5

Browse files
authored
Integrate StreamManager with run_sweep() (#6285)
* Integrate StreamManager with run_sweep() * Added a feature flag defaulting to True. * Added logic to temporarily rewrite `processor_ids` to `processor_id` prior to full deprecation of `processor_ids` in `Engine.run_sweep()`. * [WIP] Addressed maffoo's comments * Addressed wcourtney's comments * Modified engine_processor_test to take into account the enable_streaming feature flag * Address wcourtney's comments * Rename job_response_future to job_result_future
1 parent c6f60bc commit 4dc36d5

7 files changed

+646
-128
lines changed

cirq-google/cirq_google/engine/engine.py

+47-1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ def __init__(
8888
client: 'Optional[engine_client.EngineClient]' = None,
8989
timeout: Optional[int] = None,
9090
serializer: Serializer = CIRCUIT_SERIALIZER,
91+
# TODO(#5996) Remove enable_streaming once the feature is stable.
92+
enable_streaming: bool = True,
9193
) -> None:
9294
"""Context and client for using Quantum Engine.
9395
@@ -103,6 +105,9 @@ def __init__(
103105
timeout: Timeout for polling for results, in seconds. Default is
104106
to never timeout.
105107
serializer: Used to serialize circuits when running jobs.
108+
enable_streaming: Feature gate for making Quantum Engine requests using the stream RPC.
109+
If True, the Quantum Engine streaming RPC is used for creating jobs
110+
and getting results. Otherwise, unary RPCs are used.
106111
107112
Raises:
108113
ValueError: If either `service_args` and `verbose` were supplied
@@ -115,6 +120,7 @@ def __init__(
115120
if self.proto_version == ProtoVersion.V1:
116121
raise ValueError('ProtoVersion V1 no longer supported')
117122
self.serializer = serializer
123+
self.enable_streaming = enable_streaming
118124

119125
if not client:
120126
client = engine_client.EngineClient(service_args=service_args, verbose=verbose)
@@ -306,7 +312,7 @@ async def run_sweep_async(
306312
run_name: str = "",
307313
device_config_name: str = "",
308314
) -> engine_job.EngineJob:
309-
"""Runs the supplied Circuit via Quantum Engine.Creates
315+
"""Runs the supplied Circuit via Quantum Engine.
310316
311317
In contrast to run, this runs across multiple parameter sweeps, and
312318
does not block until a result is returned.
@@ -355,6 +361,44 @@ async def run_sweep_async(
355361
ValueError: If either `run_name` and `device_config_name` are set but
356362
`processor_id` is empty.
357363
"""
364+
365+
if self.context.enable_streaming:
366+
# This logic is temporary prior to deprecating the processor_ids parameter.
367+
# TODO(#6271) Remove after deprecating processor_ids elsewhere prior to v1.4.
368+
if processor_ids:
369+
if len(processor_ids) > 1:
370+
raise ValueError("The use of multiple processors is no longer supported.")
371+
if len(processor_ids) == 1 and not processor_id:
372+
processor_id = processor_ids[0]
373+
374+
if not program_id:
375+
program_id = _make_random_id('prog-')
376+
if not job_id:
377+
job_id = _make_random_id('job-')
378+
run_context = self.context._serialize_run_context(params, repetitions)
379+
380+
job_result_future = self.context.client.run_job_over_stream(
381+
project_id=self.project_id,
382+
program_id=str(program_id),
383+
program_description=program_description,
384+
program_labels=program_labels,
385+
code=self.context._serialize_program(program),
386+
job_id=str(job_id),
387+
run_context=run_context,
388+
job_description=job_description,
389+
job_labels=job_labels,
390+
processor_id=processor_id,
391+
run_name=run_name,
392+
device_config_name=device_config_name,
393+
)
394+
return engine_job.EngineJob(
395+
self.project_id,
396+
str(program_id),
397+
str(job_id),
398+
self.context,
399+
job_result_future=job_result_future,
400+
)
401+
358402
engine_program = await self.create_program_async(
359403
program, program_id, description=program_description, labels=program_labels
360404
)
@@ -372,6 +416,7 @@ async def run_sweep_async(
372416

373417
run_sweep = duet.sync(run_sweep_async)
374418

419+
# TODO(#5996) Migrate to stream client
375420
# TODO(#6271): Deprecate and remove processor_ids before v1.4
376421
async def run_batch_async(
377422
self,
@@ -475,6 +520,7 @@ async def run_batch_async(
475520

476521
run_batch = duet.sync(run_batch_async)
477522

523+
# TODO(#5996) Migrate to stream client
478524
async def run_calibration_async(
479525
self,
480526
layers: List['cirq_google.CalibrationLayer'],

cirq-google/cirq_google/engine/engine_client.py

+96
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from cirq._compat import deprecated_parameter
4040
from cirq_google.cloud import quantum
4141
from cirq_google.engine.asyncio_executor import AsyncioExecutor
42+
from cirq_google.engine import stream_manager
4243

4344
_M = TypeVar('_M', bound=proto.Message)
4445
_R = TypeVar('_R')
@@ -106,6 +107,10 @@ async def make_client():
106107

107108
return self._executor.submit(make_client).result()
108109

110+
@cached_property
111+
def _stream_manager(self) -> stream_manager.StreamManager:
112+
return stream_manager.StreamManager(self.grpc_client)
113+
109114
async def _send_request_async(self, func: Callable[[_M], Awaitable[_R]], request: _M) -> _R:
110115
"""Sends a request by invoking an asyncio callable."""
111116
return await self._run_retry_async(func, request)
@@ -736,6 +741,97 @@ async def get_job_results_async(
736741

737742
get_job_results = duet.sync(get_job_results_async)
738743

744+
def run_job_over_stream(
745+
self,
746+
*,
747+
project_id: str,
748+
program_id: str,
749+
code: any_pb2.Any,
750+
run_context: any_pb2.Any,
751+
program_description: Optional[str] = None,
752+
program_labels: Optional[Dict[str, str]] = None,
753+
job_id: str,
754+
priority: Optional[int] = None,
755+
job_description: Optional[str] = None,
756+
job_labels: Optional[Dict[str, str]] = None,
757+
processor_id: str = "",
758+
run_name: str = "",
759+
device_config_name: str = "",
760+
) -> duet.AwaitableFuture[Union[quantum.QuantumResult, quantum.QuantumJob]]:
761+
"""Runs a job with the given program and job information over a stream.
762+
763+
Sends the request over the Quantum Engine QuantumRunStream bidirectional stream, and returns
764+
a future for the stream response. The future will be completed with a `QuantumResult` if
765+
the job is successful; otherwise, it will be completed with a QuantumJob.
766+
767+
Args:
768+
project_id: A project_id of the parent Google Cloud Project.
769+
program_id: Unique ID of the program within the parent project.
770+
code: Properly serialized program code.
771+
run_context: Properly serialized run context.
772+
program_description: An optional description to set on the program.
773+
program_labels: Optional set of labels to set on the program.
774+
job_id: Unique ID of the job within the parent program.
775+
priority: Optional priority to run at, 0-1000.
776+
job_description: Optional description to set on the job.
777+
job_labels: Optional set of labels to set on the job.
778+
processor_id: Processor id for running the program. If not set,
779+
`processor_ids` will be used.
780+
run_name: A unique identifier representing an automation run for the
781+
specified processor. An Automation Run contains a collection of
782+
device configurations for a processor. If specified, `processor_id`
783+
is required to be set.
784+
device_config_name: An identifier used to select the processor configuration
785+
utilized to run the job. A configuration identifies the set of
786+
available qubits, couplers, and supported gates in the processor.
787+
If specified, `processor_id` is required to be set.
788+
789+
Returns:
790+
A future for the job result, or the job if the job has failed.
791+
792+
Raises:
793+
ValueError: If the priority is not between 0 and 1000.
794+
ValueError: If `processor_id` is not set.
795+
ValueError: If only one of `run_name` and `device_config_name` are specified.
796+
"""
797+
# Check program to run and program parameters.
798+
if priority and not 0 <= priority < 1000:
799+
raise ValueError('priority must be between 0 and 1000')
800+
if not processor_id:
801+
raise ValueError('Must specify a processor id when creating a job.')
802+
if bool(run_name) ^ bool(device_config_name):
803+
raise ValueError('Cannot specify only one of `run_name` and `device_config_name`')
804+
805+
project_name = _project_name(project_id)
806+
807+
program_name = _program_name_from_ids(project_id, program_id)
808+
program = quantum.QuantumProgram(name=program_name, code=code)
809+
if program_description:
810+
program.description = program_description
811+
if program_labels:
812+
program.labels.update(program_labels)
813+
814+
job = quantum.QuantumJob(
815+
name=_job_name_from_ids(project_id, program_id, job_id),
816+
scheduling_config=quantum.SchedulingConfig(
817+
processor_selector=quantum.SchedulingConfig.ProcessorSelector(
818+
processor=_processor_name_from_ids(project_id, processor_id),
819+
device_config_key=quantum.DeviceConfigKey(
820+
run_name=run_name, config_alias=device_config_name
821+
),
822+
)
823+
),
824+
run_context=run_context,
825+
)
826+
if priority:
827+
job.scheduling_config.priority = priority
828+
if job_description:
829+
job.description = job_description
830+
if job_labels:
831+
job.labels.update(job_labels)
832+
833+
return self._stream_manager.submit(project_name, program, job)
834+
739835
async def list_processors_async(self, project_id: str) -> List[quantum.QuantumProcessor]:
740836
"""Returns a list of Processors that the user has visibility to in the
741837
current Engine project. The names of these processors are used to

0 commit comments

Comments
 (0)