Skip to content

HLRC: Add ML Get Buckets API #33056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.protocol.xpack.ml.CloseJobRequest;
import org.elasticsearch.protocol.xpack.ml.DeleteJobRequest;
import org.elasticsearch.protocol.xpack.ml.GetJobRequest;
import org.elasticsearch.protocol.xpack.ml.GetBucketsRequest;
import org.elasticsearch.protocol.xpack.ml.OpenJobRequest;
import org.elasticsearch.protocol.xpack.ml.PutJobRequest;

Expand Down Expand Up @@ -69,7 +70,7 @@ static Request getJob(GetJobRequest getJobRequest) {
return request;
}

static Request openJob(OpenJobRequest openJobRequest) throws IOException {
static Request openJob(OpenJobRequest openJobRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
Expand Down Expand Up @@ -120,4 +121,18 @@ static Request deleteJob(DeleteJobRequest deleteJobRequest) {

return request;
}

static Request getBuckets(GetBucketsRequest getBucketsRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("anomaly_detectors")
.addPathPart(getBucketsRequest.getJobId())
.addPathPartAsIs("results")
.addPathPartAsIs("buckets")
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
request.setEntity(createEntity(getBucketsRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.protocol.xpack.ml.CloseJobResponse;
import org.elasticsearch.protocol.xpack.ml.DeleteJobRequest;
import org.elasticsearch.protocol.xpack.ml.DeleteJobResponse;
import org.elasticsearch.protocol.xpack.ml.GetBucketsRequest;
import org.elasticsearch.protocol.xpack.ml.GetBucketsResponse;
import org.elasticsearch.protocol.xpack.ml.GetJobRequest;
import org.elasticsearch.protocol.xpack.ml.GetJobResponse;
import org.elasticsearch.protocol.xpack.ml.OpenJobRequest;
Expand Down Expand Up @@ -247,4 +249,40 @@ public void closeJobAsync(CloseJobRequest request, RequestOptions options, Actio
listener,
Collections.emptySet());
}

/**
* Gets the buckets for a Machine Learning Job.
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-bucket.html">ML GET buckets documentation</a>
*
* @param request the request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: capitalise 'The'. Same in the comment for getBucketsAsync

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually reviewing further down the lower case start of the parameter description is more consistent. If that is the preferred style change 'Additional' on the line below and in the async version comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just vastly inconsistent :-(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like capitalized as it separates better from the param name. I'll capitalize and cross my fingers.

* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
*/
public GetBucketsResponse getBuckets(GetBucketsRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::getBuckets,
options,
GetBucketsResponse::fromXContent,
Collections.emptySet());
}

/**
* Gets the buckets for a Machine Learning Job, notifies listener once the requested buckets are retrieved.
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-bucket.html">ML GET buckets documentation</a>
*
* @param request the request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void getBucketsAsync(GetBucketsRequest request, RequestOptions options, ActionListener<GetBucketsResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::getBuckets,
options,
GetBucketsResponse::fromXContent,
listener,
Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.protocol.xpack.ml.CloseJobRequest;
import org.elasticsearch.protocol.xpack.ml.DeleteJobRequest;
import org.elasticsearch.protocol.xpack.ml.GetBucketsRequest;
import org.elasticsearch.protocol.xpack.ml.GetJobRequest;
import org.elasticsearch.protocol.xpack.ml.OpenJobRequest;
import org.elasticsearch.protocol.xpack.ml.PutJobRequest;
import org.elasticsearch.protocol.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.protocol.xpack.ml.job.config.Detector;
import org.elasticsearch.protocol.xpack.ml.job.config.Job;
import org.elasticsearch.protocol.xpack.ml.job.util.PageParams;
import org.elasticsearch.test.ESTestCase;

import java.io.ByteArrayOutputStream;
Expand All @@ -49,6 +52,7 @@ public void testPutJob() throws IOException {

Request request = MLRequestConverters.putJob(putJobRequest);

assertEquals(HttpPut.METHOD_NAME, request.getMethod());
assertThat(request.getEndpoint(), equalTo("/_xpack/ml/anomaly_detectors/foo"));
try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
Job parsedJob = Job.PARSER.apply(parser, null).build();
Expand Down Expand Up @@ -123,6 +127,23 @@ public void testDeleteJob() {
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
}

public void testGetBuckets() throws IOException {
String jobId = randomAlphaOfLength(10);
GetBucketsRequest getBucketsRequest = new GetBucketsRequest(jobId);
getBucketsRequest.setPageParams(new PageParams(100, 300));
getBucketsRequest.setAnomalyScore(75.0);
getBucketsRequest.setSort("anomaly_score");
getBucketsRequest.setDescending(true);

Request request = MLRequestConverters.getBuckets(getBucketsRequest);
assertEquals(HttpGet.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/anomaly_detectors/" + jobId + "/results/buckets", request.getEndpoint());
try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
GetBucketsRequest parsedRequest = GetBucketsRequest.PARSER.apply(parser, null);
assertThat(parsedRequest, equalTo(getBucketsRequest));
}
}

private static Job createValidJob(String jobId) {
AnalysisConfig.Builder analysisConfig = AnalysisConfig.builder(Collections.singletonList(
Detector.builder().setFunction("count").build()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.protocol.xpack.ml.GetBucketsRequest;
import org.elasticsearch.protocol.xpack.ml.GetBucketsResponse;
import org.elasticsearch.protocol.xpack.ml.PutJobRequest;
import org.elasticsearch.protocol.xpack.ml.job.config.Job;
import org.elasticsearch.protocol.xpack.ml.job.results.Bucket;
import org.elasticsearch.protocol.xpack.ml.job.util.PageParams;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class MachineLearningGetResultsIT extends ESRestHighLevelClientTestCase {

private static final String RESULTS_INDEX = ".ml-anomalies-shared";
private static final String DOC = "doc";

private static final String JOB_ID = "get-results-it-job";

private BucketStats bucketStats = new BucketStats();

@Before
public void createJobAndIndexResults() throws IOException {
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
Job job = MachineLearningIT.buildJob(JOB_ID);
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

long time = 1533081600000L; // 2018-08-01T00:00:00Z
long endTime = time + 3600000L * 24 * 10; // 10 days of hourly buckets
while (time < endTime) {
addBucketIndexRequest(time, false, bulkRequest);
addRecordIndexRequests(time, false, bulkRequest);
time += 3600000L;
}

// Also index an interim bucket
addBucketIndexRequest(time, true, bulkRequest);
addRecordIndexRequests(time, true, bulkRequest);

highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT);
}

private void addBucketIndexRequest(long timestamp, boolean isInterim, BulkRequest bulkRequest) {
IndexRequest indexRequest = new IndexRequest(RESULTS_INDEX, DOC);
double bucketScore = randomDoubleBetween(0.0, 100.0, true);
bucketStats.report(bucketScore);
indexRequest.source("{\"job_id\":\"" + JOB_ID + "\", \"result_type\":\"bucket\", \"timestamp\": " + timestamp + "," +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why you prefer to build the string representation rather than constructing a Bucket object converting it to a ByteReference and use the overload of indexRequest.source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor and setter methods of Bucket are package protected. I like that as it means the users cannot modify a bucket.

"\"bucket_span\": 3600,\"is_interim\": " + isInterim + ", \"anomaly_score\": " + bucketScore +
", \"bucket_influencers\":[{\"job_id\": \"" + JOB_ID + "\", \"result_type\":\"bucket_influencer\", " +
"\"influencer_field_name\": \"bucket_time\", \"timestamp\": " + timestamp + ", \"bucket_span\": 3600, " +
"\"is_interim\": " + isInterim + "}]}", XContentType.JSON);
bulkRequest.add(indexRequest);
}

private void addRecordIndexRequests(long timestamp, boolean isInterim, BulkRequest bulkRequest) {
if (randomBoolean()) {
return;
}
int recordCount = randomIntBetween(1, 3);
for (int i = 0; i < recordCount; ++i) {
IndexRequest indexRequest = new IndexRequest(RESULTS_INDEX, DOC);
double recordScore = randomDoubleBetween(0.0, 100.0, true);
double p = randomDoubleBetween(0.0, 0.05, false);
indexRequest.source("{\"job_id\":\"" + JOB_ID + "\", \"result_type\":\"record\", \"timestamp\": " + timestamp + "," +
"\"bucket_span\": 3600,\"is_interim\": " + isInterim + ", \"record_score\": " + recordScore + ", \"probability\": "
+ p + "}", XContentType.JSON);
bulkRequest.add(indexRequest);
}
}

@After
public void deleteJob() throws IOException {
new MlRestTestStateCleaner(logger, client()).clearMlMetadata();
}

public void testGetBuckets() throws IOException {
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();

{
GetBucketsRequest request = new GetBucketsRequest(JOB_ID);

GetBucketsResponse response = execute(request, machineLearningClient::getBuckets, machineLearningClient::getBucketsAsync);

assertThat(response.count(), equalTo(241L));
assertThat(response.buckets().size(), equalTo(100));
assertThat(response.buckets().get(0).getTimestamp().getTime(), equalTo(1533081600000L));
}
{
GetBucketsRequest request = new GetBucketsRequest(JOB_ID);
request.setTimestamp("1533081600000");

GetBucketsResponse response = execute(request, machineLearningClient::getBuckets, machineLearningClient::getBucketsAsync);

assertThat(response.count(), equalTo(1L));
assertThat(response.buckets().size(), equalTo(1));
assertThat(response.buckets().get(0).getTimestamp().getTime(), equalTo(1533081600000L));
}
{
GetBucketsRequest request = new GetBucketsRequest(JOB_ID);
request.setAnomalyScore(75.0);

GetBucketsResponse response = execute(request, machineLearningClient::getBuckets, machineLearningClient::getBucketsAsync);

assertThat(response.count(), equalTo(bucketStats.criticalCount));
assertThat(response.buckets().size(), equalTo((int) Math.min(100, bucketStats.criticalCount)));
assertThat(response.buckets().stream().anyMatch(b -> b.getAnomalyScore() < 75.0), is(false));
}
{
GetBucketsRequest request = new GetBucketsRequest(JOB_ID);
request.setExcludeInterim(true);

GetBucketsResponse response = execute(request, machineLearningClient::getBuckets, machineLearningClient::getBucketsAsync);

assertThat(response.count(), equalTo(240L));
}
{
GetBucketsRequest request = new GetBucketsRequest(JOB_ID);
request.setStart("1533081600000");
request.setEnd("1533092400000");

GetBucketsResponse response = execute(request, machineLearningClient::getBuckets, machineLearningClient::getBucketsAsync);

assertThat(response.count(), equalTo(3L));
assertThat(response.buckets().get(0).getTimestamp().getTime(), equalTo(1533081600000L));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: extract 1533081600000L to a constant FIRST_BUCKET_TIME as the literal is used several times

assertThat(response.buckets().get(1).getTimestamp().getTime(), equalTo(1533081600000L + 3600000L));
assertThat(response.buckets().get(2).getTimestamp().getTime(), equalTo(1533081600000L + 2 * + 3600000L));
}
{
GetBucketsRequest request = new GetBucketsRequest(JOB_ID);
request.setPageParams(new PageParams(3, 3));

GetBucketsResponse response = execute(request, machineLearningClient::getBuckets, machineLearningClient::getBucketsAsync);

assertThat(response.buckets().size(), equalTo(3));
assertThat(response.buckets().get(0).getTimestamp().getTime(), equalTo(1533081600000L + 3 * 3600000L));
assertThat(response.buckets().get(1).getTimestamp().getTime(), equalTo(1533081600000L + 4 * 3600000L));
assertThat(response.buckets().get(2).getTimestamp().getTime(), equalTo(1533081600000L + 5 * + 3600000L));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra plus in + 5 * + 3600000L

}
{
GetBucketsRequest request = new GetBucketsRequest(JOB_ID);
request.setSort("anomaly_score");
request.setDescending(true);

GetBucketsResponse response = execute(request, machineLearningClient::getBuckets, machineLearningClient::getBucketsAsync);

double previousScore = 100.0;
for (Bucket bucket : response.buckets()) {
assertThat(bucket.getAnomalyScore(), lessThanOrEqualTo(previousScore));
previousScore = bucket.getAnomalyScore();
}
}
{
GetBucketsRequest request = new GetBucketsRequest(JOB_ID);
// Make sure we get all buckets
request.setPageParams(new PageParams(0, 10000));
request.setExpand(true);

GetBucketsResponse response = execute(request, machineLearningClient::getBuckets, machineLearningClient::getBucketsAsync);

assertThat(response.buckets().stream().anyMatch(b -> b.getRecords().size() > 0), is(true));
}
}

private static class BucketStats {
// score < 50.0
private long minorCount;

// score < 75.0
private long majorCount;

// score > 75.0
private long criticalCount;

private void report(double anomalyScore) {
if (anomalyScore < 50.0) {
minorCount++;
} else if (anomalyScore < 75.0) {
majorCount++;
} else {
criticalCount++;
}
}
}
}
Loading