Skip to content

Commit 37c58ca

Browse files
Track Repository Gen. in BlobStoreRepository (#48944)
This is intended as a stop-gap solution/improvement to #38941 that prevents repo modifications without an intermittent master failover from causing inconsistent (outdated due to inconsistent listing of index-N blobs) `RepositoryData` to be written. Tracking the latest repository generation will move to the cluster state in a separate pull request. This is intended as a low-risk change to be backported as far as possible and motived by the recently increased chance of #38941 causing trouble via SLM (see #47520). Closes #47834 Closes #49048
1 parent 3ab2de1 commit 37c58ca

File tree

4 files changed

+106
-26
lines changed

4 files changed

+106
-26
lines changed

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+71-12
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
import java.util.concurrent.Executor;
114114
import java.util.concurrent.LinkedBlockingQueue;
115115
import java.util.concurrent.TimeUnit;
116+
import java.util.concurrent.atomic.AtomicLong;
116117
import java.util.stream.Collectors;
117118
import java.util.stream.Stream;
118119

@@ -366,7 +367,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
366367
} else {
367368
try {
368369
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
369-
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet()));
370+
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
370371
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
371372
// delete an index that was created by another master node after writing this index-N blob.
372373
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
@@ -377,6 +378,30 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
377378
}
378379
}
379380

381+
/**
382+
* Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation.
383+
*
384+
* @param repositoryStateId Expected repository generation
385+
* @param rootBlobs Blobs at the repository root
386+
* @return RepositoryData
387+
*/
388+
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
389+
final long generation = latestGeneration(rootBlobs.keySet());
390+
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
391+
if (genToLoad > generation) {
392+
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
393+
// debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or
394+
// snapshot delete run anyway.
395+
logger.debug("Determined repository's generation from its contents to [" + generation + "] but " +
396+
"current generation is at least [" + genToLoad + "]");
397+
}
398+
if (genToLoad != repositoryStateId) {
399+
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
400+
repositoryStateId + "], actual current generation [" + genToLoad + "]");
401+
}
402+
return getRepositoryData(genToLoad);
403+
}
404+
380405
/**
381406
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
382407
* and then has all now unreferenced blobs in it deleted.
@@ -604,14 +629,8 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen
604629
if (isReadOnly()) {
605630
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
606631
}
607-
final RepositoryData repositoryData = getRepositoryData();
608-
if (repositoryData.getGenId() != repositoryStateId) {
609-
// Check that we are working on the expected repository version before gathering the data to clean up
610-
throw new RepositoryException(metadata.name(), "concurrent modification of the repository before cleanup started, " +
611-
"expected current generation [" + repositoryStateId + "], actual current generation ["
612-
+ repositoryData.getGenId() + "]");
613-
}
614632
Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
633+
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
615634
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
616635
final Set<String> survivingIndexIds =
617636
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
@@ -897,12 +916,36 @@ public void endVerification(String seed) {
897916
}
898917
}
899918

919+
// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
920+
// and concurrent modifications.
921+
// Protected for use in MockEventuallyConsistentRepository
922+
protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);
923+
900924
@Override
901925
public RepositoryData getRepositoryData() {
902-
try {
903-
return getRepositoryData(latestIndexBlobId());
904-
} catch (IOException ioe) {
905-
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
926+
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
927+
while (true) {
928+
final long generation;
929+
try {
930+
generation = latestIndexBlobId();
931+
} catch (IOException ioe) {
932+
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
933+
}
934+
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
935+
if (genToLoad > generation) {
936+
logger.info("Determined repository generation [" + generation
937+
+ "] from repository contents but correct generation must be at least [" + genToLoad + "]");
938+
}
939+
try {
940+
return getRepositoryData(genToLoad);
941+
} catch (RepositoryException e) {
942+
if (genToLoad != latestKnownRepoGen.get()) {
943+
logger.warn("Failed to load repository data generation [" + genToLoad +
944+
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
945+
continue;
946+
}
947+
throw e;
948+
}
906949
}
907950
}
908951

@@ -920,6 +963,12 @@ private RepositoryData getRepositoryData(long indexGen) {
920963
return RepositoryData.snapshotsFromXContent(parser, indexGen);
921964
}
922965
} catch (IOException ioe) {
966+
// If we fail to load the generation we tracked in latestKnownRepoGen we reset it.
967+
// This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent
968+
// operations must start from the EMPTY_REPO_GEN again
969+
if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) {
970+
logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe);
971+
}
923972
throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe);
924973
}
925974
}
@@ -945,11 +994,21 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long exp
945994
"] - possibly due to simultaneous snapshot deletion requests");
946995
}
947996
final long newGen = currentGen + 1;
997+
if (latestKnownRepoGen.get() >= newGen) {
998+
throw new IllegalArgumentException(
999+
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
1000+
}
9481001
// write the index file
9491002
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
9501003
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
9511004
writeAtomic(indexBlob,
9521005
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
1006+
final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
1007+
if (newGen < latestKnownGen) {
1008+
// Don't mess up the index.latest blob
1009+
throw new IllegalStateException(
1010+
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]");
1011+
}
9531012
// write the current generation to the index-latest file
9541013
final BytesReference genBytes;
9551014
try (BytesStreamOutput bStream = new BytesStreamOutput()) {

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ public void testConcurrentSnapshotCreateAndDelete() {
396396
final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();
397397

398398
continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster()
399-
.prepareCreateSnapshot(repoName, snapshotName).execute(createAnotherSnapshotResponseStepListener));
399+
.prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createAnotherSnapshotResponseStepListener));
400400
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
401401
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));
402402

@@ -1133,7 +1133,7 @@ protected void assertSnapshotOrGenericThread() {
11331133
} else {
11341134
return metaData -> {
11351135
final Repository repository = new MockEventuallyConsistentRepository(
1136-
metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext);
1136+
metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext, random());
11371137
repository.start();
11381138
return repository;
11391139
};

server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java

+27-6
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.HashMap;
4848
import java.util.List;
4949
import java.util.Map;
50+
import java.util.Random;
5051
import java.util.concurrent.atomic.AtomicBoolean;
5152
import java.util.concurrent.atomic.AtomicLong;
5253
import java.util.function.Function;
@@ -64,6 +65,8 @@
6465
*/
6566
public class MockEventuallyConsistentRepository extends BlobStoreRepository {
6667

68+
private final Random random;
69+
6770
private final Context context;
6871

6972
private final NamedXContentRegistry namedXContentRegistry;
@@ -72,10 +75,12 @@ public MockEventuallyConsistentRepository(
7275
final RepositoryMetaData metadata,
7376
final NamedXContentRegistry namedXContentRegistry,
7477
final ThreadPool threadPool,
75-
final Context context) {
78+
final Context context,
79+
final Random random) {
7680
super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath());
7781
this.context = context;
7882
this.namedXContentRegistry = namedXContentRegistry;
83+
this.random = random;
7984
}
8085

8186
// Filters out all actions that are super-seeded by subsequent actions
@@ -107,6 +112,9 @@ protected BlobStore createBlobStore() {
107112
*/
108113
public static final class Context {
109114

115+
// Eventual consistency is only simulated as long as this flag is false
116+
private boolean consistent;
117+
110118
private final List<BlobStoreAction> actions = new ArrayList<>();
111119

112120
/**
@@ -117,6 +125,7 @@ public void forceConsistent() {
117125
final List<BlobStoreAction> consistentActions = consistentView(actions);
118126
actions.clear();
119127
actions.addAll(consistentActions);
128+
consistent = true;
120129
}
121130
}
122131
}
@@ -240,14 +249,14 @@ public Map<String, BlobMetaData> listBlobs() {
240249
ensureNotClosed();
241250
final String thisPath = path.buildAsString();
242251
synchronized (context.actions) {
243-
return consistentView(context.actions).stream()
252+
return maybeMissLatestIndexN(consistentView(context.actions).stream()
244253
.filter(
245254
action -> action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') == -1
246255
&& action.operation == Operation.PUT)
247256
.collect(
248257
Collectors.toMap(
249258
action -> action.path.substring(thisPath.length()),
250-
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length)));
259+
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length))));
251260
}
252261
}
253262

@@ -268,9 +277,21 @@ public Map<String, BlobContainer> children() {
268277

269278
@Override
270279
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) {
271-
return Maps.ofEntries(
272-
listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)).collect(Collectors.toList())
273-
);
280+
return maybeMissLatestIndexN(
281+
Maps.ofEntries(listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix))
282+
.collect(Collectors.toList())));
283+
}
284+
285+
// Randomly filter out the latest /index-N blob from a listing to test that tracking of it in latestKnownRepoGen
286+
// overrides an inconsistent listing
287+
private Map<String, BlobMetaData> maybeMissLatestIndexN(Map<String, BlobMetaData> listing) {
288+
// Only filter out latest index-N at the repo root and only as long as we're not in a forced consistent state
289+
if (path.parent() == null && context.consistent == false && random.nextBoolean()) {
290+
final Map<String, BlobMetaData> filtered = new HashMap<>(listing);
291+
filtered.remove(BlobStoreRepository.INDEX_FILE_PREFIX + latestKnownRepoGen.get());
292+
return Map.copyOf(filtered);
293+
}
294+
return listing;
274295
}
275296

276297
@Override

server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void testReadAfterWriteConsistently() throws IOException {
5050
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
5151
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
5252
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
53-
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
53+
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
5454
repository.start();
5555
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
5656
final String blobName = randomAlphaOfLength(10);
@@ -70,7 +70,7 @@ public void testReadAfterWriteAfterReadThrows() throws IOException {
7070
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
7171
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
7272
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
73-
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
73+
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
7474
repository.start();
7575
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
7676
final String blobName = randomAlphaOfLength(10);
@@ -86,7 +86,7 @@ public void testReadAfterDeleteAfterWriteThrows() throws IOException {
8686
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
8787
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
8888
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
89-
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
89+
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
9090
repository.start();
9191
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
9292
final String blobName = randomAlphaOfLength(10);
@@ -104,7 +104,7 @@ public void testOverwriteRandomBlobFails() throws IOException {
104104
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
105105
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
106106
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
107-
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
107+
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
108108
repository.start();
109109
final BlobContainer container = repository.blobStore().blobContainer(repository.basePath());
110110
final String blobName = randomAlphaOfLength(10);
@@ -121,7 +121,7 @@ public void testOverwriteShardSnapBlobFails() throws IOException {
121121
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
122122
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
123123
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
124-
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
124+
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
125125
repository.start();
126126
final BlobContainer container =
127127
repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0"));
@@ -143,7 +143,7 @@ public void testOverwriteSnapshotInfoBlob() {
143143
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10)));
144144
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
145145
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
146-
xContentRegistry(), threadPool, blobStoreContext)) {
146+
xContentRegistry(), threadPool, blobStoreContext, random())) {
147147
repository.start();
148148

149149
// We create a snap- blob for snapshot "foo" in the first generation

0 commit comments

Comments
 (0)