Skip to content

[FEATURE][ML] Write data frame configuration to process #35914

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
new BlackHoleAutodetectProcess(job.getId());
// factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
analyticsProcessFactory = (jobId, executorService) -> null;
analyticsProcessFactory = (jobId, analyticsProcessConfig, executorService) -> null;
}
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -25,6 +27,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.script.Script;
Expand Down Expand Up @@ -114,18 +117,23 @@ private boolean isMlNode(DiscoveryNode node) {
private void reindexDataframeAndStartAnalysis(String index, ActionListener<AcknowledgedResponse> listener) {
final String destinationIndex = index + "_copy";

ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
bulkResponse -> {
client.execute(RefreshAction.INSTANCE, new RefreshRequest(destinationIndex), ActionListener.wrap(
refreshResponse -> {
runPipelineAnalytics(destinationIndex, listener);
}, listener::onFailure
));
}, listener::onFailure
);

ActionListener<CreateIndexResponse> copyIndexCreatedListener = ActionListener.wrap(
createIndexResponse -> {
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(index);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setScript(new Script("ctx._source." + DataFrameFields.ID + " = ctx._id"));
client.execute(ReindexAction.INSTANCE, reindexRequest, ActionListener.wrap(
bulkResponse -> {
runPipelineAnalytics(destinationIndex, listener);
},
listener::onFailure
));
client.execute(ReindexAction.INSTANCE, reindexRequest, reindexCompletedListener);
}, listener::onFailure
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.analytics;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;

public class DataFrameAnalysis implements ToXContentObject {

private static final ParseField NAME = new ParseField("name");

private final String name;

public DataFrameAnalysis(String name) {
this.name = ExceptionsHelper.requireNonNull(name, NAME.getPreferredName());
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NAME.getPreferredName(), name);
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,25 @@ public String[] getFieldNamesArray() {
List<String> fieldNames = getFieldNames();
return fieldNames.toArray(new String[fieldNames.size()]);
}

public DataSummary collectDataSummary() {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setIndices(context.indices)
.setSize(0)
.setQuery(context.query);

SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
return new DataSummary(searchResponse.getHits().getTotalHits(), context.extractedFields.getAllFields().size());
}

public static class DataSummary {

public final long rows;
public final long cols;

public DataSummary(long rows, long cols) {
this.rows = rows;
this.cols = cols;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@
*/
package org.elasticsearch.xpack.ml.analytics.process;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -19,13 +28,21 @@ public class AnalyticsBuilder {
private static final String ANALYTICS_PATH = "./" + ANALYTICS;

private static final String LENGTH_ENCODED_INPUT_ARG = "--lengthEncodedInput";
private static final String CONFIG_ARG = "--config=";

private final Environment env;
private final NativeController nativeController;
private final ProcessPipes processPipes;
private final AnalyticsProcessConfig config;
private final List<Path> filesToDelete;

public AnalyticsBuilder(NativeController nativeController, ProcessPipes processPipes) {
public AnalyticsBuilder(Environment env, NativeController nativeController, ProcessPipes processPipes, AnalyticsProcessConfig config,
List<Path> filesToDelete) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
this.processPipes = Objects.requireNonNull(processPipes);
this.config = Objects.requireNonNull(config);
this.filesToDelete = Objects.requireNonNull(filesToDelete);
}

public void build() throws IOException {
Expand All @@ -34,10 +51,24 @@ public void build() throws IOException {
nativeController.startProcess(command);
}

List<String> buildAnalyticsCommand() {
List<String> buildAnalyticsCommand() throws IOException {
List<String> command = new ArrayList<>();
command.add(ANALYTICS_PATH);
command.add(LENGTH_ENCODED_INPUT_ARG);
addConfigFile(command);
return command;
}

private void addConfigFile(List<String> command) throws IOException {
Path configFile = Files.createTempFile(env.tmpFile(), "analysis", ".conf");
filesToDelete.add(configFile);
try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(configFile),StandardCharsets.UTF_8);
XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {

config.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
osw.write(Strings.toString(jsonBuilder));
}

command.add(CONFIG_ARG + configFile.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.analytics.process;

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.analytics.DataFrameAnalysis;

import java.io.IOException;
import java.util.Objects;

public class AnalyticsProcessConfig implements ToXContentObject {

private static final String ROWS = "rows";
private static final String COLS = "cols";
private static final String MEMORY_LIMIT = "memory_limit";
private static final String THREADS = "threads";
private static final String ANALYSIS = "analysis";

private final long rows;
private final long cols;
private final ByteSizeValue memoryLimit;
private final int threads;
private final DataFrameAnalysis analysis;


public AnalyticsProcessConfig(long rows, long cols, ByteSizeValue memoryLimit, int threads, DataFrameAnalysis analysis) {
this.rows = rows;
this.cols = cols;
this.memoryLimit = Objects.requireNonNull(memoryLimit);
this.threads = threads;
this.analysis = Objects.requireNonNull(analysis);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ROWS, rows);
builder.field(COLS, cols);
builder.field(MEMORY_LIMIT, memoryLimit.getBytes());
builder.field(THREADS, threads);
builder.field(ANALYSIS, analysis);
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ public interface AnalyticsProcessFactory {
* Create an implementation of {@link AnalyticsProcess}
*
* @param jobId The job id
* @param analyticsProcessConfig The process configuration
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
* @return The process
*/
AnalyticsProcess createAnalyticsProcess(String jobId, ExecutorService executorService);
AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.analytics.DataFrameAnalysis;
import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractor;

import java.io.IOException;
Expand Down Expand Up @@ -39,7 +42,7 @@ public AnalyticsProcessManager(Client client, Environment environment, ThreadPoo

public void processData(String jobId, DataFrameDataExtractor dataExtractor) {
threadPool.generic().execute(() -> {
AnalyticsProcess process = createProcess(jobId);
AnalyticsProcess process = createProcess(jobId, dataExtractor);
try {
// Fake header
process.writeRecord(dataExtractor.getFieldNamesArray());
Expand Down Expand Up @@ -69,13 +72,20 @@ public void processData(String jobId, DataFrameDataExtractor dataExtractor) {
});
}

private AnalyticsProcess createProcess(String jobId) {
private AnalyticsProcess createProcess(String jobId, DataFrameDataExtractor dataExtractor) {
// TODO We should rename the thread pool to reflect its more general use now, e.g. JOB_PROCESS_THREAD_POOL_NAME
ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, executorService);
AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, createProcessConfig(dataExtractor), executorService);
if (process.isProcessAlive() == false) {
throw ExceptionsHelper.serverError("Failed to start analytics process");
}
return process;
}

private AnalyticsProcessConfig createProcessConfig(DataFrameDataExtractor dataExtractor) {
DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary();
AnalyticsProcessConfig config = new AnalyticsProcessConfig(dataSummary.rows, dataSummary.cols,
new ByteSizeValue(1, ByteSizeUnit.GB), 1, new DataFrameAnalysis("outliers"));
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;

Expand All @@ -37,15 +39,17 @@ public NativeAnalyticsProcessFactory(Environment env, NativeController nativeCon
}

@Override
public AnalyticsProcess createAnalyticsProcess(String jobId, ExecutorService executorService) {
public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig,
ExecutorService executorService) {
List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId,
true, false, true, true, false, false);
true, false, true, true, false, false);

createNativeProcess(jobId, processPipes);
createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);

NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(),
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, 0,
Collections.emptyList(), () -> {});
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, 0,
filesToDelete, () -> {});


try {
Expand All @@ -61,8 +65,10 @@ public AnalyticsProcess createAnalyticsProcess(String jobId, ExecutorService exe
}
}

private void createNativeProcess(String jobId, ProcessPipes processPipes) {
AnalyticsBuilder analyticsBuilder = new AnalyticsBuilder(nativeController, processPipes);
private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List<Path> filesToDelete,
ProcessPipes processPipes) {
AnalyticsBuilder analyticsBuilder = new AnalyticsBuilder(env, nativeController, processPipes, analyticsProcessConfig,
filesToDelete);
try {
analyticsBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
Expand Down