Skip to content

Commit cebe401

Browse files
authored
feat(backup): Add export checkpointer (#80711)
This feature mirrors what we do for importing, where we periodically "save our work", so that if we experience an ephemeral failure (timeout, pod restart, OOM, etc), we can "pick up where we left off". For imports, we do this by saving `ImportChunk`s to the database every time we import a few models, which allows us to check what we've already imported to avoiding redoing work when retrying. We use a similar strategy here for exporting. For every model kind, we save a copy of the JSON of all instances of that model that we exported to some durable media in specially-named "checkpoint" files. If there is a failure midway through the export process, when we try again, we can scan for these files to quickly re-use them, rather than doing very expensive and resource intensive database queries again. While this does assume that the model state has stayed relatively consistent between runs, this is already an assumption we make for exporting in general (we can't export a "single snapshot in time" of the database at once anyway). A follow-up PR will implement a subclass of `ExportCheckpointer` for GCP, which is what we will use to checkpoint large SaaS->SaaS relocations.
1 parent f28f827 commit cebe401

File tree

5 files changed

+478
-49
lines changed

5 files changed

+478
-49
lines changed

Diff for: src/sentry/backup/crypto.py

+20-14
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ class Encryptor(ABC):
5454
A `IO[bytes]`-wrapper that contains relevant information and methods to encrypt some an in-memory JSON-ifiable dict.
5555
"""
5656

57-
__fp: IO[bytes]
58-
5957
@abstractmethod
6058
def get_public_key_pem(self) -> bytes:
6159
pass
@@ -67,10 +65,10 @@ class LocalFileEncryptor(Encryptor):
6765
"""
6866

6967
def __init__(self, fp: IO[bytes]):
70-
self.__fp = fp
68+
self.__key = fp.read()
7169

7270
def get_public_key_pem(self) -> bytes:
73-
return self.__fp.read()
71+
return self.__key
7472

7573

7674
class GCPKMSEncryptor(Encryptor):
@@ -82,7 +80,7 @@ class GCPKMSEncryptor(Encryptor):
8280
crypto_key_version: CryptoKeyVersion | None = None
8381

8482
def __init__(self, fp: IO[bytes]):
85-
self.__fp = fp
83+
self.__key = fp.read()
8684

8785
@classmethod
8886
def from_crypto_key_version(cls, crypto_key_version: CryptoKeyVersion) -> GCPKMSEncryptor:
@@ -93,7 +91,7 @@ def from_crypto_key_version(cls, crypto_key_version: CryptoKeyVersion) -> GCPKMS
9391
def get_public_key_pem(self) -> bytes:
9492
if self.crypto_key_version is None:
9593
# Read the user supplied configuration into the proper format.
96-
gcp_kms_config_json = orjson.loads(self.__fp.read())
94+
gcp_kms_config_json = orjson.loads(self.__key)
9795
try:
9896
self.crypto_key_version = CryptoKeyVersion(**gcp_kms_config_json)
9997
except TypeError:
@@ -217,8 +215,6 @@ class Decryptor(ABC):
217215
tarball.
218216
"""
219217

220-
__fp: IO[bytes]
221-
222218
@abstractmethod
223219
def read(self) -> bytes:
224220
pass
@@ -234,22 +230,22 @@ class LocalFileDecryptor(Decryptor):
234230
"""
235231

236232
def __init__(self, fp: IO[bytes]):
237-
self.__fp = fp
233+
self.__key = fp.read()
238234

239235
@classmethod
240236
def from_bytes(cls, b: bytes) -> LocalFileDecryptor:
241237
return cls(io.BytesIO(b))
242238

243239
def read(self) -> bytes:
244-
return self.__fp.read()
240+
return self.__key
245241

246242
def decrypt_data_encryption_key(self, unwrapped: UnwrappedEncryptedExportTarball) -> bytes:
247243
"""
248244
Decrypt the encrypted data encryption key used to encrypt the actual export JSON.
249245
"""
250246

251247
# Compare the public and private key, to ensure that they are a match.
252-
private_key_pem = self.__fp.read()
248+
private_key_pem = self.__key
253249
private_key = serialization.load_pem_private_key(
254250
private_key_pem,
255251
password=None,
@@ -286,17 +282,17 @@ class GCPKMSDecryptor(Decryptor):
286282
"""
287283

288284
def __init__(self, fp: IO[bytes]):
289-
self.__fp = fp
285+
self.__key = fp.read()
290286

291287
@classmethod
292288
def from_bytes(cls, b: bytes) -> GCPKMSDecryptor:
293289
return cls(io.BytesIO(b))
294290

295291
def read(self) -> bytes:
296-
return self.__fp.read()
292+
return self.__key
297293

298294
def decrypt_data_encryption_key(self, unwrapped: UnwrappedEncryptedExportTarball) -> bytes:
299-
gcp_kms_config_bytes = self.__fp.read()
295+
gcp_kms_config_bytes = self.__key
300296

301297
# Read the user supplied configuration into the proper format.
302298
gcp_kms_config_json = orjson.loads(gcp_kms_config_bytes)
@@ -345,3 +341,13 @@ def decrypt_encrypted_tarball(tarball: IO[bytes], decryptor: Decryptor) -> bytes
345341
decrypted_dek = decryptor.decrypt_data_encryption_key(unwrapped)
346342
fernet = Fernet(decrypted_dek)
347343
return fernet.decrypt(unwrapped.encrypted_json_blob)
344+
345+
346+
class EncryptorDecryptorPair:
347+
"""
348+
An Encryptor and Decryptor that use paired public and private keys, respectively.
349+
"""
350+
351+
def __init__(self, encryptor: Encryptor, decryptor: Decryptor):
352+
self.encryptor = encryptor
353+
self.decryptor = decryptor

Diff for: src/sentry/backup/exports.py

+94-9
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@
55
# We have to use the default JSON interface to enable pretty-printing on export. When loading JSON,
66
# we still use the one from `sentry.utils`, imported as `sentry_json` below.
77
import json as builtin_json # noqa: S003
8+
from abc import ABC, abstractmethod
89
from typing import IO
910

1011
import orjson
1112

12-
from sentry.backup.crypto import Encryptor, create_encrypted_export_tarball
13+
from sentry.backup.crypto import Encryptor, EncryptorDecryptorPair, create_encrypted_export_tarball
1314
from sentry.backup.dependencies import (
15+
ImportKind,
16+
NormalizedModelName,
1417
PrimaryKeyMap,
1518
dependencies,
1619
get_model_name,
@@ -20,6 +23,7 @@
2023
from sentry.backup.scopes import ExportScope
2124
from sentry.backup.services.import_export.model import (
2225
RpcExportError,
26+
RpcExportOk,
2327
RpcExportScope,
2428
RpcFilter,
2529
RpcPrimaryKeyMap,
@@ -41,6 +45,69 @@ def __init__(self, context: RpcExportError) -> None:
4145
self.context = context
4246

4347

48+
class ExportCheckpointerError(Exception):
49+
pass
50+
51+
52+
class ExportCheckpointer(ABC):
53+
"""
54+
For very large exports, the exporting environment may fall over half-way through the process:
55+
the thread running it may hit some timeout, or it may OOM, or fail for some other ephemeral
56+
reason. To help in such situations, we'd like an API for saving "checkpoints" during the export.
57+
58+
This class provides per-model checkpointing support for exports. Since there is a topologically
59+
sorted order of models being exported, as we move through this list, we can save the exported
60+
JSON for each kind of model in order to some stable media (disk, GCP, etc). If there is a
61+
failure late in the export process, when it is retried, the exporter can check if that
62+
particular model already exists in the checkpointer's cache, thereby avoiding redoing the work
63+
of pulling the models from the database, processing them, etc. This ensures that in most retry
64+
situations, we can quickly "re-ingest" already-exported models in memory and pick up where we
65+
left off.
66+
"""
67+
68+
def _parse_cached_json(self, json_data: bytes) -> RpcExportOk | None:
69+
max_pk = 0
70+
pk_map = PrimaryKeyMap()
71+
models = orjson.loads(json_data)
72+
for model in models:
73+
model_name = model.get("model", None)
74+
pk = model.get("pk", None)
75+
if model_name is None or pk is None:
76+
raise ExportCheckpointerError("Improperly formatted entry")
77+
78+
pk_map.insert(model_name, pk, pk, ImportKind.Inserted)
79+
if pk > max_pk:
80+
max_pk = pk
81+
82+
return RpcExportOk(
83+
mapped_pks=RpcPrimaryKeyMap.into_rpc(pk_map), max_pk=max_pk, json_data=json_data
84+
)
85+
86+
@abstractmethod
87+
def get(self, model_name: NormalizedModelName) -> RpcExportOk | None:
88+
pass
89+
90+
@abstractmethod
91+
def add(self, model_name: NormalizedModelName, json_data: str) -> None:
92+
pass
93+
94+
95+
class NoopExportCheckpointer(ExportCheckpointer):
96+
"""
97+
A noop checkpointer - that is, it doesn't write or read any checkpoints, always returning None.
98+
This means that no checkpointing ever occurs.
99+
"""
100+
101+
def __init__(self, crypto: EncryptorDecryptorPair | None, printer: Printer):
102+
pass
103+
104+
def get(self, model_name: NormalizedModelName) -> RpcExportOk | None:
105+
return None
106+
107+
def add(self, model_name: NormalizedModelName, json_data: str) -> None:
108+
return None
109+
110+
44111
def _export(
45112
dest: IO[bytes],
46113
scope: ExportScope,
@@ -49,6 +116,7 @@ def _export(
49116
indent: int = 2,
50117
filter_by: Filter | None = None,
51118
printer: Printer,
119+
checkpointer: ExportCheckpointer | None = None,
52120
):
53121
"""
54122
Exports core data for the Sentry installation.
@@ -68,6 +136,7 @@ def _export(
68136
printer.echo(errText, err=True)
69137
raise RuntimeError(errText)
70138

139+
cache = checkpointer if checkpointer is not None else NoopExportCheckpointer(None, printer)
71140
json_export = []
72141
pk_map = PrimaryKeyMap()
73142
allowed_relocation_scopes = scope.value
@@ -119,25 +188,33 @@ def _export(
119188

120189
dep_models = {get_model_name(d) for d in model_relations.get_dependencies_for_relocation()}
121190
export_by_model = ImportExportService.get_exporter_for_model(model)
122-
result = export_by_model(
123-
export_model_name=str(model_name),
124-
scope=RpcExportScope.into_rpc(scope),
125-
from_pk=0,
126-
filter_by=[RpcFilter.into_rpc(f) for f in filters],
127-
pk_map=RpcPrimaryKeyMap.into_rpc(pk_map.partition(dep_models)),
128-
indent=indent,
191+
cached_result = cache.get(model_name)
192+
result = (
193+
cached_result
194+
if cached_result is not None
195+
else export_by_model(
196+
export_model_name=str(model_name),
197+
scope=RpcExportScope.into_rpc(scope),
198+
from_pk=0,
199+
filter_by=[RpcFilter.into_rpc(f) for f in filters],
200+
pk_map=RpcPrimaryKeyMap.into_rpc(pk_map.partition(dep_models)),
201+
indent=indent,
202+
)
129203
)
130204

131205
if isinstance(result, RpcExportError):
132206
printer.echo(result.pretty(), err=True)
133207
raise ExportingError(result)
134208

135209
pk_map.extend(result.mapped_pks.from_rpc())
210+
json_models = orjson.loads(result.json_data)
211+
if cached_result is None:
212+
cache.add(model_name, json_models)
136213

137214
# TODO(getsentry/team-ospo#190): Since the structure of this data is very predictable (an
138215
# array of serialized model objects), we could probably avoid re-ingesting the JSON string
139216
# as a future optimization.
140-
for json_model in orjson.loads(result.json_data):
217+
for json_model in json_models:
141218
json_export.append(json_model)
142219

143220
# If no `encryptor` argument was passed in, this is an unencrypted export, so we can just dump
@@ -158,6 +235,7 @@ def export_in_user_scope(
158235
user_filter: set[str] | None = None,
159236
indent: int = 2,
160237
printer: Printer,
238+
checkpointer: ExportCheckpointer | None = None,
161239
):
162240
"""
163241
Perform an export in the `User` scope, meaning that only models with `RelocationScope.User` will
@@ -174,6 +252,7 @@ def export_in_user_scope(
174252
filter_by=Filter(User, "username", user_filter) if user_filter is not None else None,
175253
indent=indent,
176254
printer=printer,
255+
checkpointer=checkpointer,
177256
)
178257

179258

@@ -184,6 +263,7 @@ def export_in_organization_scope(
184263
org_filter: set[str] | None = None,
185264
indent: int = 2,
186265
printer: Printer,
266+
checkpointer: ExportCheckpointer | None = None,
187267
):
188268
"""
189269
Perform an export in the `Organization` scope, meaning that only models with
@@ -201,6 +281,7 @@ def export_in_organization_scope(
201281
filter_by=Filter(Organization, "slug", org_filter) if org_filter is not None else None,
202282
indent=indent,
203283
printer=printer,
284+
checkpointer=checkpointer,
204285
)
205286

206287

@@ -210,6 +291,7 @@ def export_in_config_scope(
210291
encryptor: Encryptor | None = None,
211292
indent: int = 2,
212293
printer: Printer,
294+
checkpointer: ExportCheckpointer | None = None,
213295
):
214296
"""
215297
Perform an export in the `Config` scope, meaning that only models directly related to the global
@@ -226,6 +308,7 @@ def export_in_config_scope(
226308
filter_by=Filter(User, "pk", import_export_service.get_all_globally_privileged_users()),
227309
indent=indent,
228310
printer=printer,
311+
checkpointer=checkpointer,
229312
)
230313

231314

@@ -235,6 +318,7 @@ def export_in_global_scope(
235318
encryptor: Encryptor | None = None,
236319
indent: int = 2,
237320
printer: Printer,
321+
checkpointer: ExportCheckpointer | None = None,
238322
):
239323
"""
240324
Perform an export in the `Global` scope, meaning that all models will be exported from the
@@ -246,4 +330,5 @@ def export_in_global_scope(
246330
encryptor=encryptor,
247331
indent=indent,
248332
printer=printer,
333+
checkpointer=checkpointer,
249334
)

0 commit comments

Comments
 (0)