-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Track Repository Gen. in BlobStoreRepository #48944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
7d13ca4
7cf6118
79c9569
46ade8d
c540d39
6acb369
f527f4c
8d8ca3c
bf7ba43
cccaa91
2dbb334
9ec4f52
e23d6a4
8b31693
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -113,6 +113,7 @@ | |
import java.util.concurrent.Executor; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
|
@@ -366,7 +367,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea | |
} else { | ||
try { | ||
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs(); | ||
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet())); | ||
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs); | ||
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never | ||
// delete an index that was created by another master node after writing this index-N blob. | ||
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children(); | ||
|
@@ -377,6 +378,30 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea | |
} | ||
} | ||
|
||
/** | ||
* Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation. | ||
* | ||
* @param repositoryStateId Expected repository generation | ||
* @param rootBlobs Blobs at the repository root | ||
* @return RepositoryData | ||
*/ | ||
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) { | ||
final long generation = latestGeneration(rootBlobs.keySet()); | ||
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId)); | ||
if (genToLoad > generation) { | ||
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just | ||
// debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or | ||
// snapshot delete run anyway. | ||
logger.debug("Determined repository's generation from its contents to [" + generation + "] but " + | ||
"current generation is at least [" + genToLoad + "]"); | ||
} | ||
if (genToLoad != repositoryStateId) { | ||
ywelsch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" + | ||
repositoryStateId + "], actual current generation [" + genToLoad + "]"); | ||
} | ||
return getRepositoryData(genToLoad); | ||
} | ||
|
||
/** | ||
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation | ||
* and then has all now unreferenced blobs in it deleted. | ||
|
@@ -604,14 +629,8 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen | |
if (isReadOnly()) { | ||
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository"); | ||
} | ||
final RepositoryData repositoryData = getRepositoryData(); | ||
if (repositoryData.getGenId() != repositoryStateId) { | ||
// Check that we are working on the expected repository version before gathering the data to clean up | ||
throw new RepositoryException(metadata.name(), "concurrent modification of the repository before cleanup started, " + | ||
"expected current generation [" + repositoryStateId + "], actual current generation [" | ||
+ repositoryData.getGenId() + "]"); | ||
} | ||
Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs(); | ||
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs); | ||
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children(); | ||
final Set<String> survivingIndexIds = | ||
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); | ||
|
@@ -897,12 +916,36 @@ public void endVerification(String seed) { | |
} | ||
} | ||
|
||
// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs | ||
// and concurrent modifications. | ||
// Protected for use in MockEventuallyConsistentRepository | ||
protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN); | ||
|
||
@Override | ||
public RepositoryData getRepositoryData() { | ||
try { | ||
return getRepositoryData(latestIndexBlobId()); | ||
} catch (IOException ioe) { | ||
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe); | ||
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. | ||
while (true) { | ||
final long generation; | ||
try { | ||
generation = latestIndexBlobId(); | ||
} catch (IOException ioe) { | ||
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe); | ||
} | ||
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation)); | ||
if (genToLoad > generation) { | ||
logger.debug("Determined repository generation [" + generation | ||
+ "] from repository contents but correct generation must be at least [" + genToLoad + "]"); | ||
} | ||
try { | ||
return getRepositoryData(genToLoad); | ||
} catch (RepositoryException e) { | ||
if (genToLoad != latestKnownRepoGen.get()) { | ||
logger.warn("Failed to load repository data generation [" + genToLoad + | ||
ywelsch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e); | ||
continue; | ||
} | ||
throw e; | ||
} | ||
} | ||
} | ||
|
||
|
@@ -920,6 +963,12 @@ private RepositoryData getRepositoryData(long indexGen) { | |
return RepositoryData.snapshotsFromXContent(parser, indexGen); | ||
} | ||
} catch (IOException ioe) { | ||
// If we fail to load the generation we tracked in latestKnownRepoGen we reset it. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering if resetting is the right thing to do here. If the content of the repo has been deleted (or bucket/folder moved, or permissions changed etc) maybe we should keep the last generation seen around, and let the user sort the issue and re-register the repository? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We talked about that yesterday and I figured that we decided not to do that (yet). I'm of the same opinion but it's quite the change in behavior if we want to just do this as a short-term fix. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rah, I've already forgot about this discussion, sorry. But I'm good with the plan. |
||
// This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent | ||
// operations must start from the EMPTY_REPO_GEN again | ||
if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) { | ||
logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe); | ||
} | ||
throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe); | ||
} | ||
} | ||
|
@@ -945,11 +994,21 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long exp | |
"] - possibly due to simultaneous snapshot deletion requests"); | ||
} | ||
final long newGen = currentGen + 1; | ||
if (latestKnownRepoGen.get() >= newGen) { | ||
throw new IllegalArgumentException( | ||
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already"); | ||
} | ||
// write the index file | ||
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); | ||
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); | ||
writeAtomic(indexBlob, | ||
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true); | ||
final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen)); | ||
if (newGen < latestKnownGen) { | ||
// Don't mess up the index.latest blob | ||
throw new IllegalStateException( | ||
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]"); | ||
} | ||
// write the current generation to the index-latest file | ||
final BytesReference genBytes; | ||
try (BytesStreamOutput bStream = new BytesStreamOutput()) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,7 @@ | |
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Random; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.Function; | ||
|
@@ -64,6 +65,8 @@ | |
*/ | ||
public class MockEventuallyConsistentRepository extends BlobStoreRepository { | ||
|
||
private final Random random; | ||
|
||
private final Context context; | ||
|
||
private final NamedXContentRegistry namedXContentRegistry; | ||
|
@@ -72,10 +75,12 @@ public MockEventuallyConsistentRepository( | |
final RepositoryMetaData metadata, | ||
final NamedXContentRegistry namedXContentRegistry, | ||
final ThreadPool threadPool, | ||
final Context context) { | ||
final Context context, | ||
final Random random) { | ||
super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath()); | ||
this.context = context; | ||
this.namedXContentRegistry = namedXContentRegistry; | ||
this.random = random; | ||
} | ||
|
||
// Filters out all actions that are super-seeded by subsequent actions | ||
|
@@ -107,6 +112,9 @@ protected BlobStore createBlobStore() { | |
*/ | ||
public static final class Context { | ||
|
||
// Eventual consistency is only simulated as long as this flag is false | ||
private boolean consistent; | ||
|
||
private final List<BlobStoreAction> actions = new ArrayList<>(); | ||
|
||
/** | ||
|
@@ -117,6 +125,7 @@ public void forceConsistent() { | |
final List<BlobStoreAction> consistentActions = consistentView(actions); | ||
actions.clear(); | ||
actions.addAll(consistentActions); | ||
consistent = true; | ||
} | ||
} | ||
} | ||
|
@@ -240,14 +249,14 @@ public Map<String, BlobMetaData> listBlobs() { | |
ensureNotClosed(); | ||
final String thisPath = path.buildAsString(); | ||
synchronized (context.actions) { | ||
return consistentView(context.actions).stream() | ||
return maybeMissLatestIndexN(consistentView(context.actions).stream() | ||
.filter( | ||
action -> action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') == -1 | ||
&& action.operation == Operation.PUT) | ||
.collect( | ||
Collectors.toMap( | ||
action -> action.path.substring(thisPath.length()), | ||
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length))); | ||
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length)))); | ||
} | ||
} | ||
|
||
|
@@ -268,9 +277,21 @@ public Map<String, BlobContainer> children() { | |
|
||
@Override | ||
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) { | ||
return Maps.ofEntries( | ||
listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)).collect(Collectors.toList()) | ||
); | ||
return maybeMissLatestIndexN( | ||
Maps.ofEntries(listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)) | ||
.collect(Collectors.toList()))); | ||
} | ||
|
||
// Randomly filter out the latest /index-N blob from a listing to test that tracking of it in latestKnownRepoGen | ||
// overrides an inconsistent listing | ||
private Map<String, BlobMetaData> maybeMissLatestIndexN(Map<String, BlobMetaData> listing) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am aware that this does not fully cover all possible inconsistent listing scenarios, but only the scenario of missing a known (in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks sufficient to me |
||
// Only filter out latest index-N at the repo root and only as long as we're not in a forced consistent state | ||
if (path.parent() == null && context.consistent == false && random.nextBoolean()) { | ||
final Map<String, BlobMetaData> filtered = new HashMap<>(listing); | ||
filtered.remove(BlobStoreRepository.INDEX_FILE_PREFIX + latestKnownRepoGen.get()); | ||
return Map.copyOf(filtered); | ||
} | ||
return listing; | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be a little controversial:
By tracking the latest gen in the field, we can now identify out of sync listings that we would have previous missed and that would just have failed in a subsequent step where the repo gen is compared. WIth this change, if we miss to list the latest
index-N
, we can still complete a delte or cleanup just fine (assuming the value inlatestKnownRepoGen
is correct).I think it's better user experience to not do a perfect cleanup in this edge case but proceed with the delete/cleanup as if nothing happened. On an eventually consistent repo, the fact that we list out the correct index-N does not guarantee that we didn't miss any other root blobs in the listing anyway.
Also, apart from maybe missing some stale blobs, the delete will work out perfectly fine otherwise.