Skip to content

Commit c8eec32

Browse files
committed
Utility methods to add and remove backing indices from data streams (elastic#77778)
1 parent fbd1330 commit c8eec32

File tree

2 files changed

+301
-1
lines changed

2 files changed

+301
-1
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

+62-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.index.PointValues;
1414
import org.elasticsearch.cluster.AbstractDiffable;
1515
import org.elasticsearch.cluster.Diff;
16+
import org.elasticsearch.common.Strings;
1617
import org.elasticsearch.core.Nullable;
1718
import org.elasticsearch.common.xcontent.ParseField;
1819
import org.elasticsearch.common.io.stream.StreamInput;
@@ -36,6 +37,7 @@
3637
import java.util.Map;
3738
import java.util.Objects;
3839
import java.util.function.LongSupplier;
40+
import java.util.stream.Collectors;
3941

4042
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
4143

@@ -185,8 +187,20 @@ public DataStream rollover(Metadata clusterMetadata, String writeIndexUuid, Vers
185187
*
186188
* @param index the backing index to remove
187189
* @return new {@code DataStream} instance with the remaining backing indices
190+
* @throws IllegalArgumentException if {@code index} is not a backing index or is the current write index of the data stream
188191
*/
189192
public DataStream removeBackingIndex(Index index) {
193+
int backingIndexPosition = indices.indexOf(index);
194+
195+
if (backingIndexPosition == -1) {
196+
throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]",
197+
index.getName(), name));
198+
}
199+
if (generation == (backingIndexPosition + 1)) {
200+
throw new IllegalArgumentException(String.format(Locale.ROOT, "cannot remove backing index [%s] of data stream [%s] because " +
201+
"it is the write index", index.getName(), name));
202+
}
203+
190204
List<Index> backingIndices = new ArrayList<>(indices);
191205
backingIndices.remove(index);
192206
assert backingIndices.size() == indices.size() - 1;
@@ -207,7 +221,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
207221
List<Index> backingIndices = new ArrayList<>(indices);
208222
int backingIndexPosition = backingIndices.indexOf(existingBackingIndex);
209223
if (backingIndexPosition == -1) {
210-
throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s] ",
224+
throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]",
211225
existingBackingIndex.getName(), name));
212226
}
213227
if (generation == (backingIndexPosition + 1)) {
@@ -218,6 +232,53 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
218232
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system);
219233
}
220234

235+
/**
236+
* Adds the specified index as a backing index and returns a new {@code DataStream} instance with the new combination
237+
* of backing indices.
238+
*
239+
* @param index index to add to the data stream
240+
* @return new {@code DataStream} instance with the added backing index
241+
* @throws IllegalArgumentException if {@code index} is ineligible to be a backing index for the data stream
242+
*/
243+
public DataStream addBackingIndex(Metadata clusterMetadata, Index index) {
244+
// validate that index is not part of another data stream
245+
final var parentDataStream = clusterMetadata.getIndicesLookup().get(index.getName()).getParentDataStream();
246+
if (parentDataStream != null) {
247+
if (parentDataStream.getDataStream().equals(this)) {
248+
return this;
249+
} else {
250+
throw new IllegalArgumentException(
251+
String.format(Locale.ROOT,
252+
"cannot add index [%s] to data stream [%s] because it is already a backing index on data stream [%s]",
253+
index.getName(),
254+
getName(),
255+
parentDataStream.getName()
256+
)
257+
);
258+
}
259+
}
260+
261+
// ensure that no aliases reference index
262+
IndexMetadata im = clusterMetadata.getIndicesLookup().get(index.getName()).getWriteIndex();
263+
if (im.getAliases().size() > 0) {
264+
throw new IllegalArgumentException(
265+
String.format(Locale.ROOT,
266+
"cannot add index [%s] to data stream [%s] until its alias(es) [%s] are removed",
267+
index.getName(),
268+
getName(),
269+
Strings.collectionToCommaDelimitedString(
270+
im.getAliases().stream().map(Map.Entry::getKey).sorted().collect(Collectors.toList())
271+
)
272+
)
273+
);
274+
}
275+
276+
List<Index> backingIndices = new ArrayList<>(indices);
277+
backingIndices.add(0, index);
278+
assert backingIndices.size() == indices.size() + 1;
279+
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated, system);
280+
}
281+
221282
public DataStream promoteDataStream() {
222283
return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false, system, timeProvider);
223284
}

server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

+239
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.cluster.metadata;
99

1010
import org.elasticsearch.Version;
11+
import org.elasticsearch.common.Strings;
1112
import org.elasticsearch.common.UUIDs;
1213
import org.elasticsearch.common.io.stream.Writeable;
1314
import org.elasticsearch.common.xcontent.XContentParser;
@@ -16,6 +17,7 @@
1617

1718
import java.io.IOException;
1819
import java.util.ArrayList;
20+
import java.util.Arrays;
1921
import java.util.HashMap;
2022
import java.util.HashSet;
2123
import java.util.List;
@@ -116,6 +118,243 @@ public void testRemoveBackingIndex() {
116118
}
117119
}
118120

121+
public void testRemoveBackingIndexThatDoesNotExist() {
122+
int numBackingIndices = randomIntBetween(2, 32);
123+
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
124+
125+
List<Index> indices = new ArrayList<>(numBackingIndices);
126+
for (int k = 1; k <= numBackingIndices; k++) {
127+
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k), UUIDs.randomBase64UUID(random())));
128+
}
129+
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
130+
131+
final Index indexToRemove = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random()));
132+
133+
IllegalArgumentException e = expectThrows(
134+
IllegalArgumentException.class,
135+
() -> original.removeBackingIndex(indexToRemove)
136+
);
137+
assertThat(
138+
e.getMessage(),
139+
equalTo(
140+
String.format(
141+
Locale.ROOT,
142+
"index [%s] is not part of data stream [%s]",
143+
indexToRemove.getName(),
144+
dataStreamName)
145+
)
146+
);
147+
}
148+
149+
public void testRemoveBackingWriteIndex() {
150+
int numBackingIndices = randomIntBetween(2, 32);
151+
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
152+
153+
List<Index> indices = new ArrayList<>(numBackingIndices);
154+
for (int k = 1; k <= numBackingIndices; k++) {
155+
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k), UUIDs.randomBase64UUID(random())));
156+
}
157+
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
158+
159+
IllegalArgumentException e = expectThrows(
160+
IllegalArgumentException.class,
161+
() -> original.removeBackingIndex(indices.get(numBackingIndices - 1))
162+
);
163+
assertThat(
164+
e.getMessage(),
165+
equalTo(
166+
String.format(
167+
Locale.ROOT,
168+
"cannot remove backing index [%s] of data stream [%s] because it is the write index",
169+
indices.get(numBackingIndices - 1).getName(),
170+
dataStreamName
171+
)
172+
)
173+
);
174+
}
175+
176+
public void testAddBackingIndex() {
177+
int numBackingIndices = randomIntBetween(2, 32);
178+
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
179+
final long epochMillis = System.currentTimeMillis();
180+
181+
List<Index> indices = new ArrayList<>(numBackingIndices);
182+
for (int k = 1; k <= numBackingIndices; k++) {
183+
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random())));
184+
}
185+
186+
Metadata.Builder builder = Metadata.builder();
187+
for (int k = 1; k <= numBackingIndices; k++) {
188+
IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName())
189+
.settings(settings(Version.CURRENT))
190+
.numberOfShards(1)
191+
.numberOfReplicas(1)
192+
.build();
193+
builder.put(im, false);
194+
}
195+
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
196+
builder.put(original);
197+
Index indexToAdd = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random()));
198+
builder.put(
199+
IndexMetadata
200+
.builder(indexToAdd.getName())
201+
.settings(settings(Version.CURRENT))
202+
.numberOfShards(1)
203+
.numberOfReplicas(1)
204+
.build(),
205+
false
206+
);
207+
208+
DataStream updated = original.addBackingIndex(builder.build(), indexToAdd);
209+
assertThat(updated.getName(), equalTo(original.getName()));
210+
assertThat(updated.getGeneration(), equalTo(original.getGeneration() + 1));
211+
assertThat(updated.getTimeStampField(), equalTo(original.getTimeStampField()));
212+
assertThat(updated.getIndices().size(), equalTo(numBackingIndices + 1));
213+
for (int k = 1; k <= numBackingIndices; k++) {
214+
assertThat(updated.getIndices().get(k), equalTo(original.getIndices().get(k - 1)));
215+
}
216+
assertThat(updated.getIndices().get(0), equalTo(indexToAdd));
217+
}
218+
219+
public void testAddBackingIndexThatIsPartOfAnotherDataStream() {
220+
int numBackingIndices = randomIntBetween(2, 32);
221+
final String dsName1 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
222+
final String dsName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
223+
224+
final long epochMillis = System.currentTimeMillis();
225+
226+
List<Index> indices1 = new ArrayList<>(numBackingIndices);
227+
List<Index> indices2 = new ArrayList<>(numBackingIndices);
228+
for (int k = 1; k <= numBackingIndices; k++) {
229+
indices1.add(new Index(DataStream.getDefaultBackingIndexName(dsName1, k, epochMillis), UUIDs.randomBase64UUID(random())));
230+
indices2.add(new Index(DataStream.getDefaultBackingIndexName(dsName2, k, epochMillis), UUIDs.randomBase64UUID(random())));
231+
}
232+
233+
Metadata.Builder builder = Metadata.builder();
234+
for (int k = 1; k <= numBackingIndices; k++) {
235+
IndexMetadata im = IndexMetadata.builder(indices1.get(k - 1).getName())
236+
.settings(settings(Version.CURRENT))
237+
.numberOfShards(1)
238+
.numberOfReplicas(1)
239+
.build();
240+
builder.put(im, false);
241+
im = IndexMetadata.builder(indices2.get(k - 1).getName())
242+
.settings(settings(Version.CURRENT))
243+
.numberOfShards(1)
244+
.numberOfReplicas(1)
245+
.build();
246+
builder.put(im, false);
247+
}
248+
DataStream ds1 = new DataStream(dsName1, createTimestampField("@timestamp"), indices1);
249+
DataStream ds2 = new DataStream(dsName2, createTimestampField("@timestamp"), indices2);
250+
builder.put(ds1);
251+
builder.put(ds2);
252+
253+
Index indexToAdd = randomFrom(indices2.toArray(Index.EMPTY_ARRAY));
254+
255+
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ds1.addBackingIndex(builder.build(), indexToAdd));
256+
assertThat(
257+
e.getMessage(),
258+
equalTo(
259+
String.format(
260+
Locale.ROOT,
261+
"cannot add index [%s] to data stream [%s] because it is already a backing index on data stream [%s]",
262+
indexToAdd.getName(),
263+
ds1.getName(),
264+
ds2.getName()
265+
)
266+
)
267+
);
268+
}
269+
270+
public void testAddExistingBackingIndex() {
271+
int numBackingIndices = randomIntBetween(2, 32);
272+
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
273+
final long epochMillis = System.currentTimeMillis();
274+
275+
List<Index> indices = new ArrayList<>(numBackingIndices);
276+
for (int k = 1; k <= numBackingIndices; k++) {
277+
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random())));
278+
}
279+
280+
Metadata.Builder builder = Metadata.builder();
281+
for (int k = 1; k <= numBackingIndices; k++) {
282+
IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName())
283+
.settings(settings(Version.CURRENT))
284+
.numberOfShards(1)
285+
.numberOfReplicas(1)
286+
.build();
287+
builder.put(im, false);
288+
}
289+
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
290+
builder.put(original);
291+
Index indexToAdd = randomFrom(indices.toArray(Index.EMPTY_ARRAY));
292+
293+
DataStream updated = original.addBackingIndex(builder.build(), indexToAdd);
294+
assertThat(updated.getName(), equalTo(original.getName()));
295+
assertThat(updated.getGeneration(), equalTo(original.getGeneration()));
296+
assertThat(updated.getTimeStampField(), equalTo(original.getTimeStampField()));
297+
assertThat(updated.getIndices().size(), equalTo(numBackingIndices));
298+
for (int k = 0; k < numBackingIndices; k++) {
299+
assertThat(updated.getIndices().get(k), equalTo(original.getIndices().get(k)));
300+
}
301+
}
302+
303+
public void testAddBackingIndexWithAliases() {
304+
int numBackingIndices = randomIntBetween(2, 32);
305+
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
306+
final long epochMillis = System.currentTimeMillis();
307+
308+
List<Index> indices = new ArrayList<>(numBackingIndices);
309+
for (int k = 1; k <= numBackingIndices; k++) {
310+
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random())));
311+
}
312+
313+
Metadata.Builder builder = Metadata.builder();
314+
for (int k = 1; k <= numBackingIndices; k++) {
315+
IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName())
316+
.settings(settings(Version.CURRENT))
317+
.numberOfShards(1)
318+
.numberOfReplicas(1)
319+
.build();
320+
builder.put(im, false);
321+
}
322+
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
323+
builder.put(original);
324+
325+
Index indexToAdd = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random()));
326+
IndexMetadata.Builder b = IndexMetadata
327+
.builder(indexToAdd.getName())
328+
.settings(settings(Version.CURRENT))
329+
.numberOfShards(1)
330+
.numberOfReplicas(1);
331+
final int numAliases = randomIntBetween(1, 3);
332+
final String[] aliasNames = new String[numAliases];
333+
for (int k = 0; k < numAliases; k++) {
334+
aliasNames[k] = randomAlphaOfLength(6);
335+
b.putAlias(AliasMetadata.builder(aliasNames[k]));
336+
}
337+
builder.put(b.build(), false);
338+
Arrays.sort(aliasNames);
339+
340+
IllegalArgumentException e = expectThrows(
341+
IllegalArgumentException.class,
342+
() -> original.addBackingIndex(builder.build(), indexToAdd)
343+
);
344+
assertThat(
345+
e.getMessage(),
346+
equalTo(
347+
String.format(
348+
Locale.ROOT,
349+
"cannot add index [%s] to data stream [%s] until its alias(es) [%s] are removed",
350+
indexToAdd.getName(),
351+
original.getName(),
352+
Strings.arrayToCommaDelimitedString(aliasNames)
353+
)
354+
)
355+
);
356+
}
357+
119358
public void testDefaultBackingIndexName() {
120359
// this test does little more than flag that changing the default naming convention for backing indices
121360
// will also require changing a lot of hard-coded values in REST tests and docs

0 commit comments

Comments
 (0)