Skip to content

Commit e891d5e

Browse files
Simplify Feature State Snapshot Code (#69433) (#69523)
We can do a lot of the checks on the request and its version compatibility off the cluster state update thread. Also, the logic for working out the indices to add to the snapshot due to requested features was quit complicated and did a lot of redundant checks and loops so I simplified it where possible.
1 parent 044085d commit e891d5e

File tree

1 file changed

+60
-70
lines changed

1 file changed

+60
-70
lines changed

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 60 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,44 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
385385
return;
386386
}
387387
final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
388+
389+
// We should only use the feature states logic if we're sure we'll be able to finish the snapshot without a lower-version
390+
// node taking over and causing problems. Therefore, if we're in a mixed cluster with versions that don't know how to handle
391+
// feature states, skip all feature states logic, and if `feature_states` is explicitly configured, throw an exception.
392+
final List<String> requestedStates = Arrays.asList(request.featureStates());
393+
final Version initialMinNodeVersion = clusterService.state().nodes().getMinNodeVersion();
394+
final Set<String> featureStatesSet;
395+
if (initialMinNodeVersion.onOrAfter(FEATURE_STATES_VERSION)) {
396+
if (request.includeGlobalState() || requestedStates.isEmpty() == false) {
397+
if (request.includeGlobalState() && requestedStates.isEmpty()) {
398+
// If we're including global state and feature states aren't specified, include all of them
399+
featureStatesSet = systemIndexDescriptorMap.keySet();
400+
} else if (requestedStates.size() == 1 && NO_FEATURE_STATES_VALUE.equalsIgnoreCase(requestedStates.get(0))) {
401+
// If there's exactly one value and it's "none", include no states
402+
featureStatesSet = Collections.emptySet();
403+
} else {
404+
// Otherwise, check for "none" then use the list of requested states
405+
if (requestedStates.contains(NO_FEATURE_STATES_VALUE)) {
406+
listener.onFailure(new IllegalArgumentException("the feature_states value [" +
407+
SnapshotsService.NO_FEATURE_STATES_VALUE + "] indicates that no feature states should be snapshotted, " +
408+
"but other feature states were requested: " + requestedStates));
409+
return;
410+
}
411+
featureStatesSet = new HashSet<>(requestedStates);
412+
featureStatesSet.retainAll(systemIndexDescriptorMap.keySet());
413+
}
414+
} else {
415+
featureStatesSet = Collections.emptySet();
416+
}
417+
} else if (requestedStates.isEmpty() == false) {
418+
listener.onFailure(new SnapshotException(snapshot, "feature_states can only be used when all nodes in cluster are version ["
419+
+ FEATURE_STATES_VERSION + "] or higher, but at least one node in this cluster is on version ["
420+
+ initialMinNodeVersion + "]"));
421+
return;
422+
} else {
423+
featureStatesSet = Collections.emptySet();
424+
}
425+
388426
final Map<String, Object> userMeta = repository.adaptUserMetadata(request.userMetadata());
389427
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {
390428

@@ -421,57 +459,32 @@ public ClusterState execute(ClusterState currentState) {
421459
// Store newSnapshot here to be processed in clusterStateProcessed
422460
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
423461

424-
List<SnapshotFeatureInfo> featureStates = Collections.emptyList();
425-
final List<String> requestedStates = Arrays.asList(request.featureStates());
426-
427-
// We should only use the feature states logic if we're sure we'll be able to finish the snapshot without a lower-version
428-
// node taking over and causing problems. Therefore, if we're in a mixed cluster with versions that don't know how to handle
429-
// feature states, skip all feature states logic, and if `feature_states` is explicitly configured, throw an exception.
430-
if (currentState.nodes().getMinNodeVersion().onOrAfter(FEATURE_STATES_VERSION)) {
431-
if (request.includeGlobalState() || requestedStates.isEmpty() == false) {
432-
final Set<String> featureStatesSet;
433-
if (request.includeGlobalState() && requestedStates.isEmpty()) {
434-
// If we're including global state and feature states aren't specified, include all of them
435-
featureStatesSet = new HashSet<>(systemIndexDescriptorMap.keySet());
436-
} else if (requestedStates.size() == 1 && NO_FEATURE_STATES_VALUE.equalsIgnoreCase(requestedStates.get(0))) {
437-
// If there's exactly one value and it's "none", include no states
438-
featureStatesSet = Collections.emptySet();
439-
} else {
440-
// Otherwise, check for "none" then use the list of requested states
441-
if (requestedStates.contains(NO_FEATURE_STATES_VALUE)) {
442-
throw new IllegalArgumentException("the feature_states value [" + SnapshotsService.NO_FEATURE_STATES_VALUE +
443-
"] indicates that no feature states should be snapshotted, but other feature states were requested: " +
444-
requestedStates);
445-
}
446-
featureStatesSet = new HashSet<>(requestedStates);
447-
}
448-
449-
featureStates = systemIndexDescriptorMap.keySet().stream()
450-
.filter(feature -> featureStatesSet.contains(feature))
451-
.map(feature -> new SnapshotFeatureInfo(feature, resolveFeatureIndexNames(currentState, feature)))
462+
final List<SnapshotFeatureInfo> featureStates;
463+
// if we have any feature states in the snapshot, we add their required indices to the snapshot indices if they haven't
464+
// been requested by the request directly
465+
if (featureStatesSet.isEmpty()) {
466+
featureStates = Collections.emptyList();
467+
} else {
468+
final Set<String> indexNames = new HashSet<>(indices);
469+
featureStates = featureStatesSet.stream()
470+
.map(feature -> new SnapshotFeatureInfo(feature,
471+
systemIndexDescriptorMap.get(feature).getIndexDescriptors().stream()
472+
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
473+
.collect(Collectors.toList())))
452474
.filter(featureInfo -> featureInfo.getIndices().isEmpty() == false) // Omit any empty featureStates
453475
.collect(Collectors.toList());
454-
final Stream<String> featureStateIndices = featureStates.stream().flatMap(feature -> feature.getIndices().stream());
455-
456-
final Stream<String> associatedIndices = systemIndexDescriptorMap.keySet().stream()
457-
.filter(feature -> featureStatesSet.contains(feature))
458-
.flatMap(feature -> resolveAssociatedIndices(currentState, feature).stream());
476+
for (SnapshotFeatureInfo featureState : featureStates) {
477+
indexNames.addAll(featureState.getIndices());
478+
}
459479

460-
// Add all resolved indices from the feature states to the list of indices
461-
indices = Stream.of(indices.stream(), featureStateIndices, associatedIndices)
462-
.flatMap(s -> s)
463-
.distinct()
464-
.collect(Collectors.toList());
480+
// Add all resolved indices from the feature states to the list of indices
481+
for (String feature : featureStatesSet) {
482+
for (String pattern : systemIndexDescriptorMap.get(feature).getAssociatedIndexPatterns()) {
483+
Collections.addAll(indexNames, indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(
484+
currentState, LENIENT_EXPAND_OPEN_CLOSED_HIDDEN, pattern));
485+
}
465486
}
466-
} else if (requestedStates.isEmpty() == false) {
467-
throw new SnapshotException(
468-
new Snapshot(repositoryName, snapshotId),
469-
"feature_states can only be used when all nodes in cluster are version ["
470-
+ FEATURE_STATES_VERSION
471-
+ "] or higher, but at least one node in this cluster is on version ["
472-
+ currentState.nodes().getMinNodeVersion()
473-
+ "]"
474-
);
487+
indices = Collections.unmodifiableList(new ArrayList<>(indexNames));
475488
}
476489

477490
final List<String> dataStreams =
@@ -524,29 +537,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
524537
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
525538
}
526539

527-
private List<String> resolveFeatureIndexNames(ClusterState currentState, String featureName) {
528-
if (systemIndexDescriptorMap.containsKey(featureName) == false) {
529-
throw new IllegalArgumentException("requested snapshot of feature state for unknown feature [" + featureName + "]");
530-
}
531-
532-
final SystemIndices.Feature feature = systemIndexDescriptorMap.get(featureName);
533-
return feature.getIndexDescriptors().stream()
534-
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
535-
.collect(Collectors.toList());
536-
}
537-
538-
private List<String> resolveAssociatedIndices(ClusterState currentState, String featureName) {
539-
if (systemIndexDescriptorMap.containsKey(featureName) == false) {
540-
throw new IllegalArgumentException("requested associated indices for feature state for unknown feature [" + featureName + "]");
541-
}
542-
543-
final SystemIndices.Feature feature = systemIndexDescriptorMap.get(featureName);
544-
return feature.getAssociatedIndexPatterns().stream()
545-
.flatMap(pattern -> Arrays.stream(indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(currentState,
546-
LENIENT_EXPAND_OPEN_CLOSED_HIDDEN, pattern)))
547-
.collect(Collectors.toList());
548-
}
549-
550540
private static void ensureSnapshotNameNotRunning(List<SnapshotsInProgress.Entry> runningSnapshots, String repositoryName,
551541
String snapshotName) {
552542
if (runningSnapshots.stream().anyMatch(s -> {

0 commit comments

Comments
 (0)