Skip to content

Commit 7c13681

Browse files
authored
[ML] Improve autodetect logic for persistence (#437)
Changed the logic surrounding persistence of both state and quantiles on graceful shutdown so that persistence only occurs if and only if at least one input record has been processed or time has been advanced. closes #393
1 parent c63e47a commit 7c13681

17 files changed

+184
-4
lines changed

docs/CHANGELOG.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ to the model. (See {pull}214[#214].)
4444
4545
* Handle NaNs when detrending seasonal components. {ml-pull}408[#408]
4646
47+
* Improve autodetect logic for persistence. {ml-pull}437[#437]
48+
4749
== {es} version 7.0.0-alpha1
4850
4951
== {es} version 6.7.0

include/api/CAnomalyJob.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
182182
//! How many records did we handle?
183183
virtual uint64_t numRecordsHandled() const;
184184

185+
//! Is persistence needed?
186+
virtual bool isPersistenceNeeded(const std::string& description) const;
187+
185188
//! Log a list of the detectors and keys
186189
void description() const;
187190

@@ -454,6 +457,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
454457
//! The hierarchical results normalizer.
455458
model::CHierarchicalResultsNormalizer m_Normalizer;
456459

460+
//! Flag indicating whether or not time has been advanced.
461+
bool m_TimeAdvanced{false};
462+
457463
friend class ::CBackgroundPersisterTest;
458464
friend class ::CAnomalyJobTest;
459465
};

include/api/CDataProcessor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ class API_EXPORT CDataProcessor : private core::CNonCopyable {
8282
//! Access the output handler
8383
virtual COutputHandler& outputHandler() = 0;
8484

85+
//! Is persistence needed?
86+
virtual bool isPersistenceNeeded(const std::string& description) const = 0;
87+
8588
//! Create debug for a record. This is expensive so should NOT be
8689
//! called for every record as a matter of course.
8790
static std::string debugPrintRecord(const TStrStrUMap& dataRowFields);

include/api/CFieldDataTyper.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ class API_EXPORT CFieldDataTyper : public CDataProcessor {
9898
virtual bool restoreState(core::CDataSearcher& restoreSearcher,
9999
core_t::TTime& completeToTime);
100100

101+
//! Is persistence needed?
102+
virtual bool isPersistenceNeeded(const std::string& description) const;
103+
101104
//! Persist current state
102105
virtual bool persistState(core::CDataAdder& persister);
103106

include/api/COutputChainer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ class API_EXPORT COutputChainer : public COutputHandler {
7878
//! Persist current state due to the periodic persistence being triggered.
7979
virtual bool periodicPersistState(CBackgroundPersister& persister);
8080

81+
//! Is persistence needed?
82+
virtual bool isPersistenceNeeded(const std::string& description) const;
83+
8184
//! The chainer does consume control messages, because it passes them on
8285
//! to whatever processor it's chained to.
8386
virtual bool consumesControlMessages();

include/api/COutputHandler.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ class API_EXPORT COutputHandler : private core::CNonCopyable {
9191
//! Persist current state due to the periodic persistence being triggered.
9292
virtual bool periodicPersistState(CBackgroundPersister& persister);
9393

94+
//! Is persistence needed?
95+
virtual bool isPersistenceNeeded(const std::string& description) const;
96+
9497
//! Does this handler deal with control messages?
9598
virtual bool consumesControlMessages();
9699

include/config/CAutoconfigurer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ class CONFIG_EXPORT CAutoconfigurer : public api::CDataProcessor {
4646
//! Generate the report.
4747
virtual void finalise();
4848

49+
//! Is persistence needed?
50+
virtual bool isPersistenceNeeded(const std::string& description) const;
51+
4952
//! No-op.
5053
virtual bool restoreState(core::CDataSearcher& restoreSearcher,
5154
core_t::TTime& completeToTime);

lib/api/CAnomalyJob.cc

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,10 @@ bool CAnomalyJob::handleRecord(const TStrStrUMap& dataRowFields) {
220220
}
221221

222222
void CAnomalyJob::finalise() {
223-
// Persist final state of normalizer
224-
m_JsonOutputWriter.persistNormalizer(m_Normalizer, m_LastNormalizerPersistTime);
223+
// Persist final state of normalizer iff an input record has been handled or time has been advanced.
224+
if (this->isPersistenceNeeded("quantiles state")) {
225+
m_JsonOutputWriter.persistNormalizer(m_Normalizer, m_LastNormalizerPersistTime);
226+
}
225227

226228
// Prune the models so that the final persisted state is as neat as possible
227229
this->pruneAllModels();
@@ -396,11 +398,23 @@ void CAnomalyJob::advanceTime(const std::string& time_) {
396398
LOG_TRACE(<< "Received request to advance time to " << time);
397399
}
398400

401+
m_TimeAdvanced = true;
402+
399403
this->outputBucketResultsUntil(time);
400404

401405
this->timeNow(time);
402406
}
403407

408+
bool CAnomalyJob::isPersistenceNeeded(const std::string& description) const {
409+
if ((m_NumRecordsHandled == 0) && (m_TimeAdvanced == false)) {
410+
LOG_DEBUG(<< "Will not attempt to persist " << description
411+
<< ". Zero records were handled and time has not been advanced.");
412+
return false;
413+
}
414+
415+
return true;
416+
}
417+
404418
void CAnomalyJob::outputBucketResultsUntil(core_t::TTime time) {
405419
// If the bucket time has increased, output results for all field names
406420
core_t::TTime bucketLength = m_ModelConfig.bucketLength();

lib/api/CCmdSkeleton.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ bool CCmdSkeleton::persistState() {
5656
return true;
5757
}
5858

59-
if (m_Processor.numRecordsHandled() == 0) {
60-
LOG_DEBUG(<< "Zero records were handled - will not attempt to persist state");
59+
if (m_Processor.isPersistenceNeeded("state") == false) {
6160
return true;
6261
}
6362

lib/api/CFieldDataTyper.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,20 @@ bool CFieldDataTyper::persistState(core::CDataAdder& persister) {
335335
return this->doPersistState(m_DataTyper->makePersistFunc(), m_ExamplesCollector, persister);
336336
}
337337

338+
bool CFieldDataTyper::isPersistenceNeeded(const std::string& description) const {
339+
// Pass on the request in case we're chained
340+
if (m_OutputHandler.isPersistenceNeeded(description)) {
341+
return true;
342+
}
343+
344+
if (m_NumRecordsHandled == 0) {
345+
LOG_DEBUG(<< "Zero records were handled - will not attempt to persist "
346+
<< description << ".");
347+
return false;
348+
}
349+
return true;
350+
}
351+
338352
bool CFieldDataTyper::doPersistState(const CDataTyper::TPersistFunc& dataTyperPersistFunc,
339353
const CCategoryExamplesCollector& examplesCollector,
340354
core::CDataAdder& persister) {

lib/api/COutputChainer.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ bool COutputChainer::periodicPersistState(CBackgroundPersister& persister) {
113113
return m_DataProcessor.periodicPersistState(persister);
114114
}
115115

116+
bool COutputChainer::isPersistenceNeeded(const std::string& description) const {
117+
return m_DataProcessor.isPersistenceNeeded(description);
118+
}
119+
116120
bool COutputChainer::consumesControlMessages() {
117121
return true;
118122
}

lib/api/COutputHandler.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ bool COutputHandler::periodicPersistState(CBackgroundPersister& /* persister */)
4646
return true;
4747
}
4848

49+
bool COutputHandler::isPersistenceNeeded(const std::string& /*description*/) const {
50+
// NOOP unless overridden
51+
return false;
52+
}
53+
4954
COutputHandler::CPreComputedHash::CPreComputedHash(size_t hash) : m_Hash(hash) {
5055
}
5156

lib/api/unittest/CAnomalyJobTest.cc

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,109 @@ void CAnomalyJobTest::testSkipTimeControlMessage() {
483483
CPPUNIT_ASSERT_EQUAL(std::size_t(11), countBuckets("bucket", outputStrm.str() + "]"));
484484
}
485485

486+
void CAnomalyJobTest::testIsPersistenceNeeded() {
487+
488+
model::CLimits limits;
489+
api::CFieldConfig fieldConfig;
490+
api::CFieldConfig::TStrVec clauses;
491+
clauses.push_back("count");
492+
fieldConfig.initFromClause(clauses);
493+
model::CAnomalyDetectorModelConfig modelConfig =
494+
model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE);
495+
496+
{
497+
// check that persistence is not needed if no input records have been handled
498+
// and the time has not been advanced
499+
500+
std::stringstream outputStrm;
501+
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);
502+
503+
api::CAnomalyJob job("job", limits, fieldConfig, modelConfig, wrappedOutputStream);
504+
505+
CPPUNIT_ASSERT_EQUAL(false, job.isPersistenceNeeded("test state"));
506+
507+
job.finalise();
508+
wrappedOutputStream.syncFlush();
509+
510+
std::string output = outputStrm.str();
511+
LOG_DEBUG(<< "Output has yielded: " << output);
512+
513+
// check that no quantile state was persisted
514+
core::CRegex regex;
515+
regex.init("\n");
516+
core::CRegex::TStrVec lines;
517+
regex.split(output, lines);
518+
CPPUNIT_ASSERT_EQUAL(false, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*",
519+
lines));
520+
}
521+
522+
core_t::TTime time = 3600;
523+
{
524+
// check that persistence is needed if an input record has been handled
525+
526+
std::stringstream outputStrm;
527+
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);
528+
529+
api::CAnomalyJob job("job", limits, fieldConfig, modelConfig, wrappedOutputStream);
530+
531+
api::CAnomalyJob::TStrStrUMap dataRows;
532+
533+
std::ostringstream ss;
534+
ss << time;
535+
dataRows["time"] = ss.str();
536+
CPPUNIT_ASSERT(job.handleRecord(dataRows));
537+
538+
CPPUNIT_ASSERT_EQUAL(true, job.isPersistenceNeeded("test state"));
539+
540+
job.finalise();
541+
wrappedOutputStream.syncFlush();
542+
543+
std::string output = outputStrm.str();
544+
LOG_DEBUG(<< "Output has yielded: " << output);
545+
546+
// check that the quantile state has actually been persisted
547+
core::CRegex regex;
548+
regex.init("\n");
549+
core::CRegex::TStrVec lines;
550+
regex.split(output, lines);
551+
CPPUNIT_ASSERT_EQUAL(true, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*",
552+
lines));
553+
}
554+
555+
{
556+
// check that persistence is needed if time has been advanced (via a control message)
557+
// even if no input data has been handled
558+
559+
std::stringstream outputStrm;
560+
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);
561+
562+
api::CAnomalyJob job("job", limits, fieldConfig, modelConfig, wrappedOutputStream);
563+
564+
api::CAnomalyJob::TStrStrUMap dataRows;
565+
566+
time = 39600;
567+
dataRows["."] = "t39600";
568+
CPPUNIT_ASSERT(job.handleRecord(dataRows));
569+
CPPUNIT_ASSERT(job.isPersistenceNeeded("test state"));
570+
571+
CPPUNIT_ASSERT_EQUAL(true, job.isPersistenceNeeded("test state"));
572+
573+
job.finalise();
574+
wrappedOutputStream.syncFlush();
575+
576+
std::string output = outputStrm.str();
577+
LOG_DEBUG(<< "Output has yielded: " << output);
578+
579+
// check that the quantile state has actually been persisted
580+
core::CRegex regex;
581+
regex.init("\n");
582+
core::CRegex::TStrVec lines;
583+
regex.split(output, lines);
584+
CPPUNIT_ASSERT_EQUAL(true, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*",
585+
lines));
586+
}
587+
}
588+
486589
void CAnomalyJobTest::testModelPlot() {
487590
core_t::TTime bucketSize = 10000;
488591
model::CLimits limits;
@@ -651,6 +754,8 @@ CppUnit::Test* CAnomalyJobTest::suite() {
651754
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(
652755
"CAnomalyJobTest::testSkipTimeControlMessage",
653756
&CAnomalyJobTest::testSkipTimeControlMessage));
757+
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(
758+
"CAnomalyJobTest::testIsPersistenceNeeded", &CAnomalyJobTest::testIsPersistenceNeeded));
654759
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(
655760
"CAnomalyJobTest::testModelPlot", &CAnomalyJobTest::testModelPlot));
656761
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(

lib/api/unittest/CAnomalyJobTest.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class CAnomalyJobTest : public CppUnit::TestFixture {
1717
void testOutOfSequence();
1818
void testControlMessages();
1919
void testSkipTimeControlMessage();
20+
void testIsPersistenceNeeded();
2021
void testModelPlot();
2122
void testInterimResultEdgeCases();
2223
void testRestoreFailsWithEmptyStream();

lib/api/unittest/CMockDataProcessor.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ bool CMockDataProcessor::handleRecord(const TStrStrUMap& dataRowFields) {
4747
void CMockDataProcessor::finalise() {
4848
}
4949

50+
bool CMockDataProcessor::isPersistenceNeeded(const std::string& description) const {
51+
if (m_NumRecordsHandled == 0) {
52+
LOG_DEBUG(<< "Zero records were handled - will not attempt to persist "
53+
<< description << ".");
54+
return false;
55+
}
56+
return true;
57+
}
58+
5059
bool CMockDataProcessor::restoreState(ml::core::CDataSearcher& restoreSearcher,
5160
ml::core_t::TTime& completeToTime) {
5261
// Pass on the request in case we're chained

lib/api/unittest/CMockDataProcessor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class CMockDataProcessor : public ml::api::CDataProcessor {
4040

4141
virtual void finalise();
4242

43+
virtual bool isPersistenceNeeded(const std::string& description) const;
44+
4345
//! Restore previously saved state
4446
virtual bool restoreState(ml::core::CDataSearcher& restoreSearcher,
4547
ml::core_t::TTime& completeToTime);

lib/config/CAutoconfigurer.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ void CAutoconfigurer::finalise() {
165165
m_Impl->finalise();
166166
}
167167

168+
bool CAutoconfigurer::isPersistenceNeeded(const std::string& /*description*/) const {
169+
return false;
170+
}
171+
168172
bool CAutoconfigurer::restoreState(core::CDataSearcher& /*restoreSearcher*/,
169173
core_t::TTime& /*completeToTime*/) {
170174
return true;

0 commit comments

Comments
 (0)