Skip to content

HLRC: Add get watch API #35531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Nov 30, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.client.watcher.ActivateWatchResponse;
import org.elasticsearch.client.watcher.AckWatchRequest;
import org.elasticsearch.client.watcher.AckWatchResponse;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.GetWatchResponse;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
Expand Down Expand Up @@ -129,6 +131,34 @@ public void putWatchAsync(PutWatchRequest request, RequestOptions options,
PutWatchResponse::fromXContent, listener, emptySet());
}

/**
* Gets a watch from the cluster
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-get-watch.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public GetWatchResponse getWatch(GetWatchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, WatcherRequestConverters::getWatch, options,
GetWatchResponse::fromXContent, emptySet());
}

/**
* Asynchronously gets a watch into the cluster
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-get-watch.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void getWatchAsync(GetWatchRequest request, RequestOptions options,
ActionListener<GetWatchResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request, WatcherRequestConverters::getWatch, options,
GetWatchResponse::fromXContent, listener, emptySet());
}

/**
* Deactivate an existing watch
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-deactivate-watch.html">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
import org.elasticsearch.client.watcher.ActivateWatchRequest;
import org.elasticsearch.client.watcher.AckWatchRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.WatcherStatsRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;

final class WatcherRequestConverters {

Expand Down Expand Up @@ -76,6 +77,16 @@ static Request putWatch(PutWatchRequest putWatchRequest) {
return request;
}


static Request getWatch(GetWatchRequest getWatchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack", "watcher", "watch")
.addPathPart(getWatchRequest.getId())
.build();

return new Request(HttpGet.METHOD_NAME, endpoint);
}

static Request deactivateWatch(DeactivateWatchRequest deactivateWatchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public static Execution failure(DateTime timestamp, String reason) {
private final boolean successful;
private final String reason;

private Execution(DateTime timestamp, boolean successful, String reason) {
Execution(DateTime timestamp, boolean successful, String reason) {
this.timestamp = timestamp.toDateTime(DateTimeZone.UTC);
this.successful = successful;
this.reason = reason;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.watcher;

import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;

/**
* The request to get the watch by name (id)
*/
public final class GetWatchRequest implements Validatable {

private final String id;

public GetWatchRequest(String watchId) {
validateId(watchId);
this.id = watchId;
}

private void validateId(String id) {
ValidationException exception = new ValidationException();
if (id == null) {
exception.addValidationError("watch id is missing");
} else if (PutWatchRequest.isValidId(id) == false) {
exception.addValidationError("watch id contains whitespace");
}
if (exception.validationErrors().isEmpty() == false) {
throw exception;
}
}

/**
* @return The name of the watch to retrieve
*/
public String getId() {
return id;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.watcher;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Objects;

public class GetWatchResponse {
private final String id;
private final long version;
private final WatchStatus status;

private final BytesReference source;
private transient Map<String, Object> sourceAsMap;

/**
* Ctor for missing watch
*/
public GetWatchResponse(String id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small comment: it would be nice to add a comment to this constructor explaining that it is for a 'missing watch', as opposed to the one below.

this(id, Versions.NOT_FOUND, null, null);
}

public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source) {
this.id = id;
this.version = version;
this.status = status;
this.source = source;
}

public String getId() {
return id;
}

public long getVersion() {
return version;
}

public boolean isFound() {
return version != Versions.NOT_FOUND;
}

public WatchStatus getStatus() {
return status;
}

/**
* Returns the {@link XContentType} of the source
*/
public XContentType getContentType() {
return XContentType.JSON;
}

/**
* Returns the serialized watch
*/
public BytesReference getSource() {
return source;
}

/**
* Returns the source as a map
*/
public Map<String, Object> getSourceAsMap() throws IOException {
if (sourceAsMap == null && source != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple questions:

  • Sorry if I'm missing something, but why don't we use XContentHelper#convertToMap here?
  • For me it'd be better to avoid introducing the cached sourceAsMap variable, and just do the conversion every time as we do in GetFieldMappingsResponse and IndexRequest. That way this object can remain immutable, and we don't need to reason about thread safety at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I changed it to return XContentHelper#convertToMap all the time

// EMPTY is safe here because we never use namedObject
try (InputStream stream = source.streamInput();
XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) {
sourceAsMap = parser.map();
}
}
return sourceAsMap;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetWatchResponse that = (GetWatchResponse) o;
return version == that.version &&
Objects.equals(id, that.id) &&
Objects.equals(status, that.status) &&
Objects.equals(source, that.source);
}

@Override
public int hashCode() {
return Objects.hash(id, status, source, version);
}

private static final ParseField ID_FIELD = new ParseField("_id");
private static final ParseField FOUND_FIELD = new ParseField("found");
private static final ParseField VERSION_FIELD = new ParseField("_version");
private static final ParseField STATUS_FIELD = new ParseField("status");
private static final ParseField WATCH_FIELD = new ParseField("watch");

private static ConstructingObjectParser<GetWatchResponse, Void> PARSER =
new ConstructingObjectParser<>("get_watch_response", true,
a -> {
boolean isFound = (boolean) a[1];
if (isFound) {
BytesReference source = (BytesReference) a[4];
return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source);
} else {
return new GetWatchResponse((String) a[0]);
}
});

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOUND_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> WatchStatus.parse(parser), STATUS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> {
try (XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent())) {
builder.copyCurrentStructure(parser);
return BytesReference.bytes(builder);
}
}, WATCH_FIELD);
}

public static GetWatchResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.watcher;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.joda.time.DateTime;
Expand All @@ -44,19 +45,22 @@ public class WatchStatus {
private final DateTime lastMetCondition;
private final long version;
private final Map<String, ActionStatus> actions;
@Nullable private Map<String, String> headers;

public WatchStatus(long version,
State state,
ExecutionState executionState,
DateTime lastChecked,
DateTime lastMetCondition,
Map<String, ActionStatus> actions) {
Map<String, ActionStatus> actions,
Map<String, String> headers) {
this.version = version;
this.lastChecked = lastChecked;
this.lastMetCondition = lastMetCondition;
this.actions = actions;
this.state = state;
this.executionState = executionState;
this.headers = headers;
}

public State state() {
Expand All @@ -79,6 +83,10 @@ public ActionStatus actionStatus(String actionId) {
return actions.get(actionId);
}

public Map<String, ActionStatus> getActions() {
return actions;
}

public long version() {
return version;
}
Expand Down Expand Up @@ -112,6 +120,7 @@ public static WatchStatus parse(XContentParser parser) throws IOException {
DateTime lastChecked = null;
DateTime lastMetCondition = null;
Map<String, ActionStatus> actions = null;
Map<String, String> headers = null;
long version = -1;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
Expand Down Expand Up @@ -172,13 +181,17 @@ public static WatchStatus parse(XContentParser parser) throws IOException {
throw new ElasticsearchParseException("could not parse watch status. expecting field [{}] to be an object, " +
"found [{}] instead", currentFieldName, token);
}
} else if (Field.HEADERS.match(currentFieldName, parser.getDeprecationHandler())) {
if (token == XContentParser.Token.START_OBJECT) {
headers = parser.mapStrings();
}
} else {
parser.skipChildren();
}
}

actions = actions == null ? emptyMap() : unmodifiableMap(actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions, headers);
}

public static class State {
Expand Down Expand Up @@ -229,5 +242,6 @@ public interface Field {
ParseField ACTIONS = new ParseField("actions");
ParseField VERSION = new ParseField("version");
ParseField EXECUTION_STATE = new ParseField("execution_state");
ParseField HEADERS = new ParseField("headers");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.WatcherStatsRequest;
Expand Down Expand Up @@ -91,6 +92,16 @@ public void testPutWatch() throws Exception {
assertThat(bos.toString("UTF-8"), is(body));
}

public void testGetWatch() throws Exception {
String watchId = randomAlphaOfLength(10);
GetWatchRequest getWatchRequest = new GetWatchRequest(watchId);

Request request = WatcherRequestConverters.getWatch(getWatchRequest);
assertEquals(HttpGet.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/watcher/watch/" + watchId, request.getEndpoint());
assertThat(request.getEntity(), nullValue());
}

public void testDeactivateWatch() {
String watchId = randomAlphaOfLength(10);
DeactivateWatchRequest deactivateWatchRequest = new DeactivateWatchRequest(watchId);
Expand Down
Loading