Skip to content

Commit b253af3

Browse files
committed
The watcher indexing listener didn't handle document level exceptions. (#51466)
Prior to the change the watcher index listener didn't implement the `postIndex(ShardId, Engine.Index, Engine.IndexResult)` method. This caused document level exceptions like VersionConflictEngineException to be ignored. This commit fixes this. The watcher indexing listener did implement the `postIndex(ShardId, Engine.Index, Exception)` method, but that only handles engine level exceptions. This change also unmutes the SmokeTestWatcherTestSuiteIT#testMonitorClusterHealth test again. Relates to #32299
1 parent 206c8ac commit b253af3

File tree

3 files changed

+49
-5
lines changed

3 files changed

+49
-5
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,19 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
135135
}
136136

137137
/**
138-
*
139-
* In case of an error, we have to ensure that the triggerservice does not leave anything behind
138+
* In case of a document related failure (for example version conflict), then clean up resources for a watch
139+
* in the trigger service.
140+
*/
141+
@Override
142+
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
143+
if (result.getResultType() == Engine.Result.Type.FAILURE) {
144+
assert result.getFailure() != null;
145+
postIndex(shardId, index, result.getFailure());
146+
}
147+
}
148+
149+
/**
150+
* In case of an engine related error, we have to ensure that the triggerservice does not leave anything behind
140151
*
141152
* TODO: If the configuration changes between preindex and postindex methods and we add a
142153
* watch, that could not be indexed

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.io.IOException;
4646
import java.time.ZonedDateTime;
4747
import java.util.ArrayList;
48+
import java.util.Arrays;
4849
import java.util.BitSet;
4950
import java.util.Collections;
5051
import java.util.HashMap;
@@ -200,15 +201,48 @@ public void testPreIndexCheckParsingException() throws Exception {
200201
assertThat(exc.getMessage(), containsString(id));
201202
}
202203

203-
public void testPostIndexRemoveTriggerOnException() throws Exception {
204+
public void testPostIndexRemoveTriggerOnDocumentRelatedException() throws Exception {
205+
when(operation.id()).thenReturn("_id");
206+
when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE);
207+
when(result.getFailure()).thenReturn(new RuntimeException());
208+
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
209+
210+
listener.postIndex(shardId, operation, result);
211+
verify(triggerService).remove(eq("_id"));
212+
}
213+
214+
public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreOtherEngineResultTypes() throws Exception {
215+
List<Engine.Result.Type> types = new ArrayList<>(Arrays.asList(Engine.Result.Type.values()));
216+
types.remove(Engine.Result.Type.FAILURE);
217+
218+
when(operation.id()).thenReturn("_id");
219+
when(result.getResultType()).thenReturn(randomFrom(types));
220+
when(result.getFailure()).thenReturn(new RuntimeException());
221+
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
222+
223+
listener.postIndex(shardId, operation, result);
224+
verifyZeroInteractions(triggerService);
225+
}
226+
227+
public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreNonWatcherDocument() throws Exception {
228+
when(operation.id()).thenReturn("_id");
229+
when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE);
230+
when(result.getFailure()).thenReturn(new RuntimeException());
231+
when(shardId.getIndexName()).thenReturn(randomAlphaOfLength(4));
232+
233+
listener.postIndex(shardId, operation, result);
234+
verifyZeroInteractions(triggerService);
235+
}
236+
237+
public void testPostIndexRemoveTriggerOnEngineLevelException() throws Exception {
204238
when(operation.id()).thenReturn("_id");
205239
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
206240

207241
listener.postIndex(shardId, operation, new ElasticsearchParseException("whatever"));
208242
verify(triggerService).remove(eq("_id"));
209243
}
210244

211-
public void testPostIndexDontInvokeForOtherDocuments() throws Exception {
245+
public void testPostIndexRemoveTriggerOnEngineLevelException_ignoreNonWatcherDocument() throws Exception {
212246
when(operation.id()).thenReturn("_id");
213247
when(shardId.getIndexName()).thenReturn("anything");
214248
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);

x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ protected Settings restAdminSettings() {
110110
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
111111
}
112112

113-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32299")
114113
public void testMonitorClusterHealth() throws Exception {
115114
final String watchId = "cluster_health_watch";
116115

0 commit comments

Comments
 (0)