Skip to content

Prevent TransportReplicationAction to route request based on stale local routing table #16274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ

private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;

private long routedBasedOnClusterVersion = 0;

public ReplicationRequest() {

}
Expand Down Expand Up @@ -141,6 +143,20 @@ public final Request consistencyLevel(WriteConsistencyLevel consistencyLevel) {
return (Request) this;
}

/**
* Sets the minimum version of the cluster state that is required on the next node before we redirect to another primary.
* Used to prevent redirect loops, see also {@link TransportReplicationAction.ReroutePhase#doRun()}
*/
@SuppressWarnings("unchecked")
Request routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
this.routedBasedOnClusterVersion = routedBasedOnClusterVersion;
return (Request) this;
}

long routedBasedOnClusterVersion() {
return routedBasedOnClusterVersion;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -161,6 +177,7 @@ public void readFrom(StreamInput in) throws IOException {
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
}

@Override
Expand All @@ -175,6 +192,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeByte(consistencyLevel.id());
timeout.writeTo(out);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,15 @@ protected void doRun() {
}
performAction(node, transportPrimaryAction, true);
} else {
if (state.version() < request.routedBasedOnClusterVersion()) {
logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...", request.shardId(), request, state.version(), request.routedBasedOnClusterVersion());
retryBecauseUnavailable(request.shardId(), "failed to find primary as current cluster state with version [" + state.version() + "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]");
return;
} else {
// chasing the node with the active primary for a second hop requires that we are at least up-to-date with the current cluster state version
// this prevents redirect loops between two nodes when a primary was relocated and the relocation target is not aware that it is the active primary shard already.
request.routedBasedOnClusterVersion(state.version());
}
if (logger.isTraceEnabled()) {
logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}]", actionName, request.shardId(), request, state.version(), primary.currentNodeId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -53,6 +55,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
Expand All @@ -67,6 +70,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -205,6 +209,56 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
assertIndexShardCounter(1);
}

/**
* When relocating a primary shard, there is a cluster state update at the end of relocation where the active primary is switched from
* the relocation source to the relocation target. If relocation source receives and processes this cluster state
* before the relocation target, there is a time span where relocation source believes active primary to be on
* relocation target and relocation target believes active primary to be on relocation source. This results in replication
* requests being sent back and forth.
*
* This test checks that replication request is not routed back from relocation target to relocation source in case of
* stale index routing table on relocation target.
*/
public void testNoRerouteOnStaleClusterState() throws InterruptedException, ExecutionException {
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
ClusterState state = state(index, true, ShardRoutingState.RELOCATING);
String relocationTargetNode = state.getRoutingTable().shardRoutingTable(shardId).primaryShard().relocatingNodeId();
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(relocationTargetNode)).build();
clusterService.setState(state);
logger.debug("--> relocation ongoing state:\n{}", clusterService.state().prettyPrint());

Request request = new Request(shardId).timeout("1ms").routedBasedOnClusterVersion(clusterService.state().version() + 1);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener);
reroutePhase.run();
assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class);

request = new Request(shardId).routedBasedOnClusterVersion(clusterService.state().version() + 1);
listener = new PlainActionFuture<>();
reroutePhase = action.new ReroutePhase(request, listener);
reroutePhase.run();
assertFalse("cluster state too old didn't cause a retry", listener.isDone());

// finish relocation
ShardRouting relocationTarget = clusterService.state().getRoutingTable().shardRoutingTable(shardId).shardsWithState(ShardRoutingState.INITIALIZING).get(0);
AllocationService allocationService = ESAllocationTestCase.createAllocationService();
RoutingAllocation.Result result = allocationService.applyStartedShards(state, Arrays.asList(relocationTarget));
ClusterState updatedState = ClusterState.builder(clusterService.state()).routingResult(result).build();

clusterService.setState(updatedState);
logger.debug("--> relocation complete state:\n{}", clusterService.state().prettyPrint());

IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
final List<CapturingTransport.CapturedRequest> capturedRequests =
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
assertThat(capturedRequests, notNullValue());
assertThat(capturedRequests.size(), equalTo(1));
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
assertIndexShardCounter(1);
}

public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
final String index = "test";
// no replicas in oder to skip the replication part
Expand Down