Skip to content

Commit fccb7a2

Browse files
[ML] Include node name when native controller cannot start process (elastic#42225)
This adds the node name where we fail to start a process via the native controller to facilitate debugging as otherwise it might not be known to which node the job was allocated.
1 parent 610230f commit fccb7a2

File tree

6 files changed

+24
-14
lines changed

6 files changed

+24
-14
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
437437
NormalizerProcessFactory normalizerProcessFactory;
438438
if (MachineLearningField.AUTODETECT_PROCESS.get(settings) && MachineLearningFeatureSet.isRunningOnMlPlatform(true)) {
439439
try {
440-
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
440+
NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment);
441441
if (nativeController == null) {
442442
// This will only only happen when path.home is not set, which is disallowed in production
443443
throw new ElasticsearchException("Failed to create native process controller for Machine Learning");

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public MachineLearningFeatureSet(Environment environment, ClusterService cluster
7979
if (enabled && XPackPlugin.transportClientMode(environment.settings()) == false) {
8080
try {
8181
if (isRunningOnMlPlatform(true)) {
82-
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
82+
NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(),
83+
environment);
8384
if (nativeController != null) {
8485
nativeCodeInfo = nativeController.getNativeCodeInfo();
8586
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
public class MlLifeCycleService {
2020

2121
private final Environment environment;
22+
private final ClusterService clusterService;
2223
private final DatafeedManager datafeedManager;
2324
private final AutodetectProcessManager autodetectProcessManager;
2425
private final MlMemoryTracker memoryTracker;
2526

2627
public MlLifeCycleService(Environment environment, ClusterService clusterService, DatafeedManager datafeedManager,
2728
AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) {
2829
this.environment = environment;
30+
this.clusterService = clusterService;
2931
this.datafeedManager = datafeedManager;
3032
this.autodetectProcessManager = autodetectProcessManager;
3133
this.memoryTracker = memoryTracker;
@@ -46,7 +48,7 @@ public synchronized void stop() {
4648
if (datafeedManager != null) {
4749
datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
4850
}
49-
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
51+
NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment);
5052
if (nativeController != null) {
5153
// This kills autodetect processes WITHOUT closing the jobs, so they get reallocated.
5254
if (autodetectProcessManager != null) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,17 @@ public class NativeController {
4343

4444
public static final Map<String, Object> UNKNOWN_NATIVE_CODE_INFO = Map.of("version", "N/A", "build_hash", "N/A");
4545

46+
private final String localNodeName;
4647
private final CppLogMessageHandler cppLogHandler;
4748
private final OutputStream commandStream;
4849

49-
NativeController(Environment env, NamedPipeHelper namedPipeHelper) throws IOException {
50+
NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException {
5051
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER, null,
5152
true, true, false, false, false, false);
5253
processPipes.connectStreams(CONTROLLER_CONNECT_TIMEOUT);
53-
cppLogHandler = new CppLogMessageHandler(null, processPipes.getLogStream().get());
54-
commandStream = new BufferedOutputStream(processPipes.getCommandStream().get());
54+
this.localNodeName = localNodeName;
55+
this.cppLogHandler = new CppLogMessageHandler(null, processPipes.getLogStream().get());
56+
this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get());
5557
}
5658

5759
void tailLogsInThread() {
@@ -98,7 +100,8 @@ public void startProcess(List<String> command) throws IOException {
98100
}
99101

100102
if (cppLogHandler.hasLogStreamEnded()) {
101-
String msg = "Cannot start process [" + command.get(0) + "]: native controller process has stopped";
103+
String msg = "Cannot start process [" + command.get(0) + "]: native controller process has stopped on node ["
104+
+ localNodeName + "]";
102105
LOGGER.error(msg);
103106
throw new ElasticsearchException(msg);
104107
}
@@ -124,7 +127,8 @@ public void killProcess(long pid) throws TimeoutException, IOException {
124127
}
125128

126129
if (cppLogHandler.hasLogStreamEnded()) {
127-
String msg = "Cannot kill process with PID [" + pid + "]: native controller process has stopped";
130+
String msg = "Cannot kill process with PID [" + pid + "]: native controller process has stopped on node ["
131+
+ localNodeName + "]";
128132
LOGGER.error(msg);
129133
throw new ElasticsearchException(msg);
130134
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ private NativeControllerHolder() {
3232
*
3333
* Calls may throw an exception if initial connection to the C++ process fails.
3434
*/
35-
public static NativeController getNativeController(Environment environment) throws IOException {
35+
public static NativeController getNativeController(String localNodeName, Environment environment) throws IOException {
3636

3737
if (MachineLearningField.AUTODETECT_PROCESS.get(environment.settings())) {
3838
synchronized (lock) {
3939
if (nativeController == null) {
40-
nativeController = new NativeController(environment, new NamedPipeHelper());
40+
nativeController = new NativeController(localNodeName, environment, new NamedPipeHelper());
4141
nativeController.tailLogsInThread();
4242
}
4343
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
public class NativeControllerTests extends ESTestCase {
3232

33+
private static final String NODE_NAME = "native-controller-tests-node";
34+
3335
private static final String TEST_MESSAGE = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211,"
3436
+ "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) "
3537
+ "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n";
@@ -50,7 +52,7 @@ public void testStartProcessCommand() throws IOException {
5052
command.add("--arg2=42");
5153
command.add("--arg3=something with spaces");
5254

53-
NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper);
55+
NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper);
5456
nativeController.startProcess(command);
5557

5658
assertEquals("start\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n",
@@ -65,7 +67,7 @@ public void testGetNativeCodeInfo() throws IOException, TimeoutException {
6567
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
6668
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);
6769

68-
NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper);
70+
NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper);
6971
nativeController.tailLogsInThread();
7072
Map<String, Object> nativeCodeInfo = nativeController.getNativeCodeInfo();
7173

@@ -83,15 +85,16 @@ public void testControllerDeath() throws Exception {
8385
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
8486
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);
8587

86-
NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper);
88+
NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper);
8789
nativeController.tailLogsInThread();
8890

8991
// As soon as the log stream ends startProcess should think the native controller has died
9092
assertBusy(() -> {
9193
ElasticsearchException e = expectThrows(ElasticsearchException.class,
9294
() -> nativeController.startProcess(Collections.singletonList("my process")));
9395

94-
assertEquals("Cannot start process [my process]: native controller process has stopped", e.getMessage());
96+
assertEquals("Cannot start process [my process]: native controller process has stopped on node " +
97+
"[native-controller-tests-node]", e.getMessage());
9598
});
9699
}
97100
}

0 commit comments

Comments
 (0)