Skip to content

Commit 34284b1

Browse files
committed
[ML] Fix job ID in C++ logs for normalize and memory estimation (#63874)
The changes of #54636 and #60395 were incorrect in their assertion that "the job ID passed to the process pipes is only used to make the file names unique". In fact it is also passed to the C++ log handler and gets logged with every message logged by the C++ processes. This PR splits the job ID and unique IDs (added in #54636 and #60395) so that the correct job ID is passed to the log handler. Nothing really bad happened as a result of this problem - it was just cosmetic.
1 parent db9f008 commit 34284b1

File tree

7 files changed

+18
-16
lines changed

7 files changed

+18
-16
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co
7373
String jobId = config.getId();
7474
List<Path> filesToDelete = new ArrayList<>();
7575
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId,
76-
false, true, true, hasState, config.getAnalysis().persistsState());
76+
null, false, true, true, hasState, config.getAnalysis().persistsState());
7777

7878
// The extra 2 are for the checksum and the control field
7979
int numberOfFields = analyticsProcessConfig.cols() + 2;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,11 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess(
6262
ExecutorService executorService,
6363
Consumer<String> onProcessCrash) {
6464
List<Path> filesToDelete = new ArrayList<>();
65-
// The config ID passed to the process pipes is only used to make the file names unique. Since memory estimation can be
66-
// called many times in quick succession for the same config the config ID alone is not sufficient to guarantee that the
67-
// memory estimation process pipe names are unique. Therefore an increasing counter value is appended to the config ID
68-
// to ensure uniqueness between calls.
65+
// Since memory estimation can be called many times in quick succession for the same config the config ID alone is not
66+
// sufficient to guarantee that the memory estimation process pipe names are unique. Therefore an increasing counter
67+
// value is passed as well as the config ID to ensure uniqueness between calls.
6968
ProcessPipes processPipes = new ProcessPipes(
70-
env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(),
69+
env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId(), counter.incrementAndGet(),
7170
false, false, true, false, false);
7271

7372
createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public AutodetectProcess createAutodetectProcess(Job job,
7777
Consumer<String> onProcessCrash) {
7878
List<Path> filesToDelete = new ArrayList<>();
7979
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AutodetectBuilder.AUTODETECT,
80-
job.getId(), false, true, true, params.modelSnapshot() != null,
80+
job.getId(), null, false, true, true, params.modelSnapshot() != null,
8181
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings) == false);
8282
createNativeProcess(job, params, processPipes, filesToDelete);
8383
boolean includeTokensField = MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ void setProcessConnectTimeout(TimeValue processConnectTimeout) {
5151
@Override
5252
public NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan,
5353
ExecutorService executorService) {
54-
// The job ID passed to the process pipes is only used to make the file names unique. Since normalize can get run many times
55-
// in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names
56-
// are unique. Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls.
54+
// Since normalize can get run many times in quick succession for the same job the job ID alone is not sufficient to
55+
// guarantee that the normalizer process pipe names are unique. Therefore an increasing counter value is passed as
56+
// well as the job ID to ensure uniqueness between calls.
5757
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, NormalizerBuilder.NORMALIZE,
58-
jobId + "_" + counter.incrementAndGet(), false, true, true, false, false);
58+
jobId, counter.incrementAndGet(), false, true, true, false, false);
5959
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
6060

6161
NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class NativeController {
7575
NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper, NamedXContentRegistry xContentRegistry)
7676
throws IOException {
7777
this.localNodeName = localNodeName;
78-
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null,
78+
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null, null,
7979
true, false, true, false, false);
8080
processPipes.connectLogStream();
8181
this.cppLogHandler = processPipes.getLogStreamHandler();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public class ProcessPipes {
7676
* May be null or empty for processes not associated with a specific job.
7777
*/
7878
public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration timeout, String processName, String jobId,
79-
boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
79+
Long uniqueId, boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
8080
boolean wantRestorePipe, boolean wantPersistPipe) {
8181
this.namedPipeHelper = namedPipeHelper;
8282
this.jobId = jobId;
@@ -91,6 +91,9 @@ public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration t
9191
if (!Strings.isNullOrEmpty(jobId)) {
9292
prefixBuilder.append(jobId).append('_');
9393
}
94+
if (uniqueId != null) {
95+
prefixBuilder.append(uniqueId).append('_');
96+
}
9497
String prefix = prefixBuilder.toString();
9598
String suffix = String.format(Locale.ROOT, "_%d", JvmInfo.jvmInfo().getPid());
9699
logPipeName = String.format(Locale.ROOT, "%slog%s", prefix, suffix);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void testProcessPipes() throws Exception {
6262

6363
int timeoutSeconds = randomIntBetween(5, 100);
6464
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(timeoutSeconds), AutodetectBuilder.AUTODETECT,
65-
"my_job", false, true, true, true, true);
65+
"my_job", null, false, true, true, true, true);
6666

6767
List<String> command = new ArrayList<>();
6868
processPipes.addArgs(command);
@@ -110,7 +110,7 @@ public void testCloseUnusedPipes_notConnected() {
110110
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
111111
Environment env = TestEnvironment.newEnvironment(settings);
112112

113-
new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
113+
new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job", null,
114114
true, true, true, true, true);
115115
}
116116

@@ -138,7 +138,7 @@ public void testCloseOpenedPipesOnError() throws IOException {
138138
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
139139
Environment env = TestEnvironment.newEnvironment(settings);
140140
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
141-
true, true, true, true, true);
141+
null, true, true, true, true, true);
142142

143143
processPipes.connectLogStream();
144144
expectThrows(IOException.class, processPipes::connectOtherStreams);

0 commit comments

Comments
 (0)