diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherClient.java index ed0043c801c7b..9d75132a903c3 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherClient.java @@ -26,6 +26,8 @@ import org.elasticsearch.client.watcher.ActivateWatchResponse; import org.elasticsearch.client.watcher.AckWatchRequest; import org.elasticsearch.client.watcher.AckWatchResponse; +import org.elasticsearch.client.watcher.ExecuteWatchRequest; +import org.elasticsearch.client.watcher.ExecuteWatchResponse; import org.elasticsearch.client.watcher.GetWatchRequest; import org.elasticsearch.client.watcher.GetWatchResponse; import org.elasticsearch.client.watcher.StartWatchServiceRequest; @@ -269,6 +271,33 @@ public void activateWatchAsync(ActivateWatchRequest request, RequestOptions opti ActivateWatchResponse::fromXContent, listener, singleton(404)); } + /** + * Execute a watch on the cluster + * See + * the docs for more. + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException if there is a problem sending the request or parsing the response + */ + public ExecuteWatchResponse executeWatch(ExecuteWatchRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(request, WatcherRequestConverters::executeWatch, options, + ExecuteWatchResponse::fromXContent, emptySet()); + } + + /** + * Asynchronously execute a watch on the cluster + * See + * the docs for more. + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notifed upon request completion + */ + public void executeWatchAsync(ExecuteWatchRequest request, RequestOptions options, ActionListener listener) { + restHighLevelClient.performRequestAsyncAndParseEntity(request, WatcherRequestConverters::executeWatch, options, + ExecuteWatchResponse::fromXContent, listener, emptySet()); + } + /** * Get the watcher stats * See diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java index 1da7ef4c617ff..3cc6826e837dc 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java @@ -25,16 +25,20 @@ import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.ContentType; -import org.elasticsearch.client.watcher.DeactivateWatchRequest; -import org.elasticsearch.client.watcher.ActivateWatchRequest; import org.elasticsearch.client.watcher.AckWatchRequest; +import org.elasticsearch.client.watcher.ActivateWatchRequest; +import org.elasticsearch.client.watcher.DeactivateWatchRequest; import org.elasticsearch.client.watcher.DeleteWatchRequest; +import org.elasticsearch.client.watcher.ExecuteWatchRequest; import org.elasticsearch.client.watcher.GetWatchRequest; import org.elasticsearch.client.watcher.PutWatchRequest; import org.elasticsearch.client.watcher.StartWatchServiceRequest; import org.elasticsearch.client.watcher.StopWatchServiceRequest; import org.elasticsearch.client.watcher.WatcherStatsRequest; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.IOException; final class WatcherRequestConverters { @@ -108,6 +112,28 @@ static Request deleteWatch(DeleteWatchRequest deleteWatchRequest) { return request; } + static Request executeWatch(ExecuteWatchRequest executeWatchRequest) throws IOException { + String endpoint = new RequestConverters.EndpointBuilder() + .addPathPartAsIs("_xpack", "watcher", "watch") + .addPathPart(executeWatchRequest.getId()) // will ignore if ID is null + .addPathPartAsIs("_execute").build(); + + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + RequestConverters.Params params = new RequestConverters.Params(request); + if (executeWatchRequest.isDebug()) { + params.putParam("debug", "true"); + } + if (executeWatchRequest.ignoreCondition()) { + params.putParam("ignore_condition", "true"); + } + if (executeWatchRequest.recordExecution()) { + params.putParam("record_execution", "true"); + } + + request.setEntity(RequestConverters.createEntity(executeWatchRequest, XContentType.JSON)); + return request; + } + public static Request ackWatch(AckWatchRequest ackWatchRequest) { String endpoint = new RequestConverters.EndpointBuilder() .addPathPartAsIs("_xpack", "watcher", "watch") diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/common/XContentSource.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/common/XContentSource.java new file mode 100644 index 0000000000000..689f82f18b9a6 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/common/XContentSource.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.common; + +import org.elasticsearch.common.xcontent.ObjectPath; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentUtils; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Encapsulates the xcontent source + */ +public class XContentSource { + + private final Object data; + + /** + * Constructs a new XContentSource out of the given parser + */ + public XContentSource(XContentParser parser) throws IOException { + this.data = XContentUtils.readValue(parser, parser.nextToken()); + } + + /** + * @return true if the top level value of the source is a map + */ + public boolean isMap() { + return data instanceof Map; + } + + /** + * @return The source as a map + */ + @SuppressWarnings("unchecked") + public Map getAsMap() { + return (Map) data; + } + + /** + * @return true if the top level value of the source is a list + */ + public boolean isList() { + return data instanceof List; + } + + /** + * @return The source as a list + */ + @SuppressWarnings("unchecked") + public List getAsList() { + return (List) data; + } + + /** + * Extracts a value identified by the given path in the source. + * + * @param path a dot notation path to the requested value + * @return The extracted value or {@code null} if no value is associated with the given path + */ + @SuppressWarnings("unchecked") + public T getValue(String path) { + return (T) ObjectPath.eval(path, data); + } + +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/ExecuteWatchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/ExecuteWatchRequest.java new file mode 100644 index 0000000000000..ac96fcc519287 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/ExecuteWatchRequest.java @@ -0,0 +1,175 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.watcher; + +import org.elasticsearch.client.Validatable; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * An execute watch request to execute a watch by id or inline + */ +public class ExecuteWatchRequest implements Validatable, ToXContentObject { + + public enum ActionExecutionMode { + SIMULATE, FORCE_SIMULATE, EXECUTE, FORCE_EXECUTE, SKIP + } + + private final String id; + private final BytesReference watchContent; + + private boolean ignoreCondition = false; + private boolean recordExecution = false; + private boolean debug = false; + + @Nullable + private BytesReference triggerData = null; + + @Nullable + private BytesReference alternativeInput = null; + + private Map actionModes = new HashMap<>(); + + /** + * Execute an existing watch on the cluster + * + * @param id the id of the watch to execute + */ + public static ExecuteWatchRequest byId(String id) { + return new ExecuteWatchRequest(Objects.requireNonNull(id, "Watch id cannot be null"), null); + } + + /** + * Execute an inline watch + * @param watchContent the JSON definition of the watch + */ + public static ExecuteWatchRequest inline(String watchContent) { + return new ExecuteWatchRequest(null, Objects.requireNonNull(watchContent, "Watch content cannot be null")); + } + + private ExecuteWatchRequest(String id, String watchContent) { + this.id = id; + this.watchContent = watchContent == null ? null : new BytesArray(watchContent); + } + + public String getId() { + return this.id; + } + + /** + * @param ignoreCondition set if the condition for this execution be ignored + */ + public void setIgnoreCondition(boolean ignoreCondition) { + this.ignoreCondition = ignoreCondition; + } + + public boolean ignoreCondition() { + return ignoreCondition; + } + + /** + * @param recordExecution Sets if this execution be recorded in the history index + */ + public void setRecordExecution(boolean recordExecution) { + if (watchContent != null && recordExecution) { + throw new IllegalArgumentException("The execution of an inline watch cannot be recorded"); + } + this.recordExecution = recordExecution; + } + + public boolean recordExecution() { + return recordExecution; + } + + /** + * @param alternativeInput Sets the alternative input + */ + public void setAlternativeInput(String alternativeInput) { + this.alternativeInput = new BytesArray(alternativeInput); + } + + /** + * @param data A JSON string representing the data that should be associated with the trigger event. + */ + public void setTriggerData(String data) { + this.triggerData = new BytesArray(data); + } + + /** + * Sets the action execution mode for the give action (identified by its id). + * + * @param actionId the action id. + * @param actionMode the execution mode of the action. + */ + public void setActionMode(String actionId, ActionExecutionMode actionMode) { + Objects.requireNonNull(actionId, "actionId cannot be null"); + actionModes.put(actionId, actionMode); + } + + public Map getActionModes() { + return this.actionModes; + } + + /** + * @param debug indicates whether the watch should execute in debug mode. In debug mode the + * returned watch record will hold the execution {@code vars} + */ + public void setDebug(boolean debug) { + this.debug = debug; + } + + public boolean isDebug() { + return debug; + } + + @Override + public String toString() { + return "execute[" + id + "]"; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (triggerData != null) { + builder.rawField("trigger_data", triggerData.streamInput(), XContentType.JSON); + } + if (alternativeInput != null) { + builder.rawField("alternative_input", alternativeInput.streamInput(), XContentType.JSON); + } + if (actionModes.size() > 0) { + builder.field("action_modes", actionModes); + } + if (watchContent != null) { + builder.rawField("watch", watchContent.streamInput(), XContentType.JSON); + } + builder.endObject(); + return builder; + } +} + diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/ExecuteWatchResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/ExecuteWatchResponse.java new file mode 100644 index 0000000000000..cf5313d56ae57 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/ExecuteWatchResponse.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.watcher; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.XContentUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +public class ExecuteWatchResponse { + + public static final ParseField ID_FIELD = new ParseField("_id"); + public static final ParseField WATCH_FIELD = new ParseField("watch_record"); + + private String recordId; + private BytesReference contentSource; + + private Map data; + + public ExecuteWatchResponse() { + } + + public ExecuteWatchResponse(String recordId, BytesReference contentSource) { + this.recordId = recordId; + this.contentSource = contentSource; + } + + /** + * @return The id of the watch record holding the watch execution result. + */ + public String getRecordId() { + return recordId; + } + + /** + * @return The watch record source + */ + public BytesReference getRecord() { + return contentSource; + } + + /** + * Returns the watch record as a map + * + * Use {@link org.elasticsearch.common.xcontent.ObjectPath} to navigate through the data + */ + @SuppressWarnings("unchecked") + public Map getRecordAsMap() { + if (data == null) { + // EMPTY is safe here because we never use namedObject + try (InputStream stream = contentSource.streamInput(); + XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, null, stream)) { + data = (Map) XContentUtils.readValue(parser, parser.nextToken()); + } catch (IOException ex) { + throw new ElasticsearchException("failed to read value", ex); + } + } + return data; + } + + private static final ConstructingObjectParser PARSER + = new ConstructingObjectParser<>("x_pack_execute_watch_response", false, + (fields) -> new ExecuteWatchResponse((String)fields[0], (BytesReference) fields[1])); + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> readBytesReference(p), WATCH_FIELD); + } + + public static ExecuteWatchResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private static BytesReference readBytesReference(XContentParser parser) throws IOException { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.copyCurrentStructure(parser); + return BytesReference.bytes(builder); + } + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherIT.java index 8d1429d319bb5..cb9da0af66995 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherIT.java @@ -19,10 +19,6 @@ package org.elasticsearch.client; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.client.watcher.DeactivateWatchRequest; -import org.elasticsearch.client.watcher.DeactivateWatchResponse; -import org.elasticsearch.client.watcher.ActivateWatchRequest; -import org.elasticsearch.client.watcher.ActivateWatchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.watcher.AckWatchRequest; import org.elasticsearch.client.watcher.AckWatchResponse; @@ -30,6 +26,14 @@ import org.elasticsearch.client.watcher.ActionStatus.AckStatus; import org.elasticsearch.client.watcher.ActivateWatchRequest; import org.elasticsearch.client.watcher.ActivateWatchResponse; +import org.elasticsearch.client.watcher.DeactivateWatchRequest; +import org.elasticsearch.client.watcher.DeactivateWatchResponse; +import org.elasticsearch.client.watcher.DeleteWatchRequest; +import org.elasticsearch.client.watcher.DeleteWatchResponse; +import org.elasticsearch.client.watcher.ExecuteWatchRequest; +import org.elasticsearch.client.watcher.ExecuteWatchResponse; +import org.elasticsearch.client.watcher.PutWatchRequest; +import org.elasticsearch.client.watcher.PutWatchResponse; import org.elasticsearch.client.watcher.StartWatchServiceRequest; import org.elasticsearch.client.watcher.StopWatchServiceRequest; import org.elasticsearch.client.watcher.WatcherState; @@ -37,13 +41,13 @@ import org.elasticsearch.client.watcher.WatcherStatsResponse; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ObjectPath; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.client.watcher.DeleteWatchRequest; -import org.elasticsearch.client.watcher.DeleteWatchResponse; -import org.elasticsearch.client.watcher.PutWatchRequest; -import org.elasticsearch.client.watcher.PutWatchResponse; import org.elasticsearch.rest.RestStatus; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -81,13 +85,14 @@ public void testPutWatch() throws Exception { assertThat(putWatchResponse.getVersion(), is(1L)); } + private static final String WATCH_JSON = "{ \n" + + " \"trigger\": { \"schedule\": { \"interval\": \"10h\" } },\n" + + " \"input\": { \"none\": {} },\n" + + " \"actions\": { \"logme\": { \"logging\": { \"text\": \"{{ctx.payload}}\" } } }\n" + + "}"; + private PutWatchResponse createWatch(String watchId) throws Exception { - String json = "{ \n" + - " \"trigger\": { \"schedule\": { \"interval\": \"10h\" } },\n" + - " \"input\": { \"none\": {} },\n" + - " \"actions\": { \"logme\": { \"logging\": { \"text\": \"{{ctx.payload}}\" } } }\n" + - "}"; - BytesReference bytesReference = new BytesArray(json); + BytesReference bytesReference = new BytesArray(WATCH_JSON); PutWatchRequest putWatchRequest = new PutWatchRequest(watchId, bytesReference, XContentType.JSON); return highLevelClient().watcher().putWatch(putWatchRequest, RequestOptions.DEFAULT); } @@ -185,6 +190,37 @@ public void testActivateWatchThatDoesNotExist() throws Exception { assertEquals(RestStatus.NOT_FOUND, exception.status()); } + + public void testExecuteWatchById() throws Exception { + String watchId = randomAlphaOfLength(10); + createWatch(watchId); + + ExecuteWatchResponse response = highLevelClient().watcher() + .executeWatch(ExecuteWatchRequest.byId(watchId), RequestOptions.DEFAULT); + assertThat(response.getRecordId(), containsString(watchId)); + + Map source = response.getRecordAsMap(); + assertThat(ObjectPath.eval("trigger_event.type", source), is("manual")); + + } + + public void testExecuteWatchThatDoesNotExist() throws Exception { + String watchId = randomAlphaOfLength(10); + // exception when activating a not existing watcher + ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> + highLevelClient().watcher().executeWatch(ExecuteWatchRequest.byId(watchId), RequestOptions.DEFAULT)); + assertEquals(RestStatus.NOT_FOUND, exception.status()); + } + + public void testExecuteInlineWatch() throws Exception { + ExecuteWatchResponse response = highLevelClient().watcher() + .executeWatch(ExecuteWatchRequest.inline(WATCH_JSON), RequestOptions.DEFAULT); + assertThat(response.getRecordId(), containsString("_inlined_")); + + Map source = response.getRecordAsMap(); + assertThat(ObjectPath.eval("trigger_event.type", source), is("manual")); + } + public void testWatcherStatsMetrics() throws Exception { boolean includeCurrent = randomBoolean(); boolean includeQueued = randomBoolean(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java index ff7050fd67ce7..3b7d533a18140 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.client; +import org.apache.http.HttpEntity; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; @@ -27,6 +28,7 @@ import org.elasticsearch.client.watcher.ActivateWatchRequest; import org.elasticsearch.client.watcher.DeactivateWatchRequest; import org.elasticsearch.client.watcher.DeleteWatchRequest; +import org.elasticsearch.client.watcher.ExecuteWatchRequest; import org.elasticsearch.client.watcher.PutWatchRequest; import org.elasticsearch.client.watcher.GetWatchRequest; import org.elasticsearch.client.watcher.StartWatchServiceRequest; @@ -38,12 +40,15 @@ import org.elasticsearch.test.ESTestCase; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.StringJoiner; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; @@ -53,6 +58,12 @@ public class WatcherRequestConvertersTests extends ESTestCase { + private static String toString(HttpEntity entity) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + entity.writeTo(baos); + return baos.toString(StandardCharsets.UTF_8.name()); + } + public void testStartWatchService() { Request request = WatcherRequestConverters.startWatchService(new StartWatchServiceRequest()); assertEquals(HttpPost.METHOD_NAME, request.getMethod()); @@ -177,4 +188,138 @@ public void testWatcherStatsRequest() { } assertThat(request.getEntity(), nullValue()); } + + public void testExecuteWatchByIdRequest() throws IOException { + + boolean ignoreCondition = randomBoolean(); + boolean recordExecution = randomBoolean(); + boolean debug = randomBoolean(); + + ExecuteWatchRequest request = ExecuteWatchRequest.byId("my_id"); + request.setIgnoreCondition(ignoreCondition); + request.setRecordExecution(recordExecution); + request.setDebug(debug); + + boolean setActionMode = randomBoolean(); + if (setActionMode) { + request.setActionMode("action1", ExecuteWatchRequest.ActionExecutionMode.SIMULATE); + } + + boolean useTriggerData = randomBoolean(); + String triggerData = "{ \"entry1\" : \"blah\", \"entry2\" : \"blah\" }"; + if (useTriggerData) { + request.setTriggerData(triggerData); + } + + boolean useAlternativeInput = randomBoolean(); + String alternativeInput = "{ \"foo\" : \"bar\" }"; + if (useAlternativeInput) { + request.setAlternativeInput(alternativeInput); + } + + Request req = WatcherRequestConverters.executeWatch(request); + assertThat(req.getEndpoint(), equalTo("/_xpack/watcher/watch/my_id/_execute")); + assertThat(req.getMethod(), equalTo(HttpPost.METHOD_NAME)); + + if (ignoreCondition) { + assertThat(req.getParameters(), hasKey("ignore_condition")); + assertThat(req.getParameters().get("ignore_condition"), is("true")); + } + + if (recordExecution) { + assertThat(req.getParameters(), hasKey("record_execution")); + assertThat(req.getParameters().get("record_execution"), is("true")); + } + + if (debug) { + assertThat(req.getParameters(), hasKey("debug")); + assertThat(req.getParameters().get("debug"), is("true")); + } + + String body = toString(req.getEntity()); + if (setActionMode) { + assertThat(body, containsString("\"action_modes\":{\"action1\":\"SIMULATE\"}")); + } + else { + assertThat(body, not(containsString("action_modes"))); + } + if (useTriggerData) { + assertThat(body, containsString("\"trigger_data\":" + triggerData)); + } + else { + assertThat(body, not(containsString("trigger_data"))); + } + if (useAlternativeInput) { + assertThat(body, containsString("\"alternative_input\":" + alternativeInput)); + } + else { + assertThat(body, not(containsString("alternative_input"))); + } + assertThat(body, not(containsString("\"watch\":"))); + + } + + private static final String WATCH_JSON = "{ \n" + + " \"trigger\": { \"schedule\": { \"interval\": \"10h\" } },\n" + + " \"input\": { \"none\": {} },\n" + + " \"actions\": { \"logme\": { \"logging\": { \"text\": \"{{ctx.payload}}\" } } }\n" + + "}"; + + public void testExecuteInlineWatchRequest() throws IOException { + boolean ignoreCondition = randomBoolean(); + + ExecuteWatchRequest request = ExecuteWatchRequest.inline(WATCH_JSON); + request.setIgnoreCondition(ignoreCondition); + + expectThrows(IllegalArgumentException.class, () -> { + request.setRecordExecution(true); + }); + + boolean setActionMode = randomBoolean(); + if (setActionMode) { + request.setActionMode("action1", ExecuteWatchRequest.ActionExecutionMode.SIMULATE); + } + + boolean useTriggerData = randomBoolean(); + String triggerData = "{ \"entry1\" : \"blah\", \"entry2\" : \"blah\" }"; + if (useTriggerData) { + request.setTriggerData(triggerData); + } + + boolean useAlternativeInput = randomBoolean(); + String alternativeInput = "{ \"foo\" : \"bar\" }"; + if (useAlternativeInput) { + request.setAlternativeInput(alternativeInput); + } + + Request req = WatcherRequestConverters.executeWatch(request); + assertThat(req.getEndpoint(), equalTo("/_xpack/watcher/watch/_execute")); + assertThat(req.getMethod(), equalTo(HttpPost.METHOD_NAME)); + + if (ignoreCondition) { + assertThat(req.getParameters(), hasKey("ignore_condition")); + assertThat(req.getParameters().get("ignore_condition"), is("true")); + } + + String body = toString(req.getEntity()); + if (setActionMode) { + assertThat(body, containsString("\"action_modes\":{\"action1\":\"SIMULATE\"}")); + } + else { + assertThat(body, not(containsString("action_modes"))); + } + if (useTriggerData) { + assertThat(body, containsString("\"trigger_data\":" + triggerData)); + } + else { + assertThat(body, not(containsString("trigger_data"))); + } + if (useAlternativeInput) { + assertThat(body, containsString("\"alternative_input\":" + alternativeInput)); + } + else { + assertThat(body, not(containsString("alternative_input"))); + } + assertThat(body, containsString("\"watch\":" + WATCH_JSON)); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/WatcherDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/WatcherDocumentationIT.java index fc6dba9c8fc61..d990710dd9a87 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/WatcherDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/WatcherDocumentationIT.java @@ -34,6 +34,8 @@ import org.elasticsearch.client.watcher.ActionStatus.AckStatus; import org.elasticsearch.client.watcher.DeactivateWatchRequest; import org.elasticsearch.client.watcher.DeactivateWatchResponse; +import org.elasticsearch.client.watcher.ExecuteWatchRequest; +import org.elasticsearch.client.watcher.ExecuteWatchResponse; import org.elasticsearch.client.watcher.GetWatchRequest; import org.elasticsearch.client.watcher.GetWatchResponse; import org.elasticsearch.client.watcher.StartWatchServiceRequest; @@ -43,6 +45,7 @@ import org.elasticsearch.client.watcher.WatcherStatsResponse; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ObjectPath; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.client.watcher.DeleteWatchRequest; import org.elasticsearch.client.watcher.DeleteWatchResponse; @@ -51,6 +54,7 @@ import org.elasticsearch.rest.RestStatus; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -199,6 +203,52 @@ public void onFailure(Exception e) { assertTrue(latch.await(30L, TimeUnit.SECONDS)); } + { + // tag::x-pack-execute-watch-by-id + ExecuteWatchRequest request = ExecuteWatchRequest.byId("my_watch_id"); + request.setAlternativeInput("{ \"foo\" : \"bar\" }"); // <1> + request.setActionMode("action1", ExecuteWatchRequest.ActionExecutionMode.SIMULATE); // <2> + request.setRecordExecution(true); // <3> + request.setIgnoreCondition(true); // <4> + request.setTriggerData("{\"triggered_time\":\"now\"}"); // <5> + request.setDebug(true); // <6> + ExecuteWatchResponse response = client.watcher().executeWatch(request, RequestOptions.DEFAULT); + // end::x-pack-execute-watch-by-id + + // tag::x-pack-execute-watch-by-id-response + String id = response.getRecordId(); // <1> + Map watch = response.getRecordAsMap(); // <2> + String watch_id = ObjectPath.eval("watch_record.watch_id", watch); // <3> + // end::x-pack-execute-watch-by-id-response + } + + { + ExecuteWatchRequest request = ExecuteWatchRequest.byId("my_watch_id"); + // tag::x-pack-execute-watch-by-id-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ExecuteWatchResponse response) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::x-pack-execute-watch-by-id-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::x-pack-execute-watch-by-id-execute-async + client.watcher().executeWatchAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::x-pack-execute-watch-by-id-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + { //tag::get-watch-request GetWatchRequest request = new GetWatchRequest("my_watch_id"); @@ -285,6 +335,65 @@ public void onFailure(Exception e) { } } + public void testExecuteInlineWatch() throws Exception { + RestHighLevelClient client = highLevelClient(); + + { + // tag::x-pack-execute-inline-watch + String watchJson = "{ \n" + + " \"trigger\": { \"schedule\": { \"interval\": \"10h\" } },\n" + + " \"input\": { \"none\": {} },\n" + + " \"actions\": { \"logme\": { \"logging\": { \"text\": \"{{ctx.payload}}\" } } }\n" + + "}"; + ExecuteWatchRequest request = ExecuteWatchRequest.inline(watchJson); + request.setAlternativeInput("{ \"foo\" : \"bar\" }"); // <1> + request.setActionMode("action1", ExecuteWatchRequest.ActionExecutionMode.SIMULATE); // <2> + request.setIgnoreCondition(true); // <3> + request.setTriggerData("{\"triggered_time\":\"now\"}"); // <4> + request.setDebug(true); // <5> + ExecuteWatchResponse response = client.watcher().executeWatch(request, RequestOptions.DEFAULT); + // end::x-pack-execute-inline-watch + + // tag::x-pack-execute-watch-by-id-response + String id = response.getRecordId(); // <1> + Map watch = response.getRecordAsMap(); // <2> + String watch_id = ObjectPath.eval("watch_record.watch_id", watch); // <3> + // end::x-pack-execute-watch-by-id-response + } + + { + String watchJson = "{ \n" + + " \"trigger\": { \"schedule\": { \"interval\": \"10h\" } },\n" + + " \"input\": { \"none\": {} },\n" + + " \"actions\": { \"logme\": { \"logging\": { \"text\": \"{{ctx.payload}}\" } } }\n" + + "}"; + ExecuteWatchRequest request = ExecuteWatchRequest.inline(watchJson); + // tag::x-pack-execute-inline-watch-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ExecuteWatchResponse response) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::x-pack-execute-inline-watch-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::x-pack-execute-inline-watch-execute-async + client.watcher().executeWatchAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::x-pack-execute-inline-watch-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + public void testAckWatch() throws Exception { RestHighLevelClient client = highLevelClient(); diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 661ce78fe80a5..b5ce819a0e2e2 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -427,6 +427,7 @@ The Java High Level REST Client supports the following Watcher APIs: * <> * <<{upid}-ack-watch>> * <<{upid}-activate-watch>> +* <<{upid}-execute-watch>> * <<{upid}-watcher-stats>> include::watcher/start-watch-service.asciidoc[] @@ -437,6 +438,7 @@ include::watcher/delete-watch.asciidoc[] include::watcher/ack-watch.asciidoc[] include::watcher/deactivate-watch.asciidoc[] include::watcher/activate-watch.asciidoc[] +include::watcher/execute-watch.asciidoc[] include::watcher/watcher-stats.asciidoc[] == Graph APIs diff --git a/docs/java-rest/high-level/watcher/execute-watch.asciidoc b/docs/java-rest/high-level/watcher/execute-watch.asciidoc new file mode 100644 index 0000000000000..1af93f0cd6c9b --- /dev/null +++ b/docs/java-rest/high-level/watcher/execute-watch.asciidoc @@ -0,0 +1,88 @@ +-- +:api: execute-watch +:request: ExecuteWatchRequest +:response: ExecuteWatchResponse +-- +[id="{upid}-{api}"] +=== Execute Watch API + +The execute watch API allows clients to immediately execute a watch, either +one that has been previously added via the +{ref}/put-watch.html[Put Watch API] or inline as part of the request. + +[id="{upid}-{api}-request-by-id"] +==== Execute by id + +Submit the following request to execute a previously added watch: + +["source","java",subs="attributes,callouts,macros"] +--------------------------------------------------- +include-tagged::{doc-tests-file}[x-pack-execute-watch-by-id] +--------------------------------------------------- +<1> Alternative input for the watch to use in json format +<2> Set the mode for action "action1" to SIMULATE +<3> Record this execution in watcher history +<4> Execute the watch regardless of the watch's condition +<5> Set the trigger data for the watch in json format +<6> Enable debug mode + +[id="{upid}-{api}-response-by-id"] +==== Execute by id Response + +The returned `Response` contains details of the execution: + +["source","java",subs="attributes,callouts,macros"] +--------------------------------------------------- +include-tagged::{doc-tests-file}[x-pack-execute-watch-by-id-response] +--------------------------------------------------- +<1> The record ID for this execution +<2> The execution response as a java `Map` +<3> Extract information from the response map using `ObjectPath` + +[id="{upid}-{api}-response-by-id-async"] +==== Asynchronous execution by id + +This request can be executed asynchronously: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[x-pack-execute-watch-by-id-execute-async] +-------------------------------------------------- +<1> The `ExecuteWatchRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `ExecuteWatchResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[x-pack-execute-watch-by-id-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument +<2> Called in case of failure. The raised exception is provided as an argument + + +[id="{upid}-{api}-request-inline"] +==== Execute inline + +Submit the following request to execute a watch defined as part of the request: + +["source","java",subs="attributes,callouts,macros"] +--------------------------------------------------- +include-tagged::{doc-tests-file}[x-pack-execute-watch-inline] +--------------------------------------------------- +<1> Alternative input for the watch to use in json format +<2> Set the mode for action "action1" to SIMULATE +<3> Execute the watch regardless of the watch's condition +<4> Set the trigger data for the watch in json format +<5> Enable debug mode + +Note that inline watches cannot be recorded. + +The response format and asynchronous execution methods are the same as for the +Execute Watch by ID API. \ No newline at end of file diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/xcontent/XContentSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/xcontent/XContentSource.java index b54acc441e74a..55362390216a3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/xcontent/XContentSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/xcontent/XContentSource.java @@ -146,12 +146,16 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; XContentSource that = (XContentSource) o; - return Objects.equals(bytes, that.bytes) && - contentType == that.contentType; + return Objects.equals(data(), that.data()); } @Override public int hashCode() { - return Objects.hash(bytes, contentType); + return Objects.hash(data()); + } + + @Override + public String toString() { + return bytes.utf8ToString(); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/execute/ExecuteWatchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/execute/ExecuteWatchResponse.java index c0eac9b3c61d2..251b994f76e58 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/execute/ExecuteWatchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/execute/ExecuteWatchResponse.java @@ -6,18 +6,28 @@ package org.elasticsearch.xpack.core.watcher.transport.actions.execute; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import java.io.IOException; +import java.util.Objects; /** * This class contains the WatchHistory generated by running the watch */ -public class ExecuteWatchResponse extends ActionResponse { +public class ExecuteWatchResponse extends ActionResponse implements ToXContentObject { + + public static final ParseField ID_FIELD = new ParseField("_id"); + public static final ParseField WATCH_FIELD = new ParseField("watch_record"); private String recordId; private XContentSource recordSource; @@ -30,6 +40,25 @@ public ExecuteWatchResponse(String recordId, BytesReference recordSource, XConte this.recordSource = new XContentSource(recordSource, contentType); } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ExecuteWatchResponse that = (ExecuteWatchResponse) o; + return Objects.equals(recordId, that.recordId) && + Objects.equals(recordSource, that.recordSource); + } + + @Override + public int hashCode() { + return Objects.hash(recordId, recordSource); + } + + @Override + public String toString() { + return recordId + ":" + recordSource; + } + /** * @return The id of the watch record holding the watch execution result. */ @@ -57,4 +86,33 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(recordId); XContentSource.writeTo(recordSource, out); } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("_id", recordId); + builder.field("watch_record"); + recordSource.toXContent(builder, params); + builder.endObject(); + return builder; + } + + private static final ConstructingObjectParser PARSER + = new ConstructingObjectParser<>("x_pack_execute_watch_response", false, + (fields) -> new ExecuteWatchResponse((String)fields[0], (BytesReference) fields[1], XContentType.JSON)); + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> readBytesReference(p), WATCH_FIELD); + } + + public static ExecuteWatchResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private static BytesReference readBytesReference(XContentParser parser) throws IOException { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.copyCurrentStructure(parser); + return BytesReference.bytes(builder); + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/ExecuteWatchResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/ExecuteWatchResponseTests.java new file mode 100644 index 0000000000000..47fd4a33dc955 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/ExecuteWatchResponseTests.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.protocol.xpack.watcher; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.protocol.AbstractHlrcXContentTestCase; +import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchResponse; + +import java.io.IOException; + +public class ExecuteWatchResponseTests + extends AbstractHlrcXContentTestCase { + + @Override + public org.elasticsearch.client.watcher.ExecuteWatchResponse doHlrcParseInstance(XContentParser parser) throws IOException { + return org.elasticsearch.client.watcher.ExecuteWatchResponse.fromXContent(parser); + } + + @Override + public ExecuteWatchResponse convertHlrcToInternal(org.elasticsearch.client.watcher.ExecuteWatchResponse instance) { + return new ExecuteWatchResponse(instance.getRecordId(), instance.getRecord(), XContentType.JSON); + } + + @Override + protected ExecuteWatchResponse createTestInstance() { + String id = "my_watch_0-2015-06-02T23:17:55.124Z"; + try { + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + builder.field("watch_id", "my_watch"); + builder.field("node", "my_node"); + builder.startArray("messages"); + builder.endArray(); + builder.startObject("trigger_event"); + builder.field("type", "manual"); + builder.endObject(); + builder.field("state", "executed"); + builder.endObject(); + BytesReference bytes = BytesReference.bytes(builder); + return new ExecuteWatchResponse(id, bytes, XContentType.JSON); + } + catch (IOException e) { + throw new AssertionError(e); + } + } + + @Override + protected ExecuteWatchResponse doParseInstance(XContentParser parser) throws IOException { + return ExecuteWatchResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +}