Skip to content

Commit 339f4ac

Browse files
committed
Change the output format
By writing results in an array it's possible to reuse the Java ProcessResultsParser class to read them. Also, making the output thread safe will make implementing elastic#1503 easier if that's ever done.
1 parent 42d2ee6 commit 339f4ac

File tree

5 files changed

+150
-115
lines changed

5 files changed

+150
-115
lines changed

bin/autodetect/CCmdLineParser.cc

+28-28
Original file line numberDiff line numberDiff line change
@@ -63,66 +63,66 @@ bool CCmdLineParser::parse(int argc,
6363
("help", "Display this information and exit")
6464
("version", "Display version information and exit")
6565
("limitconfig", boost::program_options::value<std::string>(),
66-
"Optional limit config file")
66+
"Optional limit config file")
6767
("modelconfig", boost::program_options::value<std::string>(),
68-
"Optional model config file")
68+
"Optional model config file")
6969
("fieldconfig", boost::program_options::value<std::string>(),
70-
"Optional field config file")
70+
"Optional field config file")
7171
("modelplotconfig", boost::program_options::value<std::string>(),
72-
"Optional model plot config file")
72+
"Optional model plot config file")
7373
("jobid", boost::program_options::value<std::string>(),
74-
"ID of the job this process is associated with")
74+
"ID of the job this process is associated with")
7575
("logProperties", boost::program_options::value<std::string>(),
76-
"Optional logger properties file")
76+
"Optional logger properties file")
7777
("logPipe", boost::program_options::value<std::string>(),
78-
"Optional log to named pipe")
78+
"Optional log to named pipe")
7979
("bucketspan", boost::program_options::value<core_t::TTime>(),
80-
"Optional aggregation bucket span (in seconds) - default is 300")
80+
"Optional aggregation bucket span (in seconds) - default is 300")
8181
("latency", boost::program_options::value<core_t::TTime>(),
82-
"Optional maximum delay for out-of-order records (in seconds) - default is 0")
82+
"Optional maximum delay for out-of-order records (in seconds) - default is 0")
8383
("summarycountfield", boost::program_options::value<std::string>(),
84-
"Optional field to that contains counts for pre-summarized input - default is none")
84+
"Optional field to that contains counts for pre-summarized input - default is none")
8585
("delimiter", boost::program_options::value<char>(),
86-
"Optional delimiter character for delimited data formats - default is '\t' (tab separated)")
86+
"Optional delimiter character for delimited data formats - default is '\t' (tab separated)")
8787
("lengthEncodedInput",
88-
"Take input in length encoded binary format - default is delimited")
88+
"Take input in length encoded binary format - default is delimited")
8989
("timefield", boost::program_options::value<std::string>(),
90-
"Optional name of the field containing the timestamp - default is 'time'")
90+
"Optional name of the field containing the timestamp - default is 'time'")
9191
("timeformat", boost::program_options::value<std::string>(),
92-
"Optional format of the date in the time field in strptime code - default is the epoch time in seconds")
92+
"Optional format of the date in the time field in strptime code - default is the epoch time in seconds")
9393
("quantilesState", boost::program_options::value<std::string>(),
94-
"Optional file to quantiles for normalization")
94+
"Optional file to quantiles for normalization")
9595
("deleteStateFiles",
96-
"If the 'quantilesState' option is used and this flag is set then delete the model state files once they have been read")
96+
"If the 'quantilesState' option is used and this flag is set then delete the model state files once they have been read")
9797
("namedPipeConnectTimeout", boost::program_options::value<core_t::TTime>(),
98-
"Optional timeout (in seconds) for connecting named pipes on startup - default is 300 seconds")
98+
"Optional timeout (in seconds) for connecting named pipes on startup - default is 300 seconds")
9999
("input", boost::program_options::value<std::string>(),
100-
"Optional file to read input from - not present means read from STDIN")
100+
"Optional file to read input from - not present means read from STDIN")
101101
("inputIsPipe", "Specified input file is a named pipe")
102102
("output", boost::program_options::value<std::string>(),
103-
"Optional file to write output to - not present means write to STDOUT")
103+
"Optional file to write output to - not present means write to STDOUT")
104104
("outputIsPipe", "Specified output file is a named pipe")
105105
("restore", boost::program_options::value<std::string>(),
106-
"Optional file to restore state from - not present means no state restoration")
106+
"Optional file to restore state from - not present means no state restoration")
107107
("restoreIsPipe", "Specified restore file is a named pipe")
108108
("persist", boost::program_options::value<std::string>(),
109-
"Optional file to persist state to - not present means no state persistence")
109+
"Optional file to persist state to - not present means no state persistence")
110110
("persistIsPipe", "Specified persist file is a named pipe")
111111
("persistInterval", boost::program_options::value<core_t::TTime>(),
112-
"Optional time interval at which to periodically persist model state (Mutually exclusive with bucketPersistInterval)")
112+
"Optional time interval at which to periodically persist model state (Mutually exclusive with bucketPersistInterval)")
113113
("persistInForeground", "Persistence occurs in the foreground. Defaults to background persistence.")
114114
("bucketPersistInterval", boost::program_options::value<std::size_t>(),
115-
"Optional number of buckets after which to periodically persist model state (Mutually exclusive with persistInterval)")
115+
"Optional number of buckets after which to periodically persist model state (Mutually exclusive with persistInterval)")
116116
("maxQuantileInterval", boost::program_options::value<core_t::TTime>(),
117-
"Optional interval at which to periodically output quantiles if they have not been output due to an anomaly - if not specified then quantiles will only be output following a big anomaly")
117+
"Optional interval at which to periodically output quantiles if they have not been output due to an anomaly - if not specified then quantiles will only be output following a big anomaly")
118118
("maxAnomalyRecords", boost::program_options::value<std::size_t>(),
119-
"The maximum number of records to be outputted for each bucket. Defaults to 100, a value 0 removes the limit.")
119+
"The maximum number of records to be outputted for each bucket. Defaults to 100, a value 0 removes the limit.")
120120
("memoryUsage",
121-
"Log the model memory usage at the end of the job")
121+
"Log the model memory usage at the end of the job")
122122
("multivariateByFields",
123-
"Optional flag to enable multi-variate analysis of correlated by fields")
123+
"Optional flag to enable multi-variate analysis of correlated by fields")
124124
("stopCategorizationOnWarnStatus",
125-
"Optional flag to stop categorization for partitions where the status is 'warn'.")
125+
"Optional flag to stop categorization for partitions where the status is 'warn'.")
126126
;
127127
// clang-format on
128128

bin/controller/CResponseJsonWriter.cc

+8-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
#include "CResponseJsonWriter.h"
88

9+
#include <core/CLogger.h>
10+
11+
#include <ios>
12+
913
namespace ml {
1014
namespace controller {
1115
namespace {
@@ -17,7 +21,7 @@ const std::string REASON{"reason"};
1721
}
1822

1923
CResponseJsonWriter::CResponseJsonWriter(std::ostream& responseStream)
20-
: m_WriteStream{responseStream}, m_Writer{m_WriteStream} {
24+
: m_WrappedOutputStream(responseStream), m_Writer{m_WrappedOutputStream} {
2125
}
2226

2327
void CResponseJsonWriter::writeResponse(std::uint32_t id, bool success, const std::string& reason) {
@@ -29,7 +33,9 @@ void CResponseJsonWriter::writeResponse(std::uint32_t id, bool success, const st
2933
m_Writer.Key(REASON);
3034
m_Writer.String(reason);
3135
m_Writer.EndObject();
32-
m_Writer.Flush();
36+
m_Writer.flush();
37+
LOG_DEBUG(<< "Wrote controller response - id: " << id
38+
<< " success: " << std::boolalpha << success << " reason: " << reason);
3339
}
3440
}
3541
}

bin/controller/CResponseJsonWriter.h

+16-12
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
#ifndef INCLUDED_ml_controller_CResponseJsonWriter_h
77
#define INCLUDED_ml_controller_CResponseJsonWriter_h
88

9-
#include <core/CRapidJsonLineWriter.h>
10-
11-
#include <rapidjson/ostreamwrapper.h>
9+
#include <core/CJsonOutputStreamWrapper.h>
10+
#include <core/CRapidJsonConcurrentLineWriter.h>
1211

1312
#include <iosfwd>
1413
#include <string>
@@ -24,11 +23,18 @@ namespace controller {
2423
//!
2524
//! { "id" : 123, "success" : true, "reason" : "message explaining success/failure" }
2625
//!
27-
//! A newline is written after each document, i.e. the output is ND-JSON.
26+
//! They are written into a JSON array, i.e. the overall output looks
27+
//! something like this:
28+
//!
29+
//! [{ "id" : 1, "success" : true, "reason" : "all ok" }
30+
//! ,{ "id" : 2, "success" : false, "reason" : "something went wrong" }
31+
//! ,{ "id" : 3, "success" : true, "reason" : "ok again" }
32+
//! ]
2833
//!
2934
//! IMPLEMENTATION DECISIONS:\n
30-
//! Not using the concurrent line writer, as there's no need for thread
31-
//! safety.
35+
//! Uses the concurrent line writer. There's no need for thread safety
36+
//! with the current design, but in future commands might be processed
37+
//! concurrently.
3238
//!
3339
class CResponseJsonWriter {
3440
public:
@@ -39,13 +45,11 @@ class CResponseJsonWriter {
3945
void writeResponse(std::uint32_t id, bool success, const std::string& reason);
4046

4147
private:
42-
//! JSON writer ostream wrapper
43-
rapidjson::OStreamWrapper m_WriteStream;
44-
45-
using TGenericLineWriter = core::CRapidJsonLineWriter<rapidjson::OStreamWrapper>;
48+
//! Wrapped output stream
49+
core::CJsonOutputStreamWrapper m_WrappedOutputStream;
4650

47-
//! JSON writer
48-
TGenericLineWriter m_Writer;
51+
//! JSON line writer
52+
core::CRapidJsonConcurrentLineWriter m_Writer;
4953
};
5054
}
5155
}

0 commit comments

Comments
 (0)