Skip to content

Commit 0a561dc

Browse files
authored
[7.x] Consolidate data stream alias logic (#73756)
Backporting #73724 to the 7.x branch. Move data stream alias logic that was scattered in several places to the `DataStreamAlias` class. Relates to #66163
1 parent 9a0b8fd commit 0a561dc

File tree

6 files changed

+316
-78
lines changed

6 files changed

+316
-78
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAlias.java

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,22 @@
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
17-
import org.elasticsearch.common.xcontent.ToXContentObject;
17+
import org.elasticsearch.common.xcontent.ToXContentFragment;
1818
import org.elasticsearch.common.xcontent.XContentBuilder;
1919
import org.elasticsearch.common.xcontent.XContentParser;
2020

2121
import java.io.IOException;
2222
import java.util.ArrayList;
2323
import java.util.Collection;
2424
import java.util.Collections;
25+
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Objects;
28+
import java.util.Set;
29+
import java.util.function.Predicate;
30+
import java.util.stream.Collectors;
2731

28-
public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implements ToXContentObject {
32+
public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implements ToXContentFragment {
2933

3034
public static final ParseField DATA_STREAMS_FIELD = new ParseField("data_streams");
3135
public static final ParseField WRITE_DATA_STREAM_FIELD = new ParseField("write_data_stream");
@@ -50,6 +54,7 @@ public DataStreamAlias(String name, Collection<String> dataStreams, String write
5054
this.name = Objects.requireNonNull(name);
5155
this.dataStreams = Collections.unmodifiableList(new ArrayList<>(dataStreams));
5256
this.writeDataStream = writeDataStream;
57+
assert writeDataStream == null || dataStreams.contains(writeDataStream);
5358
}
5459

5560
public DataStreamAlias(StreamInput in) throws IOException {
@@ -58,18 +63,136 @@ public DataStreamAlias(StreamInput in) throws IOException {
5863
this.writeDataStream = in.readOptionalString();
5964
}
6065

66+
/**
67+
* Returns the name of this data stream alias.
68+
*/
6169
public String getName() {
6270
return name;
6371
}
6472

73+
/**
74+
* Returns the data streams that are referenced
75+
*/
6576
public List<String> getDataStreams() {
6677
return dataStreams;
6778
}
6879

80+
/**
81+
* Returns the write data stream this data stream alias is referring to.
82+
* Write requests targeting this instance will resolve the write index
83+
* of the write data stream this alias is referring to.
84+
*
85+
* Note that the write data stream is also included in {@link #getDataStreams()}.
86+
*/
6987
public String getWriteDataStream() {
7088
return writeDataStream;
7189
}
7290

91+
/**
92+
* Returns a new {@link DataStreamAlias} instance with the provided data stream name added to it as a new member.
93+
* If the provided isWriteDataStream is set to <code>true</code> then the provided data stream is also set as write data stream.
94+
* If the provided isWriteDataStream is set to <code>false</code> and the provided data stream is also the write data stream of
95+
* this instance then the returned data stream alias instance's write data stream is unset.
96+
*
97+
* The same instance is returned if the attempted addition of the provided data stream didn't change this instance.
98+
*/
99+
public DataStreamAlias addDataStream(String dataStream, Boolean isWriteDataStream) {
100+
String writeDataStream = this.writeDataStream;
101+
if (isWriteDataStream != null) {
102+
if (isWriteDataStream) {
103+
writeDataStream = dataStream;
104+
} else {
105+
if (dataStream.equals(writeDataStream)) {
106+
writeDataStream = null;
107+
}
108+
}
109+
}
110+
111+
Set<String> dataStreams = new HashSet<>(this.dataStreams);
112+
boolean added = dataStreams.add(dataStream);
113+
if (added || Objects.equals(this.writeDataStream, writeDataStream) == false) {
114+
return new DataStreamAlias(name, dataStreams, writeDataStream);
115+
} else {
116+
return this;
117+
}
118+
}
119+
120+
/**
121+
* Returns a {@link DataStreamAlias} instance based on this instance but with the specified data stream no longer referenced.
122+
* Returns <code>null</code> if because of the removal of the provided data stream name a new instance wouldn't reference to
123+
* any data stream. The same instance is returned if the attempted removal of the provided data stream didn't change this instance.
124+
*/
125+
public DataStreamAlias removeDataStream(String dataStream) {
126+
Set<String> dataStreams = new HashSet<>(this.dataStreams);
127+
boolean removed = dataStreams.remove(dataStream);
128+
if (removed == false) {
129+
return this;
130+
}
131+
132+
if (dataStreams.isEmpty()) {
133+
return null;
134+
} else {
135+
String writeDataStream = this.writeDataStream;
136+
if (dataStream.equals(writeDataStream)) {
137+
writeDataStream = null;
138+
}
139+
return new DataStreamAlias(name, dataStreams, writeDataStream);
140+
}
141+
}
142+
143+
/**
144+
* Returns a new {@link DataStreamAlias} instance that contains a new intersection
145+
* of data streams from this instance and the provided filter.
146+
*
147+
* The write data stream gets set to null in the returned instance if the write
148+
* data stream no longer appears in the intersection.
149+
*/
150+
public DataStreamAlias intersect(Predicate<String> filter) {
151+
List<String> intersectingDataStreams = this.dataStreams.stream()
152+
.filter(filter)
153+
.collect(Collectors.toList());
154+
String writeDataStream = this.writeDataStream;
155+
if (intersectingDataStreams.contains(writeDataStream) == false) {
156+
writeDataStream = null;
157+
}
158+
return new DataStreamAlias(this.name, intersectingDataStreams, writeDataStream);
159+
}
160+
161+
/**
162+
* Returns a new {@link DataStreamAlias} instance containing data streams referenced in this instance
163+
* and the other instance. If this instance doesn't have a write data stream then the write index of
164+
* the other data stream becomes the write data stream of the returned instance.
165+
*/
166+
public DataStreamAlias merge(DataStreamAlias other) {
167+
Set<String> mergedDataStreams = new HashSet<>(other.getDataStreams());
168+
mergedDataStreams.addAll(this.getDataStreams());
169+
170+
String writeDataStream = this.writeDataStream;
171+
if (writeDataStream == null) {
172+
if (other.getWriteDataStream() != null && mergedDataStreams.contains(other.getWriteDataStream())) {
173+
writeDataStream = other.getWriteDataStream();
174+
}
175+
}
176+
177+
return new DataStreamAlias(this.name, mergedDataStreams, writeDataStream);
178+
}
179+
180+
/**
181+
* Returns a new instance with potentially renamed data stream names and write data stream name.
182+
* If a data stream name matches with the provided rename pattern then it is renamed according
183+
* to the provided rename replacement.
184+
*/
185+
public DataStreamAlias renameDataStreams(String renamePattern, String renameReplacement) {
186+
List<String> renamedDataStreams = this.dataStreams.stream()
187+
.map(s -> s.replaceAll(renamePattern, renameReplacement))
188+
.collect(Collectors.toList());
189+
String writeDataStream = this.writeDataStream;
190+
if (writeDataStream != null) {
191+
writeDataStream = writeDataStream.replaceAll(renamePattern, renameReplacement);
192+
}
193+
return new DataStreamAlias(this.name, renamedDataStreams, writeDataStream);
194+
}
195+
73196
public static Diff<DataStreamAlias> readDiffFrom(StreamInput in) throws IOException {
74197
return readDiffFrom(DataStreamAlias::new, in);
75198
}

server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,20 +1319,11 @@ public boolean put(String aliasName, String dataStream, Boolean isWriteDataStrea
13191319
String writeDataStream = isWriteDataStream != null && isWriteDataStream ? dataStream : null;
13201320
alias = new DataStreamAlias(aliasName, Collections.singletonList(dataStream), writeDataStream);
13211321
} else {
1322-
Set<String> dataStreams = new HashSet<>(alias.getDataStreams());
1323-
String writeDataStream = alias.getWriteDataStream();
1324-
if (isWriteDataStream == null || isWriteDataStream == false) {
1325-
if (dataStream.equals(writeDataStream)) {
1326-
writeDataStream = null;
1327-
}
1328-
} else if (isWriteDataStream) {
1329-
writeDataStream = dataStream;
1330-
}
1331-
boolean added = dataStreams.add(dataStream);
1332-
if (added == false && Objects.equals(alias.getWriteDataStream(), writeDataStream)) {
1322+
DataStreamAlias copy = alias.addDataStream(dataStream, isWriteDataStream);
1323+
if (copy == alias) {
13331324
return false;
13341325
}
1335-
alias = new DataStreamAlias(aliasName, dataStreams, writeDataStream);
1326+
alias = copy;
13361327
}
13371328
dataStreamAliases.put(aliasName, alias);
13381329

@@ -1355,18 +1346,14 @@ public Builder removeDataStream(String name) {
13551346
Set<String> aliasesToDelete = new HashSet<>();
13561347
List<DataStreamAlias> aliasesToUpdate = new ArrayList<>();
13571348
for (DataStreamAlias alias : existingDataStreamAliases.values()) {
1358-
Set<String> dataStreams = new HashSet<>(alias.getDataStreams());
1359-
if (dataStreams.contains(name)) {
1360-
dataStreams.remove(name);
1361-
if (dataStreams.isEmpty()) {
1362-
aliasesToDelete.add(alias.getName());
1363-
} else {
1364-
String writeDataStream = alias.getWriteDataStream();
1365-
if (dataStreams.contains(writeDataStream) == false) {
1366-
writeDataStream = null;
1367-
}
1368-
aliasesToUpdate.add(new DataStreamAlias(alias.getName(), dataStreams, writeDataStream));
1349+
DataStreamAlias copy = alias.removeDataStream(name);
1350+
if (copy != null) {
1351+
if (copy == alias) {
1352+
continue;
13691353
}
1354+
aliasesToUpdate.add(copy);
1355+
} else {
1356+
aliasesToDelete.add(alias.getName());
13701357
}
13711358
}
13721359
for (DataStreamAlias alias : aliasesToUpdate) {
@@ -1392,19 +1379,15 @@ public boolean removeDataStreamAlias(String aliasName, String dataStreamName, bo
13921379
} else if (existing == null) {
13931380
return false;
13941381
}
1395-
Set<String> dataStreams = new HashSet<>(existing.getDataStreams());
1396-
dataStreams.remove(dataStreamName);
1397-
if (dataStreams.isEmpty()) {
1398-
dataStreamAliases.remove(aliasName);
1382+
DataStreamAlias copy = existing.removeDataStream(dataStreamName);
1383+
if (copy == existing) {
1384+
return false;
1385+
}
1386+
if (copy != null) {
1387+
dataStreamAliases.put(aliasName, copy);
13991388
} else {
1400-
String writeDataStream = existing.getWriteDataStream();
1401-
if (dataStreamName.equals(writeDataStream)) {
1402-
writeDataStream = null;
1403-
}
1404-
dataStreamAliases.put(aliasName,
1405-
new DataStreamAlias(existing.getName(), dataStreams, writeDataStream));
1389+
dataStreamAliases.remove(aliasName);
14061390
}
1407-
14081391
Map<String, DataStream> existingDataStream =
14091392
Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE))
14101393
.map(dsmd -> new HashMap<>(dsmd.dataStreams()))

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -521,33 +521,15 @@ restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
521521
// Optionally rename the data stream names for each alias
522522
.map(alias -> {
523523
if (request.renamePattern() != null && request.renameReplacement() != null) {
524-
List<String> renamedDataStreams = alias.getDataStreams().stream()
525-
.map(s -> s.replaceAll(request.renamePattern(), request.renameReplacement()))
526-
.collect(Collectors.toList());
527-
String writeDataStream = alias.getWriteDataStream();
528-
if (writeDataStream != null) {
529-
writeDataStream = writeDataStream.replaceAll(request.renamePattern(), request.renameReplacement());
530-
}
531-
return new DataStreamAlias(alias.getName(), renamedDataStreams, writeDataStream);
524+
return alias.renameDataStreams(request.renamePattern(), request.renameReplacement());
532525
} else {
533526
return alias;
534527
}
535528
}).forEach(alias -> {
536529
DataStreamAlias current = updatedDataStreamAliases.putIfAbsent(alias.getName(), alias);
537530
if (current != null) {
538531
// Merge data stream alias from snapshot with an existing data stream aliases in target cluster:
539-
Set<String> mergedDataStreams = new HashSet<>(current.getDataStreams());
540-
mergedDataStreams.addAll(alias.getDataStreams());
541-
542-
String writeDataStream = alias.getWriteDataStream();
543-
if (writeDataStream == null) {
544-
if (current.getWriteDataStream() != null &&
545-
mergedDataStreams.contains(current.getWriteDataStream())) {
546-
writeDataStream = current.getWriteDataStream();
547-
}
548-
}
549-
550-
DataStreamAlias newInstance = new DataStreamAlias(alias.getName(), mergedDataStreams, writeDataStream);
532+
DataStreamAlias newInstance = alias.merge(current);
551533
updatedDataStreamAliases.put(alias.getName(), newInstance);
552534
}
553535
});
@@ -841,15 +823,8 @@ private Tuple<Map<String, DataStream>, Map<String, DataStreamAlias>> getDataStre
841823
dataStreamAliases = new HashMap<>();
842824
final Map<String, DataStreamAlias> dataStreamAliasesInSnapshot = globalMetadata.dataStreamAliases();
843825
for (DataStreamAlias alias : dataStreamAliasesInSnapshot.values()) {
844-
List<String> intersectingDataStreams = alias.getDataStreams().stream()
845-
.filter(requestedDataStreams::contains)
846-
.collect(Collectors.toList());
847-
String writeDateStream = alias.getWriteDataStream();
848-
if (intersectingDataStreams.contains(writeDateStream) == false) {
849-
writeDateStream = null;
850-
}
851-
if (intersectingDataStreams.isEmpty() == false) {
852-
DataStreamAlias copy = new DataStreamAlias(alias.getName(), intersectingDataStreams, writeDateStream);
826+
DataStreamAlias copy = alias.intersect(requestedDataStreams::contains);
827+
if (copy.getDataStreams().isEmpty() == false) {
853828
dataStreamAliases.put(alias.getName(), copy);
854829
}
855830
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2996,16 +2996,8 @@ static Map<String, DataStreamAlias> filterDataStreamAliases(Map<String, DataStre
29962996

29972997
return dataStreamAliases.values().stream()
29982998
.filter(alias -> alias.getDataStreams().stream().anyMatch(dataStreams::containsKey))
2999-
.map(alias -> {
3000-
List<String> intersectingDataStreams = alias.getDataStreams().stream()
3001-
.filter(dataStreams::containsKey)
3002-
.collect(Collectors.toList());
3003-
String writeDataStream = alias.getWriteDataStream();
3004-
if (intersectingDataStreams.contains(writeDataStream) == false) {
3005-
writeDataStream = null;
3006-
}
3007-
return new DataStreamAlias(alias.getName(), intersectingDataStreams, writeDataStream);
3008-
}).collect(Collectors.toMap(DataStreamAlias::getName, Function.identity()));
2999+
.map(alias -> alias.intersect(dataStreams::containsKey))
3000+
.collect(Collectors.toMap(DataStreamAlias::getName, Function.identity()));
30093001
}
30103002

30113003
/**

0 commit comments

Comments
 (0)