Skip to content

Commit 2031ee0

Browse files
authored
Prevent TransportReplicationAction to route request based on stale local routing table (#19296)
When relocating a primary shard, there is a cluster state update at the end of relocation where the active primary is switched from the relocation source to the relocation target. If relocation source receives and processes this cluster state before the relocation target, there is a time span where relocation source believes active primary to be on relocation target and relocation target believes active primary to be on relocation source. This results in index/delete/flush requests being sent back and forth and can end in an OOM on both nodes. Backport of #16274 to 2.4.0.
1 parent 184dd70 commit 2031ee0

File tree

3 files changed

+88
-0
lines changed

3 files changed

+88
-0
lines changed

core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

+23
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.support.replication;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionRequest;
2324
import org.elasticsearch.action.ActionRequestValidationException;
2425
import org.elasticsearch.action.IndicesRequest;
@@ -58,6 +59,8 @@ public class ReplicationRequest<T extends ReplicationRequest<T>> extends ChildTa
5859
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
5960
private volatile boolean canHaveDuplicates = false;
6061

62+
private long routedBasedOnClusterVersion = 0;
63+
6164
public ReplicationRequest() {
6265

6366
}
@@ -170,6 +173,20 @@ public final T consistencyLevel(WriteConsistencyLevel consistencyLevel) {
170173
return (T) this;
171174
}
172175

176+
/**
177+
* Sets the minimum version of the cluster state that is required on the next node before we redirect to another primary.
178+
* Used to prevent redirect loops, see also {@link TransportReplicationAction.ReroutePhase#doRun()}
179+
*/
180+
@SuppressWarnings("unchecked")
181+
T routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
182+
this.routedBasedOnClusterVersion = routedBasedOnClusterVersion;
183+
return (T) this;
184+
}
185+
186+
long routedBasedOnClusterVersion() {
187+
return routedBasedOnClusterVersion;
188+
}
189+
173190
@Override
174191
public ActionRequestValidationException validate() {
175192
ActionRequestValidationException validationException = null;
@@ -192,6 +209,9 @@ public void readFrom(StreamInput in) throws IOException {
192209
index = in.readString();
193210
canHaveDuplicates = in.readBoolean();
194211
// no need to serialize threaded* parameters, since they only matter locally
212+
if (in.getVersion().onOrAfter(Version.V_2_4_0)) {
213+
routedBasedOnClusterVersion = in.readVLong();
214+
}
195215
}
196216

197217
@Override
@@ -207,6 +227,9 @@ public void writeTo(StreamOutput out) throws IOException {
207227
timeout.writeTo(out);
208228
out.writeString(index);
209229
out.writeBoolean(canHaveDuplicates);
230+
if (out.getVersion().onOrAfter(Version.V_2_4_0)) {
231+
out.writeVLong(routedBasedOnClusterVersion);
232+
}
210233
}
211234

212235
@Override

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+9
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,15 @@ protected void doRun() {
479479
}
480480
performAction(node, transportPrimaryAction, true);
481481
} else {
482+
if (state.version() < request.routedBasedOnClusterVersion()) {
483+
logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...", request.shardId(), request, state.version(), request.routedBasedOnClusterVersion());
484+
retryBecauseUnavailable(request.shardId(), "failed to find primary as current cluster state with version [" + state.version() + "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]");
485+
return;
486+
} else {
487+
// chasing the node with the active primary for a second hop requires that we are at least up-to-date with the current cluster state version
488+
// this prevents redirect loops between two nodes when a primary was relocated and the relocation target is not aware that it is the active primary shard already.
489+
request.routedBasedOnClusterVersion(state.version());
490+
}
482491
if (logger.isTraceEnabled()) {
483492
logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}]", actionName, request.shardId(), request, state.version(), primary.currentNodeId());
484493
}

core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

+56
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.cluster.metadata.IndexMetaData;
3939
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
4040
import org.elasticsearch.cluster.metadata.MetaData;
41+
import org.elasticsearch.cluster.node.DiscoveryNodes;
4142
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
4243
import org.elasticsearch.cluster.routing.ShardIterator;
4344
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -56,6 +57,7 @@
5657
import org.elasticsearch.index.shard.ShardId;
5758
import org.elasticsearch.index.shard.ShardNotFoundException;
5859
import org.elasticsearch.rest.RestStatus;
60+
import org.elasticsearch.test.ESAllocationTestCase;
5961
import org.elasticsearch.test.ESTestCase;
6062
import org.elasticsearch.test.cluster.TestClusterService;
6163
import org.elasticsearch.test.transport.CapturingTransport;
@@ -72,6 +74,7 @@
7274

7375
import java.io.IOException;
7476
import java.util.ArrayList;
77+
import java.util.Arrays;
7578
import java.util.HashMap;
7679
import java.util.HashSet;
7780
import java.util.List;
@@ -217,6 +220,59 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
217220
assertIndexShardCounter(1);
218221
}
219222

223+
/**
224+
* When relocating a primary shard, there is a cluster state update at the end of relocation where the active primary is switched from
225+
* the relocation source to the relocation target. If relocation source receives and processes this cluster state
226+
* before the relocation target, there is a time span where relocation source believes active primary to be on
227+
* relocation target and relocation target believes active primary to be on relocation source. This results in replication
228+
* requests being sent back and forth.
229+
*
230+
* This test checks that replication request is not routed back from relocation target to relocation source in case of
231+
* stale index routing table on relocation target.
232+
*/
233+
@Test
234+
public void testNoRerouteOnStaleClusterState() throws InterruptedException, ExecutionException {
235+
final String index = "test";
236+
final ShardId shardId = new ShardId(index, 0);
237+
ClusterState state = state(index, true, ShardRoutingState.RELOCATING);
238+
IndexShardRoutingTable shardRoutingTable = state.getRoutingTable().shardRoutingTable(shardId.getIndex(), shardId.id());
239+
String relocationTargetNode = shardRoutingTable.primaryShard().relocatingNodeId();
240+
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(relocationTargetNode)).build();
241+
clusterService.setState(state);
242+
logger.debug("--> relocation ongoing state:\n{}", clusterService.state().prettyPrint());
243+
244+
Request request = new Request(shardId).timeout("1ms").routedBasedOnClusterVersion(clusterService.state().version() + 1);
245+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
246+
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
247+
reroutePhase.run();
248+
assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class);
249+
250+
request = new Request(shardId).routedBasedOnClusterVersion(clusterService.state().version() + 1);
251+
listener = new PlainActionFuture<>();
252+
reroutePhase = action.new ReroutePhase(null, request, listener);
253+
reroutePhase.run();
254+
assertFalse("cluster state too old didn't cause a retry", listener.isDone());
255+
256+
// finish relocation
257+
shardRoutingTable = clusterService.state().getRoutingTable().shardRoutingTable(shardId.getIndex(), shardId.id());
258+
ShardRouting relocationTarget = shardRoutingTable.shardsWithState(ShardRoutingState.INITIALIZING).get(0);
259+
AllocationService allocationService = ESAllocationTestCase.createAllocationService();
260+
RoutingAllocation.Result result = allocationService.applyStartedShards(state, Arrays.asList(relocationTarget));
261+
ClusterState updatedState = ClusterState.builder(clusterService.state()).routingResult(result).build();
262+
263+
clusterService.setState(updatedState);
264+
logger.debug("--> relocation complete state:\n{}", clusterService.state().prettyPrint());
265+
266+
shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
267+
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
268+
final List<CapturingTransport.CapturedRequest> capturedRequests =
269+
transport.capturedRequestsByTargetNode().get(primaryNodeId);
270+
assertThat(capturedRequests, notNullValue());
271+
assertThat(capturedRequests.size(), equalTo(1));
272+
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
273+
assertIndexShardCounter(1);
274+
}
275+
220276
@Test
221277
public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
222278
final String index = "test";

0 commit comments

Comments
 (0)