diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java index 20aaf9714171b..d138b4b90da75 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java @@ -97,11 +97,16 @@ void setConfiguration(Configuration configuration) { * * @param shardId The shard id object of the document being processed * @param operation The index operation - * @return The index operation + * @param result The result of the operation */ @Override - public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { + public void postIndex(ShardId shardId, Engine.Index operation, Engine.IndexResult result) { if (isWatchDocument(shardId.getIndexName())) { + if (result.getResultType() == Engine.Result.Type.FAILURE) { + postIndex(shardId, operation, result.getFailure()); + return; + } + ZonedDateTime now = Instant.ofEpochMilli(clock.millis()).atZone(ZoneOffset.UTC); try { Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON, @@ -109,8 +114,8 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId); if (shardAllocationConfiguration == null) { logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}", - watch.id(), shardId, configuration.localShards.keySet()); - return operation; + watch.id(), shardId, configuration.localShards.keySet()); + return; } boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id()); @@ -128,32 +133,12 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { } catch (IOException e) { throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id()); } - } - - return operation; } /** - * In case of a document related failure (for example version conflict), then clean up resources for a watch - * in the trigger service. - */ - @Override - public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) { - if (result.getResultType() == Engine.Result.Type.FAILURE) { - assert result.getFailure() != null; - postIndex(shardId, index, result.getFailure()); - } - } - - /** - * In case of an engine related error, we have to ensure that the triggerservice does not leave anything behind - * - * TODO: If the configuration changes between preindex and postindex methods and we add a - * watch, that could not be indexed - * TODO: this watch might not be deleted from the triggerservice. Are we willing to accept this? - * TODO: This could be circumvented by using a threadlocal in preIndex(), that contains the - * watch and is cleared afterwards + * In case of an engine related error, we just log that we failed the add the watch to the trigger service. + * No need to interact with the trigger service. * * @param shardId The shard id object of the document being processed * @param index The index operation @@ -162,8 +147,7 @@ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult re @Override public void postIndex(ShardId shardId, Engine.Index index, Exception ex) { if (isWatchDocument(shardId.getIndexName())) { - logger.debug(() -> new ParameterizedMessage("removing watch [{}] from trigger", index.id()), ex); - triggerService.remove(index.id()); + logger.debug(() -> new ParameterizedMessage("failed to add watch [{}] to trigger service", index.id()), ex); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java index c4baea3e9f61f..0d8af470df056 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java @@ -113,18 +113,20 @@ public void testPreIndexCheckActive() throws Exception { verifyZeroInteractions(parser); } - public void testPreIndex() throws Exception { + public void testPostIndex() throws Exception { when(operation.id()).thenReturn(randomAlphaOfLength(10)); when(operation.source()).thenReturn(BytesArray.EMPTY); when(shardId.getIndexName()).thenReturn(Watch.INDEX); + List types = new ArrayList<>(List.of(Engine.Result.Type.values())); + types.remove(Engine.Result.Type.FAILURE); + when(result.getResultType()).thenReturn(randomFrom(types)); boolean watchActive = randomBoolean(); boolean isNewWatch = randomBoolean(); Watch watch = mockWatch("_id", watchActive, isNewWatch); when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch); - Engine.Index returnedOperation = listener.preIndex(shardId, operation); - assertThat(returnedOperation, is(operation)); + listener.postIndex(shardId, operation, result); ZonedDateTime now = DateUtils.nowWithMillisResolution(clock); verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong()); @@ -139,12 +141,13 @@ public void testPreIndex() throws Exception { // this test emulates an index with 10 shards, and ensures that triggering only happens on a // single shard - public void testPreIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception { + public void testPostIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception { String id = randomAlphaOfLength(10); int totalShardCount = randomIntBetween(1, 10); boolean watchActive = randomBoolean(); boolean isNewWatch = randomBoolean(); Watch watch = mockWatch(id, watchActive, isNewWatch); + when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS); when(shardId.getIndexName()).thenReturn(Watch.INDEX); when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch); @@ -154,7 +157,7 @@ public void testPreIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Excep localShards.put(shardId, new ShardAllocationConfiguration(idx, totalShardCount, Collections.emptyList())); Configuration configuration = new Configuration(Watch.INDEX, localShards); listener.setConfiguration(configuration); - listener.preIndex(shardId, operation); + listener.postIndex(shardId, operation, result); } // no matter how many shards we had, this should have been only called once @@ -186,16 +189,17 @@ private Watch mockWatch(String id, boolean active, boolean isNewWatch) { return watch; } - public void testPreIndexCheckParsingException() throws Exception { + public void testPostIndexCheckParsingException() throws Exception { String id = randomAlphaOfLength(10); when(operation.id()).thenReturn(id); when(operation.source()).thenReturn(BytesArray.EMPTY); when(shardId.getIndexName()).thenReturn(Watch.INDEX); when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())) .thenThrow(new IOException("self thrown")); + when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS); ElasticsearchParseException exc = expectThrows(ElasticsearchParseException.class, - () -> listener.preIndex(shardId, operation)); + () -> listener.postIndex(shardId, operation, result)); assertThat(exc.getMessage(), containsString("Could not parse watch")); assertThat(exc.getMessage(), containsString(id)); } @@ -206,19 +210,6 @@ public void testPostIndexRemoveTriggerOnDocumentRelatedException() throws Except when(result.getFailure()).thenReturn(new RuntimeException()); when(shardId.getIndexName()).thenReturn(Watch.INDEX); - listener.postIndex(shardId, operation, result); - verify(triggerService).remove(eq("_id")); - } - - public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreOtherEngineResultTypes() throws Exception { - List types = new ArrayList<>(List.of(Engine.Result.Type.values())); - types.remove(Engine.Result.Type.FAILURE); - - when(operation.id()).thenReturn("_id"); - when(result.getResultType()).thenReturn(randomFrom(types)); - when(result.getFailure()).thenReturn(new RuntimeException()); - when(shardId.getIndexName()).thenReturn(Watch.INDEX); - listener.postIndex(shardId, operation, result); verifyZeroInteractions(triggerService); } @@ -238,7 +229,7 @@ public void testPostIndexRemoveTriggerOnEngineLevelException() throws Exception when(shardId.getIndexName()).thenReturn(Watch.INDEX); listener.postIndex(shardId, operation, new ElasticsearchParseException("whatever")); - verify(triggerService).remove(eq("_id")); + verifyZeroInteractions(triggerService); } public void testPostIndexRemoveTriggerOnEngineLevelException_ignoreNonWatcherDocument() throws Exception {