105
105
import java .util .Set ;
106
106
import java .util .concurrent .Executor ;
107
107
import java .util .concurrent .atomic .AtomicBoolean ;
108
+ import java .util .concurrent .atomic .AtomicLong ;
108
109
import java .util .stream .Collectors ;
109
110
import java .util .stream .Stream ;
110
111
@@ -400,7 +401,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action
400
401
} else {
401
402
try {
402
403
final Map <String , BlobMetaData > rootBlobs = blobContainer ().listBlobs ();
403
- final RepositoryData repositoryData = getRepositoryData ( latestGeneration ( rootBlobs . keySet ()) );
404
+ final RepositoryData repositoryData = safeRepositoryData ( repositoryStateId , rootBlobs );
404
405
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
405
406
// delete an index that was created by another master node after writing this index-N blob.
406
407
final Map <String , BlobContainer > foundIndices = blobStore ().blobContainer (indicesPath ()).children ();
@@ -411,6 +412,30 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action
411
412
}
412
413
}
413
414
415
+ /**
416
+ * Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation.
417
+ *
418
+ * @param repositoryStateId Expected repository generation
419
+ * @param rootBlobs Blobs at the repository root
420
+ * @return RepositoryData
421
+ */
422
+ private RepositoryData safeRepositoryData (long repositoryStateId , Map <String , BlobMetaData > rootBlobs ) {
423
+ final long generation = latestGeneration (rootBlobs .keySet ());
424
+ final long genToLoad = latestKnownRepoGen .updateAndGet (known -> Math .max (known , repositoryStateId ));
425
+ if (genToLoad > generation ) {
426
+ // It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
427
+ // debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or
428
+ // snapshot delete run anyway.
429
+ logger .debug ("Determined repository's generation from its contents to [" + generation + "] but " +
430
+ "current generation is at least [" + genToLoad + "]" );
431
+ }
432
+ if (genToLoad != repositoryStateId ) {
433
+ throw new RepositoryException (metadata .name (), "concurrent modification of the index-N file, expected current generation [" +
434
+ repositoryStateId + "], actual current generation [" + genToLoad + "]" );
435
+ }
436
+ return getRepositoryData (genToLoad );
437
+ }
438
+
414
439
/**
415
440
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
416
441
* and then has all now unreferenced blobs in it deleted.
@@ -514,14 +539,8 @@ public void cleanup(long repositoryStateId, ActionListener<RepositoryCleanupResu
514
539
if (isReadOnly ()) {
515
540
throw new RepositoryException (metadata .name (), "cannot run cleanup on readonly repository" );
516
541
}
517
- final RepositoryData repositoryData = getRepositoryData ();
518
- if (repositoryData .getGenId () != repositoryStateId ) {
519
- // Check that we are working on the expected repository version before gathering the data to clean up
520
- throw new RepositoryException (metadata .name (), "concurrent modification of the repository before cleanup started, " +
521
- "expected current generation [" + repositoryStateId + "], actual current generation ["
522
- + repositoryData .getGenId () + "]" );
523
- }
524
542
Map <String , BlobMetaData > rootBlobs = blobContainer ().listBlobs ();
543
+ final RepositoryData repositoryData = safeRepositoryData (repositoryStateId , rootBlobs );
525
544
final Map <String , BlobContainer > foundIndices = blobStore ().blobContainer (indicesPath ()).children ();
526
545
final Set <String > survivingIndexIds =
527
546
repositoryData .getIndices ().values ().stream ().map (IndexId ::getId ).collect (Collectors .toSet ());
@@ -845,12 +864,36 @@ public void endVerification(String seed) {
845
864
}
846
865
}
847
866
867
+ // Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
868
+ // and concurrent modifications.
869
+ // Protected for use in MockEventuallyConsistentRepository
870
+ protected final AtomicLong latestKnownRepoGen = new AtomicLong (RepositoryData .EMPTY_REPO_GEN );
871
+
848
872
@ Override
849
873
public RepositoryData getRepositoryData () {
850
- try {
851
- return getRepositoryData (latestIndexBlobId ());
852
- } catch (IOException ioe ) {
853
- throw new RepositoryException (metadata .name (), "Could not determine repository generation from root blobs" , ioe );
874
+ // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
875
+ while (true ) {
876
+ final long generation ;
877
+ try {
878
+ generation = latestIndexBlobId ();
879
+ } catch (IOException ioe ) {
880
+ throw new RepositoryException (metadata .name (), "Could not determine repository generation from root blobs" , ioe );
881
+ }
882
+ final long genToLoad = latestKnownRepoGen .updateAndGet (known -> Math .max (known , generation ));
883
+ if (genToLoad > generation ) {
884
+ logger .info ("Determined repository generation [" + generation
885
+ + "] from repository contents but correct generation must be at least [" + genToLoad + "]" );
886
+ }
887
+ try {
888
+ return getRepositoryData (genToLoad );
889
+ } catch (RepositoryException e ) {
890
+ if (genToLoad != latestKnownRepoGen .get ()) {
891
+ logger .warn ("Failed to load repository data generation [" + genToLoad +
892
+ "] because a concurrent operation moved the current generation to [" + latestKnownRepoGen .get () + "]" , e );
893
+ continue ;
894
+ }
895
+ throw e ;
896
+ }
854
897
}
855
898
}
856
899
@@ -868,6 +911,12 @@ private RepositoryData getRepositoryData(long indexGen) {
868
911
return RepositoryData .snapshotsFromXContent (parser , indexGen );
869
912
}
870
913
} catch (IOException ioe ) {
914
+ // If we fail to load the generation we tracked in latestKnownRepoGen we reset it.
915
+ // This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent
916
+ // operations must start from the EMPTY_REPO_GEN again
917
+ if (latestKnownRepoGen .compareAndSet (indexGen , RepositoryData .EMPTY_REPO_GEN )) {
918
+ logger .warn ("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]" , ioe );
919
+ }
871
920
throw new RepositoryException (metadata .name (), "could not read repository data from index blob" , ioe );
872
921
}
873
922
}
@@ -892,10 +941,21 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long exp
892
941
"] - possibly due to simultaneous snapshot deletion requests" );
893
942
}
894
943
final long newGen = currentGen + 1 ;
944
+ if (latestKnownRepoGen .get () >= newGen ) {
945
+ throw new IllegalArgumentException (
946
+ "Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already" );
947
+ }
895
948
// write the index file
896
949
final String indexBlob = INDEX_FILE_PREFIX + Long .toString (newGen );
897
950
logger .debug ("Repository [{}] writing new index generational blob [{}]" , metadata .name (), indexBlob );
898
- writeAtomic (indexBlob , BytesReference .bytes (repositoryData .snapshotsToXContent (XContentFactory .jsonBuilder ())), true );
951
+ writeAtomic (indexBlob ,
952
+ BytesReference .bytes (repositoryData .snapshotsToXContent (XContentFactory .jsonBuilder ())), true );
953
+ final long latestKnownGen = latestKnownRepoGen .updateAndGet (known -> Math .max (known , newGen ));
954
+ if (newGen < latestKnownGen ) {
955
+ // Don't mess up the index.latest blob
956
+ throw new IllegalStateException (
957
+ "Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]" );
958
+ }
899
959
// write the current generation to the index-latest file
900
960
final BytesReference genBytes ;
901
961
try (BytesStreamOutput bStream = new BytesStreamOutput ()) {
0 commit comments