Skip to content

Commit 64fba06

Browse files
Implement ArtifactBundle flat file indexing (#53505)
The high-level overview is this: - When uploading a new `ArtifactBundle`, we `get_or_create` the `ArtifactBundleFlatFileIndex`, and `update_or_create` the `FlatFileIndexState`, setting it to `NOT_INDEXED`. - Then, using a distributed lock, we read the existing `ArtifactBundleFlatFileIndex`, merging the `ArtifactBundle` into it and updating it, finally setting the `FlatFileIndexState` to `WAS_INDEXED`. As the final step is behind a distributed lock, we can be confident that we don’t have data races. If any error happens during this operation, the `FlatFileIndexState` indicates that indexing still needs to happen. A separate "repair-job" could in the future just query all the `FlatFileIndexState` entries older than X that were not yet indexed, and add them to that index at a later time. --------- Co-authored-by: Riccardo Busetti <[email protected]>
1 parent ccb55e8 commit 64fba06

File tree

8 files changed

+883
-41
lines changed

8 files changed

+883
-41
lines changed

Diff for: src/sentry/conf/server.py

+2
Original file line numberDiff line numberDiff line change
@@ -1679,6 +1679,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
16791679
"projects:alert-filters": True,
16801680
# Enable functionality to specify custom inbound filters on events.
16811681
"projects:custom-inbound-filters": False,
1682+
# Enable the new flat file indexing system for sourcemaps.
1683+
"organizations:sourcemaps-bundle-flat-file-indexing": False,
16821684
# Enable data forwarding functionality for projects.
16831685
"projects:data-forwarding": True,
16841686
# Enable functionality to discard groups.

Diff for: src/sentry/debug_files/artifact_bundle_indexing.py

+285
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
import hashlib
2+
import logging
3+
from dataclasses import dataclass
4+
from datetime import datetime
5+
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, TypeVar
6+
7+
import sentry_sdk
8+
from django.db import router
9+
from django.utils import timezone
10+
11+
from sentry.locks import locks
12+
from sentry.models.artifactbundle import (
13+
NULL_STRING,
14+
ArtifactBundle,
15+
ArtifactBundleArchive,
16+
ArtifactBundleFlatFileIndex,
17+
ArtifactBundleIndexingState,
18+
FlatFileIndexState,
19+
)
20+
from sentry.utils import json, metrics
21+
from sentry.utils.db import atomic_transaction
22+
from sentry.utils.locking.lock import Lock
23+
from sentry.utils.retries import TimedRetryPolicy
24+
25+
logger = logging.getLogger(__name__)
26+
27+
28+
# We want to keep the bundle as being indexed for 600 seconds = 10 minutes. We might need to revise this number and
29+
# optimize it based on the time taken to perform the indexing (on average).
30+
FLAT_FILE_INDEXING_CACHE_TIMEOUT = 600
31+
32+
33+
@sentry_sdk.tracing.trace
34+
def mark_bundle_for_flat_file_indexing(
35+
artifact_bundle: ArtifactBundle,
36+
project_ids: List[int],
37+
release: Optional[str],
38+
dist: Optional[str],
39+
):
40+
identifiers = []
41+
42+
for project_id in project_ids:
43+
if release:
44+
identifiers.append(
45+
FlatFileIdentifier(project_id, release=release, dist=dist or NULL_STRING)
46+
)
47+
48+
identifiers.append(FlatFileIdentifier.for_debug_id(project_id))
49+
50+
# Create / Update the indexing state in the database
51+
with atomic_transaction(
52+
using=(
53+
router.db_for_write(ArtifactBundleFlatFileIndex),
54+
router.db_for_write(FlatFileIndexState),
55+
)
56+
):
57+
for identifier in identifiers:
58+
flat_file_index, _created = ArtifactBundleFlatFileIndex.objects.get_or_create(
59+
project_id=identifier.project_id,
60+
release_name=identifier.release,
61+
dist_name=identifier.dist,
62+
)
63+
FlatFileIndexState.objects.update_or_create(
64+
flat_file_index=flat_file_index,
65+
artifact_bundle=artifact_bundle,
66+
defaults={
67+
"indexing_state": ArtifactBundleIndexingState.NOT_INDEXED.value,
68+
"date_added": timezone.now(),
69+
},
70+
)
71+
72+
return identifiers
73+
74+
75+
class FlatFileIdentifier(NamedTuple):
76+
project_id: int
77+
release: str
78+
dist: str
79+
80+
@staticmethod
81+
def for_debug_id(project_id: int) -> "FlatFileIdentifier":
82+
return FlatFileIdentifier(project_id, release=NULL_STRING, dist=NULL_STRING)
83+
84+
def is_indexing_by_release(self) -> bool:
85+
# An identifier is indexing by release if release is set.
86+
return bool(self.release)
87+
88+
def _key_hash(self) -> str:
89+
key = f"{self.project_id}|{self.release}|{self.dist}"
90+
return hashlib.sha1(key.encode()).hexdigest()
91+
92+
def get_lock(self) -> Lock:
93+
key_hash = self._key_hash()
94+
locking_key = f"bundle_index:write:{key_hash}"
95+
return locks.get(locking_key, duration=60 * 10, name="bundle_index")
96+
97+
98+
@sentry_sdk.tracing.trace
99+
def update_artifact_bundle_index(
100+
bundle_meta: "BundleMeta", bundle_archive: ArtifactBundleArchive, identifier: FlatFileIdentifier
101+
):
102+
"""
103+
This will merge the `ArtifactBundle` given via `bundle_meta` and `bundle_archive`
104+
into the index identified via `identifier`.
105+
106+
If this function fails for any reason, it can be, and *has to be* retried at a later point.
107+
"""
108+
# TODO: maybe query `FlatFileIndexState` to avoid double-indexing?
109+
110+
lock = identifier.get_lock()
111+
with TimedRetryPolicy(60)(lock.acquire):
112+
flat_file_index = ArtifactBundleFlatFileIndex.objects.select_related("flat_file_index").get(
113+
project_id=identifier.project_id,
114+
release_name=identifier.release,
115+
dist_name=identifier.dist,
116+
)
117+
118+
index = FlatFileIndex()
119+
# Load the index from the file if it exists
120+
if existing_index := flat_file_index.load_flat_file_index():
121+
index.from_json(existing_index)
122+
123+
# Before merging new data into the index, we will clear any existing
124+
# data from the index related to this bundle.
125+
# This is related to an edge-case in which the same `bundle_id` could be
126+
# re-used but with different file contents.
127+
index.remove(bundle_meta.id)
128+
129+
# We merge the index based on the identifier type.
130+
if identifier.is_indexing_by_release():
131+
index.merge_urls(bundle_meta, bundle_archive)
132+
else:
133+
index.merge_debug_ids(bundle_meta, bundle_archive)
134+
135+
# Store the updated index file
136+
new_json_index = index.to_json()
137+
flat_file_index.update_flat_file_index(new_json_index)
138+
139+
# And then mark the bundle as indexed
140+
was_updated = FlatFileIndexState.compare_state_and_set(
141+
flat_file_index.id,
142+
bundle_meta.id,
143+
ArtifactBundleIndexingState.NOT_INDEXED,
144+
ArtifactBundleIndexingState.WAS_INDEXED,
145+
)
146+
if not was_updated:
147+
metrics.incr("artifact_bundle_flat_file_indexing.duplicated_indexing")
148+
logger.error("`ArtifactBundle` %r was already indexed into %r", bundle_meta, identifier)
149+
150+
151+
@dataclass(frozen=True)
152+
class BundleMeta:
153+
id: int
154+
timestamp: datetime
155+
156+
157+
Bundles = List[BundleMeta]
158+
FilesByUrl = Dict[str, List[int]]
159+
FilesByDebugID = Dict[str, List[int]]
160+
161+
162+
T = TypeVar("T")
163+
164+
165+
class FlatFileIndex:
166+
def __init__(self):
167+
# By default, a flat file index is empty.
168+
self._bundles: Bundles = []
169+
self._files_by_url: FilesByUrl = {}
170+
self._files_by_debug_id: FilesByDebugID = {}
171+
172+
def from_json(self, json_str: str) -> None:
173+
json_idx = json.loads(json_str)
174+
175+
bundles = json_idx.get("bundles", [])
176+
self._bundles = [
177+
BundleMeta(
178+
int(bundle["bundle_id"].split("/")[1]),
179+
datetime.fromisoformat(bundle["timestamp"]),
180+
)
181+
for bundle in bundles
182+
]
183+
self._files_by_url = json_idx.get("files_by_url", {})
184+
self._files_by_debug_id = json_idx.get("files_by_debug_id", {})
185+
186+
def to_json(self) -> str:
187+
bundles = [
188+
{
189+
# NOTE: Symbolicator is using the `bundle_id` as the `?download=...`
190+
# parameter it passes to the artifact-lookup API to download the
191+
# linked bundle from, so this has to match whatever download_id
192+
# the artifact-lookup API accepts.
193+
"bundle_id": f"artifact_bundle/{bundle.id}",
194+
"timestamp": datetime.isoformat(bundle.timestamp),
195+
}
196+
for bundle in self._bundles
197+
]
198+
json_idx: Dict[str, Any] = {
199+
"bundles": bundles,
200+
"files_by_url": self._files_by_url,
201+
"files_by_debug_id": self._files_by_debug_id,
202+
}
203+
204+
return json.dumps(json_idx)
205+
206+
def merge_urls(self, bundle_meta: BundleMeta, bundle_archive: ArtifactBundleArchive):
207+
bundle_index = self._add_or_update_bundle(bundle_meta)
208+
if bundle_index is None:
209+
return
210+
211+
for url in bundle_archive.get_all_urls():
212+
self._add_sorted_entry(self._files_by_url, url, bundle_index)
213+
214+
def merge_debug_ids(self, bundle_meta: BundleMeta, bundle_archive: ArtifactBundleArchive):
215+
bundle_index = self._add_or_update_bundle(bundle_meta)
216+
if bundle_index is None:
217+
return
218+
219+
for debug_id, _ in bundle_archive.get_all_debug_ids():
220+
self._add_sorted_entry(self._files_by_debug_id, debug_id, bundle_index)
221+
222+
def _add_or_update_bundle(self, bundle_meta: BundleMeta) -> Optional[int]:
223+
index_and_bundle_meta = self._index_and_bundle_meta_for_id(bundle_meta.id)
224+
if index_and_bundle_meta is None:
225+
self._bundles.append(bundle_meta)
226+
return len(self._bundles) - 1
227+
228+
found_bundle_index, found_bundle_meta = index_and_bundle_meta
229+
# In case the new bundle is exactly the same, we will not update, since it's unnecessary.
230+
if found_bundle_meta == bundle_meta:
231+
return None
232+
else:
233+
# TODO: it might be possible to optimize updating and re-sorting
234+
# an existing bundle
235+
self._bundles[found_bundle_index] = bundle_meta
236+
return found_bundle_index
237+
238+
def _add_sorted_entry(self, collection: Dict[T, List[int]], key: T, bundle_index: int):
239+
entries = collection.get(key, [])
240+
entries.append(bundle_index)
241+
# Remove duplicates by doing a roundtrip through `set`.
242+
entries = list(set(entries))
243+
# Symbolicator will consider the newest element the last element of the list.
244+
entries.sort(key=lambda index: (self._bundles[index].timestamp, self._bundles[index].id))
245+
collection[key] = entries
246+
247+
def remove(self, artifact_bundle_id: int) -> bool:
248+
index_and_bundle_meta = self._index_and_bundle_meta_for_id(artifact_bundle_id)
249+
if index_and_bundle_meta is None:
250+
return False
251+
252+
found_bundle_index, _ = index_and_bundle_meta
253+
self._files_by_url = self._update_bundle_references(self._files_by_url, found_bundle_index)
254+
self._files_by_debug_id = self._update_bundle_references(
255+
self._files_by_debug_id, found_bundle_index
256+
)
257+
self._bundles.pop(found_bundle_index)
258+
259+
return True
260+
261+
@staticmethod
262+
def _update_bundle_references(collection: Dict[T, List[int]], removed_bundle_index: int):
263+
updated_collection: Dict[T, List[int]] = {}
264+
265+
for key, indexes in collection.items():
266+
updated_indexes = [
267+
index if index < removed_bundle_index else index - 1
268+
for index in indexes
269+
if index != removed_bundle_index
270+
]
271+
272+
# Only if we have some indexes we want to keep the key.
273+
if len(updated_indexes) > 0:
274+
updated_collection[key] = updated_indexes
275+
276+
return updated_collection
277+
278+
def _index_and_bundle_meta_for_id(
279+
self, artifact_bundle_id: int
280+
) -> Optional[Tuple[int, BundleMeta]]:
281+
for index, bundle in enumerate(self._bundles):
282+
if bundle.id == artifact_bundle_id:
283+
return index, bundle
284+
285+
return None

Diff for: src/sentry/features/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@
259259
default_manager.add("organizations:pr-comment-bot", OrganizationFeature, FeatureHandlerStrategy.INTERNAL)
260260
default_manager.add("organizations:ds-org-recalibration", OrganizationFeature, FeatureHandlerStrategy.INTERNAL)
261261
default_manager.add("organizations:slack-use-new-lookup", OrganizationFeature, FeatureHandlerStrategy.REMOTE)
262+
default_manager.add("organizations:sourcemaps-bundle-flat-file-indexing", OrganizationFeature, FeatureHandlerStrategy.REMOTE)
262263

263264
# Project scoped features
264265
default_manager.add("projects:alert-filters", ProjectFeature, FeatureHandlerStrategy.INTERNAL)

Diff for: src/sentry/models/artifactbundle.py

+15-25
Original file line numberDiff line numberDiff line change
@@ -132,26 +132,6 @@ class Meta:
132132

133133
index_together = (("project_id", "release_name", "dist_name"),)
134134

135-
@classmethod
136-
def create_flat_file_index(
137-
cls, project_id: int, release: str, dist: str, file_contents: Optional[str]
138-
) -> "ArtifactBundleFlatFileIndex":
139-
from sentry.models import File
140-
141-
with atomic_transaction(
142-
using=(router.db_for_write(File), router.db_for_write(ArtifactBundleFlatFileIndex))
143-
):
144-
# By default, we can create a flat index file which has not `File` object bound to it.
145-
file = None
146-
if file_contents:
147-
file = cls._create_flat_file_index_object(file_contents)
148-
149-
index = ArtifactBundleFlatFileIndex.objects.create(
150-
project_id=project_id, release_name=release, dist_name=dist, flat_file_index=file
151-
)
152-
153-
return index
154-
155135
def update_flat_file_index(self, file_contents: str):
156136
from sentry.models import File
157137

@@ -212,8 +192,8 @@ def compare_state_and_set(
212192
updated_rows = FlatFileIndexState.objects.filter(
213193
flat_file_index_id=flat_file_index_id,
214194
artifact_bundle_id=artifact_bundle_id,
215-
indexing_state=indexing_state,
216-
).update(indexing_state=new_indexing_state, date_added=timezone.now())
195+
indexing_state=indexing_state.value,
196+
).update(indexing_state=new_indexing_state.value, date_added=timezone.now())
217197

218198
# If we had one row being updated, it means that the cas operation succeeded.
219199
return updated_rows == 1
@@ -302,8 +282,12 @@ class ArtifactBundleArchive:
302282
def __init__(self, fileobj: IO, build_memory_map: bool = True):
303283
self._fileobj = fileobj
304284
self._zip_file = zipfile.ZipFile(self._fileobj)
285+
self._entries_by_debug_id = {}
286+
self._entries_by_url = {}
287+
305288
self.manifest = self._read_manifest()
306289
self.artifact_count = len(self.manifest.get("files", {}))
290+
307291
if build_memory_map:
308292
self._build_memory_maps()
309293

@@ -342,9 +326,6 @@ def normalize_debug_id(debug_id: Optional[str]) -> Optional[str]:
342326
return None
343327

344328
def _build_memory_maps(self):
345-
self._entries_by_debug_id = {}
346-
self._entries_by_url = {}
347-
348329
files = self.manifest.get("files", {})
349330
for file_path, info in files.items():
350331
# Building the map for debug_id lookup.
@@ -367,6 +348,15 @@ def _build_memory_maps(self):
367348
# Building the map for url lookup.
368349
self._entries_by_url[info.get("url")] = (file_path, info)
369350

351+
def get_all_urls(self):
352+
return self._entries_by_url.keys()
353+
354+
def get_all_debug_ids(self):
355+
return self._entries_by_debug_id.keys()
356+
357+
def has_debug_ids(self):
358+
return len(self._entries_by_debug_id) > 0
359+
370360
def extract_debug_ids_from_manifest(
371361
self,
372362
) -> Set[Tuple[SourceFileType, str]]:

0 commit comments

Comments
 (0)