Skip to content

Commit f2778de

Browse files
committed
Watcher: Reload properly on remote shard change (#33167)
When a node dies that carries a watcher shard or a shard is relocated to another node, then watcher needs not only trigger a reload on the node where the shard relocation happened, but also on other nodes where copies of this shard, as different watches may need to be loaded. This commit takes the change of remote nodes into account by not only storing the local shard allocation ids in the WatcherLifeCycleService, but storing a list of ShardRoutings based on the local active shards. This also fixes some tests, which had a wrong assumption. Using `TestShardRouting.newShardRouting` in our tests for cluster state creation led to the issue of always creating new allocation ids which implicitely lead to a reload.
1 parent 510ebb2 commit f2778de

File tree

2 files changed

+97
-20
lines changed

2 files changed

+97
-20
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1313
import org.elasticsearch.cluster.metadata.IndexMetaData;
1414
import org.elasticsearch.cluster.node.DiscoveryNode;
15-
import org.elasticsearch.cluster.routing.AllocationId;
1615
import org.elasticsearch.cluster.routing.RoutingNode;
1716
import org.elasticsearch.cluster.routing.ShardRouting;
1817
import org.elasticsearch.cluster.service.ClusterService;
@@ -23,13 +22,16 @@
2322
import org.elasticsearch.common.settings.Setting.Property;
2423
import org.elasticsearch.common.settings.Settings;
2524
import org.elasticsearch.gateway.GatewayService;
25+
import org.elasticsearch.index.shard.ShardId;
2626
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
2727
import org.elasticsearch.xpack.core.watcher.WatcherState;
2828
import org.elasticsearch.xpack.core.watcher.watch.Watch;
2929
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
3030

3131
import java.util.Collections;
32+
import java.util.Comparator;
3233
import java.util.List;
34+
import java.util.Set;
3335
import java.util.concurrent.atomic.AtomicReference;
3436
import java.util.stream.Collectors;
3537

@@ -46,7 +48,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
4648
Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope);
4749

4850
private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED);
49-
private final AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
51+
private final AtomicReference<List<ShardRouting>> previousShardRoutings = new AtomicReference<>(Collections.emptyList());
5052
private final boolean requireManualStart;
5153
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
5254
private volatile WatcherService watcherService;
@@ -147,15 +149,20 @@ public void clusterChanged(ClusterChangedEvent event) {
147149
return;
148150
}
149151

150-
List<String> currentAllocationIds = localShards.stream()
151-
.map(ShardRouting::allocationId)
152-
.map(AllocationId::getId)
153-
.sorted()
152+
// also check if non local shards have changed, as loosing a shard on a
153+
// remote node or adding a replica on a remote node needs to trigger a reload too
154+
Set<ShardId> localShardIds = localShards.stream().map(ShardRouting::shardId).collect(Collectors.toSet());
155+
List<ShardRouting> allShards = event.state().routingTable().index(watchIndex).shardsWithState(STARTED);
156+
allShards.addAll(event.state().routingTable().index(watchIndex).shardsWithState(RELOCATING));
157+
List<ShardRouting> localAffectedShardRoutings = allShards.stream()
158+
.filter(shardRouting -> localShardIds.contains(shardRouting.shardId()))
159+
// shardrouting is not comparable, so we need some order mechanism
160+
.sorted(Comparator.comparing(ShardRouting::hashCode))
154161
.collect(Collectors.toList());
155162

156-
if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
163+
if (previousShardRoutings.get().equals(localAffectedShardRoutings) == false) {
157164
if (watcherService.validate(event.state())) {
158-
previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds));
165+
previousShardRoutings.set(localAffectedShardRoutings);
159166
if (state.get() == WatcherState.STARTED) {
160167
watcherService.reload(event.state(), "new local watcher shard allocation ids");
161168
} else if (state.get() == WatcherState.STOPPED) {
@@ -219,13 +226,13 @@ private boolean isWatcherStoppedManually(ClusterState state) {
219226
* @return true, if existing allocation ids were cleaned out, false otherwise
220227
*/
221228
private boolean clearAllocationIds() {
222-
List<String> previousIds = previousAllocationIds.getAndSet(Collections.emptyList());
229+
List<ShardRouting> previousIds = previousShardRoutings.getAndSet(Collections.emptyList());
223230
return previousIds.equals(Collections.emptyList()) == false;
224231
}
225232

226233
// for testing purposes only
227-
List<String> allocationIds() {
228-
return previousAllocationIds.get();
234+
List<ShardRouting> shardRoutings() {
235+
return previousShardRoutings.get();
229236
}
230237

231238
public WatcherState getState() {

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,12 @@ public void testReplicaWasAddedOrRemoved() {
255255
.add(newNode("node_2"))
256256
.build();
257257

258+
ShardRouting firstShardOnSecondNode = TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED);
259+
ShardRouting secondShardOnFirstNode = TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED);
260+
258261
IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
259-
.addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED))
260-
.addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED))
262+
.addShard(secondShardOnFirstNode)
263+
.addShard(firstShardOnSecondNode)
261264
.build();
262265

263266
IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX)
@@ -274,10 +277,19 @@ public void testReplicaWasAddedOrRemoved() {
274277
.metaData(MetaData.builder().put(indexMetaData, false))
275278
.build();
276279

280+
// add a replica in the local node
281+
boolean addShardOnLocalNode = randomBoolean();
282+
final ShardRouting addedShardRouting;
283+
if (addShardOnLocalNode) {
284+
addedShardRouting = TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED);
285+
} else {
286+
addedShardRouting = TestShardRouting.newShardRouting(secondShardId, "node_2", false, STARTED);
287+
}
288+
277289
IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
278-
.addShard(TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED))
279-
.addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED))
280-
.addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED))
290+
.addShard(secondShardOnFirstNode)
291+
.addShard(firstShardOnSecondNode)
292+
.addShard(addedShardRouting)
281293
.build();
282294

283295
ClusterState stateWithReplicaAdded = ClusterState.builder(new ClusterName("my-cluster"))
@@ -563,7 +575,67 @@ public void testDataNodeWithoutDataCanStart() {
563575
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
564576
}
565577

566-
private ClusterState startWatcher() {
578+
// this emulates a node outage somewhere in the cluster that carried a watcher shard
579+
// the number of shards remains the same, but we need to ensure that watcher properly reloads
580+
// previously we only checked the local shard allocations, but we also need to check if shards in the cluster have changed
581+
public void testWatcherReloadsOnNodeOutageWithWatcherShard() {
582+
Index watchIndex = new Index(Watch.INDEX, "foo");
583+
ShardId shardId = new ShardId(watchIndex, 0);
584+
String localNodeId = randomFrom("node_1", "node_2");
585+
String outageNodeId = localNodeId.equals("node_1") ? "node_2" : "node_1";
586+
DiscoveryNodes previousDiscoveryNodes = new DiscoveryNodes.Builder().masterNodeId(localNodeId).localNodeId(localNodeId)
587+
.add(newNode(localNodeId))
588+
.add(newNode(outageNodeId))
589+
.build();
590+
591+
ShardRouting replicaShardRouting = TestShardRouting.newShardRouting(shardId, localNodeId, false, STARTED);
592+
ShardRouting primartShardRouting = TestShardRouting.newShardRouting(shardId, outageNodeId, true, STARTED);
593+
IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
594+
.addShard(replicaShardRouting)
595+
.addShard(primartShardRouting)
596+
.build();
597+
598+
IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX)
599+
.settings(Settings.builder()
600+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
601+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
602+
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
603+
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)
604+
).build();
605+
606+
ClusterState previousState = ClusterState.builder(new ClusterName("my-cluster"))
607+
.nodes(previousDiscoveryNodes)
608+
.routingTable(RoutingTable.builder().add(previousWatchRoutingTable).build())
609+
.metaData(MetaData.builder().put(indexMetaData, false))
610+
.build();
611+
612+
ShardRouting nowPrimaryShardRouting = replicaShardRouting.moveActiveReplicaToPrimary();
613+
IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
614+
.addShard(nowPrimaryShardRouting)
615+
.build();
616+
617+
DiscoveryNodes currentDiscoveryNodes = new DiscoveryNodes.Builder().masterNodeId(localNodeId).localNodeId(localNodeId)
618+
.add(newNode(localNodeId))
619+
.build();
620+
621+
ClusterState currentState = ClusterState.builder(new ClusterName("my-cluster"))
622+
.nodes(currentDiscoveryNodes)
623+
.routingTable(RoutingTable.builder().add(currentWatchRoutingTable).build())
624+
.metaData(MetaData.builder().put(indexMetaData, false))
625+
.build();
626+
627+
// initialize the previous state, so all the allocation ids are loaded
628+
when(watcherService.validate(anyObject())).thenReturn(true);
629+
lifeCycleService.clusterChanged(new ClusterChangedEvent("whatever", previousState, currentState));
630+
631+
reset(watcherService);
632+
when(watcherService.validate(anyObject())).thenReturn(true);
633+
ClusterChangedEvent event = new ClusterChangedEvent("whatever", currentState, previousState);
634+
lifeCycleService.clusterChanged(event);
635+
verify(watcherService).reload(eq(event.state()), anyString());
636+
}
637+
638+
private void startWatcher() {
567639
Index index = new Index(Watch.INDEX, "uuid");
568640
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
569641
indexRoutingTableBuilder.addShard(
@@ -593,12 +665,10 @@ private ClusterState startWatcher() {
593665
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
594666
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
595667
verify(watcherService, times(1)).reload(eq(state), anyString());
596-
assertThat(lifeCycleService.allocationIds(), hasSize(1));
668+
assertThat(lifeCycleService.shardRoutings(), hasSize(1));
597669

598670
// reset the mock, the user has to mock everything themselves again
599671
reset(watcherService);
600-
601-
return state;
602672
}
603673

604674
private List<String> randomIndexPatterns() {

0 commit comments

Comments
 (0)