Skip to content

Simplify watcher indexing listener. #52627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,26 @@ void setConfiguration(Configuration configuration) {
*
* @param shardId The shard id object of the document being processed
* @param operation The index operation
* @return The index operation
* @param result The result of the operation
*/
@Override
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
public void postIndex(ShardId shardId, Engine.Index operation, Engine.IndexResult result) {

if (isWatchDocument(shardId.getIndexName())) {
if (result.getResultType() == Engine.Result.Type.FAILURE) {
postIndex(shardId, operation, result.getFailure());
return;
}

ZonedDateTime now = Instant.ofEpochMilli(clock.millis()).atZone(ZoneOffset.UTC);
try {
Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON,
operation.getIfSeqNo(), operation.getIfPrimaryTerm());
ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId);
if (shardAllocationConfiguration == null) {
logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}",
watch.id(), shardId, configuration.localShards.keySet());
return operation;
watch.id(), shardId, configuration.localShards.keySet());
return;
}

boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
Expand All @@ -128,21 +134,6 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
} catch (IOException e) {
throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id());
}

}

return operation;
}

/**
* In case of a document related failure (for example version conflict), then clean up resources for a watch
* in the trigger service.
*/
@Override
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
if (result.getResultType() == Engine.Result.Type.FAILURE) {
assert result.getFailure() != null;
postIndex(shardId, index, result.getFailure());
}
}

Expand All @@ -162,8 +153,7 @@ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult re
@Override
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
if (isWatchDocument(shardId.getIndexName())) {
logger.debug(() -> new ParameterizedMessage("removing watch [{}] from trigger", index.id()), ex);
triggerService.remove(index.id());
logger.debug(() -> new ParameterizedMessage("failed to add watch [{}] to trigger service", index.id()), ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.watcher;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -73,6 +74,7 @@
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

@LuceneTestCase.AwaitsFix(bugUrl = "")
public class WatcherIndexingListenerTests extends ESTestCase {

private WatcherIndexingListener listener;
Expand Down