Skip to content

Commit 6e562e6

Browse files
authored
Add ResponseDemux (#6190)
* Add ResponseDemux * Addressed maffoo's comments * Address wcourtney's comments * Fix docstring error * Remove unsubscribe
1 parent e16fbf4 commit 6e562e6

File tree

2 files changed

+206
-0
lines changed

2 files changed

+206
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Copyright 2023 The Cirq Developers
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Dict
16+
import asyncio
17+
18+
from google.api_core.exceptions import GoogleAPICallError
19+
20+
from cirq_google.cloud import quantum
21+
22+
23+
class ResponseDemux:
24+
"""A event demultiplexer for QuantumRunStreamResponses, as part of the async reactor pattern.
25+
26+
A caller can subscribe to the response matching a provided message ID. Only a single caller may
27+
subscribe to each ID.
28+
29+
Another caller can subsequently publish a response. The future held by the subscriber with
30+
the matching message ID will then be fulfilled.
31+
32+
A caller can also publish an exception to all subscribers.
33+
"""
34+
35+
def __init__(self):
36+
# [message ID] : [subscriber future]
37+
self._subscribers: Dict[str, asyncio.Future] = {}
38+
39+
def subscribe(self, message_id: str) -> asyncio.Future:
40+
"""Subscribes to the QuantumRunStreamResponse with a matching ID.
41+
42+
There should only be one subscriber per message ID.
43+
44+
Returns:
45+
A future for the response, to be fulfilled when publish is called.
46+
47+
Raises:
48+
ValueError: when trying to subscribe to a message_id which already has a subscriber.
49+
"""
50+
51+
if message_id in self._subscribers:
52+
raise ValueError(f'There is already a subscriber for the message ID {message_id}')
53+
54+
response_future: asyncio.Future = asyncio.get_running_loop().create_future()
55+
self._subscribers[message_id] = response_future
56+
return response_future
57+
58+
def publish(self, response: quantum.QuantumRunStreamResponse) -> None:
59+
"""Makes the response available to the subscriber with the matching message ID.
60+
61+
The subscriber is unsubscribed afterwards.
62+
63+
If there are no subscribers waiting for the response, nothing happens.
64+
"""
65+
future = self._subscribers.pop(response.message_id, None)
66+
if future and not future.done():
67+
future.set_result(response)
68+
69+
def publish_exception(self, exception: GoogleAPICallError) -> None:
70+
"""Publishes an exception to all outstanding futures."""
71+
for future in self._subscribers.values():
72+
if not future.done():
73+
future.set_exception(exception)
74+
self._subscribers.clear()
75+
76+
77+
class StreamManager:
78+
# TODO(verult) Implement
79+
pass
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Copyright 2023 The Cirq Developers
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import pytest
17+
import google.api_core.exceptions as google_exceptions
18+
19+
from cirq_google.engine.stream_manager import ResponseDemux
20+
from cirq_google.cloud import quantum
21+
22+
RESPONSE0 = quantum.QuantumRunStreamResponse(
23+
message_id='0', result=quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
24+
)
25+
RESPONSE1 = quantum.QuantumRunStreamResponse(
26+
message_id='1', result=quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job1')
27+
)
28+
RESPONSE1_WITH_DIFFERENT_RESULT = quantum.QuantumRunStreamResponse(
29+
message_id='1', result=quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job2')
30+
)
31+
32+
33+
class TestResponseDemux:
34+
@pytest.fixture
35+
def demux(self) -> ResponseDemux:
36+
return ResponseDemux()
37+
38+
@pytest.mark.asyncio
39+
async def test_one_subscribe_one_publish_subscriber_receives_response(self, demux):
40+
future = demux.subscribe(message_id='0')
41+
demux.publish(RESPONSE0)
42+
actual_response = await asyncio.wait_for(future, timeout=1)
43+
44+
assert actual_response == RESPONSE0
45+
46+
@pytest.mark.asyncio
47+
async def test_subscribe_twice_to_same_message_id_raises_error(self, demux):
48+
with pytest.raises(ValueError):
49+
demux.subscribe(message_id='0')
50+
demux.subscribe(message_id='0')
51+
52+
@pytest.mark.asyncio
53+
async def test_out_of_order_response_publishes_to_subscribers_subscribers_receive_responses(
54+
self, demux
55+
):
56+
future0 = demux.subscribe(message_id='0')
57+
future1 = demux.subscribe(message_id='1')
58+
demux.publish(RESPONSE1)
59+
demux.publish(RESPONSE0)
60+
actual_response0 = await asyncio.wait_for(future0, timeout=1)
61+
actual_response1 = await asyncio.wait_for(future1, timeout=1)
62+
63+
assert actual_response0 == RESPONSE0
64+
assert actual_response1 == RESPONSE1
65+
66+
@pytest.mark.asyncio
67+
async def test_message_id_does_not_exist_subscriber_never_receives_response(self, demux):
68+
future = demux.subscribe(message_id='0')
69+
demux.publish(RESPONSE1)
70+
71+
with pytest.raises(asyncio.TimeoutError):
72+
await asyncio.wait_for(future, timeout=1)
73+
74+
@pytest.mark.asyncio
75+
async def test_no_subscribers_does_not_throw(self, demux):
76+
demux.publish(RESPONSE0)
77+
78+
# expect no exceptions
79+
80+
@pytest.mark.asyncio
81+
async def test_publishes_twice_for_same_message_id_future_unchanged(self, demux):
82+
future = demux.subscribe(message_id='1')
83+
demux.publish(RESPONSE1)
84+
demux.publish(RESPONSE1_WITH_DIFFERENT_RESULT)
85+
actual_response = await asyncio.wait_for(future, timeout=1)
86+
87+
assert actual_response == RESPONSE1
88+
89+
@pytest.mark.asyncio
90+
async def test_publish_exception_publishes_to_all_subscribers(self, demux):
91+
exception = google_exceptions.Aborted('aborted')
92+
future0 = demux.subscribe(message_id='0')
93+
future1 = demux.subscribe(message_id='1')
94+
demux.publish_exception(exception)
95+
96+
with pytest.raises(google_exceptions.Aborted):
97+
await future0
98+
with pytest.raises(google_exceptions.Aborted):
99+
await future1
100+
101+
@pytest.mark.asyncio
102+
async def test_publish_response_after_publishing_exception_does_not_change_futures(self, demux):
103+
exception = google_exceptions.Aborted('aborted')
104+
future0 = demux.subscribe(message_id='0')
105+
future1 = demux.subscribe(message_id='1')
106+
demux.publish_exception(exception)
107+
demux.publish(RESPONSE0)
108+
demux.publish(RESPONSE1)
109+
110+
with pytest.raises(google_exceptions.Aborted):
111+
await future0
112+
with pytest.raises(google_exceptions.Aborted):
113+
await future1
114+
115+
@pytest.mark.asyncio
116+
async def test_publish_exception_after_publishing_response_does_not_change_futures(self, demux):
117+
exception = google_exceptions.Aborted('aborted')
118+
future0 = demux.subscribe(message_id='0')
119+
future1 = demux.subscribe(message_id='1')
120+
demux.publish(RESPONSE0)
121+
demux.publish(RESPONSE1)
122+
demux.publish_exception(exception)
123+
actual_response0 = await asyncio.wait_for(future0, timeout=1)
124+
actual_response1 = await asyncio.wait_for(future1, timeout=1)
125+
126+
assert actual_response0 == RESPONSE0
127+
assert actual_response1 == RESPONSE1

0 commit comments

Comments
 (0)