Skip to content

feat(relocation): Add GCP-backed export checkpointer #80803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions src/sentry/backup/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,6 @@ class Decryptor(ABC):
tarball.
"""

@abstractmethod
def read(self) -> bytes:
pass

Comment on lines -218 to -221
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All changes to this file remove an unused method.

@abstractmethod
def decrypt_data_encryption_key(self, unwrapped: UnwrappedEncryptedExportTarball) -> bytes:
pass
Expand All @@ -236,9 +232,6 @@ def __init__(self, fp: IO[bytes]):
def from_bytes(cls, b: bytes) -> LocalFileDecryptor:
return cls(io.BytesIO(b))

def read(self) -> bytes:
return self.__key

def decrypt_data_encryption_key(self, unwrapped: UnwrappedEncryptedExportTarball) -> bytes:
"""
Decrypt the encrypted data encryption key used to encrypt the actual export JSON.
Expand Down Expand Up @@ -288,9 +281,6 @@ def __init__(self, fp: IO[bytes]):
def from_bytes(cls, b: bytes) -> GCPKMSDecryptor:
return cls(io.BytesIO(b))

def read(self) -> bytes:
return self.__key

def decrypt_data_encryption_key(self, unwrapped: UnwrappedEncryptedExportTarball) -> bytes:
gcp_kms_config_bytes = self.__key

Expand Down
12 changes: 12 additions & 0 deletions src/sentry/tasks/relocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from sentry.api.helpers.slugs import validate_sentry_slug
from sentry.api.serializers.rest_framework.base import camel_to_snake_case, convert_dict_key_case
from sentry.backup.crypto import (
EncryptorDecryptorPair,
GCPKMSDecryptor,
GCPKMSEncryptor,
LocalFileEncryptor,
Expand Down Expand Up @@ -72,6 +73,7 @@
TASK_TO_STEP,
LoggingPrinter,
OrderedTask,
StorageBackedCheckpointExporter,
create_cloudbuild_yaml,
fail_relocation,
get_relocations_bucket_name,
Expand Down Expand Up @@ -376,6 +378,16 @@ def fulfill_cross_region_export_request(
encryptor=LocalFileEncryptor(BytesIO(encrypt_with_public_key)),
org_filter={org_slug},
printer=LoggingPrinter(uuid),
checkpointer=StorageBackedCheckpointExporter(
crypto=EncryptorDecryptorPair(
encryptor=GCPKMSEncryptor.from_crypto_key_version(get_default_crypto_key_version()),
decryptor=GCPKMSDecryptor.from_bytes(
json.dumps(get_default_crypto_key_version()).encode("utf-8")
),
),
uuid=uuid,
storage=relocation_storage,
),
)
logger.info(
"fulfill_cross_region_export_request: exported",
Expand Down
93 changes: 90 additions & 3 deletions src/sentry/utils/relocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,32 @@
from contextlib import contextmanager
from enum import Enum, unique
from functools import lru_cache
from io import BytesIO
from string import Template
from typing import Any
from uuid import UUID

from django.core.files.storage import Storage
from django.utils import timezone
from orjson import JSONDecodeError

from sentry import options
from sentry.backup.dependencies import dependencies, get_model_name, sorted_dependencies
from sentry.backup.crypto import (
DecryptionError,
EncryptorDecryptorPair,
create_encrypted_export_tarball,
decrypt_encrypted_tarball,
)
from sentry.backup.dependencies import (
NormalizedModelName,
dependencies,
get_model_name,
sorted_dependencies,
)
from sentry.backup.exports import ExportCheckpointer, ExportCheckpointerError
from sentry.backup.helpers import Printer
from sentry.backup.scopes import RelocationScope
from sentry.backup.services.import_export.model import RpcExportOk
from sentry.http import get_server_hostname
from sentry.models.files.utils import get_relocation_storage
from sentry.models.relocation import Relocation, RelocationFile
Expand Down Expand Up @@ -308,9 +324,80 @@ class OrderedTask(Enum):
)


# A custom logger that roughly matches the parts of the `click.echo` interface that the
# `import_*` methods rely on.
class StorageBackedCheckpointExporter(ExportCheckpointer):
"""
An export checkpointer that uses GCP cloud storage to store encrypted checkpoints for every
model we export for a SAAS_TO_SAAS relocation.
"""

def __init__(
self,
*,
crypto: EncryptorDecryptorPair,
uuid: UUID,
storage: Storage,
):
self.__crypto = crypto
self.__uuid = uuid
self.__storage = storage

def _get_path_name(self, model_name: NormalizedModelName) -> str:
return f"runs/{self.__uuid}/saas_to_saas_export/_checkpoints/{str(model_name)}.enc.tar"

def get(self, model_name: NormalizedModelName) -> RpcExportOk | None:
logger_data: dict[str, Any] = {"uuid": str(self.__uuid), "model": str(model_name)}
path_name = self._get_path_name(model_name)
try:
with self.__storage.open(path_name, "rb") as fp:
logger_data["encrypted_contents_size"] = fp.tell()
json_data = decrypt_encrypted_tarball(fp, self.__crypto.decryptor)
parsed_json = self._parse_cached_json(json_data)
if parsed_json is None:
logger.info(
"Export checkpointer: miss",
extra=logger_data,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to sanity check: There will be no sensitive data exposed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope: just the name of the model, the UUID of the relocation (safe), and the size (in bytes) of the encrypted data used in the checkpoint. None of that is sensitive.

)
else:
logger_data["max_pk"] = parsed_json.max_pk
logger.info(
"Export checkpointer: read",
extra=logger_data,
)

return parsed_json
except (FileNotFoundError, DecryptionError, JSONDecodeError, ExportCheckpointerError):
logger.info(
"Export checkpointer: miss",
extra=logger_data,
)
return None

def add(self, model_name: NormalizedModelName, json_export: Any) -> None:
logger_data: dict[str, Any] = {"uuid": str(self.__uuid), "model": str(model_name)}
path_name = self._get_path_name(model_name)
if not isinstance(json_export, list):
return None

out_bytes = create_encrypted_export_tarball(json_export, self.__crypto.encryptor).getvalue()
fp = BytesIO()
fp.write(out_bytes)
fp.seek(0)
self.__storage.save(path_name, fp)

logger_data["encrypted_contents_size"] = fp.tell()
logger_data["model_count"] = len(json_export)
logger.info(
"Export checkpointer: write",
extra=logger_data,
)


class LoggingPrinter(Printer):
"""
A custom logger that roughly matches the parts of the `click.echo` interface that the `import_*`
and `export_*` backup methods rely on.
"""

def __init__(self, uuid: UUID):
self.uuid = uuid
super().__init__()
Expand Down
62 changes: 58 additions & 4 deletions tests/sentry/tasks/test_relocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from google_crc32c import value as crc32c

from sentry.backup.crypto import (
EncryptorDecryptorPair,
LocalFileDecryptor,
LocalFileEncryptor,
create_encrypted_export_tarball,
Expand Down Expand Up @@ -94,7 +95,12 @@
from sentry.users.models.lostpasswordhash import LostPasswordHash
from sentry.users.models.user import User
from sentry.utils import json
from sentry.utils.relocation import RELOCATION_BLOB_SIZE, RELOCATION_FILE_TYPE, OrderedTask
from sentry.utils.relocation import (
RELOCATION_BLOB_SIZE,
RELOCATION_FILE_TYPE,
OrderedTask,
StorageBackedCheckpointExporter,
)

IMPORT_JSON_FILE_PATH = get_fixture_path("backup", "fresh-install.json")

Expand Down Expand Up @@ -302,7 +308,7 @@ def test_success_saas_to_saas(
assert uploading_complete_mock.call_count == 1
assert cross_region_export_timeout_check_mock.call_count == 1
assert fake_message_builder.call_count == 0
assert fake_kms_client.get_public_key.call_count == 1
assert fake_kms_client.get_public_key.call_count > 0
assert fake_kms_client.asymmetric_decrypt.call_count == 0

assert RelocationFile.objects.filter(
Expand Down Expand Up @@ -345,7 +351,7 @@ def test_success_saas_to_saas_racing(
assert uploading_complete_mock.call_count == 1
assert cross_region_export_timeout_check_mock.call_count == 1
assert fake_message_builder.call_count == 0
assert fake_kms_client.get_public_key.call_count == 2
assert fake_kms_client.get_public_key.call_count > 0
assert fake_kms_client.asymmetric_decrypt.call_count == 0

assert (
Expand Down Expand Up @@ -1990,6 +1996,7 @@ def setUp(self):
self.relocation.step = Relocation.Step.VALIDATING.value
self.relocation.latest_task = OrderedTask.VALIDATING_COMPLETE.name
self.relocation.save()
self.storage = get_relocation_storage()

def test_success_self_hosted(
self, postprocessing_mock: Mock, fake_kms_client: FakeKeyManagementServiceClient
Expand Down Expand Up @@ -2035,19 +2042,66 @@ def test_success_saas_to_saas(
with assume_test_silo_mode(SiloMode.CONTROL):
user_count = User.objects.all().count()

# Create an export checkpointer, so that we can validate that it stores checkpoints properly
# over multiple export attempts.
decryptor = LocalFileDecryptor(BytesIO(self.priv_key_pem))
encryptor = LocalFileEncryptor(BytesIO(self.pub_key_pem))
export_checkpointer = StorageBackedCheckpointExporter(
crypto=EncryptorDecryptorPair(
encryptor=encryptor,
decryptor=decryptor,
),
uuid=self.relocation.uuid,
storage=self.storage,
)
with TemporaryDirectory() as tmp_dir:
tmp_priv_key_path = Path(tmp_dir).joinpath("key")
tmp_pub_key_path = Path(tmp_dir).joinpath("key.pub")
with open(tmp_priv_key_path, "wb") as f:
f.write(self.priv_key_pem)
with open(tmp_pub_key_path, "wb") as f:
f.write(self.pub_key_pem)

# Export the existing state of the `testing` organization, so that we retain exact ids.
export_contents = BytesIO()
export_in_organization_scope(
export_contents,
org_filter=set(self.relocation.want_org_slugs),
printer=Printer(),
checkpointer=export_checkpointer,
)

# Verify cache writes, to the checkpoint cache.
(_, num_checkpoints) = self.storage.listdir(
f"runs/{self.relocation.uuid}/saas_to_saas_export/_checkpoints/"
)
assert len(num_checkpoints) > 0

# Export again, to sanity-check the export checkpointer.
reexport_contents = BytesIO()
export_in_organization_scope(
reexport_contents,
org_filter=set(self.relocation.want_org_slugs),
printer=Printer(),
checkpointer=export_checkpointer,
)

# Verify no cache writes, to the checkpoint cache on the second pass, then check for output
# equality.
(_, num_recheckpoints) = self.storage.listdir(
f"runs/{self.relocation.uuid}/saas_to_saas_export/_checkpoints/"
)
assert num_checkpoints == num_recheckpoints
assert export_contents.getvalue() == reexport_contents.getvalue()
export_contents.seek(0)

# Convert this into a `SAAS_TO_SAAS` relocation, and use the data we just exported as the
# import blob.
file = RelocationFile.objects.get(relocation=self.relocation).file
self.swap_relocation_file(file, export_contents)
self.tarball = create_encrypted_export_tarball(
json.load(export_contents), encryptor
).getvalue()
file.putfile(BytesIO(self.tarball), blob_size=RELOCATION_BLOB_SIZE)
self.mock_kms_client(fake_kms_client)
self.relocation.provenance = Relocation.Provenance.SAAS_TO_SAAS
self.relocation.save()
Expand Down
Loading