Skip to content

Commit 81b6d10

Browse files
Add system data streams to feature state snapshots (#75902)
Add system data streams to the "snapshot feature state" code block, so that if we're snapshotting a feature by name we grab that feature's system data streams too. Handle these data streams on the restore side as well. * Add system data streams to feature state snapshots * Don't pass system data streams through index name resolution * Don't add no-op features to snapshots * Hook in system data streams for snapshot restoration
1 parent 82919da commit 81b6d10

File tree

4 files changed

+185
-41
lines changed

4 files changed

+185
-41
lines changed

server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,20 @@
88

99
package org.elasticsearch.indices;
1010

11+
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
1112
import org.elasticsearch.cluster.metadata.ComponentTemplate;
1213
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
1314
import org.elasticsearch.cluster.metadata.DataStream;
15+
import org.elasticsearch.cluster.metadata.Metadata;
1416

17+
import java.util.ArrayList;
18+
import java.util.Collections;
1519
import java.util.List;
1620
import java.util.Map;
1721
import java.util.Objects;
1822

23+
import static org.elasticsearch.indices.AssociatedIndexDescriptor.buildAutomaton;
24+
1925
/**
2026
* Describes a {@link DataStream} that is reserved for use by a system component. The data stream will be managed by the system and also
2127
* protected by the system against user modification so that system features are not broken by inadvertent user operations.
@@ -29,6 +35,7 @@ public class SystemDataStreamDescriptor {
2935
private final Map<String, ComponentTemplate> componentTemplates;
3036
private final List<String> allowedElasticProductOrigins;
3137
private final ExecutorNames executorNames;
38+
private final CharacterRunAutomaton characterRunAutomaton;
3239

3340
/**
3441
* Creates a new descriptor for a system data descriptor
@@ -70,12 +77,31 @@ public SystemDataStreamDescriptor(String dataStreamName, String description, Typ
7077
this.executorNames = Objects.nonNull(executorNames)
7178
? executorNames
7279
: ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS;
80+
81+
this.characterRunAutomaton = new CharacterRunAutomaton(
82+
buildAutomaton(backingIndexPatternForDataStream(this.dataStreamName)));
7383
}
7484

7585
public String getDataStreamName() {
7686
return dataStreamName;
7787
}
7888

89+
/**
90+
* Retrieve backing indices for this system data stream
91+
* @param metadata Metadata in which to look for indices
92+
* @return List of names of backing indices
93+
*/
94+
public List<String> getBackingIndexNames(Metadata metadata) {
95+
ArrayList<String> matchingIndices = new ArrayList<>();
96+
metadata.indices().keysIt().forEachRemaining(indexName -> {
97+
if (this.characterRunAutomaton.run(indexName)) {
98+
matchingIndices.add(indexName);
99+
}
100+
});
101+
102+
return Collections.unmodifiableList(matchingIndices);
103+
}
104+
79105
public String getDescription() {
80106
return description;
81107
}
@@ -89,7 +115,11 @@ public boolean isExternal() {
89115
}
90116

91117
public String getBackingIndexPattern() {
92-
return DataStream.BACKING_INDEX_PREFIX + getDataStreamName() + "-*";
118+
return backingIndexPatternForDataStream(getDataStreamName());
119+
}
120+
121+
private static String backingIndexPatternForDataStream(String dataStream) {
122+
return DataStream.BACKING_INDEX_PREFIX + dataStream + "-*";
93123
}
94124

95125
public List<String> getAllowedElasticProductOrigins() {

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.elasticsearch.index.shard.IndexShard;
7070
import org.elasticsearch.index.shard.ShardId;
7171
import org.elasticsearch.indices.ShardLimitValidator;
72+
import org.elasticsearch.indices.SystemDataStreamDescriptor;
7273
import org.elasticsearch.indices.SystemIndices;
7374
import org.elasticsearch.repositories.IndexId;
7475
import org.elasticsearch.repositories.RepositoriesService;
@@ -323,13 +324,40 @@ private void startRestore(
323324
Collections.addAll(requestIndices, indicesInRequest);
324325
}
325326

327+
// Determine system indices to restore from requested feature states
328+
final Map<String, List<String>> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot);
329+
final Set<String> featureStateIndices = featureStatesToRestore.values()
330+
.stream()
331+
.flatMap(Collection::stream)
332+
.collect(Collectors.toSet());
333+
334+
final Map<String, SystemIndices.Feature> featureSet = systemIndices.getFeatures();
335+
final Set<String> featureStateDataStreams = featureStatesToRestore.keySet().stream().filter(featureName -> {
336+
if (featureSet.containsKey(featureName)) {
337+
return true;
338+
}
339+
logger.warn(
340+
() -> new ParameterizedMessage(
341+
"Restoring snapshot[{}] skipping feature [{}] because it is not available in this cluster",
342+
snapshotInfo.snapshotId(),
343+
featureName
344+
)
345+
);
346+
return false;
347+
})
348+
.map(name -> systemIndices.getFeatures().get(name))
349+
.flatMap(feature -> feature.getDataStreamDescriptors().stream())
350+
.map(SystemDataStreamDescriptor::getDataStreamName)
351+
.collect(Collectors.toSet());
352+
326353
// Get data stream metadata for requested data streams
327354
Tuple<Map<String, DataStream>, Map<String, DataStreamAlias>> result = getDataStreamsToRestore(
328355
repository,
329356
snapshotId,
330357
snapshotInfo,
331358
globalMetadata,
332-
requestIndices,
359+
// include system data stream names in argument to this method
360+
Stream.concat(requestIndices.stream(), featureStateDataStreams.stream()).collect(Collectors.toList()),
333361
request.includeAliases()
334362
);
335363
Map<String, DataStream> dataStreamsToRestore = result.v1();
@@ -346,13 +374,6 @@ private void startRestore(
346374
.collect(Collectors.toSet());
347375
requestIndices.addAll(dataStreamIndices);
348376

349-
// Determine system indices to restore from requested feature states
350-
final Map<String, List<String>> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot);
351-
final Set<String> featureStateIndices = featureStatesToRestore.values()
352-
.stream()
353-
.flatMap(Collection::stream)
354-
.collect(Collectors.toSet());
355-
356377
// Resolve the indices that were directly requested
357378
final List<String> requestedIndicesInSnapshot = filterIndices(
358379
snapshotInfo.indices(),

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
import org.elasticsearch.core.Tuple;
7171
import org.elasticsearch.index.Index;
7272
import org.elasticsearch.index.shard.ShardId;
73-
import org.elasticsearch.indices.AssociatedIndexDescriptor;
73+
import org.elasticsearch.indices.SystemDataStreamDescriptor;
7474
import org.elasticsearch.indices.SystemIndices;
7575
import org.elasticsearch.repositories.FinalizeSnapshotContext;
7676
import org.elasticsearch.repositories.IndexId;
@@ -308,36 +308,43 @@ public ClusterState execute(ClusterState currentState) {
308308
// Store newSnapshot here to be processed in clusterStateProcessed
309309
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
310310

311-
final List<SnapshotFeatureInfo> featureStates;
311+
final Set<SnapshotFeatureInfo> featureStates = new HashSet<>();
312+
final Set<String> systemDataStreamNames = new HashSet<>();
312313
// if we have any feature states in the snapshot, we add their required indices to the snapshot indices if they haven't
313314
// been requested by the request directly
314-
if (featureStatesSet.isEmpty()) {
315-
featureStates = Collections.emptyList();
316-
} else {
317-
final Set<String> indexNames = new HashSet<>(indices);
318-
featureStates = featureStatesSet.stream()
319-
.map(
320-
feature -> new SnapshotFeatureInfo(
321-
feature,
322-
systemIndexDescriptorMap.get(feature)
323-
.getIndexDescriptors()
324-
.stream()
325-
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
326-
.collect(Collectors.toList())
327-
)
328-
)
329-
.filter(featureInfo -> featureInfo.getIndices().isEmpty() == false) // Omit any empty featureStates
330-
.collect(Collectors.toList());
331-
for (SnapshotFeatureInfo featureState : featureStates) {
332-
indexNames.addAll(featureState.getIndices());
333-
}
315+
final Set<String> indexNames = new HashSet<>(indices);
316+
for (String featureName : featureStatesSet) {
317+
SystemIndices.Feature feature = systemIndexDescriptorMap.get(featureName);
334318

335-
// Add all resolved indices from the feature states to the list of indices
336-
for (String feature : featureStatesSet) {
337-
for (AssociatedIndexDescriptor aid : systemIndexDescriptorMap.get(feature).getAssociatedIndexDescriptors()) {
338-
indexNames.addAll(aid.getMatchingIndices(currentState.metadata()));
319+
Set<String> featureSystemIndices = feature.getIndexDescriptors()
320+
.stream()
321+
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
322+
.collect(Collectors.toSet());
323+
Set<String> featureAssociatedIndices = feature.getAssociatedIndexDescriptors()
324+
.stream()
325+
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
326+
.collect(Collectors.toSet());
327+
328+
Set<String> featureSystemDataStreams = new HashSet<>();
329+
Set<String> featureDataStreamBackingIndices = new HashSet<>();
330+
for (SystemDataStreamDescriptor sdd : feature.getDataStreamDescriptors()) {
331+
List<String> backingIndexNames = sdd.getBackingIndexNames(currentState.metadata());
332+
if (backingIndexNames.size() > 0) {
333+
featureDataStreamBackingIndices.addAll(backingIndexNames);
334+
featureSystemDataStreams.add(sdd.getDataStreamName());
339335
}
340336
}
337+
338+
if (featureSystemIndices.size() > 0
339+
|| featureAssociatedIndices.size() > 0
340+
|| featureDataStreamBackingIndices.size() > 0) {
341+
342+
featureStates.add(new SnapshotFeatureInfo(featureName, List.copyOf(featureSystemIndices)));
343+
indexNames.addAll(featureSystemIndices);
344+
indexNames.addAll(featureAssociatedIndices);
345+
indexNames.addAll(featureDataStreamBackingIndices);
346+
systemDataStreamNames.addAll(featureSystemDataStreams);
347+
}
341348
indices = List.copyOf(indexNames);
342349
}
343350

@@ -346,6 +353,7 @@ public ClusterState execute(ClusterState currentState) {
346353
request.indicesOptions(),
347354
request.indices()
348355
);
356+
dataStreams.addAll(systemDataStreamNames);
349357

350358
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
351359

@@ -388,7 +396,7 @@ public ClusterState execute(ClusterState currentState) {
388396
shards,
389397
userMeta,
390398
version,
391-
featureStates
399+
List.copyOf(featureStates)
392400
);
393401
return ClusterState.builder(currentState)
394402
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(CollectionUtils.appendToCopy(runningSnapshots, newEntry)))

x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.datastreams;
99

1010
import org.elasticsearch.action.DocWriteRequest;
11-
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
1211
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
1312
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
1413
import org.elasticsearch.action.index.IndexResponse;
@@ -20,6 +19,7 @@
2019
import org.elasticsearch.plugins.Plugin;
2120
import org.elasticsearch.plugins.SystemIndexPlugin;
2221
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
22+
import org.elasticsearch.snapshots.SnapshotInfo;
2323
import org.elasticsearch.snapshots.mockstore.MockRepository;
2424
import org.elasticsearch.threadpool.ThreadPool;
2525
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
@@ -35,8 +35,11 @@
3535
import java.util.Map;
3636

3737
import static org.elasticsearch.datastreams.SystemDataStreamSnapshotIT.SystemDataStreamTestPlugin.SYSTEM_DATA_STREAM_NAME;
38+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3839
import static org.hamcrest.Matchers.arrayWithSize;
40+
import static org.hamcrest.Matchers.empty;
3941
import static org.hamcrest.Matchers.hasSize;
42+
import static org.hamcrest.Matchers.not;
4043
import static org.hamcrest.Matchers.oneOf;
4144

4245
public class SystemDataStreamSnapshotIT extends AbstractSnapshotIntegTestCase {
@@ -82,12 +85,91 @@ public void testSystemDataStreamSnapshotIT() throws Exception {
8285
assertTrue(response.getDataStreams().get(0).getDataStream().isSystem());
8386
}
8487

85-
CreateSnapshotResponse createSnapshotResponse = client().admin()
88+
assertSuccessful(
89+
client().admin()
90+
.cluster()
91+
.prepareCreateSnapshot(REPO, SNAPSHOT)
92+
.setWaitForCompletion(true)
93+
.setIncludeGlobalState(false)
94+
.execute()
95+
);
96+
97+
// We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet
98+
// See https://github.com/elastic/elasticsearch/issues/75818
99+
{
100+
DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
101+
AcknowledgedResponse response = client().execute(DeleteDataStreamAction.INSTANCE, request).get();
102+
assertTrue(response.isAcknowledged());
103+
}
104+
105+
{
106+
GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get();
107+
assertThat(indicesRemaining.indices(), arrayWithSize(0));
108+
}
109+
110+
RestoreSnapshotResponse restoreSnapshotResponse = client().admin()
86111
.cluster()
87-
.prepareCreateSnapshot(REPO, SNAPSHOT)
112+
.prepareRestoreSnapshot(REPO, SNAPSHOT)
88113
.setWaitForCompletion(true)
89-
.setIncludeGlobalState(false)
114+
.setRestoreGlobalState(false)
90115
.get();
116+
assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards());
117+
118+
{
119+
GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
120+
GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get();
121+
assertThat(response.getDataStreams(), hasSize(1));
122+
assertTrue(response.getDataStreams().get(0).getDataStream().isSystem());
123+
}
124+
}
125+
126+
public void testSystemDataStreamInFeatureState() throws Exception {
127+
Path location = randomRepoPath();
128+
createRepository(REPO, "fs", location);
129+
130+
{
131+
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME);
132+
final AcknowledgedResponse response = client().execute(CreateDataStreamAction.INSTANCE, request).get();
133+
assertTrue(response.isAcknowledged());
134+
}
135+
136+
// Index a doc so that a concrete backing index will be created
137+
IndexResponse indexToDataStreamResponse = client().prepareIndex(SYSTEM_DATA_STREAM_NAME)
138+
.setId("42")
139+
.setSource("{ \"@timestamp\": \"2099-03-08T11:06:07.000Z\", \"name\": \"my-name\" }", XContentType.JSON)
140+
.setOpType(DocWriteRequest.OpType.CREATE)
141+
.execute()
142+
.actionGet();
143+
assertThat(indexToDataStreamResponse.status().getStatus(), oneOf(200, 201));
144+
145+
// Index a doc so that a concrete backing index will be created
146+
IndexResponse indexResponse = client().prepareIndex("my-index")
147+
.setId("42")
148+
.setSource("{ \"name\": \"my-name\" }", XContentType.JSON)
149+
.setOpType(DocWriteRequest.OpType.CREATE)
150+
.execute()
151+
.get();
152+
assertThat(indexResponse.status().getStatus(), oneOf(200, 201));
153+
154+
{
155+
GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
156+
GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get();
157+
assertThat(response.getDataStreams(), hasSize(1));
158+
assertTrue(response.getDataStreams().get(0).getDataStream().isSystem());
159+
}
160+
161+
SnapshotInfo snapshotInfo = assertSuccessful(
162+
client().admin()
163+
.cluster()
164+
.prepareCreateSnapshot(REPO, SNAPSHOT)
165+
.setIndices("my-index")
166+
.setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName())
167+
.setWaitForCompletion(true)
168+
.setIncludeGlobalState(false)
169+
.execute()
170+
);
171+
172+
assertThat(snapshotInfo.dataStreams(), not(empty()));
91173

92174
// We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet
93175
// See https://github.com/elastic/elasticsearch/issues/75818
@@ -97,6 +179,8 @@ public void testSystemDataStreamSnapshotIT() throws Exception {
97179
assertTrue(response.isAcknowledged());
98180
}
99181

182+
assertAcked(client().admin().indices().prepareDelete("my-index"));
183+
100184
{
101185
GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get();
102186
assertThat(indicesRemaining.indices(), arrayWithSize(0));
@@ -106,7 +190,8 @@ public void testSystemDataStreamSnapshotIT() throws Exception {
106190
.cluster()
107191
.prepareRestoreSnapshot(REPO, SNAPSHOT)
108192
.setWaitForCompletion(true)
109-
.setRestoreGlobalState(false)
193+
.setIndices("my-index")
194+
.setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName())
110195
.get();
111196
assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards());
112197

0 commit comments

Comments
 (0)