Skip to content

Commit ae64ec6

Browse files
authored
[ML] Ensure program counters cache always cleared (#1774)
The collection of static program counters is cached prior to persistence. This provides the background persistence thread access to a consistent set of counters as they are being written. As it is desired to only persist the program counters the once for each model state snapshot, their persistence, and the clearing of the cache, is coupled to the persistence of the simple count detector, which is assumed to always exist. However there is a scenario where persistence operates on an empty collection of detectors. This occurs when no data has been seen but time has advanced (see #393 for more details). In this situation the program counter cache is populated but not cleared. A subsequent persistence operation will lead to a warning that the counter cache is being overwritten. To avoid the warning message, we take the approach of ensuring that the program counter cache is always cleared at the end of the persistence operation, regardless of its success or not.
1 parent 0614b01 commit ae64ec6

File tree

6 files changed

+59
-13
lines changed

6 files changed

+59
-13
lines changed

include/api/CAnomalyJob.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,18 +165,22 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
165165
bool restoreState(core::CDataSearcher& restoreSearcher,
166166
core_t::TTime& completeToTime) override;
167167

168-
//! Persist current state
168+
//! Persist state in the foreground. As this blocks the current thread of execution
169+
//! it should only be called in special circumstances, e.g. at job close, where it won't impact job analysis.
169170
bool persistStateInForeground(core::CDataAdder& persister,
170171
const std::string& descriptionPrefix) override;
171172

172-
//! Persist the current model state regardless of whether
173+
//! Persist the current model state in the foreground regardless of whether
173174
//! any results have been output.
174175
bool doPersistStateInForeground(core::CDataAdder& persister,
175176
const std::string& description,
176177
const std::string& snapshotId,
177178
core_t::TTime snapshotTimestamp);
178179

179-
//! Persist state of the residual models only
180+
//! Persist state of the residual models only.
181+
//! This method is not intended to be called in production code
182+
//! as it only persists a very small subset of model state with longer,
183+
//! human readable tags.
180184
bool persistModelsState(core::CDataAdder& persister,
181185
core_t::TTime timestamp,
182186
const std::string& outputFormat);
@@ -275,7 +279,10 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
275279
bool periodicPersistStateInBackground() override;
276280
bool periodicPersistStateInForeground() override;
277281

278-
//! Persist state of the residual models only
282+
//! Persist state of the residual models only.
283+
//! This method is not intended to be called in production code.
284+
//! \p outputFormat specifies the format of the output document and may
285+
//! either be JSON or XML.
279286
bool persistModelsState(const TKeyCRefAnomalyDetectorPtrPrVec& detectors,
280287
core::CDataAdder& persister,
281288
core_t::TTime timestamp,

include/core/CProgramCounters.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,14 @@ struct SCounterDefinition {
179179
//! A singleton class: there should only be one collection of global counters
180180
//!
181181
class CORE_EXPORT CProgramCounters {
182+
public:
183+
//! \brief
184+
//! The cache of program counters is cleared upon destruction of an instance of this class.
185+
class CORE_EXPORT CCacheManager {
186+
public:
187+
~CCacheManager();
188+
};
189+
182190
private:
183191
//! \brief
184192
//! An atomic counter object
@@ -258,6 +266,9 @@ class CORE_EXPORT CProgramCounters {
258266
//! Copy the collection of live counters to a cache
259267
static void cacheCounters();
260268

269+
//! Clear the collection of cached counters
270+
static void clearCachedCounters();
271+
261272
//! \name Persistence
262273
//@{
263274
//! Restore the static members of this class from persisted state

lib/api/CAnomalyJob.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,9 +1099,10 @@ bool CAnomalyJob::doPersistStateInForeground(core::CDataAdder& persister,
10991099
std::string normaliserState;
11001100
m_Normalizer.toJson(m_LastResultsTime, "api", normaliserState, true);
11011101

1102-
// Persistence operates on a cached collection of counters rather than on the live counters directly.
1103-
// This is in order that background persistence operates on a consistent set of counters however we
1104-
// also must ensure that foreground persistence has access to an up-to-date cache of counters as well.
1102+
// Persistence of static counters is expected to operate on a cached collection of counters rather
1103+
// than on the live counters directly. This is in order that the more frequently used background persistence
1104+
// operates on a consistent set of counters. Hence, to avoid an error regarding the cache not existing, we
1105+
// also must ensure that foreground persistence has access to an up-to-date cache of counters.
11051106
core::CProgramCounters::cacheCounters();
11061107

11071108
return this->persistCopiedState(
@@ -1254,6 +1255,12 @@ bool CAnomalyJob::persistCopiedState(const std::string& description,
12541255
core_t::TTime latestRecordTime,
12551256
core_t::TTime lastResultsTime,
12561257
core::CDataAdder& persister) {
1258+
// Ensure that the cache of program counters is cleared upon exiting the current scope.
1259+
// As the cache is cleared when the simple count detector is persisted this may seem
1260+
// unnecessary at first, but there are occasions when the simple count detector does not exist,
1261+
// e.g. when no data is seen but time is advanced.
1262+
core::CProgramCounters::CCacheManager cacheMgr;
1263+
12571264
// Persist state for each detector separately by streaming
12581265
try {
12591266
core::CStateCompressor compressor(persister);
@@ -1597,7 +1604,7 @@ CAnomalyJob::makeDetector(const model::CAnomalyDetectorModelConfig& modelConfig,
15971604
void CAnomalyJob::populateDetectorKeys(const CAnomalyJobConfig& jobConfig, TKeyVec& keys) {
15981605
keys.clear();
15991606

1600-
// Add a key for the simple count detector.
1607+
// Always add a key for the simple count detector.
16011608
keys.push_back(model::CSearchKey::simpleCountKey());
16021609

16031610
for (const auto& fieldOptions : jobConfig.analysisConfig().detectorsConfig()) {

lib/api/CFieldDataCategorizer.cc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ bool CFieldDataCategorizer::persistStateInForeground(core::CDataAdder& persister
425425
return false;
426426
}
427427

428-
LOG_DEBUG(<< "Persist categorizer state");
428+
LOG_DEBUG(<< "Persist categorizer state in foreground");
429429

430430
TStrVec partitionFieldValues;
431431
TPersistFuncVec dataCategorizerPersistFuncs;
@@ -469,6 +469,13 @@ bool CFieldDataCategorizer::doPersistState(const TStrVec& partitionFieldValues,
469469
const TPersistFuncVec& dataCategorizerPersistFuncs,
470470
std::size_t categorizerAllocationFailures,
471471
core::CDataAdder& persister) {
472+
473+
// TODO: if the standalone categorize program is ever progressed, a mechanism needs
474+
// to be added that does the following:
475+
// 1. Caches program counters in the foreground before starting background persistence
476+
// 2. Calls core::CProgramCounters::staticsAcceptPersistInserter once and only once per persist
477+
// 3. Clears the program counter cache after persistence is complete
478+
472479
// The two input vectors should have the same size _unless_ we are not
473480
// doing per-partition categorization, in which case partition field values
474481
// should be empty and there should be exactly one categorizer
@@ -537,7 +544,7 @@ bool CFieldDataCategorizer::doPersistState(const TStrVec& partitionFieldValues,
537544
}
538545

539546
bool CFieldDataCategorizer::periodicPersistStateInBackground() {
540-
LOG_DEBUG(<< "Periodic persist categorizer state");
547+
LOG_DEBUG(<< "Periodic persist categorizer state in background");
541548

542549
// Make sure that the model size stats are up to date
543550
for (auto& dataCategorizerEntry : m_DataCategorizers) {
@@ -598,7 +605,7 @@ bool CFieldDataCategorizer::periodicPersistStateInBackground() {
598605
}
599606

600607
bool CFieldDataCategorizer::periodicPersistStateInForeground() {
601-
LOG_DEBUG(<< "Periodic persist categorizer state");
608+
LOG_DEBUG(<< "Periodic persist categorizer state in foreground");
602609

603610
if (m_PersistenceManager == nullptr) {
604611
return false;

lib/api/unittest/CPersistenceManagerTest.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class CTestFixture {
8181
foregroundStream = new std::ostringstream()};
8282
ml::api::CSingleStreamDataAdder foregroundDataAdder{foregroundStreamPtr};
8383

84-
// The 30000 second persist interval is set large enough that the timer will
84+
// The 30000 second persist interval is set large enough that the timer
8585
// will not trigger during the test - we bypass the timer in this test
8686
// and kick off the background persistence chain explicitly
8787
ml::api::CPersistenceManager persistenceManager{

lib/core/CProgramCounters.cc

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ void addStringInt(TGenericLineWriter& writer,
5353
}
5454
}
5555

56+
CProgramCounters::CCacheManager::~CCacheManager() {
57+
CProgramCounters::clearCachedCounters();
58+
}
59+
5660
CProgramCounters& CProgramCounters::instance() {
5761
return ms_Instance;
5862
}
@@ -88,6 +92,15 @@ void CProgramCounters::cacheCounters() {
8892
}
8993
ms_Instance.m_Cache.assign(ms_Instance.m_Counters.begin(),
9094
ms_Instance.m_Counters.end());
95+
LOG_TRACE(<< "Cached " << ms_Instance.m_Cache.size() << " counters.");
96+
}
97+
98+
void CProgramCounters::clearCachedCounters() {
99+
if (ms_Instance.m_Cache.empty() == false) {
100+
LOG_TRACE(<< "Clearing cache of " << ms_Instance.m_Cache.size() << " counters.");
101+
// clear the cache
102+
TUInt64Vec().swap(ms_Instance.m_Cache);
103+
}
91104
}
92105

93106
void CProgramCounters::staticsAcceptPersistInserter(CStatePersistInserter& inserter) {
@@ -125,10 +138,11 @@ void CProgramCounters::staticsAcceptPersistInserter(CStatePersistInserter& inser
125138

126139
staticsAcceptPersistInserter(ms_Instance.m_Counters);
127140
} else {
141+
LOG_TRACE(<< "Persisting " << ms_Instance.m_Cache.size() << " cached counters.");
128142
staticsAcceptPersistInserter(ms_Instance.m_Cache);
129143

130144
// clear the cache
131-
TUInt64Vec().swap(ms_Instance.m_Cache);
145+
clearCachedCounters();
132146
}
133147
}
134148

0 commit comments

Comments
 (0)