Skip to content

Commit 4428c15

Browse files
committed
[7.x][ML] Make controller send responses for each command received
This change makes the controller process respond to each command it receives with a document indicating whether that command was successfully executed or not. This response will be used by the Java side of the connection to determine when it is appropriate to move on to the next phase of the action that the controller command was part of. For example, when starting a process and connecting named pipes to it it is best that the named pipe connections are not attempted until the process is confirmed to be started. Backport of elastic#1520
1 parent c6200ea commit 4428c15

13 files changed

+416
-152
lines changed

bin/autodetect/CCmdLineParser.cc

+28-28
Original file line numberDiff line numberDiff line change
@@ -63,66 +63,66 @@ bool CCmdLineParser::parse(int argc,
6363
("help", "Display this information and exit")
6464
("version", "Display version information and exit")
6565
("limitconfig", boost::program_options::value<std::string>(),
66-
"Optional limit config file")
66+
"Optional limit config file")
6767
("modelconfig", boost::program_options::value<std::string>(),
68-
"Optional model config file")
68+
"Optional model config file")
6969
("fieldconfig", boost::program_options::value<std::string>(),
70-
"Optional field config file")
70+
"Optional field config file")
7171
("modelplotconfig", boost::program_options::value<std::string>(),
72-
"Optional model plot config file")
72+
"Optional model plot config file")
7373
("jobid", boost::program_options::value<std::string>(),
74-
"ID of the job this process is associated with")
74+
"ID of the job this process is associated with")
7575
("logProperties", boost::program_options::value<std::string>(),
76-
"Optional logger properties file")
76+
"Optional logger properties file")
7777
("logPipe", boost::program_options::value<std::string>(),
78-
"Optional log to named pipe")
78+
"Optional log to named pipe")
7979
("bucketspan", boost::program_options::value<core_t::TTime>(),
80-
"Optional aggregation bucket span (in seconds) - default is 300")
80+
"Optional aggregation bucket span (in seconds) - default is 300")
8181
("latency", boost::program_options::value<core_t::TTime>(),
82-
"Optional maximum delay for out-of-order records (in seconds) - default is 0")
82+
"Optional maximum delay for out-of-order records (in seconds) - default is 0")
8383
("summarycountfield", boost::program_options::value<std::string>(),
84-
"Optional field to that contains counts for pre-summarized input - default is none")
84+
"Optional field to that contains counts for pre-summarized input - default is none")
8585
("delimiter", boost::program_options::value<char>(),
86-
"Optional delimiter character for delimited data formats - default is '\t' (tab separated)")
86+
"Optional delimiter character for delimited data formats - default is '\t' (tab separated)")
8787
("lengthEncodedInput",
88-
"Take input in length encoded binary format - default is delimited")
88+
"Take input in length encoded binary format - default is delimited")
8989
("timefield", boost::program_options::value<std::string>(),
90-
"Optional name of the field containing the timestamp - default is 'time'")
90+
"Optional name of the field containing the timestamp - default is 'time'")
9191
("timeformat", boost::program_options::value<std::string>(),
92-
"Optional format of the date in the time field in strptime code - default is the epoch time in seconds")
92+
"Optional format of the date in the time field in strptime code - default is the epoch time in seconds")
9393
("quantilesState", boost::program_options::value<std::string>(),
94-
"Optional file to quantiles for normalization")
94+
"Optional file to quantiles for normalization")
9595
("deleteStateFiles",
96-
"If the 'quantilesState' option is used and this flag is set then delete the model state files once they have been read")
96+
"If the 'quantilesState' option is used and this flag is set then delete the model state files once they have been read")
9797
("namedPipeConnectTimeout", boost::program_options::value<core_t::TTime>(),
98-
"Optional timeout (in seconds) for connecting named pipes on startup - default is 300 seconds")
98+
"Optional timeout (in seconds) for connecting named pipes on startup - default is 300 seconds")
9999
("input", boost::program_options::value<std::string>(),
100-
"Optional file to read input from - not present means read from STDIN")
100+
"Optional file to read input from - not present means read from STDIN")
101101
("inputIsPipe", "Specified input file is a named pipe")
102102
("output", boost::program_options::value<std::string>(),
103-
"Optional file to write output to - not present means write to STDOUT")
103+
"Optional file to write output to - not present means write to STDOUT")
104104
("outputIsPipe", "Specified output file is a named pipe")
105105
("restore", boost::program_options::value<std::string>(),
106-
"Optional file to restore state from - not present means no state restoration")
106+
"Optional file to restore state from - not present means no state restoration")
107107
("restoreIsPipe", "Specified restore file is a named pipe")
108108
("persist", boost::program_options::value<std::string>(),
109-
"Optional file to persist state to - not present means no state persistence")
109+
"Optional file to persist state to - not present means no state persistence")
110110
("persistIsPipe", "Specified persist file is a named pipe")
111111
("persistInterval", boost::program_options::value<core_t::TTime>(),
112-
"Optional time interval at which to periodically persist model state (Mutually exclusive with bucketPersistInterval)")
112+
"Optional time interval at which to periodically persist model state (Mutually exclusive with bucketPersistInterval)")
113113
("persistInForeground", "Persistence occurs in the foreground. Defaults to background persistence.")
114114
("bucketPersistInterval", boost::program_options::value<std::size_t>(),
115-
"Optional number of buckets after which to periodically persist model state (Mutually exclusive with persistInterval)")
115+
"Optional number of buckets after which to periodically persist model state (Mutually exclusive with persistInterval)")
116116
("maxQuantileInterval", boost::program_options::value<core_t::TTime>(),
117-
"Optional interval at which to periodically output quantiles if they have not been output due to an anomaly - if not specified then quantiles will only be output following a big anomaly")
117+
"Optional interval at which to periodically output quantiles if they have not been output due to an anomaly - if not specified then quantiles will only be output following a big anomaly")
118118
("maxAnomalyRecords", boost::program_options::value<std::size_t>(),
119-
"The maximum number of records to be outputted for each bucket. Defaults to 100, a value 0 removes the limit.")
119+
"The maximum number of records to be outputted for each bucket. Defaults to 100, a value 0 removes the limit.")
120120
("memoryUsage",
121-
"Log the model memory usage at the end of the job")
121+
"Log the model memory usage at the end of the job")
122122
("multivariateByFields",
123-
"Optional flag to enable multi-variate analysis of correlated by fields")
123+
"Optional flag to enable multi-variate analysis of correlated by fields")
124124
("stopCategorizationOnWarnStatus",
125-
"Optional flag to stop categorization for partitions where the status is 'warn'.")
125+
"Optional flag to stop categorization for partitions where the status is 'warn'.")
126126
;
127127
// clang-format on
128128

bin/controller/CCmdLineParser.cc

+10-4
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,22 @@ bool CCmdLineParser::parse(int argc,
2121
const char* const* argv,
2222
std::string& jvmPidStr,
2323
std::string& logPipe,
24-
std::string& commandPipe) {
24+
std::string& commandPipe,
25+
std::string& outputPipe) {
2526
try {
2627
boost::program_options::options_description desc(DESCRIPTION);
2728
// clang-format off
2829
desc.add_options()
2930
("help", "Display this information and exit")
3031
("version", "Display version information and exit")
3132
("jvmPid", boost::program_options::value<std::string>(),
32-
"Process ID of the JVM to communicate with - default is parent process PID")
33+
"Process ID of the JVM to communicate with - default is parent process PID")
3334
("logPipe", boost::program_options::value<std::string>(),
34-
"Named pipe to log to - default is controller_log_<JVM PID>")
35+
"Named pipe to log to - default is controller_log_<JVM PID>")
3536
("commandPipe", boost::program_options::value<std::string>(),
36-
"Named pipe to accept commands from - default is controller_command_<JVM PID>")
37+
"Named pipe to accept commands from - default is controller_command_<JVM PID>")
38+
("outputPipe", boost::program_options::value<std::string>(),
39+
"Named pipe to output responses to - default is controller_output_<JVM PID>")
3740
;
3841
// clang-format on
3942

@@ -59,6 +62,9 @@ bool CCmdLineParser::parse(int argc,
5962
if (vm.count("commandPipe") > 0) {
6063
commandPipe = vm["commandPipe"].as<std::string>();
6164
}
65+
if (vm.count("outputPipe") > 0) {
66+
outputPipe = vm["outputPipe"].as<std::string>();
67+
}
6268
} catch (std::exception& e) {
6369
std::cerr << "Error processing command line: " << e.what() << std::endl;
6470
return false;

bin/controller/CCmdLineParser.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ class CCmdLineParser {
3333
const char* const* argv,
3434
std::string& jvmPidStr,
3535
std::string& logPipe,
36-
std::string& commandPipe);
36+
std::string& commandPipe,
37+
std::string& outputPipe);
3738

3839
private:
3940
static const std::string DESCRIPTION;

bin/controller/CCommandProcessor.cc

+51-29
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,18 @@ namespace ml {
2222
namespace controller {
2323

2424
// Initialise statics
25-
const std::string CCommandProcessor::START("start");
26-
const std::string CCommandProcessor::KILL("kill");
25+
const std::string CCommandProcessor::START{"start"};
26+
const std::string CCommandProcessor::KILL{"kill"};
2727

28-
CCommandProcessor::CCommandProcessor(const TStrVec& permittedProcessPaths)
29-
: m_Spawner(permittedProcessPaths) {
28+
CCommandProcessor::CCommandProcessor(const TStrVec& permittedProcessPaths,
29+
std::ostream& responseStream)
30+
: m_Spawner{permittedProcessPaths}, m_ResponseWriter{responseStream} {
3031
}
3132

32-
void CCommandProcessor::processCommands(std::istream& stream) {
33+
void CCommandProcessor::processCommands(std::istream& commandStream) {
3334
std::string command;
34-
while (std::getline(stream, command)) {
35-
if (!command.empty()) {
35+
while (std::getline(commandStream, command)) {
36+
if (command.empty() == false) {
3637
this->handleCommand(command);
3738
}
3839
}
@@ -41,61 +42,82 @@ void CCommandProcessor::processCommands(std::istream& stream) {
4142
bool CCommandProcessor::handleCommand(const std::string& command) {
4243
// Command lines must be tab-separated
4344
TStrVec tokens;
44-
std::string remainder;
45-
core::CStringUtils::tokenise(TAB, command, tokens, remainder);
46-
if (!remainder.empty()) {
47-
tokens.push_back(remainder);
45+
{
46+
std::string remainder;
47+
core::CStringUtils::tokenise(TAB, command, tokens, remainder);
48+
if (remainder.empty() == false) {
49+
tokens.emplace_back(std::move(remainder));
50+
}
4851
}
4952

5053
// Multiple consecutive tabs might have caused empty tokens
5154
tokens.erase(std::remove(tokens.begin(), tokens.end(), EMPTY_STRING), tokens.end());
5255

53-
if (tokens.empty()) {
54-
LOG_DEBUG(<< "Ignoring empty command");
56+
if (tokens.size() < 3) {
57+
if (tokens.empty() == false) {
58+
LOG_ERROR(<< "Ignoring command with only " << tokens.size()
59+
<< ((tokens.size() == 1) ? " token" : " tokens"));
60+
}
5561
return false;
5662
}
5763

58-
// Split into verb and other tokens
59-
std::string verb(tokens[0]);
60-
tokens.erase(tokens.begin());
64+
// Split into ID, verb and other tokens
65+
std::uint32_t id{0};
66+
if (core::CStringUtils::stringToType(tokens[0], id) == false || id == 0) {
67+
LOG_ERROR(<< "Invalid command ID in " << core::CContainerPrinter::print(tokens));
68+
return false;
69+
}
70+
71+
std::string verb{std::move(tokens[1])};
72+
tokens.erase(tokens.begin(), tokens.begin() + 2);
6173

6274
if (verb == START) {
63-
return this->handleStart(tokens);
75+
return this->handleStart(id, std::move(tokens));
6476
}
6577
if (verb == KILL) {
66-
return this->handleKill(tokens);
78+
return this->handleKill(id, std::move(tokens));
6779
}
6880

69-
LOG_ERROR(<< "Did not understand verb '" << verb << '\'');
81+
std::string error{"Did not understand verb '" + verb + '\''};
82+
LOG_ERROR(<< error << " in command with ID " << id);
83+
m_ResponseWriter.writeResponse(id, false, error);
7084
return false;
7185
}
7286

73-
bool CCommandProcessor::handleStart(TStrVec& tokens) {
74-
std::string processPath;
75-
processPath.swap(tokens[0]);
87+
bool CCommandProcessor::handleStart(std::uint32_t id, TStrVec tokens) {
88+
std::string processPath{std::move(tokens[0])};
7689
tokens.erase(tokens.begin());
7790

7891
if (m_Spawner.spawn(processPath, tokens) == false) {
79-
LOG_ERROR(<< "Failed to start process '" << processPath << '\'');
92+
std::string error{"Failed to start process '" + processPath + '\''};
93+
LOG_ERROR(<< error << " in command with ID " << id);
94+
m_ResponseWriter.writeResponse(id, false, error);
8095
return false;
8196
}
8297

98+
m_ResponseWriter.writeResponse(id, true, "Process '" + processPath + "' started");
8399
return true;
84100
}
85101

86-
bool CCommandProcessor::handleKill(TStrVec& tokens) {
87-
core::CProcess::TPid pid = 0;
88-
if (tokens.size() != 1 || core::CStringUtils::stringToType(tokens[0], pid) == false) {
89-
LOG_ERROR(<< "Unexpected arguments for kill command: "
90-
<< core::CContainerPrinter::print(tokens));
102+
bool CCommandProcessor::handleKill(std::uint32_t id, TStrVec tokens) {
103+
core::CProcess::TPid pid{0};
104+
if (tokens.size() != 1 ||
105+
core::CStringUtils::stringToType(tokens[0], pid) == false || pid == 0) {
106+
std::string error{"Unexpected arguments for kill command: " +
107+
core::CContainerPrinter::print(tokens)};
108+
LOG_ERROR(<< error << " in command with ID " << id);
109+
m_ResponseWriter.writeResponse(id, false, error);
91110
return false;
92111
}
93112

94113
if (m_Spawner.terminateChild(pid) == false) {
95-
LOG_ERROR(<< "Failed to kill process with PID " << pid);
114+
std::string error{"Failed to kill process with PID " + tokens[0]};
115+
LOG_ERROR(<< error << " in command with ID " << id);
116+
m_ResponseWriter.writeResponse(id, false, error);
96117
return false;
97118
}
98119

120+
m_ResponseWriter.writeResponse(id, true, "Process with PID " + tokens[0] + " killed");
99121
return true;
100122
}
101123
}

bin/controller/CCommandProcessor.h

+20-10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
#include <core/CDetachedProcessSpawner.h>
1010

11+
#include "CResponseJsonWriter.h"
12+
13+
#include <cstdint>
1114
#include <iosfwd>
1215
#include <string>
1316
#include <vector>
@@ -25,7 +28,11 @@ namespace controller {
2528
//! command to be executed.
2629
//!
2730
//! Each command has the following format:
28-
//! verb arguments...
31+
//! ID verb arguments...
32+
//!
33+
//! The ID is expected to be a unique positive integer. This is reported
34+
//! in error messages and in the response objects that are sent when the
35+
//! command is complete.
2936
//!
3037
//! Available verbs are:
3138
//! 1) start - in this case the arguments consist of the process name
@@ -51,30 +58,33 @@ class CCommandProcessor {
5158
static const std::string KILL;
5259

5360
public:
54-
CCommandProcessor(const TStrVec& permittedProcessPaths);
61+
CCommandProcessor(const TStrVec& permittedProcessPaths, std::ostream& responseStream);
5562

56-
//! Action commands read from the supplied \p stream until end-of-file
57-
//! is reached.
58-
void processCommands(std::istream& stream);
63+
//! Action commands read from the supplied \p commandStream until
64+
//! end-of-file is reached.
65+
void processCommands(std::istream& commandStream);
5966

6067
//! Parse and handle a single command.
6168
bool handleCommand(const std::string& command);
6269

6370
private:
6471
//! Handle a start command.
65-
//! \param tokens Tokens to the command excluding the verb. Passed
66-
//! non-const so that this method can manipulate the
67-
//! tokens without having to copy.
68-
bool handleStart(TStrVec& tokens);
72+
//! \param id The command ID.
73+
//! \param tokens Tokens to the command excluding the command ID and verb.
74+
bool handleStart(std::uint32_t id, TStrVec tokens);
6975

7076
//! Handle a kill command.
77+
//! \param id The command ID.
7178
//! \param tokens Expected to contain one element, namely the process
7279
//! ID of the process to be killed.
73-
bool handleKill(TStrVec& tokens);
80+
bool handleKill(std::uint32_t id, TStrVec tokens);
7481

7582
private:
7683
//! Used to spawn/kill the requested processes.
7784
core::CDetachedProcessSpawner m_Spawner;
85+
86+
//! Used to write responses in JSON format to the response stream.
87+
CResponseJsonWriter m_ResponseWriter;
7888
};
7989
}
8090
}

bin/controller/CResponseJsonWriter.cc

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
#include "CResponseJsonWriter.h"
8+
9+
#include <core/CLogger.h>
10+
11+
#include <ios>
12+
13+
namespace ml {
14+
namespace controller {
15+
namespace {
16+
17+
// JSON field names
18+
const std::string ID{"id"};
19+
const std::string SUCCESS{"success"};
20+
const std::string REASON{"reason"};
21+
}
22+
23+
CResponseJsonWriter::CResponseJsonWriter(std::ostream& responseStream)
24+
: m_WrappedOutputStream(responseStream), m_Writer{m_WrappedOutputStream} {
25+
}
26+
27+
void CResponseJsonWriter::writeResponse(std::uint32_t id, bool success, const std::string& reason) {
28+
m_Writer.StartObject();
29+
m_Writer.Key(ID);
30+
m_Writer.Uint(id);
31+
m_Writer.Key(SUCCESS);
32+
m_Writer.Bool(success);
33+
m_Writer.Key(REASON);
34+
m_Writer.String(reason);
35+
m_Writer.EndObject();
36+
m_Writer.flush();
37+
LOG_DEBUG(<< "Wrote controller response - id: " << id
38+
<< " success: " << std::boolalpha << success << " reason: " << reason);
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)