113
113
import java .util .concurrent .Executor ;
114
114
import java .util .concurrent .LinkedBlockingQueue ;
115
115
import java .util .concurrent .TimeUnit ;
116
+ import java .util .concurrent .atomic .AtomicLong ;
116
117
import java .util .stream .Collectors ;
117
118
import java .util .stream .Stream ;
118
119
@@ -372,7 +373,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
372
373
} else {
373
374
try {
374
375
final Map <String , BlobMetaData > rootBlobs = blobContainer ().listBlobs ();
375
- final RepositoryData repositoryData = getRepositoryData ( latestGeneration ( rootBlobs . keySet ()) );
376
+ final RepositoryData repositoryData = safeRepositoryData ( repositoryStateId , rootBlobs );
376
377
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
377
378
// delete an index that was created by another master node after writing this index-N blob.
378
379
final Map <String , BlobContainer > foundIndices = blobStore ().blobContainer (indicesPath ()).children ();
@@ -383,6 +384,30 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
383
384
}
384
385
}
385
386
387
+ /**
388
+ * Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation.
389
+ *
390
+ * @param repositoryStateId Expected repository generation
391
+ * @param rootBlobs Blobs at the repository root
392
+ * @return RepositoryData
393
+ */
394
+ private RepositoryData safeRepositoryData (long repositoryStateId , Map <String , BlobMetaData > rootBlobs ) {
395
+ final long generation = latestGeneration (rootBlobs .keySet ());
396
+ final long genToLoad = latestKnownRepoGen .updateAndGet (known -> Math .max (known , repositoryStateId ));
397
+ if (genToLoad > generation ) {
398
+ // It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
399
+ // debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or
400
+ // snapshot delete run anyway.
401
+ logger .debug ("Determined repository's generation from its contents to [" + generation + "] but " +
402
+ "current generation is at least [" + genToLoad + "]" );
403
+ }
404
+ if (genToLoad != repositoryStateId ) {
405
+ throw new RepositoryException (metadata .name (), "concurrent modification of the index-N file, expected current generation [" +
406
+ repositoryStateId + "], actual current generation [" + genToLoad + "]" );
407
+ }
408
+ return getRepositoryData (genToLoad );
409
+ }
410
+
386
411
/**
387
412
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
388
413
* and then has all now unreferenced blobs in it deleted.
@@ -610,14 +635,8 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen
610
635
if (isReadOnly ()) {
611
636
throw new RepositoryException (metadata .name (), "cannot run cleanup on readonly repository" );
612
637
}
613
- final RepositoryData repositoryData = getRepositoryData ();
614
- if (repositoryData .getGenId () != repositoryStateId ) {
615
- // Check that we are working on the expected repository version before gathering the data to clean up
616
- throw new RepositoryException (metadata .name (), "concurrent modification of the repository before cleanup started, " +
617
- "expected current generation [" + repositoryStateId + "], actual current generation ["
618
- + repositoryData .getGenId () + "]" );
619
- }
620
638
Map <String , BlobMetaData > rootBlobs = blobContainer ().listBlobs ();
639
+ final RepositoryData repositoryData = safeRepositoryData (repositoryStateId , rootBlobs );
621
640
final Map <String , BlobContainer > foundIndices = blobStore ().blobContainer (indicesPath ()).children ();
622
641
final Set <String > survivingIndexIds =
623
642
repositoryData .getIndices ().values ().stream ().map (IndexId ::getId ).collect (Collectors .toSet ());
@@ -903,12 +922,36 @@ public void endVerification(String seed) {
903
922
}
904
923
}
905
924
925
+ // Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
926
+ // and concurrent modifications.
927
+ // Protected for use in MockEventuallyConsistentRepository
928
+ protected final AtomicLong latestKnownRepoGen = new AtomicLong (RepositoryData .EMPTY_REPO_GEN );
929
+
906
930
@ Override
907
931
public RepositoryData getRepositoryData () {
908
- try {
909
- return getRepositoryData (latestIndexBlobId ());
910
- } catch (IOException ioe ) {
911
- throw new RepositoryException (metadata .name (), "Could not determine repository generation from root blobs" , ioe );
932
+ // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
933
+ while (true ) {
934
+ final long generation ;
935
+ try {
936
+ generation = latestIndexBlobId ();
937
+ } catch (IOException ioe ) {
938
+ throw new RepositoryException (metadata .name (), "Could not determine repository generation from root blobs" , ioe );
939
+ }
940
+ final long genToLoad = latestKnownRepoGen .updateAndGet (known -> Math .max (known , generation ));
941
+ if (genToLoad > generation ) {
942
+ logger .info ("Determined repository generation [" + generation
943
+ + "] from repository contents but correct generation must be at least [" + genToLoad + "]" );
944
+ }
945
+ try {
946
+ return getRepositoryData (genToLoad );
947
+ } catch (RepositoryException e ) {
948
+ if (genToLoad != latestKnownRepoGen .get ()) {
949
+ logger .warn ("Failed to load repository data generation [" + genToLoad +
950
+ "] because a concurrent operation moved the current generation to [" + latestKnownRepoGen .get () + "]" , e );
951
+ continue ;
952
+ }
953
+ throw e ;
954
+ }
912
955
}
913
956
}
914
957
@@ -926,6 +969,12 @@ private RepositoryData getRepositoryData(long indexGen) {
926
969
return RepositoryData .snapshotsFromXContent (parser , indexGen );
927
970
}
928
971
} catch (IOException ioe ) {
972
+ // If we fail to load the generation we tracked in latestKnownRepoGen we reset it.
973
+ // This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent
974
+ // operations must start from the EMPTY_REPO_GEN again
975
+ if (latestKnownRepoGen .compareAndSet (indexGen , RepositoryData .EMPTY_REPO_GEN )) {
976
+ logger .warn ("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]" , ioe );
977
+ }
929
978
throw new RepositoryException (metadata .name (), "could not read repository data from index blob" , ioe );
930
979
}
931
980
}
@@ -951,11 +1000,21 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long exp
951
1000
"] - possibly due to simultaneous snapshot deletion requests" );
952
1001
}
953
1002
final long newGen = currentGen + 1 ;
1003
+ if (latestKnownRepoGen .get () >= newGen ) {
1004
+ throw new IllegalArgumentException (
1005
+ "Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already" );
1006
+ }
954
1007
// write the index file
955
1008
final String indexBlob = INDEX_FILE_PREFIX + Long .toString (newGen );
956
1009
logger .debug ("Repository [{}] writing new index generational blob [{}]" , metadata .name (), indexBlob );
957
1010
writeAtomic (indexBlob ,
958
1011
BytesReference .bytes (repositoryData .snapshotsToXContent (XContentFactory .jsonBuilder (), writeShardGens )), true );
1012
+ final long latestKnownGen = latestKnownRepoGen .updateAndGet (known -> Math .max (known , newGen ));
1013
+ if (newGen < latestKnownGen ) {
1014
+ // Don't mess up the index.latest blob
1015
+ throw new IllegalStateException (
1016
+ "Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]" );
1017
+ }
959
1018
// write the current generation to the index-latest file
960
1019
final BytesReference genBytes ;
961
1020
try (BytesStreamOutput bStream = new BytesStreamOutput ()) {
0 commit comments