Skip to content

Commit 42f513c

Browse files
authored
Create first backing index when creating data stream (#54467)
1 parent db572af commit 42f513c

File tree

6 files changed

+134
-26
lines changed

6 files changed

+134
-26
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml

+11-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
---
22
"Create data stream":
33
- skip:
4-
version: " - 7.6.99"
5-
reason: available only in 7.7+
4+
version: " - 7.99.99"
5+
reason: "enable in 7.8+ after back-porting https://github.com/elastic/elasticsearch/pull/54467"
66

77
- do:
88
indices.create_data_stream:
@@ -22,10 +22,17 @@
2222
indices.get_data_streams: {}
2323
- match: { 0.name: simple-data-stream1 }
2424
- match: { 0.timestamp_field: '@timestamp' }
25-
- match: { 0.indices: [] }
25+
- length: { 0.indices: 1 }
26+
- match: { 0.indices.0.index_name: 'simple-data-stream1-000001' }
2627
- match: { 1.name: simple-data-stream2 }
2728
- match: { 1.timestamp_field: '@timestamp2' }
28-
- match: { 1.indices: [] }
29+
- length: { 1.indices: 1 }
30+
- match: { 1.indices.0.index_name: 'simple-data-stream2-000001' }
31+
32+
- do:
33+
indices.delete_data_stream:
34+
name: simple-data-stream1
35+
- is_true: acknowledged
2936

3037
- do:
3138
indices.delete_data_stream:

server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java

+22-5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.action.ActionRequestValidationException;
2525
import org.elasticsearch.action.ActionType;
2626
import org.elasticsearch.action.ValidateActions;
27+
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
2728
import org.elasticsearch.action.support.ActionFilters;
2829
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2930
import org.elasticsearch.action.support.master.MasterNodeRequest;
@@ -33,6 +34,7 @@
3334
import org.elasticsearch.cluster.block.ClusterBlockException;
3435
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3536
import org.elasticsearch.cluster.metadata.DataStream;
37+
import org.elasticsearch.cluster.metadata.IndexMetadata;
3638
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3739
import org.elasticsearch.cluster.metadata.Metadata;
3840
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
@@ -42,13 +44,14 @@
4244
import org.elasticsearch.common.inject.Inject;
4345
import org.elasticsearch.common.io.stream.StreamInput;
4446
import org.elasticsearch.common.io.stream.StreamOutput;
47+
import org.elasticsearch.common.settings.Settings;
4548
import org.elasticsearch.common.unit.TimeValue;
4649
import org.elasticsearch.tasks.Task;
4750
import org.elasticsearch.threadpool.ThreadPool;
4851
import org.elasticsearch.transport.TransportService;
4952

5053
import java.io.IOException;
51-
import java.util.Collections;
54+
import java.util.List;
5255
import java.util.Objects;
5356

5457
public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
@@ -117,10 +120,14 @@ public int hashCode() {
117120

118121
public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
119122

123+
private final MetadataCreateIndexService metadataCreateIndexService;
124+
120125
@Inject
121126
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
122-
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
127+
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
128+
MetadataCreateIndexService metaDataCreateIndexService) {
123129
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
130+
this.metadataCreateIndexService = metaDataCreateIndexService;
124131
}
125132

126133
@Override
@@ -151,7 +158,7 @@ public void onFailure(String source, Exception e) {
151158

152159
@Override
153160
public ClusterState execute(ClusterState currentState) throws Exception {
154-
return createDataStream(currentState, request);
161+
return createDataStream(metadataCreateIndexService, currentState, request);
155162
}
156163

157164
@Override
@@ -161,16 +168,26 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
161168
});
162169
}
163170

164-
static ClusterState createDataStream(ClusterState currentState, Request request) {
171+
static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
172+
ClusterState currentState,
173+
Request request) throws Exception {
165174
if (currentState.metadata().dataStreams().containsKey(request.name)) {
166175
throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
167176
}
168177

169178
MetadataCreateIndexService.validateIndexOrAliasName(request.name,
170179
(s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));
171180

181+
String firstBackingIndexName = request.name + "-000001";
182+
CreateIndexClusterStateUpdateRequest createIndexRequest =
183+
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
184+
.settings(Settings.builder().put("index.hidden", true).build());
185+
currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
186+
IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName);
187+
assert firstBackingIndex != null;
188+
172189
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(
173-
new DataStream(request.name, request.timestampFieldName, Collections.emptyList()));
190+
new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex())));
174191
logger.info("adding data stream [{}]", request.name);
175192
return ClusterState.builder(currentState).metadata(builder).build();
176193
}

server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public interface IndexAbstraction {
5656

5757
/**
5858
* A write index is a dedicated concrete index, that accepts all the new documents that belong to an index abstraction.
59-
*
59+
* <p>
6060
* A write index may also be a regular concrete index of a index abstraction and may therefore also be returned
6161
* by {@link #getIndices()}. An index abstraction may also not have a dedicated write index.
6262
*
@@ -87,7 +87,14 @@ enum Type {
8787
* An alias typically refers to many concrete indices and
8888
* may have a write index.
8989
*/
90-
ALIAS("alias");
90+
ALIAS("alias"),
91+
92+
/**
93+
* An index abstraction that refers to a data stream.
94+
* A data stream typically has multiple backing indices, the latest of which
95+
* is the target for index requests.
96+
*/
97+
DATA_STREAM("data_stream");
9198

9299
private final String displayName;
93100

@@ -181,7 +188,7 @@ public boolean isHidden() {
181188

182189
/**
183190
* Returns the unique alias metadata per concrete index.
184-
*
191+
* <p>
185192
* (note that although alias can point to the same concrete indices, each alias reference may have its own routing
186193
* and filters)
187194
*/
@@ -233,7 +240,7 @@ public void computeAndValidateAliasProperties() {
233240

234241
// Validate hidden status
235242
final Map<Boolean, List<IndexMetadata>> groupedByHiddenStatus = referenceIndexMetadatas.stream()
236-
.collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden())));
243+
.collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden())));
237244
if (isNonEmpty(groupedByHiddenStatus.get(true)) && isNonEmpty(groupedByHiddenStatus.get(false))) {
238245
List<String> hiddenOn = groupedByHiddenStatus.get(true).stream()
239246
.map(idx -> idx.getIndex().getName()).collect(Collectors.toList());

server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

+21-6
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import java.util.TreeMap;
7878
import java.util.function.Function;
7979
import java.util.function.Predicate;
80+
import java.util.stream.Collectors;
8081
import java.util.stream.StreamSupport;
8182

8283
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
@@ -1367,6 +1368,7 @@ private SortedMap<String, IndexAbstraction> buildIndicesLookup() {
13671368
});
13681369
}
13691370
}
1371+
13701372
aliasAndIndexLookup.values().stream()
13711373
.filter(aliasOrIndex -> aliasOrIndex.getType() == IndexAbstraction.Type.ALIAS)
13721374
.forEach(alias -> ((IndexAbstraction.Alias) alias).computeAndValidateAliasProperties());
@@ -1377,15 +1379,28 @@ private void validateDataStreams(SortedMap<String, IndexAbstraction> indicesLook
13771379
DataStreamMetadata dsMetadata = (DataStreamMetadata) customs.get(DataStreamMetadata.TYPE);
13781380
if (dsMetadata != null) {
13791381
for (DataStream ds : dsMetadata.dataStreams().values()) {
1380-
if (indicesLookup.containsKey(ds.getName())) {
1382+
IndexAbstraction existing = indicesLookup.get(ds.getName());
1383+
if (existing != null && existing.getType() != IndexAbstraction.Type.DATA_STREAM) {
13811384
throw new IllegalStateException("data stream [" + ds.getName() + "] conflicts with existing index or alias");
13821385
}
13831386

1384-
SortedMap<?, ?> map = indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
1385-
if (map.size() != 0) {
1386-
throw new IllegalStateException("data stream [" + ds.getName() +
1387-
"] could create backing indices that conflict with " + map.size() + " existing index(s) or alias(s)" +
1388-
" including '" + map.firstKey() + "'");
1387+
SortedMap<String, IndexAbstraction> potentialConflicts =
1388+
indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
1389+
if (potentialConflicts.size() != 0) {
1390+
List<String> indexNames = ds.getIndices().stream().map(Index::getName).collect(Collectors.toList());
1391+
List<String> conflicts = new ArrayList<>();
1392+
for (Map.Entry<String, IndexAbstraction> entry : potentialConflicts.entrySet()) {
1393+
if (entry.getValue().getType() != IndexAbstraction.Type.CONCRETE_INDEX ||
1394+
indexNames.contains(entry.getKey()) == false) {
1395+
conflicts.add(entry.getKey());
1396+
}
1397+
}
1398+
1399+
if (conflicts.size() > 0) {
1400+
throw new IllegalStateException("data stream [" + ds.getName() +
1401+
"] could create backing indices that conflict with " + conflicts.size() + " existing index(s) or alias(s)" +
1402+
" including '" + conflicts.get(0) + "'");
1403+
}
13891404
}
13901405
}
13911406
}

server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java

+43-6
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,30 @@
1818
*/
1919
package org.elasticsearch.action.admin.indices.datastream;
2020

21+
import org.elasticsearch.Version;
2122
import org.elasticsearch.action.ActionRequestValidationException;
23+
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
2224
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
2325
import org.elasticsearch.cluster.ClusterName;
2426
import org.elasticsearch.cluster.ClusterState;
2527
import org.elasticsearch.cluster.metadata.DataStream;
28+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2629
import org.elasticsearch.cluster.metadata.Metadata;
30+
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
2731
import org.elasticsearch.common.io.stream.Writeable;
32+
import org.elasticsearch.common.settings.Settings;
2833
import org.elasticsearch.test.AbstractWireSerializingTestCase;
2934

3035
import java.util.Collections;
3136
import java.util.Map;
3237

3338
import static org.hamcrest.Matchers.containsString;
3439
import static org.hamcrest.Matchers.equalTo;
40+
import static org.hamcrest.Matchers.notNullValue;
41+
import static org.mockito.Matchers.any;
42+
import static org.mockito.Matchers.anyBoolean;
43+
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.when;
3545

3646
public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
3747

@@ -62,33 +72,60 @@ public void testValidateRequestWithoutTimestampField() {
6272
assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing"));
6373
}
6474

65-
public void testCreateDataStream() {
75+
public void testCreateDataStream() throws Exception {
76+
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
6677
final String dataStreamName = "my-data-stream";
6778
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
6879
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
69-
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(cs, req);
80+
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req);
7081
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
7182
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
83+
assertThat(newState.metadata().index(dataStreamName + "-000001"), notNullValue());
84+
assertThat(newState.metadata().index(dataStreamName + "-000001").getSettings().get("index.hidden"), equalTo("true"));
7285
}
7386

74-
public void testCreateDuplicateDataStream() {
87+
public void testCreateDuplicateDataStream() throws Exception {
88+
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
7589
final String dataStreamName = "my-data-stream";
7690
DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList());
7791
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
7892
.metadata(Metadata.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build();
7993
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
8094

8195
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
82-
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
96+
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
8397
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists"));
8498
}
8599

86-
public void testCreateDataStreamWithInvalidName() {
100+
public void testCreateDataStreamWithInvalidName() throws Exception {
101+
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
87102
final String dataStreamName = "_My-da#ta- ,stream-";
88103
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
89104
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
90105
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
91-
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
106+
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
92107
assertThat(e.getMessage(), containsString("must not contain the following characters"));
93108
}
109+
110+
private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
111+
MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
112+
when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))
113+
.thenAnswer(mockInvocation -> {
114+
ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0];
115+
CreateIndexClusterStateUpdateRequest request = (CreateIndexClusterStateUpdateRequest) mockInvocation.getArguments()[1];
116+
117+
Metadata.Builder b = Metadata.builder(currentState.metadata())
118+
.put(IndexMetadata.builder(request.index())
119+
.settings(Settings.builder()
120+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
121+
.put(request.settings())
122+
.build())
123+
.numberOfShards(1)
124+
.numberOfReplicas(1)
125+
.build(), false);
126+
return ClusterState.builder(currentState).metadata(b.build()).build();
127+
});
128+
129+
return s;
130+
}
94131
}

server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java

+26-1
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@
4343
import org.elasticsearch.test.ESTestCase;
4444

4545
import java.io.IOException;
46+
import java.util.ArrayList;
4647
import java.util.Arrays;
4748
import java.util.Collections;
4849
import java.util.HashMap;
4950
import java.util.HashSet;
5051
import java.util.List;
52+
import java.util.Locale;
5153
import java.util.Map;
5254
import java.util.Set;
5355

@@ -939,7 +941,7 @@ public void testBuilderRejectsDataStreamThatConflictsWithAlias() {
939941

940942
public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
941943
final String dataStreamName = "my-data-stream";
942-
final String conflictingIndex = dataStreamName + "-00001";
944+
final String conflictingIndex = dataStreamName + "-000001";
943945
Metadata.Builder b = Metadata.builder()
944946
.put(IndexMetadata.builder(conflictingIndex)
945947
.settings(settings(Version.CURRENT))
@@ -953,6 +955,29 @@ public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
953955
"] could create backing indices that conflict with 1 existing index(s) or alias(s) including '" + conflictingIndex + "'"));
954956
}
955957

958+
public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() {
959+
final String dataStreamName = "my-data-stream";
960+
final List<Index> backingIndices = new ArrayList<>();
961+
final int numBackingIndices = randomIntBetween(2, 5);
962+
int lastBackingIndexNum = randomIntBetween(9, 50);
963+
Metadata.Builder b = Metadata.builder();
964+
for (int k = 1; k <= numBackingIndices; k++) {
965+
IndexMetadata im = IndexMetadata.builder(String.format(Locale.ROOT, "%s-%06d", dataStreamName, lastBackingIndexNum))
966+
.settings(settings(Version.CURRENT))
967+
.numberOfShards(1)
968+
.numberOfReplicas(1)
969+
.build();
970+
b.put(im, false);
971+
backingIndices.add(im.getIndex());
972+
lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
973+
}
974+
975+
b.put(new DataStream(dataStreamName, "ts", backingIndices));
976+
Metadata metadata = b.build();
977+
assertThat(metadata.dataStreams().size(), equalTo(1));
978+
assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
979+
}
980+
956981
public void testSerialization() throws IOException {
957982
final Metadata orig = randomMetadata();
958983
final BytesStreamOutput out = new BytesStreamOutput();

0 commit comments

Comments
 (0)