Skip to content

[7.x] Create first backing index when creating data stream (#54467) #54682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
"Create data stream":
- skip:
version: " - 7.6.99"
reason: available only in 7.7+
version: " - 7.99.99"
reason: "enable in 7.8+ after back-porting https://github.com/elastic/elasticsearch/pull/54467"

- do:
indices.create_data_stream:
Expand All @@ -22,10 +22,17 @@
indices.get_data_streams: {}
- match: { 0.name: simple-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.indices: [] }
- length: { 0.indices: 1 }
- match: { 0.indices.0.index_name: 'simple-data-stream1-000001' }
- match: { 1.name: simple-data-stream2 }
- match: { 1.timestamp_field: '@timestamp2' }
- match: { 1.indices: [] }
- length: { 1.indices: 1 }
- match: { 1.indices.0.index_name: 'simple-data-stream2-000001' }

- do:
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged

- do:
indices.delete_data_stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
Expand All @@ -33,21 +34,23 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.List;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;

public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
Expand Down Expand Up @@ -116,10 +119,14 @@ public int hashCode() {

public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {

private final MetadataCreateIndexService metadataCreateIndexService;

@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService metaDataCreateIndexService) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
this.metadataCreateIndexService = metaDataCreateIndexService;
}

@Override
Expand Down Expand Up @@ -150,7 +157,7 @@ public void onFailure(String source, Exception e) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return createDataStream(currentState, request);
return createDataStream(metadataCreateIndexService, currentState, request);
}

@Override
Expand All @@ -160,16 +167,26 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

static ClusterState createDataStream(ClusterState currentState, Request request) {
static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
ClusterState currentState,
Request request) throws Exception {
if (currentState.metadata().dataStreams().containsKey(request.name)) {
throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
}

MetadataCreateIndexService.validateIndexOrAliasName(request.name,
(s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));

String firstBackingIndexName = request.name + "-000001";
CreateIndexClusterStateUpdateRequest createIndexRequest =
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
.settings(Settings.builder().put("index.hidden", true).build());
currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName);
assert firstBackingIndex != null;

Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(
new DataStream(request.name, request.timestampFieldName, Collections.emptyList()));
new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex())));
logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metadata(builder).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public interface IndexAbstraction {

/**
* A write index is a dedicated concrete index, that accepts all the new documents that belong to an index abstraction.
*
* <p>
* A write index may also be a regular concrete index of a index abstraction and may therefore also be returned
* by {@link #getIndices()}. An index abstraction may also not have a dedicated write index.
*
Expand Down Expand Up @@ -88,7 +88,14 @@ enum Type {
* An alias typically refers to many concrete indices and
* may have a write index.
*/
ALIAS("alias");
ALIAS("alias"),

/**
* An index abstraction that refers to a data stream.
* A data stream typically has multiple backing indices, the latest of which
* is the target for index requests.
*/
DATA_STREAM("data_stream");

private final String displayName;

Expand Down Expand Up @@ -182,7 +189,7 @@ public boolean isHidden() {

/**
* Returns the unique alias metadata per concrete index.
*
* <p>
* (note that although alias can point to the same concrete indices, each alias reference may have its own routing
* and filters)
*/
Expand Down Expand Up @@ -234,7 +241,7 @@ public void computeAndValidateAliasProperties() {

// Validate hidden status
final Map<Boolean, List<IndexMetadata>> groupedByHiddenStatus = referenceIndexMetadatas.stream()
.collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden())));
.collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden())));
if (isNonEmpty(groupedByHiddenStatus.get(true)) && isNonEmpty(groupedByHiddenStatus.get(false))) {
List<String> hiddenOn = groupedByHiddenStatus.get(true).stream()
.map(idx -> idx.getIndex().getName()).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
Expand Down Expand Up @@ -1443,6 +1444,7 @@ private SortedMap<String, IndexAbstraction> buildIndicesLookup() {
});
}
}

aliasAndIndexLookup.values().stream()
.filter(aliasOrIndex -> aliasOrIndex.getType() == IndexAbstraction.Type.ALIAS)
.forEach(alias -> ((IndexAbstraction.Alias) alias).computeAndValidateAliasProperties());
Expand All @@ -1453,15 +1455,28 @@ private void validateDataStreams(SortedMap<String, IndexAbstraction> indicesLook
DataStreamMetadata dsMetadata = (DataStreamMetadata) customs.get(DataStreamMetadata.TYPE);
if (dsMetadata != null) {
for (DataStream ds : dsMetadata.dataStreams().values()) {
if (indicesLookup.containsKey(ds.getName())) {
IndexAbstraction existing = indicesLookup.get(ds.getName());
if (existing != null && existing.getType() != IndexAbstraction.Type.DATA_STREAM) {
throw new IllegalStateException("data stream [" + ds.getName() + "] conflicts with existing index or alias");
}

SortedMap<?, ?> map = indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
if (map.size() != 0) {
throw new IllegalStateException("data stream [" + ds.getName() +
"] could create backing indices that conflict with " + map.size() + " existing index(s) or alias(s)" +
" including '" + map.firstKey() + "'");
SortedMap<String, IndexAbstraction> potentialConflicts =
indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
if (potentialConflicts.size() != 0) {
List<String> indexNames = ds.getIndices().stream().map(Index::getName).collect(Collectors.toList());
List<String> conflicts = new ArrayList<>();
for (Map.Entry<String, IndexAbstraction> entry : potentialConflicts.entrySet()) {
if (entry.getValue().getType() != IndexAbstraction.Type.CONCRETE_INDEX ||
indexNames.contains(entry.getKey()) == false) {
conflicts.add(entry.getKey());
}
}

if (conflicts.size() > 0) {
throw new IllegalStateException("data stream [" + ds.getName() +
"] could create backing indices that conflict with " + conflicts.size() + " existing index(s) or alias(s)" +
" including '" + conflicts.get(0) + "'");
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,29 @@
*/
package org.elasticsearch.action.admin.indices.datastream;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.util.Collections;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {

Expand Down Expand Up @@ -61,33 +71,60 @@ public void testValidateRequestWithoutTimestampField() {
assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing"));
}

public void testCreateDataStream() {
public void testCreateDataStream() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(cs, req);
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
assertThat(newState.metadata().index(dataStreamName + "-000001"), notNullValue());
assertThat(newState.metadata().index(dataStreamName + "-000001").getSettings().get("index.hidden"), equalTo("true"));
}

public void testCreateDuplicateDataStream() {
public void testCreateDuplicateDataStream() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList());
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().dataStreams(Collections.singletonMap(dataStreamName, existingDataStream)).build()).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);

IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists"));
}

public void testCreateDataStreamWithInvalidName() {
public void testCreateDataStreamWithInvalidName() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "_My-da#ta- ,stream-";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("must not contain the following characters"));
}

private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))
.thenAnswer(mockInvocation -> {
ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0];
CreateIndexClusterStateUpdateRequest request = (CreateIndexClusterStateUpdateRequest) mockInvocation.getArguments()[1];

Metadata.Builder b = Metadata.builder(currentState.metadata())
.put(IndexMetadata.builder(request.index())
.settings(Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(request.settings())
.build())
.numberOfShards(1)
.numberOfReplicas(1)
.build(), false);
return ClusterState.builder(currentState).metadata(b.build()).build();
});

return s;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -973,7 +975,7 @@ public void testBuilderRejectsDataStreamThatConflictsWithAlias() {

public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
final String dataStreamName = "my-data-stream";
final String conflictingIndex = dataStreamName + "-00001";
final String conflictingIndex = dataStreamName + "-000001";
Metadata.Builder b = Metadata.builder()
.put(IndexMetadata.builder(conflictingIndex)
.settings(settings(Version.CURRENT))
Expand All @@ -987,6 +989,29 @@ public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
"] could create backing indices that conflict with 1 existing index(s) or alias(s) including '" + conflictingIndex + "'"));
}

public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() {
final String dataStreamName = "my-data-stream";
final List<Index> backingIndices = new ArrayList<>();
final int numBackingIndices = randomIntBetween(2, 5);
int lastBackingIndexNum = randomIntBetween(9, 50);
Metadata.Builder b = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(String.format(Locale.ROOT, "%s-%06d", dataStreamName, lastBackingIndexNum))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
b.put(im, false);
backingIndices.add(im.getIndex());
lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
}

b.put(new DataStream(dataStreamName, "ts", backingIndices));
Metadata metadata = b.build();
assertThat(metadata.dataStreams().size(), equalTo(1));
assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
}

public void testSerialization() throws IOException {
final Metadata orig = randomMetadata();
final BytesStreamOutput out = new BytesStreamOutput();
Expand Down