Skip to content

Commit 39c4ec6

Browse files
authored
[7.x] Create first backing index when creating data stream
1 parent 6e73f67 commit 39c4ec6

File tree

6 files changed

+134
-26
lines changed

6 files changed

+134
-26
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
---
22
"Create data stream":
33
- skip:
4-
version: " - 7.6.99"
5-
reason: available only in 7.7+
4+
version: " - 7.99.99"
5+
reason: "enable in 7.8+ after back-porting https://github.com/elastic/elasticsearch/pull/54467"
66

77
- do:
88
indices.create_data_stream:
@@ -22,10 +22,17 @@
2222
indices.get_data_streams: {}
2323
- match: { 0.name: simple-data-stream1 }
2424
- match: { 0.timestamp_field: '@timestamp' }
25-
- match: { 0.indices: [] }
25+
- length: { 0.indices: 1 }
26+
- match: { 0.indices.0.index_name: 'simple-data-stream1-000001' }
2627
- match: { 1.name: simple-data-stream2 }
2728
- match: { 1.timestamp_field: '@timestamp2' }
28-
- match: { 1.indices: [] }
29+
- length: { 1.indices: 1 }
30+
- match: { 1.indices.0.index_name: 'simple-data-stream2-000001' }
31+
32+
- do:
33+
indices.delete_data_stream:
34+
name: simple-data-stream1
35+
- is_true: acknowledged
2936

3037
- do:
3138
indices.delete_data_stream:

server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.action.ActionRequestValidationException;
2525
import org.elasticsearch.action.ActionType;
2626
import org.elasticsearch.action.ValidateActions;
27+
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
2728
import org.elasticsearch.action.support.ActionFilters;
2829
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2930
import org.elasticsearch.action.support.master.MasterNodeRequest;
@@ -33,21 +34,23 @@
3334
import org.elasticsearch.cluster.block.ClusterBlockException;
3435
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3536
import org.elasticsearch.cluster.metadata.DataStream;
37+
import org.elasticsearch.cluster.metadata.IndexMetadata;
3638
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3739
import org.elasticsearch.cluster.metadata.Metadata;
3840
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
3941
import org.elasticsearch.cluster.service.ClusterService;
4042
import org.elasticsearch.common.Priority;
4143
import org.elasticsearch.common.Strings;
44+
import org.elasticsearch.common.collect.List;
4245
import org.elasticsearch.common.inject.Inject;
4346
import org.elasticsearch.common.io.stream.StreamInput;
4447
import org.elasticsearch.common.io.stream.StreamOutput;
48+
import org.elasticsearch.common.settings.Settings;
4549
import org.elasticsearch.common.unit.TimeValue;
4650
import org.elasticsearch.threadpool.ThreadPool;
4751
import org.elasticsearch.transport.TransportService;
4852

4953
import java.io.IOException;
50-
import java.util.Collections;
5154
import java.util.Objects;
5255

5356
public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
@@ -116,10 +119,14 @@ public int hashCode() {
116119

117120
public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
118121

122+
private final MetadataCreateIndexService metadataCreateIndexService;
123+
119124
@Inject
120125
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
121-
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
126+
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
127+
MetadataCreateIndexService metaDataCreateIndexService) {
122128
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
129+
this.metadataCreateIndexService = metaDataCreateIndexService;
123130
}
124131

125132
@Override
@@ -150,7 +157,7 @@ public void onFailure(String source, Exception e) {
150157

151158
@Override
152159
public ClusterState execute(ClusterState currentState) throws Exception {
153-
return createDataStream(currentState, request);
160+
return createDataStream(metadataCreateIndexService, currentState, request);
154161
}
155162

156163
@Override
@@ -160,16 +167,26 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
160167
});
161168
}
162169

163-
static ClusterState createDataStream(ClusterState currentState, Request request) {
170+
static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
171+
ClusterState currentState,
172+
Request request) throws Exception {
164173
if (currentState.metadata().dataStreams().containsKey(request.name)) {
165174
throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
166175
}
167176

168177
MetadataCreateIndexService.validateIndexOrAliasName(request.name,
169178
(s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));
170179

180+
String firstBackingIndexName = request.name + "-000001";
181+
CreateIndexClusterStateUpdateRequest createIndexRequest =
182+
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
183+
.settings(Settings.builder().put("index.hidden", true).build());
184+
currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
185+
IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName);
186+
assert firstBackingIndex != null;
187+
171188
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(
172-
new DataStream(request.name, request.timestampFieldName, Collections.emptyList()));
189+
new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex())));
173190
logger.info("adding data stream [{}]", request.name);
174191
return ClusterState.builder(currentState).metadata(builder).build();
175192
}

server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public interface IndexAbstraction {
5757

5858
/**
5959
* A write index is a dedicated concrete index, that accepts all the new documents that belong to an index abstraction.
60-
*
60+
* <p>
6161
* A write index may also be a regular concrete index of a index abstraction and may therefore also be returned
6262
* by {@link #getIndices()}. An index abstraction may also not have a dedicated write index.
6363
*
@@ -88,7 +88,14 @@ enum Type {
8888
* An alias typically refers to many concrete indices and
8989
* may have a write index.
9090
*/
91-
ALIAS("alias");
91+
ALIAS("alias"),
92+
93+
/**
94+
* An index abstraction that refers to a data stream.
95+
* A data stream typically has multiple backing indices, the latest of which
96+
* is the target for index requests.
97+
*/
98+
DATA_STREAM("data_stream");
9299

93100
private final String displayName;
94101

@@ -182,7 +189,7 @@ public boolean isHidden() {
182189

183190
/**
184191
* Returns the unique alias metadata per concrete index.
185-
*
192+
* <p>
186193
* (note that although alias can point to the same concrete indices, each alias reference may have its own routing
187194
* and filters)
188195
*/
@@ -234,7 +241,7 @@ public void computeAndValidateAliasProperties() {
234241

235242
// Validate hidden status
236243
final Map<Boolean, List<IndexMetadata>> groupedByHiddenStatus = referenceIndexMetadatas.stream()
237-
.collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden())));
244+
.collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden())));
238245
if (isNonEmpty(groupedByHiddenStatus.get(true)) && isNonEmpty(groupedByHiddenStatus.get(false))) {
239246
List<String> hiddenOn = groupedByHiddenStatus.get(true).stream()
240247
.map(idx -> idx.getIndex().getName()).collect(Collectors.toList());

server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import java.util.TreeMap;
8080
import java.util.function.Function;
8181
import java.util.function.Predicate;
82+
import java.util.stream.Collectors;
8283
import java.util.stream.StreamSupport;
8384

8485
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
@@ -1443,6 +1444,7 @@ private SortedMap<String, IndexAbstraction> buildIndicesLookup() {
14431444
});
14441445
}
14451446
}
1447+
14461448
aliasAndIndexLookup.values().stream()
14471449
.filter(aliasOrIndex -> aliasOrIndex.getType() == IndexAbstraction.Type.ALIAS)
14481450
.forEach(alias -> ((IndexAbstraction.Alias) alias).computeAndValidateAliasProperties());
@@ -1453,15 +1455,28 @@ private void validateDataStreams(SortedMap<String, IndexAbstraction> indicesLook
14531455
DataStreamMetadata dsMetadata = (DataStreamMetadata) customs.get(DataStreamMetadata.TYPE);
14541456
if (dsMetadata != null) {
14551457
for (DataStream ds : dsMetadata.dataStreams().values()) {
1456-
if (indicesLookup.containsKey(ds.getName())) {
1458+
IndexAbstraction existing = indicesLookup.get(ds.getName());
1459+
if (existing != null && existing.getType() != IndexAbstraction.Type.DATA_STREAM) {
14571460
throw new IllegalStateException("data stream [" + ds.getName() + "] conflicts with existing index or alias");
14581461
}
14591462

1460-
SortedMap<?, ?> map = indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
1461-
if (map.size() != 0) {
1462-
throw new IllegalStateException("data stream [" + ds.getName() +
1463-
"] could create backing indices that conflict with " + map.size() + " existing index(s) or alias(s)" +
1464-
" including '" + map.firstKey() + "'");
1463+
SortedMap<String, IndexAbstraction> potentialConflicts =
1464+
indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
1465+
if (potentialConflicts.size() != 0) {
1466+
List<String> indexNames = ds.getIndices().stream().map(Index::getName).collect(Collectors.toList());
1467+
List<String> conflicts = new ArrayList<>();
1468+
for (Map.Entry<String, IndexAbstraction> entry : potentialConflicts.entrySet()) {
1469+
if (entry.getValue().getType() != IndexAbstraction.Type.CONCRETE_INDEX ||
1470+
indexNames.contains(entry.getKey()) == false) {
1471+
conflicts.add(entry.getKey());
1472+
}
1473+
}
1474+
1475+
if (conflicts.size() > 0) {
1476+
throw new IllegalStateException("data stream [" + ds.getName() +
1477+
"] could create backing indices that conflict with " + conflicts.size() + " existing index(s) or alias(s)" +
1478+
" including '" + conflicts.get(0) + "'");
1479+
}
14651480
}
14661481
}
14671482
}

server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,29 @@
1818
*/
1919
package org.elasticsearch.action.admin.indices.datastream;
2020

21+
import org.elasticsearch.Version;
2122
import org.elasticsearch.action.ActionRequestValidationException;
23+
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
2224
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
2325
import org.elasticsearch.cluster.ClusterName;
2426
import org.elasticsearch.cluster.ClusterState;
2527
import org.elasticsearch.cluster.metadata.DataStream;
28+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2629
import org.elasticsearch.cluster.metadata.Metadata;
30+
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
2731
import org.elasticsearch.common.io.stream.Writeable;
32+
import org.elasticsearch.common.settings.Settings;
2833
import org.elasticsearch.test.AbstractWireSerializingTestCase;
2934

3035
import java.util.Collections;
3136

3237
import static org.hamcrest.Matchers.containsString;
3338
import static org.hamcrest.Matchers.equalTo;
39+
import static org.hamcrest.Matchers.notNullValue;
40+
import static org.mockito.Matchers.any;
41+
import static org.mockito.Matchers.anyBoolean;
42+
import static org.mockito.Mockito.mock;
43+
import static org.mockito.Mockito.when;
3444

3545
public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
3646

@@ -61,33 +71,60 @@ public void testValidateRequestWithoutTimestampField() {
6171
assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing"));
6272
}
6373

64-
public void testCreateDataStream() {
74+
public void testCreateDataStream() throws Exception {
75+
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
6576
final String dataStreamName = "my-data-stream";
6677
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
6778
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
68-
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(cs, req);
79+
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req);
6980
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
7081
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
82+
assertThat(newState.metadata().index(dataStreamName + "-000001"), notNullValue());
83+
assertThat(newState.metadata().index(dataStreamName + "-000001").getSettings().get("index.hidden"), equalTo("true"));
7184
}
7285

73-
public void testCreateDuplicateDataStream() {
86+
public void testCreateDuplicateDataStream() throws Exception {
87+
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
7488
final String dataStreamName = "my-data-stream";
7589
DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList());
7690
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
7791
.metadata(Metadata.builder().dataStreams(Collections.singletonMap(dataStreamName, existingDataStream)).build()).build();
7892
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
7993

8094
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
81-
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
95+
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
8296
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists"));
8397
}
8498

85-
public void testCreateDataStreamWithInvalidName() {
99+
public void testCreateDataStreamWithInvalidName() throws Exception {
100+
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
86101
final String dataStreamName = "_My-da#ta- ,stream-";
87102
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
88103
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
89104
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
90-
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
105+
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
91106
assertThat(e.getMessage(), containsString("must not contain the following characters"));
92107
}
108+
109+
private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
110+
MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
111+
when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))
112+
.thenAnswer(mockInvocation -> {
113+
ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0];
114+
CreateIndexClusterStateUpdateRequest request = (CreateIndexClusterStateUpdateRequest) mockInvocation.getArguments()[1];
115+
116+
Metadata.Builder b = Metadata.builder(currentState.metadata())
117+
.put(IndexMetadata.builder(request.index())
118+
.settings(Settings.builder()
119+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
120+
.put(request.settings())
121+
.build())
122+
.numberOfShards(1)
123+
.numberOfReplicas(1)
124+
.build(), false);
125+
return ClusterState.builder(currentState).metadata(b.build()).build();
126+
});
127+
128+
return s;
129+
}
93130
}

server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@
4343
import org.elasticsearch.test.ESTestCase;
4444

4545
import java.io.IOException;
46+
import java.util.ArrayList;
4647
import java.util.Arrays;
4748
import java.util.Collections;
4849
import java.util.HashMap;
4950
import java.util.HashSet;
5051
import java.util.List;
52+
import java.util.Locale;
5153
import java.util.Map;
5254
import java.util.Set;
5355

@@ -973,7 +975,7 @@ public void testBuilderRejectsDataStreamThatConflictsWithAlias() {
973975

974976
public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
975977
final String dataStreamName = "my-data-stream";
976-
final String conflictingIndex = dataStreamName + "-00001";
978+
final String conflictingIndex = dataStreamName + "-000001";
977979
Metadata.Builder b = Metadata.builder()
978980
.put(IndexMetadata.builder(conflictingIndex)
979981
.settings(settings(Version.CURRENT))
@@ -987,6 +989,29 @@ public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
987989
"] could create backing indices that conflict with 1 existing index(s) or alias(s) including '" + conflictingIndex + "'"));
988990
}
989991

992+
public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() {
993+
final String dataStreamName = "my-data-stream";
994+
final List<Index> backingIndices = new ArrayList<>();
995+
final int numBackingIndices = randomIntBetween(2, 5);
996+
int lastBackingIndexNum = randomIntBetween(9, 50);
997+
Metadata.Builder b = Metadata.builder();
998+
for (int k = 1; k <= numBackingIndices; k++) {
999+
IndexMetadata im = IndexMetadata.builder(String.format(Locale.ROOT, "%s-%06d", dataStreamName, lastBackingIndexNum))
1000+
.settings(settings(Version.CURRENT))
1001+
.numberOfShards(1)
1002+
.numberOfReplicas(1)
1003+
.build();
1004+
b.put(im, false);
1005+
backingIndices.add(im.getIndex());
1006+
lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
1007+
}
1008+
1009+
b.put(new DataStream(dataStreamName, "ts", backingIndices));
1010+
Metadata metadata = b.build();
1011+
assertThat(metadata.dataStreams().size(), equalTo(1));
1012+
assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
1013+
}
1014+
9901015
public void testSerialization() throws IOException {
9911016
final Metadata orig = randomMetadata();
9921017
final BytesStreamOutput out = new BytesStreamOutput();

0 commit comments

Comments
 (0)