Skip to content

Commit af1f637

Browse files
author
Yannick Welsch
committed
Prevent TransportReplicationAction to route request based on stale local routing table
Closes #16274 Closes #12573 Closes #12574
1 parent 26f77eb commit af1f637

File tree

3 files changed

+81
-0
lines changed

3 files changed

+81
-0
lines changed

core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

+18
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
5555

5656
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
5757

58+
private long routedBasedOnClusterVersion = 0;
59+
5860
public ReplicationRequest() {
5961

6062
}
@@ -141,6 +143,20 @@ public final Request consistencyLevel(WriteConsistencyLevel consistencyLevel) {
141143
return (Request) this;
142144
}
143145

146+
/**
147+
* Sets the minimum version of the cluster state that is required on the next node before we redirect to another primary.
148+
* Used to prevent redirect loops, see also {@link TransportReplicationAction.ReroutePhase#doRun()}
149+
*/
150+
@SuppressWarnings("unchecked")
151+
Request routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
152+
this.routedBasedOnClusterVersion = routedBasedOnClusterVersion;
153+
return (Request) this;
154+
}
155+
156+
long routedBasedOnClusterVersion() {
157+
return routedBasedOnClusterVersion;
158+
}
159+
144160
@Override
145161
public ActionRequestValidationException validate() {
146162
ActionRequestValidationException validationException = null;
@@ -161,6 +177,7 @@ public void readFrom(StreamInput in) throws IOException {
161177
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
162178
timeout = TimeValue.readTimeValue(in);
163179
index = in.readString();
180+
routedBasedOnClusterVersion = in.readVLong();
164181
}
165182

166183
@Override
@@ -175,6 +192,7 @@ public void writeTo(StreamOutput out) throws IOException {
175192
out.writeByte(consistencyLevel.id());
176193
timeout.writeTo(out);
177194
out.writeString(index);
195+
out.writeVLong(routedBasedOnClusterVersion);
178196
}
179197

180198
/**

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+9
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,15 @@ protected void doRun() {
472472
}
473473
performAction(node, transportPrimaryAction, true);
474474
} else {
475+
if (state.version() < request.routedBasedOnClusterVersion()) {
476+
logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...", request.shardId(), request, state.version(), request.routedBasedOnClusterVersion());
477+
retryBecauseUnavailable(request.shardId(), "failed to find primary as current cluster state with version [" + state.version() + "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]");
478+
return;
479+
} else {
480+
// chasing the node with the active primary for a second hop requires that we are at least up-to-date with the current cluster state version
481+
// this prevents redirect loops between two nodes when a primary was relocated and the relocation target is not aware that it is the active primary shard already.
482+
request.routedBasedOnClusterVersion(state.version());
483+
}
475484
if (logger.isTraceEnabled()) {
476485
logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}]", actionName, request.shardId(), request, state.version(), primary.currentNodeId());
477486
}

core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

+54
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import org.elasticsearch.cluster.routing.ShardIterator;
4343
import org.elasticsearch.cluster.routing.ShardRouting;
4444
import org.elasticsearch.cluster.routing.ShardRoutingState;
45+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
46+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
4547
import org.elasticsearch.common.collect.Tuple;
4648
import org.elasticsearch.common.io.stream.StreamInput;
4749
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -53,6 +55,7 @@
5355
import org.elasticsearch.index.shard.ShardId;
5456
import org.elasticsearch.index.shard.ShardNotFoundException;
5557
import org.elasticsearch.rest.RestStatus;
58+
import org.elasticsearch.test.ESAllocationTestCase;
5659
import org.elasticsearch.test.ESTestCase;
5760
import org.elasticsearch.test.cluster.TestClusterService;
5861
import org.elasticsearch.test.transport.CapturingTransport;
@@ -67,6 +70,7 @@
6770

6871
import java.io.IOException;
6972
import java.util.ArrayList;
73+
import java.util.Arrays;
7074
import java.util.HashMap;
7175
import java.util.HashSet;
7276
import java.util.List;
@@ -205,6 +209,56 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
205209
assertIndexShardCounter(1);
206210
}
207211

212+
/**
213+
* When relocating a primary shard, there is a cluster state update at the end of relocation where the active primary is switched from
214+
* the relocation source to the relocation target. If relocation source receives and processes this cluster state
215+
* before the relocation target, there is a time span where relocation source believes active primary to be on
216+
* relocation target and relocation target believes active primary to be on relocation source. This results in replication
217+
* requests being sent back and forth.
218+
*
219+
* This test checks that replication request is not routed back from relocation target to relocation source in case of
220+
* stale index routing table on relocation target.
221+
*/
222+
public void testNoRerouteOnStaleClusterState() throws InterruptedException, ExecutionException {
223+
final String index = "test";
224+
final ShardId shardId = new ShardId(index, 0);
225+
ClusterState state = state(index, true, ShardRoutingState.RELOCATING);
226+
String relocationTargetNode = state.getRoutingTable().shardRoutingTable(shardId).primaryShard().relocatingNodeId();
227+
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(relocationTargetNode)).build();
228+
clusterService.setState(state);
229+
logger.debug("--> relocation ongoing state:\n{}", clusterService.state().prettyPrint());
230+
231+
Request request = new Request(shardId).timeout("1ms").routedBasedOnClusterVersion(clusterService.state().version() + 1);
232+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
233+
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener);
234+
reroutePhase.run();
235+
assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class);
236+
237+
request = new Request(shardId).routedBasedOnClusterVersion(clusterService.state().version() + 1);
238+
listener = new PlainActionFuture<>();
239+
reroutePhase = action.new ReroutePhase(request, listener);
240+
reroutePhase.run();
241+
assertFalse("cluster state too old didn't cause a retry", listener.isDone());
242+
243+
// finish relocation
244+
ShardRouting relocationTarget = clusterService.state().getRoutingTable().shardRoutingTable(shardId).shardsWithState(ShardRoutingState.INITIALIZING).get(0);
245+
AllocationService allocationService = ESAllocationTestCase.createAllocationService();
246+
RoutingAllocation.Result result = allocationService.applyStartedShards(state, Arrays.asList(relocationTarget));
247+
ClusterState updatedState = ClusterState.builder(clusterService.state()).routingResult(result).build();
248+
249+
clusterService.setState(updatedState);
250+
logger.debug("--> relocation complete state:\n{}", clusterService.state().prettyPrint());
251+
252+
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
253+
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
254+
final List<CapturingTransport.CapturedRequest> capturedRequests =
255+
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
256+
assertThat(capturedRequests, notNullValue());
257+
assertThat(capturedRequests.size(), equalTo(1));
258+
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
259+
assertIndexShardCounter(1);
260+
}
261+
208262
public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
209263
final String index = "test";
210264
// no replicas in oder to skip the replication part

0 commit comments

Comments
 (0)