Skip to content

[ML][Inference] adds lazy model loader and inference #47410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.core.ml.action.GetOverallBucketsAction;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.InferModelAction;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.action.MlInfoAction;
Expand Down Expand Up @@ -139,7 +140,14 @@
import org.elasticsearch.xpack.core.ml.dataframe.evaluation.softclassification.Recall;
import org.elasticsearch.xpack.core.ml.dataframe.evaluation.softclassification.ScoreByThresholdResult;
import org.elasticsearch.xpack.core.ml.dataframe.evaluation.softclassification.SoftClassificationMetric;
import org.elasticsearch.xpack.core.ml.inference.results.ClassificationInferenceResults;
import org.elasticsearch.xpack.core.ml.inference.results.InferenceResults;
import org.elasticsearch.xpack.core.ml.inference.results.RegressionInferenceResults;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.TrainedModel;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ensemble.Ensemble;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ensemble.OutputAggregator;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ensemble.WeightedMode;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ensemble.WeightedSum;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.tree.Tree;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.FrequencyEncoding;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.OneHotEncoding;
Expand Down Expand Up @@ -323,6 +331,7 @@ public List<ActionType<? extends ActionResponse>> getClientActions() {
StartDataFrameAnalyticsAction.INSTANCE,
EvaluateDataFrameAction.INSTANCE,
EstimateMemoryUsageAction.INSTANCE,
InferModelAction.INSTANCE,
// security
ClearRealmCacheAction.INSTANCE,
ClearRolesCacheAction.INSTANCE,
Expand Down Expand Up @@ -451,6 +460,17 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(PreProcessor.class, TargetMeanEncoding.NAME.getPreferredName(), TargetMeanEncoding::new),
// ML - Inference models
new NamedWriteableRegistry.Entry(TrainedModel.class, Tree.NAME.getPreferredName(), Tree::new),
new NamedWriteableRegistry.Entry(TrainedModel.class, Ensemble.NAME.getPreferredName(), Ensemble::new),
// ML - Inference aggregators
new NamedWriteableRegistry.Entry(OutputAggregator.class, WeightedSum.NAME.getPreferredName(), WeightedSum::new),
new NamedWriteableRegistry.Entry(OutputAggregator.class, WeightedMode.NAME.getPreferredName(), WeightedMode::new),
// ML - Inference Results
new NamedWriteableRegistry.Entry(InferenceResults.class,
ClassificationInferenceResults.NAME,
ClassificationInferenceResults::new),
new NamedWriteableRegistry.Entry(InferenceResults.class,
RegressionInferenceResults.NAME,
RegressionInferenceResults::new),

// monitoring
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.MONITORING, MonitoringFeatureSetUsage::new),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
import org.elasticsearch.xpack.core.ml.inference.results.InferenceResults;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceParams;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class InferModelAction extends ActionType<InferModelAction.Response> {

public static final InferModelAction INSTANCE = new InferModelAction();
public static final String NAME = "cluster:admin/xpack/ml/infer";

private InferModelAction() {
super(NAME, Response::new);
}

public static class Request extends ActionRequest {

private final String modelId;
private final long modelVersion;
private final List<Map<String, Object>> objectsToInfer;
private final InferenceParams params;

public Request(String modelId, long modelVersion) {
this(modelId, modelVersion, Collections.emptyList(), InferenceParams.EMPTY_PARAMS);
}

public Request(String modelId, long modelVersion, List<Map<String, Object>> objectsToInfer, InferenceParams inferenceParams) {
this.modelId = ExceptionsHelper.requireNonNull(modelId, TrainedModelConfig.MODEL_ID);
this.modelVersion = modelVersion;
this.objectsToInfer = Collections.unmodifiableList(ExceptionsHelper.requireNonNull(objectsToInfer, "objects_to_infer"));
this.params = inferenceParams == null ? InferenceParams.EMPTY_PARAMS : inferenceParams;
}

public Request(String modelId, long modelVersion, Map<String, Object> objectToInfer, InferenceParams params) {
this(modelId,
modelVersion,
Arrays.asList(ExceptionsHelper.requireNonNull(objectToInfer, "objects_to_infer")),
params);
}

public Request(StreamInput in) throws IOException {
super(in);
this.modelId = in.readString();
this.modelVersion = in.readVLong();
this.objectsToInfer = Collections.unmodifiableList(in.readList(StreamInput::readMap));
this.params = new InferenceParams(in);
}

public String getModelId() {
return modelId;
}

public long getModelVersion() {
return modelVersion;
}

public List<Map<String, Object>> getObjectsToInfer() {
return objectsToInfer;
}

public InferenceParams getParams() {
return params;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(modelId);
out.writeVLong(modelVersion);
out.writeCollection(objectsToInfer, StreamOutput::writeMap);
params.writeTo(out);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InferModelAction.Request that = (InferModelAction.Request) o;
return Objects.equals(modelId, that.modelId)
&& Objects.equals(modelVersion, that.modelVersion)
&& Objects.equals(params, that.params)
&& Objects.equals(objectsToInfer, that.objectsToInfer);
}

@Override
public int hashCode() {
return Objects.hash(modelId, modelVersion, objectsToInfer, params);
}

}

public static class Response extends ActionResponse {

private final List<InferenceResults> inferenceResults;

public Response(List<InferenceResults> inferenceResults) {
super();
this.inferenceResults = Collections.unmodifiableList(ExceptionsHelper.requireNonNull(inferenceResults, "inferenceResults"));
}

public Response(StreamInput in) throws IOException {
super(in);
this.inferenceResults = Collections.unmodifiableList(in.readNamedWriteableList(InferenceResults.class));
}

public List<InferenceResults> getInferenceResults() {
return inferenceResults;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteableList(inferenceResults);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InferModelAction.Response that = (InferModelAction.Response) o;
return Objects.equals(inferenceResults, that.inferenceResults);
}

@Override
public int hashCode() {
return Objects.hash(inferenceResults);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.plugins.spi.NamedXContentProvider;
import org.elasticsearch.xpack.core.ml.inference.results.ClassificationInferenceResults;
import org.elasticsearch.xpack.core.ml.inference.results.InferenceResults;
import org.elasticsearch.xpack.core.ml.inference.results.RegressionInferenceResults;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.LenientlyParsedTrainedModel;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.TrainedModel;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.StrictlyParsedTrainedModel;
Expand Down Expand Up @@ -100,6 +103,14 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
WeightedMode.NAME.getPreferredName(),
WeightedMode::new));

// Inference Results
namedWriteables.add(new NamedWriteableRegistry.Entry(InferenceResults.class,
ClassificationInferenceResults.NAME,
ClassificationInferenceResults::new));
namedWriteables.add(new NamedWriteableRegistry.Entry(InferenceResults.class,
RegressionInferenceResults.NAME,
RegressionInferenceResults::new));

return namedWriteables;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.elasticsearch.xpack.core.ml.inference.preprocessing.LenientlyParsedPreProcessor;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.PreProcessor;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.StrictlyParsedPreProcessor;
import org.elasticsearch.xpack.core.ml.inference.results.InferenceResults;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceParams;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.LenientlyParsedTrainedModel;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.StrictlyParsedTrainedModel;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.TrainedModel;
Expand All @@ -27,6 +29,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class TrainedModelDefinition implements ToXContentObject, Writeable {
Expand Down Expand Up @@ -118,6 +121,15 @@ public Input getInput() {
return input;
}

private void preProcess(Map<String, Object> fields) {
preProcessors.forEach(preProcessor -> preProcessor.process(fields));
}

public InferenceResults infer(Map<String, Object> fields, InferenceParams params) {
preProcess(fields);
return trainedModel.infer(fields, params);
}

@Override
public String toString() {
return Strings.toString(this);
Expand Down
Loading