Skip to content

Commit c2b5d5d

Browse files
committed
Add custom metadata support to data steams.
Composable index template may hold custom metadata. This change adds behaviour that when a data stream gets created the custom metadata of the matching composable index template is copied to new data stream. The get data stream api can then be used to view the custom metadata. Example: ``` PUT /_index_template/my-logs-template { "index_patterns": [ "logs-*" ], "data_stream": { }, "template": { "settings": { "index.number_of_replicas": 0 } }, "_meta": { "managed": true } } PUT /_data_stream/logs-myapp GET /_data_stream ``` The get data stream api then yields the following response: ``` { "data_streams": [ { "name": "logs-myapp", "timestamp_field": { "name": "@timestamp" }, "indices": [ { "index_name": ".ds-logs-myapp-000001", "index_uuid": "3UaBxM3mQXuHR6qx0IDVCw" } ], "generation": 1, "_meta": { "managed": true }, "status": "GREEN", "template": "my-logs-template" } ] } ``` Closes elastic#59195
1 parent 265a214 commit c2b5d5d

File tree

15 files changed

+140
-55
lines changed

15 files changed

+140
-55
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java

+4-22
Original file line numberDiff line numberDiff line change
@@ -19,43 +19,25 @@
1919

2020
package org.elasticsearch.client.indices;
2121

22-
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
23-
import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo;
2422
import org.elasticsearch.client.AbstractResponseTestCase;
23+
import org.elasticsearch.cluster.DataStreamTestHelper;
2524
import org.elasticsearch.cluster.health.ClusterHealthStatus;
2625
import org.elasticsearch.cluster.metadata.DataStream;
27-
import org.elasticsearch.common.UUIDs;
2826
import org.elasticsearch.common.xcontent.XContentParser;
2927
import org.elasticsearch.common.xcontent.XContentType;
3028
import org.elasticsearch.index.Index;
29+
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
30+
import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo;
3131

3232
import java.io.IOException;
3333
import java.util.ArrayList;
3434
import java.util.Iterator;
35-
import java.util.List;
36-
import java.util.Locale;
3735
import java.util.stream.Collectors;
3836

39-
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
40-
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
41-
4237
public class GetDataStreamResponseTests extends AbstractResponseTestCase<GetDataStreamAction.Response, GetDataStreamResponse> {
4338

44-
private static List<Index> randomIndexInstances() {
45-
int numIndices = randomIntBetween(0, 128);
46-
List<Index> indices = new ArrayList<>(numIndices);
47-
for (int i = 0; i < numIndices; i++) {
48-
indices.add(new Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())));
49-
}
50-
return indices;
51-
}
52-
5339
private static DataStreamInfo randomInstance() {
54-
List<Index> indices = randomIndexInstances();
55-
long generation = indices.size() + randomLongBetween(1, 128);
56-
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
57-
indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
58-
DataStream dataStream = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation);
40+
DataStream dataStream = DataStreamTestHelper.randomInstance();
5941
return new DataStreamInfo(dataStream, ClusterHealthStatus.YELLOW, randomAlphaOfLengthBetween(2, 10),
6042
randomAlphaOfLengthBetween(2, 10));
6143
}

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

+30-9
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
*/
1919
package org.elasticsearch.cluster.metadata;
2020

21+
import org.elasticsearch.Version;
2122
import org.elasticsearch.cluster.AbstractDiffable;
2223
import org.elasticsearch.cluster.Diff;
24+
import org.elasticsearch.common.Nullable;
2325
import org.elasticsearch.common.ParseField;
2426
import org.elasticsearch.common.io.stream.StreamInput;
2527
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -35,6 +37,7 @@
3537
import java.util.Collections;
3638
import java.util.List;
3739
import java.util.Locale;
40+
import java.util.Map;
3841
import java.util.Objects;
3942

4043
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
@@ -45,18 +48,20 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
4548
private final TimestampField timeStampField;
4649
private final List<Index> indices;
4750
private final long generation;
51+
private final Map<String, Object> metadata;
4852

49-
public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation) {
53+
public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata) {
5054
this.name = name;
5155
this.timeStampField = timeStampField;
5256
this.indices = Collections.unmodifiableList(indices);
5357
this.generation = generation;
58+
this.metadata = metadata;
5459
assert indices.size() > 0;
5560
assert indices.get(indices.size() - 1).getName().equals(getDefaultBackingIndexName(name, generation));
5661
}
5762

5863
public DataStream(String name, TimestampField timeStampField, List<Index> indices) {
59-
this(name, timeStampField, indices, indices.size());
64+
this(name, timeStampField, indices, indices.size(), null);
6065
}
6166

6267
public String getName() {
@@ -75,6 +80,11 @@ public long getGeneration() {
7580
return generation;
7681
}
7782

83+
@Nullable
84+
public Map<String, Object> getMetadata() {
85+
return metadata;
86+
}
87+
7888
/**
7989
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
8090
* the updated list of backing indices and incremented generation.
@@ -87,7 +97,7 @@ public DataStream rollover(Index newWriteIndex) {
8797
assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1));
8898
List<Index> backingIndices = new ArrayList<>(indices);
8999
backingIndices.add(newWriteIndex);
90-
return new DataStream(name, timeStampField, backingIndices, generation + 1);
100+
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata);
91101
}
92102

93103
/**
@@ -101,7 +111,7 @@ public DataStream removeBackingIndex(Index index) {
101111
List<Index> backingIndices = new ArrayList<>(indices);
102112
backingIndices.remove(index);
103113
assert backingIndices.size() == indices.size() - 1;
104-
return new DataStream(name, timeStampField, backingIndices, generation);
114+
return new DataStream(name, timeStampField, backingIndices, generation, metadata);
105115
}
106116

107117
/**
@@ -126,7 +136,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
126136
"it is the write index", existingBackingIndex.getName(), name));
127137
}
128138
backingIndices.set(backingIndexPosition, newBackingIndex);
129-
return new DataStream(name, timeStampField, backingIndices, generation);
139+
return new DataStream(name, timeStampField, backingIndices, generation, metadata);
130140
}
131141

132142
/**
@@ -142,7 +152,8 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene
142152
}
143153

144154
public DataStream(StreamInput in) throws IOException {
145-
this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong());
155+
this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(),
156+
in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readMap(): null);
146157
}
147158

148159
public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
@@ -155,22 +166,28 @@ public void writeTo(StreamOutput out) throws IOException {
155166
timeStampField.writeTo(out);
156167
out.writeList(indices);
157168
out.writeVLong(generation);
169+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
170+
out.writeMap(metadata);
171+
}
158172
}
159173

160174
public static final ParseField NAME_FIELD = new ParseField("name");
161175
public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
162176
public static final ParseField INDICES_FIELD = new ParseField("indices");
163177
public static final ParseField GENERATION_FIELD = new ParseField("generation");
178+
public static final ParseField METADATA_FIELD = new ParseField("_meta");
164179

165180
@SuppressWarnings("unchecked")
166181
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
167-
args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3]));
182+
args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3],
183+
(Map<String, Object>) args[4]));
168184

169185
static {
170186
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
171187
PARSER.declareObject(ConstructingObjectParser.constructorArg(), TimestampField.PARSER, TIMESTAMP_FIELD_FIELD);
172188
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> Index.fromXContent(p), INDICES_FIELD);
173189
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
190+
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
174191
}
175192

176193
public static DataStream fromXContent(XContentParser parser) throws IOException {
@@ -184,6 +201,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
184201
builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField);
185202
builder.field(INDICES_FIELD.getPreferredName(), indices);
186203
builder.field(GENERATION_FIELD.getPreferredName(), generation);
204+
if (metadata != null) {
205+
builder.field(METADATA_FIELD.getPreferredName(), metadata);
206+
}
187207
builder.endObject();
188208
return builder;
189209
}
@@ -196,12 +216,13 @@ public boolean equals(Object o) {
196216
return name.equals(that.name) &&
197217
timeStampField.equals(that.timeStampField) &&
198218
indices.equals(that.indices) &&
199-
generation == that.generation;
219+
generation == that.generation &&
220+
Objects.equals(metadata, that.metadata);
200221
}
201222

202223
@Override
203224
public int hashCode() {
204-
return Objects.hash(name, timeStampField, indices, generation);
225+
return Objects.hash(name, timeStampField, indices, generation, metadata);
205226
}
206227

207228
public static final class TimestampField implements Writeable, ToXContentObject {

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,9 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn
158158

159159
String fieldName = template.getDataStreamTemplate().getTimestampField();
160160
DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName);
161-
DataStream newDataStream = new DataStream(request.name, timestampField, List.of(firstBackingIndex.getIndex()));
161+
DataStream newDataStream =
162+
new DataStream(request.name, timestampField, List.of(firstBackingIndex.getIndex()), 1L,
163+
template.metadata() != null ? Map.copyOf(template.metadata()) : null);
162164
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
163165
logger.info("adding data stream [{}]", request.name);
164166
return ClusterState.builder(currentState).metadata(builder).build();

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,8 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad
608608
List<Index> updatedIndices = dataStream.getIndices().stream()
609609
.map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex())
610610
.collect(Collectors.toList());
611-
return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration());
611+
return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(),
612+
dataStream.getMetadata());
612613
}
613614

614615
public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {

server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1727,7 +1727,7 @@ public void testIndicesAliasesRequestTargetDataStreams() {
17271727

17281728
Metadata.Builder mdBuilder = Metadata.builder()
17291729
.put(backingIndex, false)
1730-
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(backingIndex.getIndex()), 1));
1730+
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(backingIndex.getIndex())));
17311731
ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
17321732

17331733
{
@@ -1916,7 +1916,7 @@ public void testDataStreams() {
19161916
Metadata.Builder mdBuilder = Metadata.builder()
19171917
.put(index1, false)
19181918
.put(index2, false)
1919-
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(index1.getIndex(), index2.getIndex()), 2));
1919+
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(index1.getIndex(), index2.getIndex())));
19201920
ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
19211921

19221922
{

server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -1016,7 +1016,7 @@ public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() {
10161016
backingIndices.add(im.getIndex());
10171017
}
10181018

1019-
b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum));
1019+
b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null));
10201020
Metadata metadata = b.build();
10211021
assertThat(metadata.dataStreams().size(), equalTo(1));
10221022
assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
@@ -1034,7 +1034,7 @@ public void testBuildIndicesLookupForDataStreams() {
10341034
indices.add(idx.getIndex());
10351035
b.put(idx, true);
10361036
}
1037-
b.put(new DataStream(name, createTimestampField("@timestamp"), indices, indices.size()));
1037+
b.put(new DataStream(name, createTimestampField("@timestamp"), indices));
10381038
}
10391039

10401040
Metadata metadata = b.build();
@@ -1100,8 +1100,7 @@ public void testValidateDataStreamsThrowsExceptionOnConflict() {
11001100
DataStream dataStream = new DataStream(
11011101
dataStreamName,
11021102
createTimestampField("@timestamp"),
1103-
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
1104-
backingIndices.size()
1103+
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())
11051104
);
11061105

11071106
IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices);
@@ -1174,8 +1173,7 @@ public void testValidateDataStreamsAllowsPrefixedBackingIndices() {
11741173
DataStream dataStream = new DataStream(
11751174
dataStreamName,
11761175
createTimestampField("@timestamp"),
1177-
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
1178-
backingIndices.size()
1176+
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())
11791177
);
11801178

11811179
IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices);
@@ -1275,7 +1273,7 @@ private static CreateIndexResult createIndices(int numIndices, int numBackingInd
12751273
b.put(im, false);
12761274
backingIndices.add(im.getIndex());
12771275
}
1278-
b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum));
1276+
b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null));
12791277
return new CreateIndexResult(indices, backingIndices, b.build());
12801278
}
12811279

test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@
3232
import java.util.ArrayList;
3333
import java.util.List;
3434
import java.util.Locale;
35+
import java.util.Map;
3536
import java.util.stream.Collectors;
3637

3738
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
3839
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
3940
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
41+
import static org.elasticsearch.test.ESTestCase.randomBoolean;
4042

4143
public final class DataStreamTestHelper {
4244

@@ -103,7 +105,11 @@ public static DataStream randomInstance() {
103105
long generation = indices.size() + ESTestCase.randomLongBetween(1, 128);
104106
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
105107
indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(LuceneTestCase.random())));
106-
return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation);
108+
Map<String, Object> metadata = null;
109+
if (randomBoolean()) {
110+
metadata = Map.of("key", "value");
111+
}
112+
return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata);
107113
}
108114

109115
/**
@@ -127,7 +133,8 @@ public static ClusterState getClusterStateWithDataStreams(List<Tuple<String, Int
127133
dsTuple.v1(),
128134
createTimestampField("@timestamp"),
129135
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
130-
dsTuple.v2()
136+
dsTuple.v2(),
137+
null
131138
);
132139
builder.put(ds);
133140
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java

+3
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
159159
builder.field(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName(), dataStream.getTimeStampField());
160160
builder.field(DataStream.INDICES_FIELD.getPreferredName(), dataStream.getIndices());
161161
builder.field(DataStream.GENERATION_FIELD.getPreferredName(), dataStream.getGeneration());
162+
if (dataStream.getMetadata() != null) {
163+
builder.field(DataStream.METADATA_FIELD.getPreferredName(), dataStream.getMetadata());
164+
}
162165
builder.field(STATUS_FIELD.getPreferredName(), dataStreamStatus);
163166
if (indexTemplate != null) {
164167
builder.field(INDEX_TEMPLATE_FIELD.getPreferredName(), indexTemplate);

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void testPerformActionOnDataStream() {
136136
.metadata(
137137
Metadata.builder()
138138
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
139-
List.of(indexMetadata.getIndex()), 1L))
139+
List.of(indexMetadata.getIndex())))
140140
.put(indexMetadata, true)
141141
)
142142
.build();

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStepTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void testPerformActionOnDataStream() {
9898
.metadata(
9999
Metadata.builder()
100100
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
101-
List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L))
101+
List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex())))
102102
.put(originalIndexMeta, true)
103103
.put(rolledIndexMeta, true)
104104
).build();

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public void testResultEvaluatedOnDataStream() throws IOException {
169169
.metadata(
170170
Metadata.builder()
171171
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
172-
List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L))
172+
List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex())))
173173
.put(originalIndexMeta, true)
174174
.put(rolledIndexMeta, true)
175175
)

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void testEvaluateConditionOnDataStreamTarget() {
152152
SetOnce<Boolean> conditionsMet = new SetOnce<>();
153153
Metadata metadata = Metadata.builder().put(indexMetadata, true)
154154
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
155-
List.of(indexMetadata.getIndex()), 1L))
155+
List.of(indexMetadata.getIndex())))
156156
.build();
157157
step.evaluateCondition(metadata, indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
158158

0 commit comments

Comments
 (0)