Skip to content

Commit 6bef703

Browse files
Consistent RepositoryData Load on Blobstores
This moves the blob store repository to only use the information available in the clusterstate for loading `RepositoryData` without falling back to listing to determine a repositories' generation. Relates elastic#49729 Closes elastic#38941
1 parent b4fa677 commit 6bef703

File tree

4 files changed

+313
-40
lines changed

4 files changed

+313
-40
lines changed

server/src/main/java/org/elasticsearch/repositories/RepositoryData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ public final class RepositoryData {
5858
*/
5959
public static final long UNKNOWN_REPO_GEN = -2L;
6060

61+
/**
62+
* Generation value indicating that the repository generation could not be determined.
63+
*/
64+
public static final long CORRUPTED_REPO_GEN = -3L;
65+
6166
/**
6267
* An instance initialized for an empty repository.
6368
*/

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 192 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.lucene.store.IndexOutput;
3232
import org.apache.lucene.store.RateLimiter;
3333
import org.apache.lucene.util.SetOnce;
34+
import org.elasticsearch.ExceptionsHelper;
3435
import org.elasticsearch.action.ActionListener;
3536
import org.elasticsearch.action.ActionRunnable;
3637
import org.elasticsearch.action.StepListener;
@@ -216,6 +217,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
216217

217218
private final ClusterService clusterService;
218219

220+
/**
221+
* While running in a cluster with nodes older than {@link RepositoryMetaData#REPO_GEN_IN_CS_VERSION} in it, finding the latest
222+
* repository generation is not done via the cluster state. This flag is set by {@link #updateState} in case this node finds
223+
* itself in such a mixed-version cluster containing nodes older than {@link RepositoryMetaData#REPO_GEN_IN_CS_VERSION} to ensure
224+
* appropriate backwards compatible logic is used.
225+
* TODO: Reword to include read-only repos
226+
*/
227+
private volatile boolean bestEffortConsistency;
228+
219229
/**
220230
* Constructs new BlobStoreRepository
221231
* @param metadata The metadata for this repository including name and settings
@@ -279,29 +289,38 @@ protected void doClose() {
279289
// #latestKnownRepoGen if a newer than currently known generation is found
280290
@Override
281291
public void updateState(ClusterState state) {
292+
bestEffortConsistency = readOnly || state.nodes().getMinNodeVersion().before(RepositoryMetaData.REPO_GEN_IN_CS_VERSION);
282293
if (readOnly) {
283294
// No need to waste cycles, no operations can run against a read-only repository
284295
return;
285296
}
286-
long bestGenerationFromCS = RepositoryData.EMPTY_REPO_GEN;
287-
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
288-
if (snapshotsInProgress != null) {
289-
bestGenerationFromCS = bestGeneration(snapshotsInProgress.entries());
290-
}
291-
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
292-
// Don't use generation from the delete task if we already found a generation for an in progress snapshot.
293-
// In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet exist
294-
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && deletionsInProgress != null) {
295-
bestGenerationFromCS = bestGeneration(deletionsInProgress.getEntries());
296-
}
297-
final RepositoryCleanupInProgress cleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
298-
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && cleanupInProgress != null) {
299-
bestGenerationFromCS = bestGeneration(cleanupInProgress.entries());
300-
}
301-
302297
metadata = getRepoMetaData(state);
303-
final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation());
304-
latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen));
298+
if (bestEffortConsistency) {
299+
long bestGenerationFromCS = RepositoryData.EMPTY_REPO_GEN;
300+
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
301+
if (snapshotsInProgress != null) {
302+
bestGenerationFromCS = bestGeneration(snapshotsInProgress.entries());
303+
}
304+
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
305+
// Don't use generation from the delete task if we already found a generation for an in progress snapshot.
306+
// In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet exist
307+
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && deletionsInProgress != null) {
308+
bestGenerationFromCS = bestGeneration(deletionsInProgress.getEntries());
309+
}
310+
final RepositoryCleanupInProgress cleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
311+
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && cleanupInProgress != null) {
312+
bestGenerationFromCS = bestGeneration(cleanupInProgress.entries());
313+
}
314+
final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation());
315+
latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen));
316+
} else {
317+
final long previousBest = latestKnownRepoGen.getAndSet(metadata.generation());
318+
if (previousBest != metadata.generation()) {
319+
assert metadata.generation() == RepositoryData.CORRUPTED_REPO_GEN || previousBest < metadata.generation() :
320+
"Illegal move from repository generation [" + previousBest + "] to generation [" + metadata.generation() + "]";
321+
logger.debug("Updated repository generation from [{}] to [{}]", previousBest, metadata.generation());
322+
}
323+
}
305324
}
306325

307326
private long bestGeneration(Collection<? extends RepositoryOperation> operations) {
@@ -446,7 +465,12 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
446465
*/
447466
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
448467
final long generation = latestGeneration(rootBlobs.keySet());
449-
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
468+
final long genToLoad;
469+
if (bestEffortConsistency) {
470+
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
471+
} else {
472+
genToLoad = latestKnownRepoGen.get();
473+
}
450474
if (genToLoad > generation) {
451475
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
452476
// debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or
@@ -982,34 +1006,95 @@ public void endVerification(String seed) {
9821006

9831007
// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
9841008
// and concurrent modifications.
985-
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);
1009+
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);
9861010

9871011
@Override
9881012
public void getRepositoryData(ActionListener<RepositoryData> listener) {
989-
ActionListener.completeWith(listener, () -> {
1013+
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
1014+
listener.onFailure(corruptedStateException(null));
1015+
return;
1016+
}
1017+
if (bestEffortConsistency == false) {
1018+
if (latestKnownRepoGen.get() == RepositoryData.UNKNOWN_REPO_GEN) {
1019+
initializeRepoInClusterState(ActionListener.wrap(v -> getRepositoryData(listener), listener::onFailure));
1020+
return;
1021+
}
1022+
}
1023+
threadPool.generic().execute(ActionRunnable.wrap(listener, l -> {
9901024
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
9911025
while (true) {
992-
final long generation;
993-
try {
994-
generation = latestIndexBlobId();
995-
} catch (IOException ioe) {
996-
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
997-
}
998-
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
999-
if (genToLoad > generation) {
1000-
logger.info("Determined repository generation [" + generation
1001-
+ "] from repository contents but correct generation must be at least [" + genToLoad + "]");
1026+
final long genToLoad;
1027+
if (bestEffortConsistency) {
1028+
final long generation;
1029+
try {
1030+
generation = latestIndexBlobId();
1031+
} catch (IOException ioe) {
1032+
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
1033+
}
1034+
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
1035+
if (genToLoad > generation) {
1036+
logger.info("Determined repository generation [" + generation
1037+
+ "] from repository contents but correct generation must be at least [" + genToLoad + "]");
1038+
}
1039+
} else {
1040+
genToLoad = latestKnownRepoGen.get();
10021041
}
10031042
try {
1004-
return getRepositoryData(genToLoad);
1043+
l.onResponse(getRepositoryData(genToLoad));
1044+
return;
10051045
} catch (RepositoryException e) {
10061046
if (genToLoad != latestKnownRepoGen.get()) {
10071047
logger.warn("Failed to load repository data generation [" + genToLoad +
10081048
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
10091049
continue;
10101050
}
1011-
throw e;
1051+
if (bestEffortConsistency == false && ExceptionsHelper.unwrap(e, NoSuchFileException.class) != null) {
1052+
markRepoCorrupted(genToLoad, e,
1053+
ActionListener.wrap(v -> l.onFailure(corruptedStateException(e)), l::onFailure));
1054+
return;
1055+
} else {
1056+
throw e;
1057+
}
1058+
}
1059+
}
1060+
}));
1061+
}
1062+
1063+
private RepositoryException corruptedStateException(@Nullable Exception cause) {
1064+
return new RepositoryException(metadata.name(),
1065+
"Could not read repository data because the contents of the repository do not match its " +
1066+
"expected state. This is likely the result of either concurrently modifying the contents of the " +
1067+
"repository by a process other than this cluster or an issue with the repository's underlying" +
1068+
"storage. The repository has been disabled to prevent corrupting its contents. To re-enable it " +
1069+
"and continue using it please remove the repository from the cluster and add it again to make " +
1070+
"the cluster recover the known state of the repository from its physical contents.", cause);
1071+
}
1072+
1073+
private void markRepoCorrupted(long corruptedGeneration, Exception originalException, ActionListener<Void> listener) {
1074+
clusterService.submitStateUpdateTask("update_repo_gen", new ClusterStateUpdateTask() {
1075+
@Override
1076+
public ClusterState execute(ClusterState currentState) {
1077+
final RepositoriesMetaData state = currentState.metaData().custom(RepositoriesMetaData.TYPE);
1078+
final RepositoryMetaData repoState = state.repository(metadata.name());
1079+
final long prevGeneration = repoState.generation();
1080+
if (prevGeneration != corruptedGeneration) {
1081+
throw new IllegalStateException("Tried to mark repo generation [" + corruptedGeneration
1082+
+ "] as corrupted but its state concurrently changed to [" + repoState + "]");
10121083
}
1084+
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.metaData()).putCustom(
1085+
RepositoriesMetaData.TYPE, state.withUpdatedGeneration(
1086+
metadata.name(), RepositoryData.CORRUPTED_REPO_GEN, repoState.pendingGeneration())).build()).build();
1087+
}
1088+
1089+
@Override
1090+
public void onFailure(String source, Exception e) {
1091+
listener.onFailure(new RepositoryException(metadata.name(), "Failed marking repository state as corrupted",
1092+
ExceptionsHelper.useOrSuppress(e, originalException)));
1093+
}
1094+
1095+
@Override
1096+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
1097+
listener.onResponse(null);
10131098
}
10141099
});
10151100
}
@@ -1028,16 +1113,85 @@ private RepositoryData getRepositoryData(long indexGen) {
10281113
return RepositoryData.snapshotsFromXContent(parser, indexGen);
10291114
}
10301115
} catch (IOException ioe) {
1031-
// If we fail to load the generation we tracked in latestKnownRepoGen we reset it.
1032-
// This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent
1033-
// operations must start from the EMPTY_REPO_GEN again
1034-
if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) {
1035-
logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe);
1116+
if (bestEffortConsistency) {
1117+
// If we fail to load the generation we tracked in latestKnownRepoGen we reset it.
1118+
// This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent
1119+
// operations must start from the EMPTY_REPO_GEN again
1120+
if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) {
1121+
logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe);
1122+
}
10361123
}
10371124
throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe);
10381125
}
10391126
}
10401127

1128+
1129+
// Lock for {@link #initListeners}
1130+
private final Object repoGenInitMutex = new Object();
1131+
1132+
// Listeners to resolve once the repository generation has been successfully initialized in the cluster state
1133+
private List<ActionListener<Void>> initListeners = null;
1134+
1135+
/**
1136+
* Initializes this repositories' generation {@code N} in the cluster state by listing all {@code index-N} under the repository root
1137+
* and then updating {@link RepositoriesMetaData} accordingly.
1138+
*
1139+
* @param listener listener to invoke once repo generation has been initialized in the cluster state
1140+
*/
1141+
private void initializeRepoInClusterState(ActionListener<Void> listener) {
1142+
final boolean initInProgress;
1143+
synchronized (repoGenInitMutex) {
1144+
initInProgress = initListeners != null;
1145+
if (initInProgress == false) {
1146+
initListeners = new ArrayList<>();
1147+
}
1148+
initListeners.add(listener);
1149+
}
1150+
if (initInProgress == true) {
1151+
return;
1152+
}
1153+
final String repoName = metadata.name();
1154+
threadPool.generic().execute(ActionRunnable.supply(ActionListener.wrap(gen ->
1155+
clusterService.submitStateUpdateTask("initialize repository [" + metadata.name() + "] to generation [" + gen + "]",
1156+
new ClusterStateUpdateTask() {
1157+
@Override
1158+
public ClusterState execute(ClusterState currentState) {
1159+
final MetaData oldMetaData = currentState.metaData();
1160+
final RepositoriesMetaData repositoriesMetaData = oldMetaData.custom(RepositoriesMetaData.TYPE);
1161+
return ClusterState.builder(currentState).metaData(MetaData.builder(oldMetaData).putCustom(
1162+
RepositoriesMetaData.TYPE, repositoriesMetaData.withUpdatedGeneration(metadata.name(), gen, gen)
1163+
).build()).build();
1164+
}
1165+
1166+
@Override
1167+
public void onFailure(String source, Exception e) {
1168+
logger.warn(new ParameterizedMessage("Failed to initialize repository [{}] in cluster state [{}]",
1169+
repoName, source), e);
1170+
failInitListeners(new RepositoryException(metadata.name(),
1171+
"Failed to initialize repository with cluster update [" + source + "]", e));
1172+
}
1173+
1174+
@Override
1175+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
1176+
ActionListener.onResponse(getAndClearInitListeners(), null);
1177+
}
1178+
}), e -> failInitListeners(
1179+
new RepositoryException(repoName, "Failed to determine repository generation", e))), this::latestIndexBlobId));
1180+
}
1181+
1182+
private List<ActionListener<Void>> getAndClearInitListeners() {
1183+
final List<ActionListener<Void>> listeners;
1184+
synchronized (repoGenInitMutex) {
1185+
listeners = initListeners;
1186+
initListeners = null;
1187+
}
1188+
return listeners;
1189+
}
1190+
1191+
private void failInitListeners(final Exception e) {
1192+
ActionListener.onFailure(getAndClearInitListeners(), e);
1193+
}
1194+
10411195
private static String testBlobPrefix(String seed) {
10421196
return TESTS_FILE + seed;
10431197
}

server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.cluster.routing.RecoverySource;
2828
import org.elasticsearch.cluster.routing.ShardRouting;
2929
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
30+
import org.elasticsearch.cluster.service.ClusterService;
3031
import org.elasticsearch.common.UUIDs;
3132
import org.elasticsearch.common.settings.Settings;
3233
import org.elasticsearch.core.internal.io.IOUtils;
@@ -193,13 +194,14 @@ public void testSnapshotWithConflictingName() throws IOException {
193194
private Repository createRepository() {
194195
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
195196
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
196-
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
197-
BlobStoreTestUtil.mockClusterService(repositoryMetaData)) {
197+
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData);
198+
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService) {
198199
@Override
199200
protected void assertSnapshotOrGenericThread() {
200201
// eliminate thread name check as we create repo manually
201202
}
202203
};
204+
clusterService.addStateApplier(event -> repository.updateState(event.state()));
203205
repository.start();
204206
return repository;
205207
}

0 commit comments

Comments
 (0)