diff --git a/src/sentry/backup/crypto.py b/src/sentry/backup/crypto.py index 78bcebe8e5b4dc..1e842ca6e2ed01 100644 --- a/src/sentry/backup/crypto.py +++ b/src/sentry/backup/crypto.py @@ -215,10 +215,6 @@ class Decryptor(ABC): tarball. """ - @abstractmethod - def read(self) -> bytes: - pass - @abstractmethod def decrypt_data_encryption_key(self, unwrapped: UnwrappedEncryptedExportTarball) -> bytes: pass @@ -236,9 +232,6 @@ def __init__(self, fp: IO[bytes]): def from_bytes(cls, b: bytes) -> LocalFileDecryptor: return cls(io.BytesIO(b)) - def read(self) -> bytes: - return self.__key - def decrypt_data_encryption_key(self, unwrapped: UnwrappedEncryptedExportTarball) -> bytes: """ Decrypt the encrypted data encryption key used to encrypt the actual export JSON. @@ -288,9 +281,6 @@ def __init__(self, fp: IO[bytes]): def from_bytes(cls, b: bytes) -> GCPKMSDecryptor: return cls(io.BytesIO(b)) - def read(self) -> bytes: - return self.__key - def decrypt_data_encryption_key(self, unwrapped: UnwrappedEncryptedExportTarball) -> bytes: gcp_kms_config_bytes = self.__key diff --git a/src/sentry/tasks/relocation.py b/src/sentry/tasks/relocation.py index 545ec775a1f14a..efd7a4cf886289 100644 --- a/src/sentry/tasks/relocation.py +++ b/src/sentry/tasks/relocation.py @@ -24,6 +24,7 @@ from sentry.api.helpers.slugs import validate_sentry_slug from sentry.api.serializers.rest_framework.base import camel_to_snake_case, convert_dict_key_case from sentry.backup.crypto import ( + EncryptorDecryptorPair, GCPKMSDecryptor, GCPKMSEncryptor, LocalFileEncryptor, @@ -72,6 +73,7 @@ TASK_TO_STEP, LoggingPrinter, OrderedTask, + StorageBackedCheckpointExporter, create_cloudbuild_yaml, fail_relocation, get_relocations_bucket_name, @@ -376,6 +378,16 @@ def fulfill_cross_region_export_request( encryptor=LocalFileEncryptor(BytesIO(encrypt_with_public_key)), org_filter={org_slug}, printer=LoggingPrinter(uuid), + checkpointer=StorageBackedCheckpointExporter( + crypto=EncryptorDecryptorPair( + encryptor=GCPKMSEncryptor.from_crypto_key_version(get_default_crypto_key_version()), + decryptor=GCPKMSDecryptor.from_bytes( + json.dumps(get_default_crypto_key_version()).encode("utf-8") + ), + ), + uuid=uuid, + storage=relocation_storage, + ), ) logger.info( "fulfill_cross_region_export_request: exported", diff --git a/src/sentry/utils/relocation.py b/src/sentry/utils/relocation.py index 0c9d4d2e5ec45e..4a5f5b4e12e985 100644 --- a/src/sentry/utils/relocation.py +++ b/src/sentry/utils/relocation.py @@ -5,16 +5,32 @@ from contextlib import contextmanager from enum import Enum, unique from functools import lru_cache +from io import BytesIO from string import Template from typing import Any from uuid import UUID +from django.core.files.storage import Storage from django.utils import timezone +from orjson import JSONDecodeError from sentry import options -from sentry.backup.dependencies import dependencies, get_model_name, sorted_dependencies +from sentry.backup.crypto import ( + DecryptionError, + EncryptorDecryptorPair, + create_encrypted_export_tarball, + decrypt_encrypted_tarball, +) +from sentry.backup.dependencies import ( + NormalizedModelName, + dependencies, + get_model_name, + sorted_dependencies, +) +from sentry.backup.exports import ExportCheckpointer, ExportCheckpointerError from sentry.backup.helpers import Printer from sentry.backup.scopes import RelocationScope +from sentry.backup.services.import_export.model import RpcExportOk from sentry.http import get_server_hostname from sentry.models.files.utils import get_relocation_storage from sentry.models.relocation import Relocation, RelocationFile @@ -308,9 +324,80 @@ class OrderedTask(Enum): ) -# A custom logger that roughly matches the parts of the `click.echo` interface that the -# `import_*` methods rely on. +class StorageBackedCheckpointExporter(ExportCheckpointer): + """ + An export checkpointer that uses GCP cloud storage to store encrypted checkpoints for every + model we export for a SAAS_TO_SAAS relocation. + """ + + def __init__( + self, + *, + crypto: EncryptorDecryptorPair, + uuid: UUID, + storage: Storage, + ): + self.__crypto = crypto + self.__uuid = uuid + self.__storage = storage + + def _get_path_name(self, model_name: NormalizedModelName) -> str: + return f"runs/{self.__uuid}/saas_to_saas_export/_checkpoints/{str(model_name)}.enc.tar" + + def get(self, model_name: NormalizedModelName) -> RpcExportOk | None: + logger_data: dict[str, Any] = {"uuid": str(self.__uuid), "model": str(model_name)} + path_name = self._get_path_name(model_name) + try: + with self.__storage.open(path_name, "rb") as fp: + logger_data["encrypted_contents_size"] = fp.tell() + json_data = decrypt_encrypted_tarball(fp, self.__crypto.decryptor) + parsed_json = self._parse_cached_json(json_data) + if parsed_json is None: + logger.info( + "Export checkpointer: miss", + extra=logger_data, + ) + else: + logger_data["max_pk"] = parsed_json.max_pk + logger.info( + "Export checkpointer: read", + extra=logger_data, + ) + + return parsed_json + except (FileNotFoundError, DecryptionError, JSONDecodeError, ExportCheckpointerError): + logger.info( + "Export checkpointer: miss", + extra=logger_data, + ) + return None + + def add(self, model_name: NormalizedModelName, json_export: Any) -> None: + logger_data: dict[str, Any] = {"uuid": str(self.__uuid), "model": str(model_name)} + path_name = self._get_path_name(model_name) + if not isinstance(json_export, list): + return None + + out_bytes = create_encrypted_export_tarball(json_export, self.__crypto.encryptor).getvalue() + fp = BytesIO() + fp.write(out_bytes) + fp.seek(0) + self.__storage.save(path_name, fp) + + logger_data["encrypted_contents_size"] = fp.tell() + logger_data["model_count"] = len(json_export) + logger.info( + "Export checkpointer: write", + extra=logger_data, + ) + + class LoggingPrinter(Printer): + """ + A custom logger that roughly matches the parts of the `click.echo` interface that the `import_*` + and `export_*` backup methods rely on. + """ + def __init__(self, uuid: UUID): self.uuid = uuid super().__init__() diff --git a/tests/sentry/tasks/test_relocation.py b/tests/sentry/tasks/test_relocation.py index 40b79bda06acd2..1304e4c71238a1 100644 --- a/tests/sentry/tasks/test_relocation.py +++ b/tests/sentry/tasks/test_relocation.py @@ -15,6 +15,7 @@ from google_crc32c import value as crc32c from sentry.backup.crypto import ( + EncryptorDecryptorPair, LocalFileDecryptor, LocalFileEncryptor, create_encrypted_export_tarball, @@ -94,7 +95,12 @@ from sentry.users.models.lostpasswordhash import LostPasswordHash from sentry.users.models.user import User from sentry.utils import json -from sentry.utils.relocation import RELOCATION_BLOB_SIZE, RELOCATION_FILE_TYPE, OrderedTask +from sentry.utils.relocation import ( + RELOCATION_BLOB_SIZE, + RELOCATION_FILE_TYPE, + OrderedTask, + StorageBackedCheckpointExporter, +) IMPORT_JSON_FILE_PATH = get_fixture_path("backup", "fresh-install.json") @@ -302,7 +308,7 @@ def test_success_saas_to_saas( assert uploading_complete_mock.call_count == 1 assert cross_region_export_timeout_check_mock.call_count == 1 assert fake_message_builder.call_count == 0 - assert fake_kms_client.get_public_key.call_count == 1 + assert fake_kms_client.get_public_key.call_count > 0 assert fake_kms_client.asymmetric_decrypt.call_count == 0 assert RelocationFile.objects.filter( @@ -345,7 +351,7 @@ def test_success_saas_to_saas_racing( assert uploading_complete_mock.call_count == 1 assert cross_region_export_timeout_check_mock.call_count == 1 assert fake_message_builder.call_count == 0 - assert fake_kms_client.get_public_key.call_count == 2 + assert fake_kms_client.get_public_key.call_count > 0 assert fake_kms_client.asymmetric_decrypt.call_count == 0 assert ( @@ -1990,6 +1996,7 @@ def setUp(self): self.relocation.step = Relocation.Step.VALIDATING.value self.relocation.latest_task = OrderedTask.VALIDATING_COMPLETE.name self.relocation.save() + self.storage = get_relocation_storage() def test_success_self_hosted( self, postprocessing_mock: Mock, fake_kms_client: FakeKeyManagementServiceClient @@ -2035,19 +2042,66 @@ def test_success_saas_to_saas( with assume_test_silo_mode(SiloMode.CONTROL): user_count = User.objects.all().count() + # Create an export checkpointer, so that we can validate that it stores checkpoints properly + # over multiple export attempts. + decryptor = LocalFileDecryptor(BytesIO(self.priv_key_pem)) + encryptor = LocalFileEncryptor(BytesIO(self.pub_key_pem)) + export_checkpointer = StorageBackedCheckpointExporter( + crypto=EncryptorDecryptorPair( + encryptor=encryptor, + decryptor=decryptor, + ), + uuid=self.relocation.uuid, + storage=self.storage, + ) + with TemporaryDirectory() as tmp_dir: + tmp_priv_key_path = Path(tmp_dir).joinpath("key") + tmp_pub_key_path = Path(tmp_dir).joinpath("key.pub") + with open(tmp_priv_key_path, "wb") as f: + f.write(self.priv_key_pem) + with open(tmp_pub_key_path, "wb") as f: + f.write(self.pub_key_pem) + # Export the existing state of the `testing` organization, so that we retain exact ids. export_contents = BytesIO() export_in_organization_scope( export_contents, org_filter=set(self.relocation.want_org_slugs), printer=Printer(), + checkpointer=export_checkpointer, + ) + + # Verify cache writes, to the checkpoint cache. + (_, num_checkpoints) = self.storage.listdir( + f"runs/{self.relocation.uuid}/saas_to_saas_export/_checkpoints/" + ) + assert len(num_checkpoints) > 0 + + # Export again, to sanity-check the export checkpointer. + reexport_contents = BytesIO() + export_in_organization_scope( + reexport_contents, + org_filter=set(self.relocation.want_org_slugs), + printer=Printer(), + checkpointer=export_checkpointer, + ) + + # Verify no cache writes, to the checkpoint cache on the second pass, then check for output + # equality. + (_, num_recheckpoints) = self.storage.listdir( + f"runs/{self.relocation.uuid}/saas_to_saas_export/_checkpoints/" ) + assert num_checkpoints == num_recheckpoints + assert export_contents.getvalue() == reexport_contents.getvalue() export_contents.seek(0) # Convert this into a `SAAS_TO_SAAS` relocation, and use the data we just exported as the # import blob. file = RelocationFile.objects.get(relocation=self.relocation).file - self.swap_relocation_file(file, export_contents) + self.tarball = create_encrypted_export_tarball( + json.load(export_contents), encryptor + ).getvalue() + file.putfile(BytesIO(self.tarball), blob_size=RELOCATION_BLOB_SIZE) self.mock_kms_client(fake_kms_client) self.relocation.provenance = Relocation.Provenance.SAAS_TO_SAAS self.relocation.save()