Skip to content

[Zen2] VotingTombstone class #35832

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 23, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingTombstone;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -69,10 +70,10 @@ public AddVotingTombstonesRequest(StreamInput in) throws IOException {
timeout = in.readTimeValue();
}

Set<DiscoveryNode> resolveNodes(ClusterState currentState) {
Set<VotingTombstone> resolveNodes(ClusterState currentState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should now be called resolveVotingTombStones?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final DiscoveryNodes allNodes = currentState.nodes();
final Set<DiscoveryNode> resolvedNodes = Arrays.stream(allNodes.resolveNodes(nodeDescriptions))
.map(allNodes::get).filter(DiscoveryNode::isMasterNode).collect(Collectors.toSet());
final Set<VotingTombstone> resolvedNodes = Arrays.stream(allNodes.resolveNodes(nodeDescriptions))
.map(allNodes::get).filter(DiscoveryNode::isMasterNode).map(VotingTombstone::new).collect(Collectors.toSet());

if (resolvedNodes.isEmpty()) {
throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(nodeDescriptions)
Expand All @@ -83,8 +84,8 @@ Set<DiscoveryNode> resolveNodes(ClusterState currentState) {
return resolvedNodes;
}

Set<DiscoveryNode> resolveNodesAndCheckMaximum(ClusterState currentState, int maxTombstoneCount, String maximumSettingKey) {
final Set<DiscoveryNode> resolvedNodes = resolveNodes(currentState);
Set<VotingTombstone> resolveNodesAndCheckMaximum(ClusterState currentState, int maxTombstoneCount, String maximumSettingKey) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idem here (resolveVoting....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And forgot "AndCheckMaximumPart" 8f63927

final Set<VotingTombstone> resolvedNodes = resolveNodes(currentState);

final int oldTombstoneCount = currentState.getVotingTombstones().size();
final int newTombstoneCount = resolvedNodes.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingTombstone;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -84,7 +84,7 @@ protected void masterOperation(AddVotingTombstonesRequest request, ClusterState

clusterService.submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask(Priority.URGENT) {

private Set<DiscoveryNode> resolvedNodes;
private Set<VotingTombstone> resolvedNodes;

@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -110,7 +110,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
final ClusterStateObserver observer
= new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext());

final Set<String> resolvedNodeIds = resolvedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
final Set<String> resolvedNodeIds = resolvedNodes.stream().map(VotingTombstone::getNodeId).collect(Collectors.toSet());

final Predicate<ClusterState> allNodesRemoved = clusterState -> {
final Set<String> votingNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds();
Expand Down Expand Up @@ -145,7 +145,7 @@ public void onTimeout(TimeValue timeout) {
});
}

private static Set<DiscoveryNode> resolveNodesAndCheckMaximum(AddVotingTombstonesRequest request, ClusterState state) {
private static Set<VotingTombstone> resolveNodesAndCheckMaximum(AddVotingTombstonesRequest request, ClusterState state) {
return request.resolveNodesAndCheckMaximum(state,
MAXIMUM_VOTING_TOMBSTONES_SETTING.get(state.metaData().settings()), MAXIMUM_VOTING_TOMBSTONES_SETTING.getKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingTombstone;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -77,10 +77,10 @@ protected void masterOperation(ClearVotingTombstonesRequest request, ClusterStat
final long startTimeMillis = threadPool.relativeTimeInMillis();

final Predicate<ClusterState> allTombstonedNodesRemoved = newState -> {
for (DiscoveryNode tombstone : initialState.getVotingTombstones()) {
for (VotingTombstone tombstone : initialState.getVotingTombstones()) {
// NB checking for the existence of any node with this persistent ID, because persistent IDs are how votes are counted.
// Calling nodeExists(tombstone) is insufficient because this compares on the ephemeral ID.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the second part of the comment is now superfluous as VotingTombstone does not have the ephemeral id and there is no nodeExists method that takes a voting tombstone

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (newState.nodes().nodeExists(tombstone.getId())) {
if (newState.nodes().nodeExists(tombstone.getNodeId())) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingTombstone;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
Expand Down Expand Up @@ -277,7 +278,7 @@ public VotingConfiguration getLastCommittedConfiguration() {
return coordinationMetaData().getLastCommittedConfiguration();
}

public Set<DiscoveryNode> getVotingTombstones() {
public Set<VotingTombstone> getVotingTombstones() {
return coordinationMetaData().getVotingTombstones();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,40 +49,48 @@ public class CoordinationMetaData implements Writeable, ToXContentFragment {

private final VotingConfiguration lastAcceptedConfiguration;

private final Set<DiscoveryNode> votingTombstones;
private final Set<VotingTombstone> votingTombstones;

private static final ParseField TERM_PARSE_FIELD = new ParseField("term");
private static final ParseField LAST_COMMITTED_CONFIGURATION_FIELD = new ParseField("last_committed_config");
private static final ParseField LAST_ACCEPTED_CONFIGURATION_FIELD = new ParseField("last_accepted_config");
private static final ParseField VOTING_TOMBSTONES_FIELD = new ParseField("voting_tombstones");

private static long term(Object[] termAndConfigs) {
return (long)termAndConfigs[0];
}

@SuppressWarnings("unchecked")
private static VotingConfiguration lastCommittedConfig(Object[] termAndConfig) {
List<String> nodeIds = (List<String>) termAndConfig[1];
private static VotingConfiguration lastCommittedConfig(Object[] fields) {
List<String> nodeIds = (List<String>) fields[1];
return new VotingConfiguration(new HashSet<>(nodeIds));
}

@SuppressWarnings("unchecked")
private static VotingConfiguration lastAcceptedConfig(Object[] termAndConfig) {
List<String> nodeIds = (List<String>) termAndConfig[2];
private static VotingConfiguration lastAcceptedConfig(Object[] fields) {
List<String> nodeIds = (List<String>) fields[2];
return new VotingConfiguration(new HashSet<>(nodeIds));
}

@SuppressWarnings("unchecked")
private static Set<VotingTombstone> votingTombstones(Object[] fields) {
Set<VotingTombstone> votingTombstones = new HashSet<>((List<VotingTombstone>) fields[3]);
return votingTombstones;
}

private static final ConstructingObjectParser<CoordinationMetaData, Void> PARSER = new ConstructingObjectParser<>(
"coordination_metadata",
termAndConfigs -> new CoordinationMetaData(term(termAndConfigs), lastCommittedConfig(termAndConfigs),
lastAcceptedConfig(termAndConfigs), Collections.emptySet()));
fields -> new CoordinationMetaData(term(fields), lastCommittedConfig(fields),
lastAcceptedConfig(fields), votingTombstones(fields)));
static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TERM_PARSE_FIELD);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LAST_COMMITTED_CONFIGURATION_FIELD);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LAST_ACCEPTED_CONFIGURATION_FIELD);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), VotingTombstone.PARSER, VOTING_TOMBSTONES_FIELD);
}

public CoordinationMetaData(long term, VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration,
Set<DiscoveryNode> votingTombstones) {
Set<VotingTombstone> votingTombstones) {
this.term = term;
this.lastCommittedConfiguration = lastCommittedConfiguration;
this.lastAcceptedConfiguration = lastAcceptedConfiguration;
Expand All @@ -93,7 +101,7 @@ public CoordinationMetaData(StreamInput in) throws IOException {
term = in.readLong();
lastCommittedConfiguration = new VotingConfiguration(in);
lastAcceptedConfiguration = new VotingConfiguration(in);
votingTombstones = Collections.unmodifiableSet(in.readSet(DiscoveryNode::new));
votingTombstones = Collections.unmodifiableSet(in.readSet(VotingTombstone::new));
}

public static Builder builder() {
Expand All @@ -117,8 +125,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder
.field(TERM_PARSE_FIELD.getPreferredName(), term)
.field(LAST_COMMITTED_CONFIGURATION_FIELD.getPreferredName(), lastCommittedConfiguration)
.field(LAST_ACCEPTED_CONFIGURATION_FIELD.getPreferredName(), lastAcceptedConfiguration);
// TODO include voting tombstones here
.field(LAST_ACCEPTED_CONFIGURATION_FIELD.getPreferredName(), lastAcceptedConfiguration)
.field(VOTING_TOMBSTONES_FIELD.getPreferredName(), votingTombstones);
}

public static CoordinationMetaData fromXContent(XContentParser parser) throws IOException {
Expand All @@ -137,7 +145,7 @@ public VotingConfiguration getLastCommittedConfiguration() {
return lastCommittedConfiguration;
}

public Set<DiscoveryNode> getVotingTombstones() {
public Set<VotingTombstone> getVotingTombstones() {
return votingTombstones;
}

Expand Down Expand Up @@ -177,7 +185,7 @@ public static class Builder {
private long term = 0;
private VotingConfiguration lastCommittedConfiguration = VotingConfiguration.EMPTY_CONFIG;
private VotingConfiguration lastAcceptedConfiguration = VotingConfiguration.EMPTY_CONFIG;
private final Set<DiscoveryNode> votingTombstones = new HashSet<>();
private final Set<VotingTombstone> votingTombstones = new HashSet<>();

public Builder() {

Expand Down Expand Up @@ -205,7 +213,7 @@ public Builder lastAcceptedConfiguration(VotingConfiguration config) {
return this;
}

public Builder addVotingTombstone(DiscoveryNode tombstone) {
public Builder addVotingTombstone(VotingTombstone tombstone) {
votingTombstones.add(tombstone);
return this;
}
Expand All @@ -220,6 +228,97 @@ public CoordinationMetaData build() {
}
}

public static class VotingTombstone implements Writeable, ToXContentFragment {
private final String nodeId;
private final String nodeName;

public VotingTombstone(DiscoveryNode node) {
this(node.getId(), node.getName());
}

public VotingTombstone(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.nodeName = in.readString();
}

public VotingTombstone(String nodeId, String nodeName) {
this.nodeId = nodeId;
this.nodeName = nodeName;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);
out.writeString(nodeName);
}

public String getNodeId() {
return nodeId;
}

public String getNodeName() {
return nodeName;
}

private static final ParseField NODE_ID_PARSE_FIELD = new ParseField("node_id");
private static final ParseField NODE_NAME_PARSE_FIELD = new ParseField("node_name");

private static String nodeId(Object[] nodeIdAndName) {
return (String) nodeIdAndName[0];
}

private static String nodeName(Object[] nodeIdAndName) {
return (String) nodeIdAndName[1];
}

private static final ConstructingObjectParser<VotingTombstone, Void> PARSER = new ConstructingObjectParser<>(
"voting_tombstone",
nodeIdAndName -> new VotingTombstone(nodeId(nodeIdAndName), nodeName(nodeIdAndName))
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_PARSE_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_NAME_PARSE_FIELD);
}

public static VotingTombstone fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field(NODE_ID_PARSE_FIELD.getPreferredName(), nodeId)
.field(NODE_NAME_PARSE_FIELD.getPreferredName(), nodeName)
.endObject();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
VotingTombstone that = (VotingTombstone) o;
return Objects.equals(nodeId, that.nodeId) &&
Objects.equals(nodeName, that.nodeName);
}

@Override
public int hashCode() {
return Objects.hash(nodeId, nodeName);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
if (nodeName.length() > 0) {
sb.append('{').append(nodeName).append('}');
}
sb.append('{').append(nodeId).append('}');
return sb.toString();
}

}

/**
* A collection of persistent node ids, denoting the voting configuration for cluster state changes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingTombstone;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -654,7 +655,7 @@ ClusterState improveConfiguration(ClusterState clusterState) {
final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
.filter(this::hasJoinVoteFrom).collect(Collectors.toSet());
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
clusterState.getVotingTombstones().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()),
clusterState.getVotingTombstones().stream().map(VotingTombstone::getNodeId).collect(Collectors.toSet()),
clusterState.getLastAcceptedConfiguration());
if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);
Expand Down
Loading