Skip to content

Zen2: Add basic Zen1 transport-level BWC #35443

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -1110,7 +1111,18 @@ protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publ
@Override
protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit,
ActionListener<Empty> responseActionListener) {
publicationHandler.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener));
publicationContext.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener));
}
}

// TODO: only here temporarily for BWC development, remove once complete
public static Settings.Builder addZen1Attribute(Settings.Builder builder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps insert:

// only here temporarily for BWC development, TODO remove once complete

Assuming we are going to remove this before GA?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 9b12b05

return builder.put("node.attr.zen1", true);
}

// TODO: only here temporarily for BWC development, remove once complete
public static boolean isZen1Node(DiscoveryNode discoveryNode) {
return discoveryNode.getVersion().before(Version.V_7_0_0) ||
discoveryNode.getAttributes().containsKey("zen1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -30,6 +32,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.zen.NodesFaultDetection;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -77,6 +80,8 @@ public class FollowersChecker extends AbstractComponent {
public static final Setting<Integer> FOLLOWER_CHECK_RETRY_COUNT_SETTING =
Setting.intSetting("cluster.fault_detection.follower_check.retry_count", 3, 1, Setting.Property.NodeScope);

private final Settings settings;

private final TimeValue followerCheckInterval;
private final TimeValue followerCheckTimeout;
private final int followerCheckRetryCount;
Expand All @@ -94,6 +99,7 @@ public class FollowersChecker extends AbstractComponent {
public FollowersChecker(Settings settings, TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure) {
this.settings = settings;
this.transportService = transportService;
this.handleRequestAndUpdateState = handleRequestAndUpdateState;
this.onNodeFailure = onNodeFailure;
Expand All @@ -103,8 +109,12 @@ public FollowersChecker(Settings settings, TransportService transportService,
followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings);

updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, FollowerCheckRequest::new,
transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, false, false, FollowerCheckRequest::new,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell this change is to set canTripCircuitBreaker to false. Do we definitely want to count a follower as healthy if a ping would trip its circuit breaker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. I wonder if it's something we should discuss with a larger group, but go with Zen1 defaults for now.

(request, transportChannel, task) -> handleFollowerCheck(request, transportChannel));
transportService.registerRequestHandler(
NodesFaultDetection.PING_ACTION_NAME, NodesFaultDetection.PingRequest::new, Names.SAME, false, false,
(request, channel, task) -> // TODO: check that we're a follower of the requesting node?
channel.sendResponse(new NodesFaultDetection.PingResponse()));
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
Expand Down Expand Up @@ -290,7 +300,18 @@ private void handleWakeUp() {

final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term, transportService.getLocalNode());
logger.trace("handleWakeUp: checking {} with {}", discoveryNode, request);
transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request,

final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(discoveryNode)) {
actionName = NodesFaultDetection.PING_ACTION_NAME;
transportRequest = new NodesFaultDetection.PingRequest(discoveryNode, ClusterName.CLUSTER_NAME_SETTING.get(settings),
transportService.getLocalNode(), ClusterState.UNKNOWN_VERSION);
} else {
actionName = FOLLOWER_CHECK_ACTION_NAME;
transportRequest = request;
}
transportService.sendRequest(discoveryNode, actionName, transportRequest,
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
new TransportResponseHandler<Empty>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.zen.MembershipAction;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -98,32 +101,12 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
};

transportService.registerRequestHandler(JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, JoinRequest::new,
(request, channel, task) -> joinHandler.accept(request, new JoinCallback() {
(request, channel, task) -> joinHandler.accept(request, transportJoinCallback(request, channel)));

@Override
public void onSuccess() {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (IOException e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("failed to send back failure on join request", inner);
}
}

@Override
public String toString() {
return "JoinCallback{request=" + request + "}";
}
}));
transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_ACTION_NAME, MembershipAction.JoinRequest::new,
ThreadPool.Names.GENERIC, false, false,
(request, channel, task) -> joinHandler.accept(new JoinRequest(request.node, Optional.empty()), // treat as non-voting join
transportJoinCallback(request, channel)));

transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false,
StartJoinRequest::new,
Expand All @@ -132,14 +115,64 @@ public String toString() {
sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request)));
channel.sendResponse(Empty.INSTANCE);
});

transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new MembershipAction.ValidateJoinRequest(), ThreadPool.Names.GENERIC,
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: implement join validation

transportService.registerRequestHandler(
ZenDiscovery.DISCOVERY_REJOIN_ACTION_NAME, ZenDiscovery.RejoinClusterRequest::new, ThreadPool.Names.SAME,
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here?

transportService.registerRequestHandler(
MembershipAction.DISCOVERY_LEAVE_ACTION_NAME, MembershipAction.LeaveRequest::new, ThreadPool.Names.SAME,
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here?
}

private JoinCallback transportJoinCallback(TransportRequest request, TransportChannel channel) {
return new JoinCallback() {

@Override
public void onSuccess() {
try {
channel.sendResponse(Empty.INSTANCE);
} catch (IOException e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("failed to send back failure on join request", inner);
}
}

@Override
public String toString() {
return "JoinCallback{request=" + request + "}";
}
};
}

public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {
logger.debug("attempting to join {} with {}", destination, joinRequest);
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest,
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(destination)) {
actionName = MembershipAction.DISCOVERY_JOIN_ACTION_NAME;
transportRequest = new MembershipAction.JoinRequest(transportService.getLocalNode());
} else {
actionName = JOIN_ACTION_NAME;
transportRequest = joinRequest;
}
transportService.sendRequest(destination, actionName, transportRequest,
TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
new TransportResponseHandler<Empty>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
Expand All @@ -30,10 +31,9 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.discovery.zen.MasterFaultDetection;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
Expand Down Expand Up @@ -74,6 +74,8 @@ public class LeaderChecker extends AbstractComponent {
public static final Setting<Integer> LEADER_CHECK_RETRY_COUNT_SETTING =
Setting.intSetting("cluster.fault_detection.leader_check.retry_count", 3, 1, Setting.Property.NodeScope);

private final Settings settings;

private final TimeValue leaderCheckInterval;
private final TimeValue leaderCheckTimeout;
private final int leaderCheckRetryCount;
Expand All @@ -85,13 +87,29 @@ public class LeaderChecker extends AbstractComponent {
private volatile DiscoveryNodes discoveryNodes;

public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings);
leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings);
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;

transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, LeaderCheckRequest::new, this::handleLeaderCheck);
transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, false, false, LeaderCheckRequest::new,
(request, channel, task) -> {
handleLeaderCheck(request);
channel.sendResponse(Empty.INSTANCE);
});

transportService.registerRequestHandler(MasterFaultDetection.MASTER_PING_ACTION_NAME, MasterFaultDetection.MasterPingRequest::new,
Names.SAME, false, false, (request, channel, task) -> {
try {
handleLeaderCheck(new LeaderCheckRequest(request.sourceNode));
} catch (CoordinationStateRejectedException e) {
throw new MasterFaultDetection.ThisIsNotTheMasterYouAreLookingForException(e.getMessage());
}
channel.sendResponse(new MasterFaultDetection.MasterPingResponseResponse());
});

transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
Expand Down Expand Up @@ -145,19 +163,18 @@ boolean currentNodeIsMaster() {
return discoveryNodes.isLocalNodeElectedMaster();
}

private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel transportChannel, Task task) throws IOException {
private void handleLeaderCheck(LeaderCheckRequest request) {
final DiscoveryNodes discoveryNodes = this.discoveryNodes;
assert discoveryNodes != null;

if (discoveryNodes.isLocalNodeElectedMaster() == false) {
logger.debug("non-master handling {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("non-leader rejecting leader check"));
throw new CoordinationStateRejectedException("non-leader rejecting leader check");
} else if (discoveryNodes.nodeExists(request.getSender()) == false) {
logger.debug("leader check from unknown node: {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("leader check from unknown node"));
throw new CoordinationStateRejectedException("leader check from unknown node");
} else {
logger.trace("handling {}", request);
transportChannel.sendResponse(Empty.INSTANCE);
}
}

Expand Down Expand Up @@ -197,11 +214,21 @@ void handleWakeUp() {

logger.trace("checking {} with [{}] = {}", leader, LEADER_CHECK_TIMEOUT_SETTING.getKey(), leaderCheckTimeout);

final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(leader)) {
actionName = MasterFaultDetection.MASTER_PING_ACTION_NAME;
transportRequest = new MasterFaultDetection.MasterPingRequest(
transportService.getLocalNode(), leader, ClusterName.CLUSTER_NAME_SETTING.get(settings));
} else {
actionName = LEADER_CHECK_ACTION_NAME;
transportRequest = new LeaderCheckRequest(transportService.getLocalNode());
}
// TODO lag detection:
// In the PoC, the leader sent its current version to the follower in the response to a LeaderCheck, so the follower
// could detect if it was lagging. We'd prefer this to be implemented on the leader, so the response is just
// TransportResponse.Empty here.
transportService.sendRequest(leader, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(transportService.getLocalNode()),
transportService.sendRequest(leader, actionName, transportRequest,
TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(),

new TransportResponseHandler<TransportResponse.Empty>() {
Expand Down
Loading