Skip to content

Commit f47636b

Browse files
authored
[Zen2] Introduce VotingTombstone class (#35832)
Today voting tombstones are stored in CoordinationMetaData as Set<DiscoveryNode>. DiscoveryNode is not a lightweight object and have a lot of fields. It also has toXContent method, but no fromXContent method and the output of toXContent is not enough to re-create DiscoveryNode object. And votingTombstone set should be persisted as a part of MetaData. On the other hand, the only thing required from the tombstone is the nodeId. This PR adds VotingTombstone class for voting tombstones, which consists of two fields for now - nodeId and nodeName. It could be extended/shrank in the future if needed. This PR also resolves TODO's related to the voting tombstones xcontent story. Example of CoordinationMetaData.toXContent with voting tombstones: { "term": 1, "last_committed_config": [ "fkwLdOBvXSlgRTBfgNAL", "tmQiPGHvUxXzPkkCDSJo", "HhOmtQBZAThpHIGWhxpz", "qZHWGpoDNPYRNIiqKsDl" ], "last_accepted_config": [ "lhqacKmriwhHGFZcvqbx", "MYysmBuROkvJRlDcusyd" ], "voting_tombstones": [ { "node_id": "McjbZbRkEz", "node_name": "pdKIWeNJUO" }, { "node_id": "cpXkVibGwo", "node_name": "UnCvFgdVsc" }, { "node_id": "EylRNOztbc", "node_name": "ohOhkbMWZX" } ] }
1 parent cfdf666 commit f47636b

13 files changed

+238
-88
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.action.ActionRequestValidationException;
2222
import org.elasticsearch.action.support.master.MasterNodeRequest;
2323
import org.elasticsearch.cluster.ClusterState;
24+
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingTombstone;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.cluster.node.DiscoveryNodes;
2627
import org.elasticsearch.common.io.stream.StreamInput;
@@ -69,10 +70,10 @@ public AddVotingTombstonesRequest(StreamInput in) throws IOException {
6970
timeout = in.readTimeValue();
7071
}
7172

72-
Set<DiscoveryNode> resolveNodes(ClusterState currentState) {
73+
Set<VotingTombstone> resolveVotingTombstones(ClusterState currentState) {
7374
final DiscoveryNodes allNodes = currentState.nodes();
74-
final Set<DiscoveryNode> resolvedNodes = Arrays.stream(allNodes.resolveNodes(nodeDescriptions))
75-
.map(allNodes::get).filter(DiscoveryNode::isMasterNode).collect(Collectors.toSet());
75+
final Set<VotingTombstone> resolvedNodes = Arrays.stream(allNodes.resolveNodes(nodeDescriptions))
76+
.map(allNodes::get).filter(DiscoveryNode::isMasterNode).map(VotingTombstone::new).collect(Collectors.toSet());
7677

7778
if (resolvedNodes.isEmpty()) {
7879
throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(nodeDescriptions)
@@ -83,8 +84,9 @@ Set<DiscoveryNode> resolveNodes(ClusterState currentState) {
8384
return resolvedNodes;
8485
}
8586

86-
Set<DiscoveryNode> resolveNodesAndCheckMaximum(ClusterState currentState, int maxTombstoneCount, String maximumSettingKey) {
87-
final Set<DiscoveryNode> resolvedNodes = resolveNodes(currentState);
87+
Set<VotingTombstone> resolveVotingTombstonesAndCheckMaximum(ClusterState currentState, int maxTombstoneCount,
88+
String maximumSettingKey) {
89+
final Set<VotingTombstone> resolvedNodes = resolveVotingTombstones(currentState);
8890

8991
final int oldTombstoneCount = currentState.getVotingTombstones().size();
9092
final int newTombstoneCount = resolvedNodes.size();

server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.elasticsearch.cluster.block.ClusterBlockException;
3131
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3232
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
33+
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingTombstone;
3334
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3435
import org.elasticsearch.cluster.metadata.MetaData;
35-
import org.elasticsearch.cluster.node.DiscoveryNode;
3636
import org.elasticsearch.cluster.service.ClusterService;
3737
import org.elasticsearch.common.Priority;
3838
import org.elasticsearch.common.inject.Inject;
@@ -80,16 +80,16 @@ protected AddVotingTombstonesResponse read(StreamInput in) throws IOException {
8080
protected void masterOperation(AddVotingTombstonesRequest request, ClusterState state,
8181
ActionListener<AddVotingTombstonesResponse> listener) throws Exception {
8282

83-
resolveNodesAndCheckMaximum(request, state); // throws IllegalArgumentException if no nodes matched or maximum exceeded
83+
resolveVotingTombstonesAndCheckMaximum(request, state); // throws IllegalArgumentException if no nodes matched or maximum exceeded
8484

8585
clusterService.submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask(Priority.URGENT) {
8686

87-
private Set<DiscoveryNode> resolvedNodes;
87+
private Set<VotingTombstone> resolvedNodes;
8888

8989
@Override
9090
public ClusterState execute(ClusterState currentState) {
9191
assert resolvedNodes == null : resolvedNodes;
92-
resolvedNodes = resolveNodesAndCheckMaximum(request, currentState);
92+
resolvedNodes = resolveVotingTombstonesAndCheckMaximum(request, currentState);
9393

9494
final CoordinationMetaData.Builder builder = CoordinationMetaData.builder(currentState.coordinationMetaData());
9595
resolvedNodes.forEach(builder::addVotingTombstone);
@@ -110,7 +110,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
110110
final ClusterStateObserver observer
111111
= new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext());
112112

113-
final Set<String> resolvedNodeIds = resolvedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
113+
final Set<String> resolvedNodeIds = resolvedNodes.stream().map(VotingTombstone::getNodeId).collect(Collectors.toSet());
114114

115115
final Predicate<ClusterState> allNodesRemoved = clusterState -> {
116116
final Set<String> votingNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds();
@@ -145,8 +145,8 @@ public void onTimeout(TimeValue timeout) {
145145
});
146146
}
147147

148-
private static Set<DiscoveryNode> resolveNodesAndCheckMaximum(AddVotingTombstonesRequest request, ClusterState state) {
149-
return request.resolveNodesAndCheckMaximum(state,
148+
private static Set<VotingTombstone> resolveVotingTombstonesAndCheckMaximum(AddVotingTombstonesRequest request, ClusterState state) {
149+
return request.resolveVotingTombstonesAndCheckMaximum(state,
150150
MAXIMUM_VOTING_TOMBSTONES_SETTING.get(state.metaData().settings()), MAXIMUM_VOTING_TOMBSTONES_SETTING.getKey());
151151
}
152152

server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.elasticsearch.cluster.block.ClusterBlockException;
3131
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3232
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
33+
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingTombstone;
3334
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3435
import org.elasticsearch.cluster.metadata.MetaData;
35-
import org.elasticsearch.cluster.node.DiscoveryNode;
3636
import org.elasticsearch.cluster.service.ClusterService;
3737
import org.elasticsearch.common.Priority;
3838
import org.elasticsearch.common.inject.Inject;
@@ -77,10 +77,9 @@ protected void masterOperation(ClearVotingTombstonesRequest request, ClusterStat
7777
final long startTimeMillis = threadPool.relativeTimeInMillis();
7878

7979
final Predicate<ClusterState> allTombstonedNodesRemoved = newState -> {
80-
for (DiscoveryNode tombstone : initialState.getVotingTombstones()) {
80+
for (VotingTombstone tombstone : initialState.getVotingTombstones()) {
8181
// NB checking for the existence of any node with this persistent ID, because persistent IDs are how votes are counted.
82-
// Calling nodeExists(tombstone) is insufficient because this compares on the ephemeral ID.
83-
if (newState.nodes().nodeExists(tombstone.getId())) {
82+
if (newState.nodes().nodeExists(tombstone.getNodeId())) {
8483
return false;
8584
}
8685
}

server/src/main/java/org/elasticsearch/cluster/ClusterState.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.cluster.block.ClusterBlocks;
2929
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
3030
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
31+
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingTombstone;
3132
import org.elasticsearch.cluster.metadata.IndexMetaData;
3233
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
3334
import org.elasticsearch.cluster.metadata.MappingMetaData;
@@ -277,7 +278,7 @@ public VotingConfiguration getLastCommittedConfiguration() {
277278
return coordinationMetaData().getLastCommittedConfiguration();
278279
}
279280

280-
public Set<DiscoveryNode> getVotingTombstones() {
281+
public Set<VotingTombstone> getVotingTombstones() {
281282
return coordinationMetaData().getVotingTombstones();
282283
}
283284

server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationMetaData.java

+113-14
Original file line numberDiff line numberDiff line change
@@ -49,40 +49,48 @@ public class CoordinationMetaData implements Writeable, ToXContentFragment {
4949

5050
private final VotingConfiguration lastAcceptedConfiguration;
5151

52-
private final Set<DiscoveryNode> votingTombstones;
52+
private final Set<VotingTombstone> votingTombstones;
5353

5454
private static final ParseField TERM_PARSE_FIELD = new ParseField("term");
5555
private static final ParseField LAST_COMMITTED_CONFIGURATION_FIELD = new ParseField("last_committed_config");
5656
private static final ParseField LAST_ACCEPTED_CONFIGURATION_FIELD = new ParseField("last_accepted_config");
57+
private static final ParseField VOTING_TOMBSTONES_FIELD = new ParseField("voting_tombstones");
5758

5859
private static long term(Object[] termAndConfigs) {
5960
return (long)termAndConfigs[0];
6061
}
6162

6263
@SuppressWarnings("unchecked")
63-
private static VotingConfiguration lastCommittedConfig(Object[] termAndConfig) {
64-
List<String> nodeIds = (List<String>) termAndConfig[1];
64+
private static VotingConfiguration lastCommittedConfig(Object[] fields) {
65+
List<String> nodeIds = (List<String>) fields[1];
6566
return new VotingConfiguration(new HashSet<>(nodeIds));
6667
}
6768

6869
@SuppressWarnings("unchecked")
69-
private static VotingConfiguration lastAcceptedConfig(Object[] termAndConfig) {
70-
List<String> nodeIds = (List<String>) termAndConfig[2];
70+
private static VotingConfiguration lastAcceptedConfig(Object[] fields) {
71+
List<String> nodeIds = (List<String>) fields[2];
7172
return new VotingConfiguration(new HashSet<>(nodeIds));
7273
}
7374

75+
@SuppressWarnings("unchecked")
76+
private static Set<VotingTombstone> votingTombstones(Object[] fields) {
77+
Set<VotingTombstone> votingTombstones = new HashSet<>((List<VotingTombstone>) fields[3]);
78+
return votingTombstones;
79+
}
80+
7481
private static final ConstructingObjectParser<CoordinationMetaData, Void> PARSER = new ConstructingObjectParser<>(
7582
"coordination_metadata",
76-
termAndConfigs -> new CoordinationMetaData(term(termAndConfigs), lastCommittedConfig(termAndConfigs),
77-
lastAcceptedConfig(termAndConfigs), Collections.emptySet()));
83+
fields -> new CoordinationMetaData(term(fields), lastCommittedConfig(fields),
84+
lastAcceptedConfig(fields), votingTombstones(fields)));
7885
static {
7986
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TERM_PARSE_FIELD);
8087
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LAST_COMMITTED_CONFIGURATION_FIELD);
8188
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LAST_ACCEPTED_CONFIGURATION_FIELD);
89+
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), VotingTombstone.PARSER, VOTING_TOMBSTONES_FIELD);
8290
}
8391

8492
public CoordinationMetaData(long term, VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration,
85-
Set<DiscoveryNode> votingTombstones) {
93+
Set<VotingTombstone> votingTombstones) {
8694
this.term = term;
8795
this.lastCommittedConfiguration = lastCommittedConfiguration;
8896
this.lastAcceptedConfiguration = lastAcceptedConfiguration;
@@ -93,7 +101,7 @@ public CoordinationMetaData(StreamInput in) throws IOException {
93101
term = in.readLong();
94102
lastCommittedConfiguration = new VotingConfiguration(in);
95103
lastAcceptedConfiguration = new VotingConfiguration(in);
96-
votingTombstones = Collections.unmodifiableSet(in.readSet(DiscoveryNode::new));
104+
votingTombstones = Collections.unmodifiableSet(in.readSet(VotingTombstone::new));
97105
}
98106

99107
public static Builder builder() {
@@ -117,8 +125,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
117125
return builder
118126
.field(TERM_PARSE_FIELD.getPreferredName(), term)
119127
.field(LAST_COMMITTED_CONFIGURATION_FIELD.getPreferredName(), lastCommittedConfiguration)
120-
.field(LAST_ACCEPTED_CONFIGURATION_FIELD.getPreferredName(), lastAcceptedConfiguration);
121-
// TODO include voting tombstones here
128+
.field(LAST_ACCEPTED_CONFIGURATION_FIELD.getPreferredName(), lastAcceptedConfiguration)
129+
.field(VOTING_TOMBSTONES_FIELD.getPreferredName(), votingTombstones);
122130
}
123131

124132
public static CoordinationMetaData fromXContent(XContentParser parser) throws IOException {
@@ -137,7 +145,7 @@ public VotingConfiguration getLastCommittedConfiguration() {
137145
return lastCommittedConfiguration;
138146
}
139147

140-
public Set<DiscoveryNode> getVotingTombstones() {
148+
public Set<VotingTombstone> getVotingTombstones() {
141149
return votingTombstones;
142150
}
143151

@@ -177,7 +185,7 @@ public static class Builder {
177185
private long term = 0;
178186
private VotingConfiguration lastCommittedConfiguration = VotingConfiguration.EMPTY_CONFIG;
179187
private VotingConfiguration lastAcceptedConfiguration = VotingConfiguration.EMPTY_CONFIG;
180-
private final Set<DiscoveryNode> votingTombstones = new HashSet<>();
188+
private final Set<VotingTombstone> votingTombstones = new HashSet<>();
181189

182190
public Builder() {
183191

@@ -205,7 +213,7 @@ public Builder lastAcceptedConfiguration(VotingConfiguration config) {
205213
return this;
206214
}
207215

208-
public Builder addVotingTombstone(DiscoveryNode tombstone) {
216+
public Builder addVotingTombstone(VotingTombstone tombstone) {
209217
votingTombstones.add(tombstone);
210218
return this;
211219
}
@@ -220,6 +228,97 @@ public CoordinationMetaData build() {
220228
}
221229
}
222230

231+
public static class VotingTombstone implements Writeable, ToXContentFragment {
232+
private final String nodeId;
233+
private final String nodeName;
234+
235+
public VotingTombstone(DiscoveryNode node) {
236+
this(node.getId(), node.getName());
237+
}
238+
239+
public VotingTombstone(StreamInput in) throws IOException {
240+
this.nodeId = in.readString();
241+
this.nodeName = in.readString();
242+
}
243+
244+
public VotingTombstone(String nodeId, String nodeName) {
245+
this.nodeId = nodeId;
246+
this.nodeName = nodeName;
247+
}
248+
249+
@Override
250+
public void writeTo(StreamOutput out) throws IOException {
251+
out.writeString(nodeId);
252+
out.writeString(nodeName);
253+
}
254+
255+
public String getNodeId() {
256+
return nodeId;
257+
}
258+
259+
public String getNodeName() {
260+
return nodeName;
261+
}
262+
263+
private static final ParseField NODE_ID_PARSE_FIELD = new ParseField("node_id");
264+
private static final ParseField NODE_NAME_PARSE_FIELD = new ParseField("node_name");
265+
266+
private static String nodeId(Object[] nodeIdAndName) {
267+
return (String) nodeIdAndName[0];
268+
}
269+
270+
private static String nodeName(Object[] nodeIdAndName) {
271+
return (String) nodeIdAndName[1];
272+
}
273+
274+
private static final ConstructingObjectParser<VotingTombstone, Void> PARSER = new ConstructingObjectParser<>(
275+
"voting_tombstone",
276+
nodeIdAndName -> new VotingTombstone(nodeId(nodeIdAndName), nodeName(nodeIdAndName))
277+
);
278+
279+
static {
280+
PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_PARSE_FIELD);
281+
PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_NAME_PARSE_FIELD);
282+
}
283+
284+
public static VotingTombstone fromXContent(XContentParser parser) throws IOException {
285+
return PARSER.parse(parser, null);
286+
}
287+
288+
@Override
289+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
290+
return builder.startObject()
291+
.field(NODE_ID_PARSE_FIELD.getPreferredName(), nodeId)
292+
.field(NODE_NAME_PARSE_FIELD.getPreferredName(), nodeName)
293+
.endObject();
294+
}
295+
296+
@Override
297+
public boolean equals(Object o) {
298+
if (this == o) return true;
299+
if (o == null || getClass() != o.getClass()) return false;
300+
VotingTombstone that = (VotingTombstone) o;
301+
return Objects.equals(nodeId, that.nodeId) &&
302+
Objects.equals(nodeName, that.nodeName);
303+
}
304+
305+
@Override
306+
public int hashCode() {
307+
return Objects.hash(nodeId, nodeName);
308+
}
309+
310+
@Override
311+
public String toString() {
312+
StringBuilder sb = new StringBuilder();
313+
if (nodeName.length() > 0) {
314+
sb.append('{').append(nodeName).append('}');
315+
}
316+
sb.append('{').append(nodeId).append('}');
317+
return sb.toString();
318+
}
319+
320+
}
321+
223322
/**
224323
* A collection of persistent node ids, denoting the voting configuration for cluster state changes.
225324
*/

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.cluster.ClusterStateUpdateTask;
3434
import org.elasticsearch.cluster.block.ClusterBlocks;
3535
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
36+
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingTombstone;
3637
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
3738
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
3839
import org.elasticsearch.cluster.metadata.MetaData;
@@ -654,7 +655,7 @@ ClusterState improveConfiguration(ClusterState clusterState) {
654655
final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
655656
.filter(this::hasJoinVoteFrom).collect(Collectors.toSet());
656657
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
657-
clusterState.getVotingTombstones().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()),
658+
clusterState.getVotingTombstones().stream().map(VotingTombstone::getNodeId).collect(Collectors.toSet()),
658659
clusterState.getLastAcceptedConfiguration());
659660
if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
660661
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);

0 commit comments

Comments
 (0)