-
Notifications
You must be signed in to change notification settings - Fork 25.2k
[ML][Inference] Adding inference ingest processor #47859
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML][Inference] Adding inference ingest processor #47859
Conversation
Pinging @elastic/ml-core (:ml) |
@@ -416,7 +429,9 @@ public static boolean isMlNode(DiscoveryNode node) { | |||
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC, | |||
MAX_OPEN_JOBS_PER_NODE, | |||
MIN_DISK_SPACE_OFF_HEAP, | |||
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION); | |||
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION, | |||
InferenceProcessor.MAX_INFERENCE_PROCESSORS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be one more setting we should add "Maximum loaded models". But, I think that we don't need to add this other setting until we support loading models outside of processors.
|
||
public class InferenceProcessor extends AbstractProcessor { | ||
|
||
// How many total inference processors are allowed to be used in the cluster. | ||
public static final Setting<Integer> MAX_INFERENCE_PROCESSORS = Setting.intSetting("xpack.ml.max_inference_processors", | ||
50, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a "magic" number. Given we don't have real data around our typical model size and performance, I just picked a number that is not too small.
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, TARGET_FIELD); | ||
Map<String, String> fieldMapping = ConfigurationUtils.readOptionalMap(TYPE, tag, config, FIELD_MAPPINGS); | ||
InferenceConfig inferenceConfig = inferenceConfigFromMap(ConfigurationUtils.readMap(TYPE, tag, config, INFERENCE_CONFIG)); | ||
String modelInfoField = ConfigurationUtils.readStringProperty(TYPE, tag, config, MODEL_INFO_FIELD, "_model_info"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default, I think we should add a field that includes the model ID used in the inference step.
This should probably append the tag
to the end of the default info field to protect against multiple processors in the same pipeline...
} | ||
} | ||
|
||
void checkSupportedVersion(InferenceConfig config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, this factory builds the pipeline on the master node before storing it in cluster state. So, if we always check the minimum supported version against the minimum node version we can guarantee that PUT pipeline
will fail if a user tries to create a pipeline when the specific processor setting is not supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this explanation should go into code comment for method getMinimalSupportedVersion
.
...e/src/main/java/org/elasticsearch/xpack/core/ml/inference/trainedmodel/RegressionConfig.java
Show resolved
Hide resolved
} | ||
|
||
void mutateDocument(InferModelAction.Response response, IngestDocument ingestDocument) { | ||
response.getInferenceResults().get(0).writeResult(ingestDocument, this.targetField); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it guaranteed that response.getInferenceResults()
is non-empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I will put a check to protect us from funkiness.
.../plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessor.java
Show resolved
Hide resolved
XContentType.JSON).get().isAcknowledged(), is(true)); | ||
|
||
client().prepareIndex("index_for_inference_test", "_doc") | ||
.setSource(new HashMap<>(){{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would put source doc generation into a method, say, "generateDocSource".
} | ||
|
||
public void testSimulate() { | ||
String source = "{\n" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[non-actionable] Waiting for multiline raw string literals being introduced to Java...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No joke!
"""
"""
cannot be added soon enough!
InferenceConfig inferenceConfigFromMap(Map<String, Object> inferenceConfig) throws IOException { | ||
ExceptionsHelper.requireNonNull(inferenceConfig, INFERENCE_CONFIG); | ||
|
||
if (inferenceConfig.keySet().size() != 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it equivalent to inferenceConfig.size()
? It should be, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely, I can simplify.
.../plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessor.java
Show resolved
Hide resolved
} | ||
} | ||
|
||
private static ClusterState buildState(MetaData metaData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to buildClusterState
for consistency with the methods below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -13,4 +14,5 @@ | |||
|
|||
boolean isTargetTypeSupported(TargetType targetType); | |||
|
|||
Version getMinimalSupportedVersion(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment explaining the need for this?
} | ||
} | ||
|
||
void checkSupportedVersion(InferenceConfig config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this explanation should go into code comment for method getMinimalSupportedVersion
.
…ture/ml-inference-processor
* [ML][Inference] adds lazy model loader and inference (#47410) This adds a couple of things: - A model loader service that is accessible via transport calls. This service will load in models and cache them. They will stay loaded until a processor no longer references them - A Model class and its first sub-class LocalModel. Used to cache model information and run inference. - Transport action and handler for requests to infer against a local model Related Feature PRs: * [ML][Inference] Adjust inference configuration option API (#47812) * [ML][Inference] adds logistic_regression output aggregator (#48075) * [ML][Inference] Adding read/del trained models (#47882) * [ML][Inference] Adding inference ingest processor (#47859) * [ML][Inference] fixing classification inference for ensemble (#48463) * [ML][Inference] Adding model memory estimations (#48323) * [ML][Inference] adding more options to inference processor (#48545) * [ML][Inference] handle string values better in feature extraction (#48584) * [ML][Inference] Adding _stats endpoint for inference (#48492) * [ML][Inference] add inference processors and trained models to usage (#47869) * [ML][Inference] add new flag for optionally including model definition (#48718) * [ML][Inference] adding license checks (#49056) * [ML][Inference] Adding memory and compute estimates to inference (#48955)
* [ML][Inference] adds lazy model loader and inference (elastic#47410) This adds a couple of things: - A model loader service that is accessible via transport calls. This service will load in models and cache them. They will stay loaded until a processor no longer references them - A Model class and its first sub-class LocalModel. Used to cache model information and run inference. - Transport action and handler for requests to infer against a local model Related Feature PRs: * [ML][Inference] Adjust inference configuration option API (elastic#47812) * [ML][Inference] adds logistic_regression output aggregator (elastic#48075) * [ML][Inference] Adding read/del trained models (elastic#47882) * [ML][Inference] Adding inference ingest processor (elastic#47859) * [ML][Inference] fixing classification inference for ensemble (elastic#48463) * [ML][Inference] Adding model memory estimations (elastic#48323) * [ML][Inference] adding more options to inference processor (elastic#48545) * [ML][Inference] handle string values better in feature extraction (elastic#48584) * [ML][Inference] Adding _stats endpoint for inference (elastic#48492) * [ML][Inference] add inference processors and trained models to usage (elastic#47869) * [ML][Inference] add new flag for optionally including model definition (elastic#48718) * [ML][Inference] adding license checks (elastic#49056) * [ML][Inference] Adding memory and compute estimates to inference (elastic#48955)
* [ML] ML Model Inference Ingest Processor (#49052) * [ML][Inference] adds lazy model loader and inference (#47410) This adds a couple of things: - A model loader service that is accessible via transport calls. This service will load in models and cache them. They will stay loaded until a processor no longer references them - A Model class and its first sub-class LocalModel. Used to cache model information and run inference. - Transport action and handler for requests to infer against a local model Related Feature PRs: * [ML][Inference] Adjust inference configuration option API (#47812) * [ML][Inference] adds logistic_regression output aggregator (#48075) * [ML][Inference] Adding read/del trained models (#47882) * [ML][Inference] Adding inference ingest processor (#47859) * [ML][Inference] fixing classification inference for ensemble (#48463) * [ML][Inference] Adding model memory estimations (#48323) * [ML][Inference] adding more options to inference processor (#48545) * [ML][Inference] handle string values better in feature extraction (#48584) * [ML][Inference] Adding _stats endpoint for inference (#48492) * [ML][Inference] add inference processors and trained models to usage (#47869) * [ML][Inference] add new flag for optionally including model definition (#48718) * [ML][Inference] adding license checks (#49056) * [ML][Inference] Adding memory and compute estimates to inference (#48955) * fixing version of indexed docs for model inference
This adds a new Ingest processor that does infers against a stored previously trained model.