From af1f6375478658fd1950184be0199128080e8a91 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 20 Jan 2016 16:19:42 +0100 Subject: [PATCH] Prevent TransportReplicationAction to route request based on stale local routing table Closes #16274 Closes #12573 Closes #12574 --- .../replication/ReplicationRequest.java | 18 +++++++ .../TransportReplicationAction.java | 9 ++++ .../TransportReplicationActionTests.java | 54 +++++++++++++++++++ 3 files changed, 81 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index ed23017410eaf..4e6ec3c358473 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -55,6 +55,8 @@ public abstract class ReplicationRequest relocation ongoing state:\n{}", clusterService.state().prettyPrint()); + + Request request = new Request(shardId).timeout("1ms").routedBasedOnClusterVersion(clusterService.state().version() + 1); + PlainActionFuture listener = new PlainActionFuture<>(); + TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener); + reroutePhase.run(); + assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class); + + request = new Request(shardId).routedBasedOnClusterVersion(clusterService.state().version() + 1); + listener = new PlainActionFuture<>(); + reroutePhase = action.new ReroutePhase(request, listener); + reroutePhase.run(); + assertFalse("cluster state too old didn't cause a retry", listener.isDone()); + + // finish relocation + ShardRouting relocationTarget = clusterService.state().getRoutingTable().shardRoutingTable(shardId).shardsWithState(ShardRoutingState.INITIALIZING).get(0); + AllocationService allocationService = ESAllocationTestCase.createAllocationService(); + RoutingAllocation.Result result = allocationService.applyStartedShards(state, Arrays.asList(relocationTarget)); + ClusterState updatedState = ClusterState.builder(clusterService.state()).routingResult(result).build(); + + clusterService.setState(updatedState); + logger.debug("--> relocation complete state:\n{}", clusterService.state().prettyPrint()); + + IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); + final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId(); + final List capturedRequests = + transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId); + assertThat(capturedRequests, notNullValue()); + assertThat(capturedRequests.size(), equalTo(1)); + assertThat(capturedRequests.get(0).action, equalTo("testAction[p]")); + assertIndexShardCounter(1); + } + public void testUnknownIndexOrShardOnReroute() throws InterruptedException { final String index = "test"; // no replicas in oder to skip the replication part