Skip to content

Commit bb4468c

Browse files
authored
[cirqflow] Execution Loop I/O (#4599)
Following #4584 comment. ![ExecutableGroupResult](https://user-images.githubusercontent.com/4967059/138186981-ef4c82fc-6f05-400e-a761-d9a9f0b3257d.png) - Update the constituent parts only when necessary to avoid re-writing the full `ExecutableGroupResult` when only a part has changed. Re-writing would cause a performance hit and potential corruption risk. - Add a new dataclass to keep track of filenames to make loading in the data easier; see the test.
1 parent 74ed369 commit bb4468c

8 files changed

+197
-41
lines changed

cirq-google/cirq_google/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
RuntimeInfo,
133133
ExecutableResult,
134134
ExecutableGroupResult,
135+
ExecutableGroupResultFilesystemRecord,
135136
QuantumRuntimeConfiguration,
136137
execute,
137138
)

cirq-google/cirq_google/json_resolver_cache.py

+4
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,9 @@ def _class_resolver_dictionary() -> Dict[str, ObjectFactory]:
4949
'cirq.google.RuntimeInfo': cirq_google.RuntimeInfo,
5050
'cirq.google.ExecutableResult': cirq_google.ExecutableResult,
5151
'cirq.google.ExecutableGroupResult': cirq_google.ExecutableGroupResult,
52+
# Pylint fights with the black formatter.
53+
# pylint: disable=line-too-long
54+
'cirq.google.ExecutableGroupResultFilesystemRecord': cirq_google.ExecutableGroupResultFilesystemRecord,
55+
# pylint: enable=line-too-long
5256
'cirq.google.QuantumRuntimeConfiguration': cirq_google.QuantumRuntimeConfiguration,
5357
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"cirq_type": "cirq.google.ExecutableGroupResultFilesystemRecord",
3+
"runtime_configuration_path": "RuntimeConfiguration.json.gz",
4+
"shared_runtime_info_path": "SharedRuntimeInfo.jzon.gz",
5+
"executable_result_paths": [
6+
"ExecutableResult.1.json.gz",
7+
"ExecutableResult.2.json.gz"
8+
],
9+
"run_id": "my-run-id"
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
cirq_google.ExecutableGroupResultFilesystemRecord(runtime_configuration_path='RuntimeConfiguration.json.gz', shared_runtime_info_path='SharedRuntimeInfo.jzon.gz', executable_result_paths=['ExecutableResult.1.json.gz', 'ExecutableResult.2.json.gz'], run_id='my-run-id')

cirq-google/cirq_google/json_test_data/spec.py

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
'QuantumRuntimeConfiguration',
6666
'RuntimeInfo',
6767
'SharedRuntimeInfo',
68+
'ExecutableGroupResultFilesystemRecord',
6869
]
6970
},
7071
tested_elsewhere=[

cirq-google/cirq_google/workflow/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
RuntimeInfo,
1313
ExecutableResult,
1414
ExecutableGroupResult,
15+
ExecutableGroupResultFilesystemRecord,
1516
QuantumRuntimeConfiguration,
1617
execute,
1718
)

cirq-google/cirq_google/workflow/quantum_runtime.py

+118-17
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,51 @@ def __repr__(self) -> str:
116116
return _compat.dataclass_repr(self, namespace='cirq_google')
117117

118118

119+
@dataclasses.dataclass
120+
class ExecutableGroupResultFilesystemRecord:
121+
"""Filename references to the constituent parts of a `cg.ExecutableGroupResult`.
122+
123+
Args:
124+
runtime_configuration_path: A filename pointing to the `runtime_configuration` value.
125+
shared_runtime_info_path: A filename pointing to the `shared_runtime_info` value.
126+
executable_result_paths: A list of filenames pointing to the `executable_results` values.
127+
run_id: The unique `str` identifier from this run. This is used to locate the other
128+
values on disk.
129+
"""
130+
131+
runtime_configuration_path: str
132+
shared_runtime_info_path: str
133+
executable_result_paths: List[str]
134+
135+
run_id: str
136+
137+
def load(self, *, base_data_dir: str = ".") -> ExecutableGroupResult:
138+
"""Using the filename references in this dataclass, load a `cg.ExecutableGroupResult`
139+
from its constituent parts.
140+
141+
Args:
142+
base_data_dir: The base data directory. Files should be found at
143+
{base_data_dir}/{run_id}/{this class's paths}
144+
"""
145+
data_dir = f"{base_data_dir}/{self.run_id}"
146+
return ExecutableGroupResult(
147+
runtime_configuration=cirq.read_json_gzip(
148+
f'{data_dir}/{self.runtime_configuration_path}'
149+
),
150+
shared_runtime_info=cirq.read_json_gzip(f'{data_dir}/{self.shared_runtime_info_path}'),
151+
executable_results=[
152+
cirq.read_json_gzip(f'{data_dir}/{exe_path}')
153+
for exe_path in self.executable_result_paths
154+
],
155+
)
156+
157+
def _json_dict_(self) -> Dict[str, Any]:
158+
return dataclass_json_dict(self, namespace='cirq.google')
159+
160+
def __repr__(self) -> str:
161+
return _compat.dataclass_repr(self, namespace='cirq_google')
162+
163+
119164
@dataclasses.dataclass
120165
class QuantumRuntimeConfiguration:
121166
"""User-requested configuration of how to execute a given `cg.QuantumExecutableGroup`.
@@ -138,22 +183,63 @@ def __repr__(self) -> str:
138183
return _compat.dataclass_repr(self, namespace='cirq_google')
139184

140185

186+
def _safe_to_json(obj: Any, *, part_path: str, nominal_path: str, bak_path: str):
187+
"""Safely update a json file.
188+
189+
1. The new value is written to a "part" file
190+
2. The previous file atomically replaces the previous backup file, thereby becoming the
191+
current backup file.
192+
3. The part file is atomically renamed to the desired filename.
193+
"""
194+
cirq.to_json_gzip(obj, part_path)
195+
if os.path.exists(nominal_path):
196+
os.replace(nominal_path, bak_path)
197+
os.replace(part_path, nominal_path)
198+
199+
200+
def _update_updatable_files(
201+
egr_record: ExecutableGroupResultFilesystemRecord,
202+
shared_rt_info: SharedRuntimeInfo,
203+
data_dir: str,
204+
):
205+
"""Safely update ExecutableGroupResultFilesystemRecord.json.gz and SharedRuntimeInfo.json.gz
206+
during an execution run.
207+
"""
208+
_safe_to_json(
209+
shared_rt_info,
210+
part_path=f'{data_dir}/SharedRuntimeInfo.json.gz.part',
211+
nominal_path=f'{data_dir}/SharedRuntimeInfo.json.gz',
212+
bak_path=f'{data_dir}/SharedRuntimeInfo.json.gz.bak',
213+
)
214+
_safe_to_json(
215+
egr_record,
216+
part_path=f'{data_dir}/ExecutableGroupResultFilesystemRecord.json.gz.part',
217+
nominal_path=f'{data_dir}/ExecutableGroupResultFilesystemRecord.json.gz',
218+
bak_path=f'{data_dir}/ExecutableGroupResultFilesystemRecord.json.gz.bak',
219+
)
220+
221+
141222
def execute(
142223
rt_config: QuantumRuntimeConfiguration,
143224
executable_group: QuantumExecutableGroup,
144225
base_data_dir: str = ".",
145226
) -> ExecutableGroupResult:
146227
"""Execute a `cg.QuantumExecutableGroup` according to a `cg.QuantumRuntimeConfiguration`.
147228
229+
The ExecutableGroupResult's constituent parts will be saved to disk as they become
230+
available. Within the "{base_data_dir}/{run_id}" directory we save:
231+
- The `cg.QuantumRuntimeConfiguration` at the start of the execution as a record
232+
of *how* the executable group was run.
233+
- A `cg.SharedRuntimeInfo` which is updated throughout the run.
234+
- An `cg.ExecutableResult` for each `cg.QuantumExecutable` as they become available.
235+
- A `cg.ExecutableGroupResultFilesystemRecord` which is updated throughout the run.
236+
148237
Args:
149238
rt_config: The `cg.QuantumRuntimeConfiguration` specifying how to execute
150239
`executable_group`.
151240
executable_group: The `cg.QuantumExecutableGroup` containing the executables to execute.
152-
base_data_dir: A filesystem path to write data. We write
153-
"{base_data_dir}/{run_id}/ExecutableGroupResult.json.gz"
154-
containing the `cg.ExecutableGroupResult` as well as one file
155-
"{base_data_dir}/{run_id}/ExecutableResult.{i}.json.gz" per `cg.ExecutableResult` as
156-
each executable result becomes available.
241+
base_data_dir: Each data file will be written to the "{base_data_dir}/{run_id}/" directory,
242+
which must not already exist.
157243
158244
Returns:
159245
The `cg.ExecutableGroupResult` containing all data and metadata for an execution.
@@ -174,15 +260,21 @@ def execute(
174260
# coverage: ignore
175261
raise ValueError("Please provide a non-empty `base_data_dir`.")
176262

177-
os.makedirs(f'{base_data_dir}/{run_id}', exist_ok=False)
178-
179-
# Results object that we will fill in in the main loop.
180-
exegroup_result = ExecutableGroupResult(
181-
runtime_configuration=rt_config,
182-
shared_runtime_info=SharedRuntimeInfo(run_id=run_id),
183-
executable_results=list(),
263+
# Set up data saving, save runtime configuration.
264+
data_dir = f'{base_data_dir}/{run_id}'
265+
os.makedirs(data_dir, exist_ok=False)
266+
egr_record = ExecutableGroupResultFilesystemRecord(
267+
runtime_configuration_path='QuantumRuntimeConfiguration.json.gz',
268+
shared_runtime_info_path='SharedRuntimeInfo.json.gz',
269+
executable_result_paths=[],
270+
run_id=run_id,
184271
)
185-
cirq.to_json_gzip(exegroup_result, f'{base_data_dir}/{run_id}/ExecutableGroupResult.json.gz')
272+
cirq.to_json_gzip(rt_config, f'{data_dir}/{egr_record.runtime_configuration_path}')
273+
274+
# Set up to-be-updated objects.
275+
shared_rt_info = SharedRuntimeInfo(run_id=run_id)
276+
_update_updatable_files(egr_record, shared_rt_info, data_dir)
277+
executable_results = []
186278

187279
# Loop over executables.
188280
sampler = rt_config.processor.get_sampler()
@@ -206,9 +298,18 @@ def execute(
206298
runtime_info=runtime_info,
207299
raw_data=sampler_run_result,
208300
)
209-
cirq.to_json_gzip(exe_result, f'{base_data_dir}/{run_id}/ExecutableResult.{i}.json.gz')
210-
exegroup_result.executable_results.append(exe_result)
211-
print(f'\r{i+1} / {n_executables}', end='', flush=True)
301+
# Do bookkeeping for finished ExecutableResult
302+
exe_result_path = f'ExecutableResult.{i}.json.gz'
303+
cirq.to_json_gzip(exe_result, f"{data_dir}/{exe_result_path}")
304+
executable_results.append(exe_result)
305+
egr_record.executable_result_paths.append(exe_result_path)
306+
307+
_update_updatable_files(egr_record, shared_rt_info, data_dir)
308+
print(f'\r{i + 1} / {n_executables}', end='', flush=True)
212309
print()
213310

214-
return exegroup_result
311+
return ExecutableGroupResult(
312+
runtime_configuration=rt_config,
313+
shared_runtime_info=shared_rt_info,
314+
executable_results=executable_results,
315+
)

cirq-google/cirq_google/workflow/quantum_runtime_test.py

+61-24
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515
import re
1616
import uuid
1717
from dataclasses import dataclass
18-
from typing import List
19-
20-
import numpy as np
21-
import pytest
18+
from typing import List, cast, Any
2219

2320
import cirq
2421
import cirq_google as cg
22+
import numpy as np
23+
import pytest
2524
from cirq_google.workflow._abstract_engine_processor_shim import AbstractEngineProcessorShim
2625
from cirq_google.workflow.quantum_executable_test import _get_quantum_executables, _get_example_spec
2726

@@ -69,14 +68,23 @@ def test_executable_result():
6968
cg_assert_equivalent_repr(er)
7069

7170

72-
def _cg_read_json_gzip(fn):
73-
def _testing_resolver(cirq_type: str):
74-
if cirq_type == 'cirq.google.testing._MockEngineProcessor':
75-
return _MockEngineProcessor
71+
def _testing_resolver(cirq_type: str):
72+
if cirq_type == 'cirq.google.testing._MockEngineProcessor':
73+
return _MockEngineProcessor
7674

75+
76+
def _cg_read_json_gzip(fn):
7777
return cirq.read_json_gzip(fn, resolvers=[_testing_resolver] + cirq.DEFAULT_RESOLVERS)
7878

7979

80+
@pytest.fixture
81+
def patch_cirq_default_resolvers():
82+
backup = cirq.DEFAULT_RESOLVERS.copy()
83+
cirq.DEFAULT_RESOLVERS.insert(0, _testing_resolver)
84+
yield True
85+
cirq.DEFAULT_RESOLVERS = backup
86+
87+
8088
def _assert_json_roundtrip(o, tmpdir):
8189
cirq.to_json_gzip(o, f'{tmpdir}/o.json')
8290
o2 = _cg_read_json_gzip(f'{tmpdir}/o.json')
@@ -128,28 +136,57 @@ def test_executable_group_result(tmpdir):
128136
_assert_json_roundtrip(egr, tmpdir)
129137

130138

131-
@pytest.mark.parametrize('run_id', ['unit_test_runid', None])
132-
def test_execute(tmpdir, run_id):
133-
rt_config = cg.QuantumRuntimeConfiguration(processor=_MockEngineProcessor(), run_id=run_id)
139+
def test_egr_filesystem_record_repr():
140+
egr_fs_record = cg.ExecutableGroupResultFilesystemRecord(
141+
runtime_configuration_path='RuntimeConfiguration.json.gz',
142+
shared_runtime_info_path='SharedRuntimeInfo.jzon.gz',
143+
executable_result_paths=[
144+
'ExecutableResult.1.json.gz',
145+
'ExecutableResult.2.json.gz',
146+
],
147+
run_id='my-run-id',
148+
)
149+
cg_assert_equivalent_repr(egr_fs_record)
150+
151+
152+
def _load_result_by_hand(tmpdir: str, run_id: str) -> cg.ExecutableGroupResult:
153+
"""Load `ExecutableGroupResult` "by hand" without using
154+
`ExecutableGroupResultFilesystemRecord`."""
155+
rt_config = cirq.read_json_gzip(f'{tmpdir}/{run_id}/QuantumRuntimeConfiguration.json.gz')
156+
shared_rt_info = cirq.read_json_gzip(f'{tmpdir}/{run_id}/SharedRuntimeInfo.json.gz')
157+
fns = glob.glob(f'{tmpdir}/{run_id}/ExecutableResult.*.json.gz')
158+
fns = sorted(
159+
fns,
160+
key=lambda s: int(cast(Any, re.search(r'ExecutableResult\.(\d+)\.json\.gz$', s)).group(1)),
161+
)
162+
assert len(fns) == 3
163+
exe_results: List[cg.ExecutableResult] = [cirq.read_json_gzip(fn) for fn in fns]
164+
return cg.ExecutableGroupResult(
165+
runtime_configuration=rt_config,
166+
shared_runtime_info=shared_rt_info,
167+
executable_results=exe_results,
168+
)
169+
170+
171+
@pytest.mark.parametrize('run_id_in', ['unit_test_runid', None])
172+
def test_execute(tmpdir, run_id_in, patch_cirq_default_resolvers):
173+
assert patch_cirq_default_resolvers
174+
rt_config = cg.QuantumRuntimeConfiguration(processor=_MockEngineProcessor(), run_id=run_id_in)
134175
executable_group = cg.QuantumExecutableGroup(_get_quantum_executables())
135176
returned_exegroup_result = cg.execute(
136177
rt_config=rt_config, executable_group=executable_group, base_data_dir=tmpdir
137178
)
138-
actual_run_id = returned_exegroup_result.shared_runtime_info.run_id
139-
if run_id is not None:
140-
assert run_id == actual_run_id
179+
run_id = returned_exegroup_result.shared_runtime_info.run_id
180+
if run_id_in is not None:
181+
assert run_id_in == run_id
141182
else:
142-
assert isinstance(uuid.UUID(actual_run_id), uuid.UUID)
143-
fns = glob.glob(f'{tmpdir}/{actual_run_id}/ExecutableGroupResult.json.gz')
144-
assert len(fns) == 1
145-
exegroup_result: cg.ExecutableGroupResult = _cg_read_json_gzip(fns[0])
183+
assert isinstance(uuid.UUID(run_id), uuid.UUID)
146184

147-
fns = glob.glob(f'{tmpdir}/{actual_run_id}/ExecutableResult.*.json.gz')
148-
fns = sorted(
149-
fns, key=lambda s: int(re.search(r'ExecutableResult\.(\d+)\.json\.gz$', s).group(1))
185+
manual_exegroup_result = _load_result_by_hand(tmpdir, run_id)
186+
egr_record: cg.ExecutableGroupResultFilesystemRecord = cirq.read_json_gzip(
187+
f'{tmpdir}/{run_id}/ExecutableGroupResultFilesystemRecord.json.gz'
150188
)
151-
assert len(fns) == 3
152-
exe_results: List[cg.ExecutableResult] = [_cg_read_json_gzip(fn) for fn in fns]
189+
exegroup_result: cg.ExecutableGroupResult = egr_record.load(base_data_dir=tmpdir)
153190

154-
exegroup_result.executable_results = exe_results
155191
assert returned_exegroup_result == exegroup_result
192+
assert manual_exegroup_result == exegroup_result

0 commit comments

Comments
 (0)