Skip to content

[7.x][ML] Ensure program counters cache always cleared (#1774) #1778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,22 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
bool restoreState(core::CDataSearcher& restoreSearcher,
core_t::TTime& completeToTime) override;

//! Persist current state
//! Persist state in the foreground. As this blocks the current thread of execution
//! it should only be called in special circumstances, e.g. at job close, where it won't impact job analysis.
bool persistStateInForeground(core::CDataAdder& persister,
const std::string& descriptionPrefix) override;

//! Persist the current model state regardless of whether
//! Persist the current model state in the foreground regardless of whether
//! any results have been output.
bool doPersistStateInForeground(core::CDataAdder& persister,
const std::string& description,
const std::string& snapshotId,
core_t::TTime snapshotTimestamp);

//! Persist state of the residual models only
//! Persist state of the residual models only.
//! This method is not intended to be called in production code
//! as it only persists a very small subset of model state with longer,
//! human readable tags.
bool persistModelsState(core::CDataAdder& persister,
core_t::TTime timestamp,
const std::string& outputFormat);
Expand Down Expand Up @@ -275,7 +279,10 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
bool periodicPersistStateInBackground() override;
bool periodicPersistStateInForeground() override;

//! Persist state of the residual models only
//! Persist state of the residual models only.
//! This method is not intended to be called in production code.
//! \p outputFormat specifies the format of the output document and may
//! either be JSON or XML.
bool persistModelsState(const TKeyCRefAnomalyDetectorPtrPrVec& detectors,
core::CDataAdder& persister,
core_t::TTime timestamp,
Expand Down
11 changes: 11 additions & 0 deletions include/core/CProgramCounters.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ struct SCounterDefinition {
//! A singleton class: there should only be one collection of global counters
//!
class CORE_EXPORT CProgramCounters {
public:
//! \brief
//! The cache of program counters is cleared upon destruction of an instance of this class.
class CORE_EXPORT CCacheManager {
public:
~CCacheManager();
};

private:
//! \brief
//! An atomic counter object
Expand Down Expand Up @@ -258,6 +266,9 @@ class CORE_EXPORT CProgramCounters {
//! Copy the collection of live counters to a cache
static void cacheCounters();

//! Clear the collection of cached counters
static void clearCachedCounters();

//! \name Persistence
//@{
//! Restore the static members of this class from persisted state
Expand Down
15 changes: 11 additions & 4 deletions lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1099,9 +1099,10 @@ bool CAnomalyJob::doPersistStateInForeground(core::CDataAdder& persister,
std::string normaliserState;
m_Normalizer.toJson(m_LastResultsTime, "api", normaliserState, true);

// Persistence operates on a cached collection of counters rather than on the live counters directly.
// This is in order that background persistence operates on a consistent set of counters however we
// also must ensure that foreground persistence has access to an up-to-date cache of counters as well.
// Persistence of static counters is expected to operate on a cached collection of counters rather
// than on the live counters directly. This is in order that the more frequently used background persistence
// operates on a consistent set of counters. Hence, to avoid an error regarding the cache not existing, we
// also must ensure that foreground persistence has access to an up-to-date cache of counters.
core::CProgramCounters::cacheCounters();

return this->persistCopiedState(
Expand Down Expand Up @@ -1254,6 +1255,12 @@ bool CAnomalyJob::persistCopiedState(const std::string& description,
core_t::TTime latestRecordTime,
core_t::TTime lastResultsTime,
core::CDataAdder& persister) {
// Ensure that the cache of program counters is cleared upon exiting the current scope.
// As the cache is cleared when the simple count detector is persisted this may seem
// unnecessary at first, but there are occasions when the simple count detector does not exist,
// e.g. when no data is seen but time is advanced.
core::CProgramCounters::CCacheManager cacheMgr;

// Persist state for each detector separately by streaming
try {
core::CStateCompressor compressor(persister);
Expand Down Expand Up @@ -1597,7 +1604,7 @@ CAnomalyJob::makeDetector(const model::CAnomalyDetectorModelConfig& modelConfig,
void CAnomalyJob::populateDetectorKeys(const CAnomalyJobConfig& jobConfig, TKeyVec& keys) {
keys.clear();

// Add a key for the simple count detector.
// Always add a key for the simple count detector.
keys.push_back(model::CSearchKey::simpleCountKey());

for (const auto& fieldOptions : jobConfig.analysisConfig().detectorsConfig()) {
Expand Down
13 changes: 10 additions & 3 deletions lib/api/CFieldDataCategorizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ bool CFieldDataCategorizer::persistStateInForeground(core::CDataAdder& persister
return false;
}

LOG_DEBUG(<< "Persist categorizer state");
LOG_DEBUG(<< "Persist categorizer state in foreground");

TStrVec partitionFieldValues;
TPersistFuncVec dataCategorizerPersistFuncs;
Expand Down Expand Up @@ -469,6 +469,13 @@ bool CFieldDataCategorizer::doPersistState(const TStrVec& partitionFieldValues,
const TPersistFuncVec& dataCategorizerPersistFuncs,
std::size_t categorizerAllocationFailures,
core::CDataAdder& persister) {

// TODO: if the standalone categorize program is ever progressed, a mechanism needs
// to be added that does the following:
// 1. Caches program counters in the foreground before starting background persistence
// 2. Calls core::CProgramCounters::staticsAcceptPersistInserter once and only once per persist
// 3. Clears the program counter cache after persistence is complete

// The two input vectors should have the same size _unless_ we are not
// doing per-partition categorization, in which case partition field values
// should be empty and there should be exactly one categorizer
Expand Down Expand Up @@ -537,7 +544,7 @@ bool CFieldDataCategorizer::doPersistState(const TStrVec& partitionFieldValues,
}

bool CFieldDataCategorizer::periodicPersistStateInBackground() {
LOG_DEBUG(<< "Periodic persist categorizer state");
LOG_DEBUG(<< "Periodic persist categorizer state in background");

// Make sure that the model size stats are up to date
for (auto& dataCategorizerEntry : m_DataCategorizers) {
Expand Down Expand Up @@ -598,7 +605,7 @@ bool CFieldDataCategorizer::periodicPersistStateInBackground() {
}

bool CFieldDataCategorizer::periodicPersistStateInForeground() {
LOG_DEBUG(<< "Periodic persist categorizer state");
LOG_DEBUG(<< "Periodic persist categorizer state in foreground");

if (m_PersistenceManager == nullptr) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion lib/api/unittest/CPersistenceManagerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class CTestFixture {
foregroundStream = new std::ostringstream()};
ml::api::CSingleStreamDataAdder foregroundDataAdder{foregroundStreamPtr};

// The 30000 second persist interval is set large enough that the timer will
// The 30000 second persist interval is set large enough that the timer
// will not trigger during the test - we bypass the timer in this test
// and kick off the background persistence chain explicitly
ml::api::CPersistenceManager persistenceManager{
Expand Down
16 changes: 15 additions & 1 deletion lib/core/CProgramCounters.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ void addStringInt(TGenericLineWriter& writer,
}
}

CProgramCounters::CCacheManager::~CCacheManager() {
CProgramCounters::clearCachedCounters();
}

CProgramCounters& CProgramCounters::instance() {
return ms_Instance;
}
Expand Down Expand Up @@ -88,6 +92,15 @@ void CProgramCounters::cacheCounters() {
}
ms_Instance.m_Cache.assign(ms_Instance.m_Counters.begin(),
ms_Instance.m_Counters.end());
LOG_TRACE(<< "Cached " << ms_Instance.m_Cache.size() << " counters.");
}

void CProgramCounters::clearCachedCounters() {
if (ms_Instance.m_Cache.empty() == false) {
LOG_TRACE(<< "Clearing cache of " << ms_Instance.m_Cache.size() << " counters.");
// clear the cache
TUInt64Vec().swap(ms_Instance.m_Cache);
}
}

void CProgramCounters::staticsAcceptPersistInserter(CStatePersistInserter& inserter) {
Expand Down Expand Up @@ -125,10 +138,11 @@ void CProgramCounters::staticsAcceptPersistInserter(CStatePersistInserter& inser

staticsAcceptPersistInserter(ms_Instance.m_Counters);
} else {
LOG_TRACE(<< "Persisting " << ms_Instance.m_Cache.size() << " cached counters.");
staticsAcceptPersistInserter(ms_Instance.m_Cache);

// clear the cache
TUInt64Vec().swap(ms_Instance.m_Cache);
clearCachedCounters();
}
}

Expand Down