Skip to content

Commit 6d0d6c4

Browse files
[ML] Remove out-of-phase buckets feature (#318)
This feature was never fully completed and in fact we no longer need it as the multibucket feature covers the benefits from supporting out-of-phase buckets.
1 parent 9014b2a commit 6d0d6c4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+186
-2548
lines changed

bin/autodetect/CCmdLineParser.cc

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ bool CCmdLineParser::parse(int argc,
5050
bool& isPersistFileNamedPipe,
5151
size_t& maxAnomalyRecords,
5252
bool& memoryUsage,
53-
std::size_t& bucketResultsDelay,
5453
bool& multivariateByFields,
5554
TStrVec& clauseTokens) {
5655
try {
@@ -111,8 +110,6 @@ bool CCmdLineParser::parse(int argc,
111110
"The maximum number of records to be outputted for each bucket. Defaults to 100, a value 0 removes the limit.")
112111
("memoryUsage",
113112
"Log the model memory usage at the end of the job")
114-
("resultFinalizationWindow", boost::program_options::value<std::size_t>(),
115-
"The numer of half buckets to store before choosing which overlapping bucket has the biggest anomaly")
116113
("multivariateByFields",
117114
"Optional flag to enable multi-variate analysis of correlated by fields")
118115
;
@@ -222,9 +219,6 @@ bool CCmdLineParser::parse(int argc,
222219
if (vm.count("memoryUsage") > 0) {
223220
memoryUsage = true;
224221
}
225-
if (vm.count("resultFinalizationWindow") > 0) {
226-
bucketResultsDelay = vm["resultFinalizationWindow"].as<std::size_t>();
227-
}
228222
if (vm.count("multivariateByFields") > 0) {
229223
multivariateByFields = true;
230224
}

bin/autodetect/CCmdLineParser.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ class CCmdLineParser {
6262
bool& isPersistFileNamedPipe,
6363
size_t& maxAnomalyRecords,
6464
bool& memoryUsage,
65-
std::size_t& bucketResultsDelay,
6665
bool& multivariateByFields,
6766
TStrVec& clauseTokens);
6867

bin/autodetect/Main.cc

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ int main(int argc, char** argv) {
8686
bool isPersistFileNamedPipe(false);
8787
size_t maxAnomalyRecords(100u);
8888
bool memoryUsage(false);
89-
std::size_t bucketResultsDelay(0);
9089
bool multivariateByFields(false);
9190
TStrVec clauseTokens;
9291
if (ml::autodetect::CCmdLineParser::parse(
@@ -96,8 +95,8 @@ int main(int argc, char** argv) {
9695
timeFormat, quantilesStateFile, deleteStateFiles, persistInterval,
9796
maxQuantileInterval, inputFileName, isInputFileNamedPipe, outputFileName,
9897
isOutputFileNamedPipe, restoreFileName, isRestoreFileNamedPipe,
99-
persistFileName, isPersistFileNamedPipe, maxAnomalyRecords, memoryUsage,
100-
bucketResultsDelay, multivariateByFields, clauseTokens) == false) {
98+
persistFileName, isPersistFileNamedPipe, maxAnomalyRecords,
99+
memoryUsage, multivariateByFields, clauseTokens) == false) {
101100
return EXIT_FAILURE;
102101
}
103102

@@ -143,8 +142,7 @@ int main(int argc, char** argv) {
143142
summaryCountFieldName.empty() ? ml::model_t::E_None : ml::model_t::E_Manual);
144143
ml::model::CAnomalyDetectorModelConfig modelConfig =
145144
ml::model::CAnomalyDetectorModelConfig::defaultConfig(
146-
bucketSpan, summaryMode, summaryCountFieldName, latency,
147-
bucketResultsDelay, multivariateByFields);
145+
bucketSpan, summaryMode, summaryCountFieldName, latency, multivariateByFields);
148146
modelConfig.detectionRules(ml::model::CAnomalyDetectorModelConfig::TIntDetectionRuleVecUMapCRef(
149147
fieldConfig.detectionRules()));
150148
modelConfig.scheduledEvents(ml::model::CAnomalyDetectorModelConfig::TStrDetectionRulePrVecCRef(

include/api/CAnomalyJob.h

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,11 @@
1212

1313
#include <model/CAnomalyDetector.h>
1414
#include <model/CAnomalyDetectorModelConfig.h>
15-
#include <model/CBucketQueue.h>
1615
#include <model/CHierarchicalResults.h>
1716
#include <model/CHierarchicalResultsAggregator.h>
1817
#include <model/CHierarchicalResultsNormalizer.h>
1918
#include <model/CInterimBucketCorrector.h>
2019
#include <model/CResourceMonitor.h>
21-
#include <model/CResultsQueue.h>
2220
#include <model/CSearchKey.h>
2321

2422
#include <api/CDataProcessor.h>
@@ -117,26 +115,20 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
117115
std::pair<model::CSearchKey::TStrCRefKeyCRefPr, TAnomalyDetectorPtr>;
118116
using TKeyCRefAnomalyDetectorPtrPrVec = std::vector<TKeyCRefAnomalyDetectorPtrPr>;
119117
using TModelPlotDataVec = model::CAnomalyDetector::TModelPlotDataVec;
120-
using TModelPlotDataVecCItr = TModelPlotDataVec::const_iterator;
121-
using TModelPlotDataVecQueue = model::CBucketQueue<TModelPlotDataVec>;
122118

123119
struct API_EXPORT SRestoredStateDetail {
124120
ERestoreStateStatus s_RestoredStateStatus;
125121
boost::optional<std::string> s_Extra;
126122
};
127123

128124
struct SBackgroundPersistArgs {
129-
SBackgroundPersistArgs(const model::CResultsQueue& resultsQueue,
130-
const TModelPlotDataVecQueue& modelPlotQueue,
131-
core_t::TTime time,
125+
SBackgroundPersistArgs(core_t::TTime time,
132126
const model::CResourceMonitor::SResults& modelSizeStats,
133127
const model::CInterimBucketCorrector& interimBucketCorrector,
134128
const model::CHierarchicalResultsAggregator& aggregator,
135129
core_t::TTime latestRecordTime,
136130
core_t::TTime lastResultsTime);
137131

138-
model::CResultsQueue s_ResultsQueue;
139-
TModelPlotDataVecQueue s_ModelPlotQueue;
140132
core_t::TTime s_Time;
141133
model::CResourceMonitor::SResults s_ModelSizeStats;
142134
model::CInterimBucketCorrector s_InterimBucketCorrector;
@@ -224,14 +216,11 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
224216
void outputInterimResults(core_t::TTime bucketStartTime);
225217

226218
//! Helper function for outputResults.
227-
//! \p processingTimer is the processing time can be written to the bucket
228-
//! \p sumPastProcessingTime is the total time previously spent processing
229-
//! but resulted in no bucket being outputted.
219+
//! \p processingTime is the processing time of the bucket
230220
void writeOutResults(bool interim,
231221
model::CHierarchicalResults& results,
232222
core_t::TTime bucketTime,
233-
uint64_t processingTime,
234-
uint64_t sumPastProcessingTime);
223+
uint64_t processingTime);
235224

236225
//! Reset buckets in the range specified by the control message.
237226
void resetBuckets(const std::string& controlMessage);
@@ -259,8 +248,6 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
259248

260249
//! Persist the detectors to a stream.
261250
bool persistState(const std::string& descriptionPrefix,
262-
const model::CResultsQueue& resultsQueue,
263-
const TModelPlotDataVecQueue& modelPlotQueue,
264251
core_t::TTime time,
265252
const TKeyCRefAnomalyDetectorPtrPrVec& detectors,
266253
const model::CResourceMonitor::SResults& modelSizeStats,
@@ -296,16 +283,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
296283
//! \param[in] endTime The end of the time interval to skip sampling.
297284
void skipSampling(core_t::TTime endTime);
298285

299-
//! Outputs queued results and resets the queue to the given \p startTime
300-
void flushAndResetResultsQueue(core_t::TTime startTime);
301-
302286
//! Roll time forward to \p time
303287
void timeNow(core_t::TTime time);
304288

305-
//! Get the bucketLength, or half the bucketLength if
306-
//! out-of-phase buckets are active
307-
core_t::TTime effectiveBucketLength() const;
308-
309289
//! Update configuration
310290
void updateConfig(const std::string& config);
311291

@@ -333,15 +313,12 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
333313
//! specified time range.
334314
void generateModelPlot(core_t::TTime startTime,
335315
core_t::TTime endTime,
336-
const model::CAnomalyDetector& detector);
316+
const model::CAnomalyDetector& detector,
317+
TModelPlotDataVec& modelPlotData);
337318

338319
//! Write the pre-generated model plot to the output stream of the user's
339320
//! choosing: either file or streamed to the API
340-
void writeOutModelPlot(core_t::TTime resultsTime);
341-
342-
//! Write the pre-generated model plot to the output stream of the user's
343-
//! choosing: either file or streamed to the API
344-
void writeOutModelPlot(core_t::TTime, CModelPlotDataJsonWriter& writer);
321+
void writeOutModelPlot(const TModelPlotDataVec& modelPlotData);
345322

346323
//! Persist one detector to a stream.
347324
//! This method is static so that there is no danger of it accessing
@@ -477,15 +454,6 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
477454
//! The hierarchical results normalizer.
478455
model::CHierarchicalResultsNormalizer m_Normalizer;
479456

480-
//! Store the last N half-buckets' results in order
481-
//! to choose the best result
482-
model::CResultsQueue m_ResultsQueue;
483-
484-
//! Also store the model plot for the buckets for each
485-
//! result time - these will be output when the corresponding
486-
//! result is output
487-
TModelPlotDataVecQueue m_ModelPlotQueue;
488-
489457
friend class ::CBackgroundPersisterTest;
490458
friend class ::CAnomalyJobTest;
491459
};

include/model/CAnomalyDetectorModel.h

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -352,17 +352,6 @@ class MODEL_EXPORT CAnomalyDetectorModel {
352352
core_t::TTime endTime,
353353
CResourceMonitor& resourceMonitor) = 0;
354354

355-
//! This samples the bucket statistics, and any state needed
356-
//! by computeProbablity, in the time interval [\p startTime,
357-
//! \p endTime], but does not update the model. This is needed
358-
//! by the results preview.
359-
//!
360-
//! \param[in] startTime The start of the time interval to sample.
361-
//! \param[in] endTime The end of the time interval to sample.
362-
virtual void sampleOutOfPhase(core_t::TTime startTime,
363-
core_t::TTime endTime,
364-
CResourceMonitor& resourceMonitor) = 0;
365-
366355
//! Rolls time to \p endTime while skipping sampling the models for
367356
//! buckets within the gap.
368357
//!

include/model/CAnomalyDetectorModelConfig.h

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,6 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
114114

115115
//! Bucket length corresponding to the default decay and learn rates.
116116
static const core_t::TTime STANDARD_BUCKET_LENGTH;
117-
118-
//! The default number of half buckets to store before choosing which
119-
//! overlapping bucket has the biggest anomaly
120-
static const std::size_t DEFAULT_BUCKET_RESULTS_DELAY;
121117
//@}
122118

123119
//! \name Modelling
@@ -239,15 +235,12 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
239235
//! then this is the name of the field holding the summary count.
240236
//! \param[in] latency The amount of time records are buffered for, to
241237
//! allow out-of-order records to be seen by the models in order.
242-
//! \param[in] bucketResultsDelay The number of half-bucket results
243-
//! to sit on before giving a definitive result.
244238
//! \param[in] multivariateByFields Should multivariate analysis of
245239
//! correlated 'by' fields be performed?
246240
static CAnomalyDetectorModelConfig defaultConfig(core_t::TTime bucketLength,
247241
model_t::ESummaryMode summaryMode,
248242
const std::string& summaryCountFieldName,
249243
core_t::TTime latency,
250-
std::size_t bucketResultsDelay,
251244
bool multivariateByFields);
252245

253246
//! Overload using defaults.
@@ -256,8 +249,7 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
256249
model_t::ESummaryMode summaryMode = model_t::E_None,
257250
const std::string& summaryCountFieldName = "") {
258251
return defaultConfig(bucketLength, summaryMode, summaryCountFieldName,
259-
DEFAULT_LATENCY_BUCKETS * bucketLength,
260-
DEFAULT_BUCKET_RESULTS_DELAY, false);
252+
DEFAULT_LATENCY_BUCKETS * bucketLength, false);
261253
}
262254

263255
//! Get the factor to normalize all bucket lengths to the default
@@ -273,8 +265,6 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
273265

274266
//! Set the data bucketing interval.
275267
void bucketLength(core_t::TTime length);
276-
//! Set the number of buckets to delay finalizing out-of-phase buckets.
277-
void bucketResultsDelay(std::size_t delay);
278268
//! Set the single interim bucket correction calculator.
279269
void interimBucketCorrector(const TInterimBucketCorrectorPtr& interimBucketCorrector);
280270
//! Set whether to model multibucket features.
@@ -358,9 +348,6 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
358348
//! numbers of buckets.
359349
std::size_t latencyBuckets() const;
360350

361-
//! Get the bucket result delay window.
362-
std::size_t bucketResultsDelay() const;
363-
364351
//! Get the single interim bucket correction calculator.
365352
const CInterimBucketCorrector& interimBucketCorrector() const;
366353

@@ -444,10 +431,6 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
444431
//! Bucket length.
445432
core_t::TTime m_BucketLength;
446433

447-
//! Get the bucket result delay window: The numer of half buckets to
448-
//! store before choosing which overlapping bucket has the biggest anomaly
449-
std::size_t m_BucketResultsDelay;
450-
451434
//! Should multivariate analysis of correlated 'by' fields be performed?
452435
bool m_MultivariateByFields;
453436

include/model/CBucketGatherer.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ class CResourceMonitor;
5656
//! IMPLEMENTATION:\n
5757
//! This functionality has been separated from the CDataGatherer in order
5858
//! to allow the CDataGatherer to support multiple overlapping buckets and
59-
//! buckets with different time spans.
59+
//! buckets with different time spans. However, the overlapping feature
60+
//! has been removed but this class is kept to avoid BWC issues.
6061
class MODEL_EXPORT CBucketGatherer {
6162
public:
6263
using TDoubleVec = std::vector<double>;
@@ -394,6 +395,9 @@ class MODEL_EXPORT CBucketGatherer {
394395
//! Create samples if possible for the bucket pointed out by \p time.
395396
virtual void sample(core_t::TTime time) = 0;
396397

398+
//! Persist state by passing information \p inserter.
399+
virtual void acceptPersistInserter(core::CStatePersistInserter& inserter) const = 0;
400+
397401
private:
398402
//! Resize the necessary data structures so they can hold values
399403
//! for the person and/or attribute identified by \p pid and \p cid,

include/model/CCountingModel.h

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -160,17 +160,6 @@ class MODEL_EXPORT CCountingModel : public CAnomalyDetectorModel {
160160
core_t::TTime endTime,
161161
CResourceMonitor& resourceMonitor);
162162

163-
//! This samples the bucket statistics, and any state needed
164-
//! by computeProbablity, in the time interval [\p startTime,
165-
//! \p endTime], but does not update the model. This is needed
166-
//! by the results preview.
167-
//!
168-
//! \param[in] startTime The start of the time interval to sample.
169-
//! \param[in] endTime The end of the time interval to sample.
170-
virtual void sampleOutOfPhase(core_t::TTime startTime,
171-
core_t::TTime endTime,
172-
CResourceMonitor& resourceMonitor);
173-
174163
//! This samples the bucket statistics, in the time interval
175164
//! [\p startTime, \p endTime].
176165
//!

include/model/CCountingModelFactory.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ class MODEL_EXPORT CCountingModelFactory : public CModelFactory {
117117

118118
//! Set the features which will be modeled.
119119
virtual void features(const TFeatureVec& features);
120-
121-
//! Set the bucket results delay
122-
virtual void bucketResultsDelay(std::size_t bucketResultsDelay);
123120
//@}
124121

125122
//! Get the minimum seasonal variance scale
@@ -153,9 +150,6 @@ class MODEL_EXPORT CCountingModelFactory : public CModelFactory {
153150
//! The count features which will be modeled.
154151
TFeatureVec m_Features;
155152

156-
//! The bucket results delay.
157-
std::size_t m_BucketResultsDelay;
158-
159153
//! A cached search key.
160154
mutable TOptionalSearchKey m_SearchKeyCache;
161155
};

include/model/CDataGatherer.h

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ class MODEL_EXPORT CDataGatherer {
120120
CBucketQueue<TSizeSizePrStoredStringPtrPrUInt64UMapVec>;
121121
using TSearchKeyCRef = boost::reference_wrapper<const CSearchKey>;
122122
using TBucketGathererPtr = std::unique_ptr<CBucketGatherer>;
123-
using TBucketGathererPtrVec = std::vector<TBucketGathererPtr>;
124123
using TFeatureAnyPr = std::pair<model_t::EFeature, boost::any>;
125124
using TFeatureAnyPrVec = std::vector<TFeatureAnyPr>;
126125
using TMetricCategoryVec = std::vector<model_t::EMetricCategory>;
@@ -338,7 +337,7 @@ class MODEL_EXPORT CDataGatherer {
338337
core_t::TTime bucketLength,
339338
std::vector<std::pair<model_t::EFeature, T>>& result) const {
340339
TFeatureAnyPrVec rawFeatureData;
341-
this->chooseBucketGatherer(time).featureData(time, bucketLength, rawFeatureData);
340+
m_BucketGatherer->featureData(time, bucketLength, rawFeatureData);
342341

343342
bool succeeded = true;
344343

@@ -558,7 +557,7 @@ class MODEL_EXPORT CDataGatherer {
558557
void timeNow(core_t::TTime time);
559558

560559
//! Print the current bucket.
561-
std::string printCurrentBucket(core_t::TTime time) const;
560+
std::string printCurrentBucket() const;
562561

563562
//! Record a attribute called \p attribute.
564563
std::size_t addAttribute(const std::string& attribute,
@@ -591,9 +590,6 @@ class MODEL_EXPORT CDataGatherer {
591590

592591
//! Reset bucket and return true if bucket was successfully
593592
//! reset or false otherwise.
594-
//! Note that this should not be used in conjunction with out-of-phase buckets
595-
//! where the concept of resetting a specific bucketed period of time is
596-
//! not valid.
597593
bool resetBucket(core_t::TTime bucketStart);
598594

599595
//! Release memory that is no longer needed
@@ -693,14 +689,6 @@ class MODEL_EXPORT CDataGatherer {
693689
using TModelParamsCRef = boost::reference_wrapper<const SModelParams>;
694690

695691
private:
696-
//! Select the correct bucket gatherer based on the time: if we have
697-
//! out-of-phase buckets, select either in-phase or out-of-phase.
698-
const CBucketGatherer& chooseBucketGatherer(core_t::TTime time) const;
699-
700-
//! Select the correct bucket gatherer based on the time: if we have
701-
//! out-of-phase buckets, select either in-phase or out-of-phase.
702-
CBucketGatherer& chooseBucketGatherer(core_t::TTime time);
703-
704692
//! Restore state from supplied traverser.
705693
bool acceptRestoreTraverser(const std::string& summaryCountFieldName,
706694
const std::string& personFieldName,
@@ -738,9 +726,9 @@ class MODEL_EXPORT CDataGatherer {
738726
//! The collection of features on which to gather data.
739727
TFeatureVec m_Features;
740728

741-
//! The collection of bucket gatherers which contain the bucket-specific
729+
//! The bucket gatherer which contains the bucket-specific
742730
//! metrics and counts.
743-
TBucketGathererPtrVec m_Gatherers;
731+
TBucketGathererPtr m_BucketGatherer;
744732

745733
//! Indicates whether the data being gathered are already summarized
746734
//! by an external aggregation process.

include/model/CEventRateBucketGatherer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class MODEL_EXPORT CEventRateBucketGatherer final : public CBucketGatherer {
155155
bool acceptRestoreTraverser(core::CStateRestoreTraverser& traverser);
156156

157157
//! Persist state by passing information to the supplied inserter
158-
void acceptPersistInserter(core::CStatePersistInserter& inserter) const;
158+
virtual void acceptPersistInserter(core::CStatePersistInserter& inserter) const;
159159

160160
//! Create a clone of this data gatherer that will result in the same
161161
//! persisted state. The clone may be incomplete in ways that do not

0 commit comments

Comments
 (0)