Skip to content

Commit 872a16c

Browse files
authored
[7.7] Watcher dont add watches post index if stopped (#56556) (#56626)
Watcher adds watches to the trigger service on the postIndex action for the .watches index. This has the (intentional) side effect of also adding the watches to the stats. The tests rely on these stats for their assertions. The tests also start and stop Watcher between each test for a clean slate. When Watcher executes it updates the .watches index and upon this update it will go through the postIndex method and end up added that watch to the trigger service (and stats). Functionally this is not a problem, if Watcher is stopping or stopped since Watcher is also paused and will not execute the watch. However, with specific timing and expectations of a clean slate can cause issues the test assertions against the stats. This commit ensures that the postIndex action only adds to the trigger service if the Watcher state is not stopping or stopped. When started back up it will re-read index .watches. This commit also un-mutes the tests related to #53177 and #56534
1 parent 63bcd67 commit 872a16c

File tree

7 files changed

+46
-15
lines changed

7 files changed

+46
-15
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
423423
final WatcherLifeCycleService watcherLifeCycleService =
424424
new WatcherLifeCycleService(clusterService, watcherService);
425425

426-
listener = new WatcherIndexingListener(watchParser, getClock(), triggerService);
426+
listener = new WatcherIndexingListener(watchParser, getClock(), triggerService, watcherLifeCycleService.getState());
427427
clusterService.addListener(listener);
428428

429429
return Arrays.asList(registry, inputRegistry, historyStore, triggerService, triggeredWatchParser,

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.index.engine.Engine;
2626
import org.elasticsearch.index.shard.IndexingOperationListener;
2727
import org.elasticsearch.index.shard.ShardId;
28+
import org.elasticsearch.xpack.core.watcher.WatcherState;
2829
import org.elasticsearch.xpack.core.watcher.watch.Watch;
2930
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
3031
import org.elasticsearch.xpack.watcher.watch.WatchParser;
@@ -38,11 +39,13 @@
3839
import java.util.ArrayList;
3940
import java.util.Collection;
4041
import java.util.Collections;
42+
import java.util.EnumSet;
4143
import java.util.HashMap;
4244
import java.util.HashSet;
4345
import java.util.List;
4446
import java.util.Map;
4547
import java.util.Set;
48+
import java.util.function.Supplier;
4649
import java.util.stream.Collectors;
4750

4851
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@@ -66,12 +69,14 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
6669
private final WatchParser parser;
6770
private final Clock clock;
6871
private final TriggerService triggerService;
72+
private final Supplier<WatcherState> watcherState;
6973
private volatile Configuration configuration = INACTIVE;
7074

71-
WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService) {
75+
WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService, Supplier<WatcherState> watcherState) {
7276
this.parser = parser;
7377
this.clock = clock;
7478
this.triggerService = triggerService;
79+
this.watcherState = watcherState;
7580
}
7681

7782
// package private for testing
@@ -119,16 +124,17 @@ public void postIndex(ShardId shardId, Engine.Index operation, Engine.IndexResul
119124
}
120125

121126
boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
122-
if (shouldBeTriggered) {
123-
if (watch.status().state().isActive()) {
127+
WatcherState currentState = watcherState.get();
128+
if (shouldBeTriggered && EnumSet.of(WatcherState.STOPPING, WatcherState.STOPPED).contains(currentState) == false) {
129+
if (watch.status().state().isActive() ) {
124130
logger.debug("adding watch [{}] to trigger service", watch.id());
125131
triggerService.add(watch);
126132
} else {
127133
logger.debug("removing watch [{}] to trigger service", watch.id());
128134
triggerService.remove(watch.id());
129135
}
130136
} else {
131-
logger.debug("watch [{}] should not be triggered", watch.id());
137+
logger.debug("watch [{}] should not be triggered. watcher state [{}]", watch.id(), currentState);
132138
}
133139
} catch (IOException e) {
134140
throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id());

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Set;
3333
import java.util.concurrent.atomic.AtomicReference;
34+
import java.util.function.Supplier;
3435
import java.util.stream.Collectors;
3536

3637
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@@ -203,7 +204,7 @@ List<ShardRouting> shardRoutings() {
203204
return previousShardRoutings.get();
204205
}
205206

206-
public WatcherState getState() {
207-
return state.get();
207+
public Supplier<WatcherState> getState(){
208+
return () -> state.get();
208209
}
209210
}

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ protected WatcherStatsResponse.Node newNodeResponse(StreamInput in) throws IOExc
6666
@Override
6767
protected WatcherStatsResponse.Node nodeOperation(WatcherStatsRequest.Node request) {
6868
WatcherStatsResponse.Node statsResponse = new WatcherStatsResponse.Node(clusterService.localNode());
69-
statsResponse.setWatcherState(lifeCycleService.getState());
69+
statsResponse.setWatcherState(lifeCycleService.getState().get());
7070
statsResponse.setThreadPoolQueueSize(executionService.executionThreadPoolQueueSize());
7171
statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize());
7272
if (request.includeCurrentWatches()) {

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.index.engine.Engine;
3434
import org.elasticsearch.index.shard.ShardId;
3535
import org.elasticsearch.test.ESTestCase;
36+
import org.elasticsearch.xpack.core.watcher.WatcherState;
3637
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
3738
import org.elasticsearch.xpack.core.watcher.watch.Watch;
3839
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
@@ -89,7 +90,7 @@ public class WatcherIndexingListenerTests extends ESTestCase {
8990
@Before
9091
public void setup() throws Exception {
9192
clock.freeze();
92-
listener = new WatcherIndexingListener(parser, clock, triggerService);
93+
listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STARTED);
9394

9495
Map<ShardId, ShardAllocationConfiguration> map = new HashMap<>();
9596
map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo")));
@@ -140,6 +141,29 @@ public void testPostIndex() throws Exception {
140141
}
141142
}
142143

144+
public void testPostIndexWhenStopped() throws Exception {
145+
listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STOPPED);
146+
Map<ShardId, ShardAllocationConfiguration> map = new HashMap<>();
147+
map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo")));
148+
listener.setConfiguration(new Configuration(Watch.INDEX, map));
149+
when(operation.id()).thenReturn(randomAlphaOfLength(10));
150+
when(operation.source()).thenReturn(BytesArray.EMPTY);
151+
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
152+
List<Engine.Result.Type> types = new ArrayList<>(Arrays.asList(Engine.Result.Type.values()));
153+
types.remove(Engine.Result.Type.FAILURE);
154+
when(result.getResultType()).thenReturn(randomFrom(types));
155+
156+
boolean watchActive = randomBoolean();
157+
boolean isNewWatch = randomBoolean();
158+
Watch watch = mockWatch("_id", watchActive, isNewWatch);
159+
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);
160+
161+
listener.postIndex(shardId, operation, result);
162+
ZonedDateTime now = DateUtils.nowWithMillisResolution(clock);
163+
verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong());
164+
verifyZeroInteractions(triggerService);
165+
}
166+
143167
// this test emulates an index with 10 shards, and ensures that triggering only happens on a
144168
// single shard
145169
public void testPostIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,9 @@ public void testManualStartStop() {
179179
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
180180
verify(watcherService, times(1))
181181
.stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture());
182-
assertEquals(WatcherState.STOPPING, lifeCycleService.getState());
182+
assertEquals(WatcherState.STOPPING, lifeCycleService.getState().get());
183183
captor.getValue().run();
184-
assertEquals(WatcherState.STOPPED, lifeCycleService.getState());
184+
assertEquals(WatcherState.STOPPED, lifeCycleService.getState().get());
185185

186186
// Starting via cluster state update, as the watcher metadata block is removed/set to true
187187
reset(watcherService);
@@ -480,7 +480,7 @@ public void testMasterOnlyNodeCanStart() {
480480
new HashSet<>(roles), Version.CURRENT))).build();
481481

482482
lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
483-
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
483+
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
484484
}
485485

486486
public void testDataNodeWithoutDataCanStart() {
@@ -494,7 +494,7 @@ public void testDataNodeWithoutDataCanStart() {
494494
.build();
495495

496496
lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
497-
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
497+
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
498498
}
499499

500500
// this emulates a node outage somewhere in the cluster that carried a watcher shard
@@ -584,7 +584,7 @@ private void startWatcher() {
584584
when(watcherService.validate(state)).thenReturn(true);
585585

586586
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
587-
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
587+
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
588588
verify(watcherService, times(1)).reload(eq(state), anyString());
589589
assertThat(lifeCycleService.shardRoutings(), hasSize(1));
590590

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void setupTransportAction() {
5858
when(clusterService.state()).thenReturn(clusterState);
5959

6060
WatcherLifeCycleService watcherLifeCycleService = mock(WatcherLifeCycleService.class);
61-
when(watcherLifeCycleService.getState()).thenReturn(WatcherState.STARTED);
61+
when(watcherLifeCycleService.getState()).thenReturn(() -> WatcherState.STARTED);
6262

6363
ExecutionService executionService = mock(ExecutionService.class);
6464
when(executionService.executionThreadPoolQueueSize()).thenReturn(100L);

0 commit comments

Comments
 (0)