From 132f54a8c54c2bcc27df57434017373991c072e6 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 17 Apr 2018 18:28:33 +0200 Subject: [PATCH 1/6] [ML-FC] Use disk storage for forecasting large models (#59) This implements the C++ side of forecast persistence. An additional parameter allows the forecast runner to persist models on disk for temporary purposes. Models are loaded back into memory one by one. For models smaller than the current limit of 20MB nothing changes. --- bin/autodetect/Makefile | 1 + include/api/CForecastRunner.h | 26 ++- include/core/RestoreMacros.h | 17 ++ include/model/CAnomalyDetector.h | 4 +- include/model/CCountingModelFactory.h | 3 + include/model/CEventRateModelFactory.h | 3 + .../model/CEventRatePopulationModelFactory.h | 3 + include/model/CForecastDataSink.h | 8 +- include/model/CForecastModelPersist.h | 108 ++++++++++++ include/model/CMetricModelFactory.h | 3 + include/model/CMetricPopulationModelFactory.h | 3 + include/model/CModelFactory.h | 3 + lib/api/CForecastRunner.cc | 111 +++++++++++- lib/api/dump_state/Makefile | 1 + lib/model/CAnomalyDetector.cc | 48 ++++-- lib/model/CCountingModelFactory.cc | 4 + lib/model/CEventRateModelFactory.cc | 7 +- lib/model/CEventRatePopulationModelFactory.cc | 7 +- lib/model/CForecastDataSink.cc | 12 +- lib/model/CForecastModelPersist.cc | 160 ++++++++++++++++++ lib/model/CMetricModelFactory.cc | 7 +- lib/model/CMetricPopulationModelFactory.cc | 7 +- lib/model/Makefile | 2 + .../unittest/CForecastModelPersistTest.cc | 158 +++++++++++++++++ .../unittest/CForecastModelPersistTest.h | 20 +++ lib/model/unittest/Main.cc | 2 + lib/model/unittest/Makefile | 2 + 27 files changed, 696 insertions(+), 34 deletions(-) create mode 100644 include/model/CForecastModelPersist.h create mode 100644 lib/model/CForecastModelPersist.cc create mode 100644 lib/model/unittest/CForecastModelPersistTest.cc create mode 100644 lib/model/unittest/CForecastModelPersistTest.h diff --git a/bin/autodetect/Makefile b/bin/autodetect/Makefile index 7531f80150..a4051da9cd 100644 --- a/bin/autodetect/Makefile +++ b/bin/autodetect/Makefile @@ -12,6 +12,7 @@ INSTALL_DIR=$(CPP_PLATFORM_HOME)/bin ML_LIBS=$(LIB_ML_CORE) $(LIB_ML_MATHS) $(LIB_ML_MODEL) $(LIB_ML_API) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_BOOST_PROGRAMOPTIONS_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 diff --git a/include/api/CForecastRunner.h b/include/api/CForecastRunner.h index 74f91a62f3..960a44ec5d 100644 --- a/include/api/CForecastRunner.h +++ b/include/api/CForecastRunner.h @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -68,10 +69,22 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { static const size_t DEFAULT_EXPIRY_TIME = 14 * core::constants::DAY; //! max memory allowed to use for forecast models - static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520; // 20MB + static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520ull; // 20MB + + // Note: This value measures the size in memory, not the size of the persistence, + // which is likely higher and would be hard to calculate upfront + //! max memory allowed to use for forecast models persisting to disk + static const size_t MAX_FORECAST_MODEL_PERSISTANCE_MEMORY = 524288000ull; // 500MB + + // Note: This value is lower than on X-pack side to prevent side-effects, + // if you change this value also change the limit on X-pack side. + // The purpose of this value is to guard the rest of the system regarding + // an out of disk space + //! minimum disk space required for disk persistence + static const size_t MIN_FORECAST_AVAILABLE_DISK_SPACE = 4294967296ull; // 4GB //! minimum time between stat updates to prevent to many updates in a short time - static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000; // 3s + static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000ul; // 3s private: static const std::string ERROR_FORECAST_REQUEST_FAILED_TO_PARSE; @@ -82,6 +95,8 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { static const std::string ERROR_NO_CREATE_TIME; static const std::string ERROR_BAD_MEMORY_STATUS; static const std::string ERROR_MEMORY_LIMIT; + static const std::string ERROR_MEMORY_LIMIT_DISK; + static const std::string ERROR_MEMORY_LIMIT_DISKSPACE; static const std::string ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS; static const std::string ERROR_NO_SUPPORTED_FUNCTIONS; static const std::string WARNING_DURATION_LIMIT; @@ -100,6 +115,7 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { using TForecastModelWrapper = model::CForecastDataSink::SForecastModelWrapper; using TForecastResultSeries = model::CForecastDataSink::SForecastResultSeries; using TForecastResultSeriesVec = std::vector; + using TMathsModelPtr = std::shared_ptr; using TStrUSet = boost::unordered_set; @@ -189,6 +205,9 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { //! A collection storing important messages from forecasting TStrUSet s_Messages; + + //! A directory to persist models on disk + std::string s_TemporaryFolder; }; private: @@ -202,6 +221,9 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { //! Check for new jobs, blocks while waiting bool tryGetJob(SForecast& forecastJob); + //! check for sufficient disk space + bool sufficientAvailableDiskSpace(const boost::filesystem::path& path); + //! pushes new jobs into the internal 'queue' (thread boundary) bool push(SForecast& forecastJob); diff --git a/include/core/RestoreMacros.h b/include/core/RestoreMacros.h index 427724dad7..3c0dc169d7 100644 --- a/include/core/RestoreMacros.h +++ b/include/core/RestoreMacros.h @@ -39,6 +39,23 @@ namespace core { continue; \ } +#define RESTORE_ENUM(tag, target, enumtype) \ + if (name == tag) { \ + int value; \ + if (core::CStringUtils::stringToType(traverser.value(), value) == false) { \ + LOG_ERROR(<< "Failed to restore " #tag ", got " << traverser.value()); \ + return false; \ + } \ + target = enumtype(value); \ + continue; \ + } + +#define RESTORE_ENUM_CHECKED(tag, target, enumtype, restoreSuccess) \ + if (name == tag) { \ + restoreSuccess = true; \ + RESTORE_ENUM(tag, target, enumtype) \ + } + #define RESTORE_SETUP_TEARDOWN(tag, setup, restore, teardown) \ if (name == tag) { \ setup; \ diff --git a/include/model/CAnomalyDetector.h b/include/model/CAnomalyDetector.h index 278c9db4a0..71c40ce92a 100644 --- a/include/model/CAnomalyDetector.h +++ b/include/model/CAnomalyDetector.h @@ -234,7 +234,9 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable { CForecastDataSink::SForecastModelPrerequisites getForecastPrerequisites() const; //! Generate maths models for forecasting - CForecastDataSink::SForecastResultSeries getForecastModels() const; + CForecastDataSink::SForecastResultSeries + getForecastModels(bool persistOnDisk = false, + const std::string& persistenceFolder = EMPTY_STRING) const; //! Remove dead models, i.e. those models that have more-or-less //! reverted back to their non-informative state. BE CAREFUL WHEN diff --git a/include/model/CCountingModelFactory.h b/include/model/CCountingModelFactory.h index 8d6da80523..87897a4754 100644 --- a/include/model/CCountingModelFactory.h +++ b/include/model/CCountingModelFactory.h @@ -121,6 +121,9 @@ class MODEL_EXPORT CCountingModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CEventRateModelFactory.h b/include/model/CEventRateModelFactory.h index 3269d433f1..7adacd67ce 100644 --- a/include/model/CEventRateModelFactory.h +++ b/include/model/CEventRateModelFactory.h @@ -131,6 +131,9 @@ class MODEL_EXPORT CEventRateModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CEventRatePopulationModelFactory.h b/include/model/CEventRatePopulationModelFactory.h index 967399528d..1659696023 100644 --- a/include/model/CEventRatePopulationModelFactory.h +++ b/include/model/CEventRatePopulationModelFactory.h @@ -133,6 +133,9 @@ class MODEL_EXPORT CEventRatePopulationModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CForecastDataSink.h b/include/model/CForecastDataSink.h index 515732c9f6..158a4521a7 100644 --- a/include/model/CForecastDataSink.h +++ b/include/model/CForecastDataSink.h @@ -14,6 +14,7 @@ #include +#include #include #include @@ -38,7 +39,7 @@ namespace model { //! to change (e.g. the json writing should not happen in this class). class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable { public: - using TMathsModelPtr = std::unique_ptr; + using TMathsModelPtr = std::shared_ptr; using TStrUMap = boost::unordered_set; //! Wrapper for 1 timeseries model, its feature and by Field @@ -59,18 +60,21 @@ class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable { //! Everything that defines 1 series of forecasts struct MODEL_EXPORT SForecastResultSeries { - SForecastResultSeries(); + SForecastResultSeries(const SModelParams& modelParams); SForecastResultSeries(SForecastResultSeries&& other); SForecastResultSeries(const SForecastResultSeries& that) = delete; SForecastResultSeries& operator=(const SForecastResultSeries&) = delete; + SModelParams s_ModelParams; int s_DetectorIndex; std::vector s_ToForecast; + std::string s_ToForecastPersisted; std::string s_PartitionFieldName; std::string s_PartitionFieldValue; std::string s_ByFieldName; + double s_MinimumSeasonalVarianceScale; }; //! \brief Data describing prerequisites prior predictions diff --git a/include/model/CForecastModelPersist.h b/include/model/CForecastModelPersist.h new file mode 100644 index 0000000000..a5d1f0c71e --- /dev/null +++ b/include/model/CForecastModelPersist.h @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +#ifndef INCLUDED_ml_model_CForecastModelPersist_h +#define INCLUDED_ml_model_CForecastModelPersist_h + +#include +#include + +#include + +#include +#include +#include + +#include + +#include +#include + +namespace ml { +namespace model { + +//! \brief Persist/Restore CModel sub-classes to/from text representations for +//! the purpose of forecasting. +//! +//! DESCRIPTION:\n +//! Persists/Restores models to disk for the purpose of restoring and forecasting +//! on them. +//! +//! IMPLEMENTATION DECISIONS:\n +//! Only as complete as required for forecasting. +//! +//! Persist and Restore are only done to avoid heap memory usage using temporary disk space. +//! No need for backwards compatibility and version'ing as code will only be used +//! locally never leaving process/io boundaries. +class MODEL_EXPORT CForecastModelPersist final { +public: + using TMathsModelPtr = std::shared_ptr; + +public: + class CPersist final { + public: + explicit CPersist(const std::string& temporaryPath); + + //! add a model to the persistence + void addModel(const maths::CModel* model, + const model_t::EFeature feature, + const std::string& byFieldValue); + + //! close the outputStream + const std::string& finalizePersistAndGetFile(); + + private: + static void persistOneModel(core::CStatePersistInserter& inserter, + const maths::CModel* model, + const model_t::EFeature feature, + const std::string& byFieldValue); + + private: + //! the filename where to persist to + boost::filesystem::path m_FileName; + + //! the actual file where it models are persisted to + std::ofstream m_OutStream; + + //! number of models persisted + size_t m_ModelCount; + }; + + class CRestore final { + public: + explicit CRestore(const SModelParams& modelParams, + double minimumSeasonalVarianceScale, + const std::string& fileName); + + //! add a model to the persistence + bool nextModel(TMathsModelPtr& model, model_t::EFeature& feature, std::string& byFieldValue); + + private: + static bool restoreOneModel(core::CStateRestoreTraverser& traverser, + SModelParams modelParams, + double inimumSeasonalVarianceScale, + TMathsModelPtr& model, + model_t::EFeature& feature, + std::string& byFieldValue); + + private: + //! model parameters required in order to restore the model + SModelParams m_ModelParams; + + //! minimum seasonal variance scale specific to the model + double m_MinimumSeasonalVarianceScale; + + //! the actual file where it models are persisted to + std::ifstream m_InStream; + + //! the persist inserter + core::CJsonStateRestoreTraverser m_RestoreTraverser; + }; // class CRestore +}; // class CForecastModelPersist +} +} + +#endif // INCLUDED_ml_model_CForecastModelPersist_h diff --git a/include/model/CMetricModelFactory.h b/include/model/CMetricModelFactory.h index f17d21022f..4fe1ba2ce6 100644 --- a/include/model/CMetricModelFactory.h +++ b/include/model/CMetricModelFactory.h @@ -134,6 +134,9 @@ class MODEL_EXPORT CMetricModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CMetricPopulationModelFactory.h b/include/model/CMetricPopulationModelFactory.h index 5c66ef142d..cda61f4f5f 100644 --- a/include/model/CMetricPopulationModelFactory.h +++ b/include/model/CMetricPopulationModelFactory.h @@ -133,6 +133,9 @@ class MODEL_EXPORT CMetricPopulationModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CModelFactory.h b/include/model/CModelFactory.h index b9a3780bab..8d2228a7bd 100644 --- a/include/model/CModelFactory.h +++ b/include/model/CModelFactory.h @@ -351,6 +351,9 @@ class MODEL_EXPORT CModelFactory { //! component. std::size_t componentSize() const; + // Get the minimum seasonal variance scale, specific to the model + virtual double minimumSeasonalVarianceScale() const = 0; + protected: using TMultivariatePriorPtrVec = std::vector; using TOptionalSearchKey = boost::optional; diff --git a/lib/api/CForecastRunner.cc b/lib/api/CForecastRunner.cc index 60d81fd76e..be7776b955 100644 --- a/lib/api/CForecastRunner.cc +++ b/lib/api/CForecastRunner.cc @@ -11,12 +11,14 @@ #include #include +#include #include #include #include -#include +#include +#include #include namespace ml { @@ -34,7 +36,11 @@ const std::string CForecastRunner::ERROR_NO_DATA_PROCESSED( "Forecast cannot be executed as job requires data to have been processed and modeled"); const std::string CForecastRunner::ERROR_NO_CREATE_TIME("Forecast create time must be specified and non zero"); const std::string CForecastRunner::ERROR_BAD_MEMORY_STATUS("Forecast cannot be executed as model memory status is not OK"); -const std::string CForecastRunner::ERROR_MEMORY_LIMIT("Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB"); +const std::string CForecastRunner::ERROR_MEMORY_LIMIT("Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB while disk space is exceeded"); +const std::string CForecastRunner::ERROR_MEMORY_LIMIT_DISK( + "Forecast cannot be executed as forecast memory usage is predicted to exceed 500MB"); +const std::string CForecastRunner::ERROR_MEMORY_LIMIT_DISKSPACE( + "Forecast cannot be executed as models exceed internal memory limit and available disk space is insufficient"); const std::string CForecastRunner::ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS("Forecast is not supported for population analysis"); const std::string CForecastRunner::ERROR_NO_SUPPORTED_FUNCTIONS("Forecast is not supported for the used functions"); const std::string CForecastRunner::WARNING_DURATION_LIMIT("Forecast duration exceeds internal limit, setting to 8 weeks"); @@ -47,7 +53,7 @@ CForecastRunner::SForecast::SForecast() : s_ForecastId(), s_ForecastAlias(), s_ForecastSeries(), s_CreateTime(0), s_StartTime(0), s_Duration(0), s_ExpiryTime(0), s_BoundsPercentile(0), s_NumberOfModels(0), s_NumberOfForecastableModels(0), s_MemoryUsage(0), - s_Messages() { + s_Messages(), s_TemporaryFolder() { } CForecastRunner::SForecast::SForecast(SForecast&& other) @@ -59,7 +65,8 @@ CForecastRunner::SForecast::SForecast(SForecast&& other) s_BoundsPercentile(other.s_BoundsPercentile), s_NumberOfModels(other.s_NumberOfModels), s_NumberOfForecastableModels(other.s_NumberOfForecastableModels), - s_MemoryUsage(other.s_MemoryUsage), s_Messages(other.s_Messages) { + s_MemoryUsage(other.s_MemoryUsage), s_Messages(other.s_Messages), + s_TemporaryFolder(std::move(other.s_TemporaryFolder)) { } CForecastRunner::SForecast& CForecastRunner::SForecast::operator=(SForecast&& other) { @@ -75,6 +82,7 @@ CForecastRunner::SForecast& CForecastRunner::SForecast::operator=(SForecast&& ot s_NumberOfForecastableModels = other.s_NumberOfForecastableModels; s_MemoryUsage = other.s_MemoryUsage; s_Messages = other.s_Messages; + s_TemporaryFolder = std::move(other.s_TemporaryFolder); return *this; } @@ -138,8 +146,32 @@ void CForecastRunner::forecastWorker() { // while loops allow us to free up memory for every model right after each forecast is done while (!forecastJob.s_ForecastSeries.empty()) { TForecastResultSeries& series = forecastJob.s_ForecastSeries.back(); + std::unique_ptr modelRestore; + + // initialize persistence restore exactly once + if (!series.s_ToForecastPersisted.empty()) { + modelRestore.reset(new model::CForecastModelPersist::CRestore( + series.s_ModelParams, series.s_MinimumSeasonalVarianceScale, + series.s_ToForecastPersisted)); + } + + while (series.s_ToForecast.empty() == false || modelRestore != nullptr) { + // check if we should backfill from persistence + if (series.s_ToForecast.empty()) { + TMathsModelPtr model; + model_t::EFeature feature; + std::string byFieldValue; + + if (modelRestore->nextModel(model, feature, byFieldValue)) { + series.s_ToForecast.emplace_back( + feature, std::move(model), byFieldValue); + } else // restorer exhausted, no need for further restoring + { + modelRestore.reset(); + break; + } + } - while (!series.s_ToForecast.empty()) { const TForecastModelWrapper& model = series.s_ToForecast.back(); model_t::TDouble1VecDouble1VecPr support = model_t::support(model.s_Feature); @@ -189,6 +221,20 @@ void CForecastRunner::forecastWorker() { // signal that job is done m_WorkCompleteCondition.notify_all(); + + // cleanup + if (!forecastJob.s_TemporaryFolder.empty()) { + boost::filesystem::path temporaryFolder(forecastJob.s_TemporaryFolder); + boost::system::error_code errorCode; + boost::filesystem::remove_all(temporaryFolder, errorCode); + if (errorCode) { + // not an error: there is also cleanup code on X-pack side + LOG_WARN(<< "Failed to cleanup temporary data from: " + << forecastJob.s_TemporaryFolder << " error " + << errorCode.message()); + return; + } + } } } @@ -259,13 +305,19 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, prerequisites.s_IsSupportedFunction; totalMemoryUsage += prerequisites.s_MemoryUsageForDetector; - if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY) { + if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY && + forecastJob.s_TemporaryFolder.empty()) { // note: for now MAX_FORECAST_MODEL_MEMORY is a static limit, a user can not change it this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT); return false; } } + if (totalMemoryUsage >= MAX_FORECAST_MODEL_PERSISTANCE_MEMORY) { + this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISK); + return false; + } + if (atLeastOneNonPopulationModel == false) { this->sendErrorMessage(forecastJob, ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS); return false; @@ -289,7 +341,33 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, this->sendScheduledMessage(forecastJob); // 2nd loop over the detectors to clone models for forecasting - TForecastResultSeriesVec s; + bool persistOnDisk = false; + if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY) { + boost::filesystem::path temporaryFolder(forecastJob.s_TemporaryFolder); + + if (this->sufficientAvailableDiskSpace(temporaryFolder) == false) { + this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISKSPACE); + return false; + } + + LOG_INFO(<< "Forecast of large model requested (requires " + << std::to_string(1 + (totalMemoryUsage >> 20)) << " MB), using disk."); + + boost::system::error_code errorCode; + boost::filesystem::create_directories(temporaryFolder, errorCode); + if (errorCode) { + this->sendErrorMessage( + forecastJob, + "Forecast internal error, failed to create temporary folder " + + temporaryFolder.string() + " error: " + errorCode.message()); + return false; + } + + LOG_DEBUG(<< "Persisting to: " << temporaryFolder.string()); + persistOnDisk = true; + } else { + forecastJob.s_TemporaryFolder.clear(); + } for (const auto& detector : detectors) { if (detector.get() == nullptr) { @@ -297,7 +375,8 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, continue; } - forecastJob.s_ForecastSeries.emplace_back(detector->getForecastModels()); + forecastJob.s_ForecastSeries.emplace_back(detector->getForecastModels( + persistOnDisk, forecastJob.s_TemporaryFolder)); } return this->push(forecastJob); @@ -341,6 +420,8 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control forecastJob.s_Duration = properties.get("duration", 0); forecastJob.s_CreateTime = properties.get("create_time", 0); + // tmp storage if available + forecastJob.s_TemporaryFolder = properties.get("tmp_storage", EMPTY_STRING); // use -1 as default to allow 0 as 'never expires' expiresIn = properties.get("expires_in", -1l); @@ -371,7 +452,6 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control // to be replaced by https://github.com/elastic/machine-learning-cpp/issues/443 // TODO this is a temporary fix to prevent the analysis blowing up // if you change this value, also change the log string - // todo: refactor validation out from here core_t::TTime maxDuration = 8 * core::constants::WEEK; if (forecastJob.s_Duration > maxDuration) { LOG_INFO(<< WARNING_DURATION_LIMIT); @@ -400,6 +480,19 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control return true; } +bool CForecastRunner::sufficientAvailableDiskSpace(const boost::filesystem::path& path) { + boost::system::error_code errorCode; + auto spaceInfo = boost::filesystem::space(path, errorCode); + + if (errorCode) { + LOG_ERROR(<< "Failed to retrieve disk information for " << path + << " error " << errorCode.message()); + return false; + } + + return spaceInfo.available > MIN_FORECAST_AVAILABLE_DISK_SPACE; +} + void CForecastRunner::sendScheduledMessage(const SForecast& forecastJob) const { LOG_DEBUG(<< "job passed forecast validation, scheduled for forecasting"); model::CForecastDataSink sink( diff --git a/lib/api/dump_state/Makefile b/lib/api/dump_state/Makefile index 894646433c..9165083ce0 100644 --- a/lib/api/dump_state/Makefile +++ b/lib/api/dump_state/Makefile @@ -10,6 +10,7 @@ TARGET=dump_state$(EXE_EXT) ML_LIBS=$(LIB_ML_CORE) $(LIB_ML_MATHS) $(LIB_ML_MODEL) $(LIB_ML_API) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 diff --git a/lib/model/CAnomalyDetector.cc b/lib/model/CAnomalyDetector.cc index 8173b6df39..35c2b46514 100644 --- a/lib/model/CAnomalyDetector.cc +++ b/lib/model/CAnomalyDetector.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -514,8 +515,10 @@ CAnomalyDetector::getForecastPrerequisites() const { return prerequisites; } -CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels() const { - CForecastDataSink::SForecastResultSeries series; +CForecastDataSink::SForecastResultSeries +CAnomalyDetector::getForecastModels(bool persistOnDisk, + const std::string& persistenceFolder) const { + CForecastDataSink::SForecastResultSeries series(m_ModelFactory->modelParams()); if (m_DataGatherer->isPopulation()) { return series; @@ -533,17 +536,38 @@ CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels() c series.s_DetectorIndex = m_DetectorIndex; series.s_PartitionFieldName = key.partitionFieldName(); series.s_PartitionFieldValue = m_DataGatherer->partitionFieldValue(); + series.s_MinimumSeasonalVarianceScale = m_ModelFactory->minimumSeasonalVarianceScale(); + + if (persistOnDisk) { + CForecastModelPersist::CPersist persister(persistenceFolder); + + for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); + pid < maxPid; ++pid) { + // todo: Add terms filtering here + if (m_DataGatherer->isPersonActive(pid)) { + for (auto feature : view->features()) { + const maths::CModel* model = view->model(feature, pid); + if (model != nullptr && model->isForecastPossible()) { + persister.addModel(model, feature, m_DataGatherer->personName(pid)); + } + } + } + } - for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) { - // todo: Add terms filtering here - if (m_DataGatherer->isPersonActive(pid)) { - for (auto feature : view->features()) { - const maths::CModel* model = view->model(feature, pid); - if (model != nullptr && model->isForecastPossible()) { - series.s_ToForecast.emplace_back( - feature, - CForecastDataSink::TMathsModelPtr(model->cloneForForecast()), - m_DataGatherer->personName(pid)); + series.s_ToForecastPersisted = persister.finalizePersistAndGetFile(); + } else { + for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); + pid < maxPid; ++pid) { + // todo: Add terms filtering here + if (m_DataGatherer->isPersonActive(pid)) { + for (auto feature : view->features()) { + const maths::CModel* model = view->model(feature, pid); + if (model != nullptr && model->isForecastPossible()) { + series.s_ToForecast.emplace_back( + feature, + CForecastDataSink::TMathsModelPtr(model->cloneForForecast()), + m_DataGatherer->personName(pid)); + } } } } diff --git a/lib/model/CCountingModelFactory.cc b/lib/model/CCountingModelFactory.cc index 31dce848e0..3b11fa1829 100644 --- a/lib/model/CCountingModelFactory.cc +++ b/lib/model/CCountingModelFactory.cc @@ -152,5 +152,9 @@ CCountingModelFactory::TStrCRefVec CCountingModelFactory::partitioningFields() c } return result; } +double CCountingModelFactory::minimumSeasonalVarianceScale() const { + // unused, return something + return 0.0; +} } } diff --git a/lib/model/CEventRateModelFactory.cc b/lib/model/CEventRateModelFactory.cc index e5c873d4f2..bbba2c8981 100644 --- a/lib/model/CEventRateModelFactory.cc +++ b/lib/model/CEventRateModelFactory.cc @@ -57,7 +57,8 @@ CEventRateModelFactory::makeModel(const SModelInitializationData& initData) cons return new CEventRateModel( this->modelParams(), dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 0.4, true), + this->defaultFeatureModels(features, dataGatherer->bucketLength(), + this->minimumSeasonalVarianceScale(), true), this->defaultCorrelatePriors(features), this->defaultCorrelates(features), this->defaultCategoricalPrior(), influenceCalculators); } @@ -259,6 +260,10 @@ void CEventRateModelFactory::bucketResultsDelay(std::size_t bucketResultsDelay) m_BucketResultsDelay = bucketResultsDelay; } +double CEventRateModelFactory::minimumSeasonalVarianceScale() const { + return 0.4; +} + CEventRateModelFactory::TStrCRefVec CEventRateModelFactory::partitioningFields() const { TStrCRefVec result; result.reserve(2); diff --git a/lib/model/CEventRatePopulationModelFactory.cc b/lib/model/CEventRatePopulationModelFactory.cc index 98230a6750..9506c2fdeb 100644 --- a/lib/model/CEventRatePopulationModelFactory.cc +++ b/lib/model/CEventRatePopulationModelFactory.cc @@ -57,7 +57,8 @@ CEventRatePopulationModelFactory::makeModel(const SModelInitializationData& init return new CEventRatePopulationModel( this->modelParams(), dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 1.0, false), + this->defaultFeatureModels(features, dataGatherer->bucketLength(), + this->minimumSeasonalVarianceScale(), false), this->defaultCorrelatePriors(features), this->defaultCorrelates(features), influenceCalculators); } @@ -274,5 +275,9 @@ CEventRatePopulationModelFactory::partitioningFields() const { } return result; } + +double CEventRatePopulationModelFactory::minimumSeasonalVarianceScale() const { + return 1.0; +} } } diff --git a/lib/model/CForecastDataSink.cc b/lib/model/CForecastDataSink.cc index a4e803ec22..7cfd735337 100644 --- a/lib/model/CForecastDataSink.cc +++ b/lib/model/CForecastDataSink.cc @@ -67,16 +67,20 @@ CForecastDataSink::SForecastModelWrapper::SForecastModelWrapper(SForecastModelWr s_ByFieldValue(std::move(other.s_ByFieldValue)) { } -CForecastDataSink::SForecastResultSeries::SForecastResultSeries() - : s_DetectorIndex(), s_ToForecast(), s_PartitionFieldValue(), s_ByFieldName() { +CForecastDataSink::SForecastResultSeries::SForecastResultSeries(const SModelParams& modelParams) + : s_ModelParams(modelParams), s_DetectorIndex(), s_ToForecastPersisted(), + s_ByFieldName(), s_MinimumSeasonalVarianceScale(0.0) { } CForecastDataSink::SForecastResultSeries::SForecastResultSeries(SForecastResultSeries&& other) - : s_DetectorIndex(other.s_DetectorIndex), + : s_ModelParams(std::move(other.s_ModelParams)), + s_DetectorIndex(other.s_DetectorIndex), s_ToForecast(std::move(other.s_ToForecast)), + s_ToForecastPersisted(std::move(other.s_ToForecastPersisted)), s_PartitionFieldName(std::move(other.s_PartitionFieldName)), s_PartitionFieldValue(std::move(other.s_PartitionFieldValue)), - s_ByFieldName(std::move(other.s_ByFieldName)) { + s_ByFieldName(std::move(other.s_ByFieldName)), + s_MinimumSeasonalVarianceScale(other.s_MinimumSeasonalVarianceScale) { } CForecastDataSink::CForecastDataSink(const std::string& jobId, diff --git a/lib/model/CForecastModelPersist.cc b/lib/model/CForecastModelPersist.cc new file mode 100644 index 0000000000..3a3a8c5520 --- /dev/null +++ b/lib/model/CForecastModelPersist.cc @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +#include + +#include +#include +#include + +#include +#include +#include + +#include + +#include + +namespace ml { +namespace model { + +namespace { +static const std::string FORECAST_MODEL_PERSIST_TAG("forecast_persist"); +static const std::string FEATURE_TAG("feature"); +static const std::string DATA_TYPE_TAG("datatype"); +static const std::string MODEL_TAG("model"); +static const std::string BY_FIELD_VALUE_TAG("by_field_value"); +} + +CForecastModelPersist::CPersist::CPersist(const std::string& temporaryPath) + : m_FileName(temporaryPath), m_OutStream(), m_ModelCount(0) { + m_FileName /= boost::filesystem::unique_path("forecast-persist-%%%%-%%%%-%%%%-%%%%"); + m_OutStream.open(m_FileName.string()); + m_OutStream << "["; +} + +void CForecastModelPersist::CPersist::addModel(const maths::CModel* model, + const model_t::EFeature feature, + const std::string& byFieldValue) { + if (m_ModelCount++ > 0) { + m_OutStream << ","; + } + + core::CJsonStatePersistInserter inserter(m_OutStream); + inserter.insertLevel(FORECAST_MODEL_PERSIST_TAG, + boost::bind(CForecastModelPersist::CPersist::persistOneModel, + _1, model, feature, byFieldValue)); +} + +void CForecastModelPersist::CPersist::persistOneModel(core::CStatePersistInserter& inserter, + const maths::CModel* model, + const model_t::EFeature feature, + const std::string& byFieldValue) { + inserter.insertValue(FEATURE_TAG, feature); + inserter.insertValue(DATA_TYPE_TAG, model->dataType()); + inserter.insertValue(BY_FIELD_VALUE_TAG, byFieldValue); + inserter.insertLevel(MODEL_TAG, boost::bind(maths::CModelStateSerialiser(), + boost::cref(*model), _1)); +} + +const std::string& CForecastModelPersist::CPersist::finalizePersistAndGetFile() { + m_OutStream << "]"; + m_OutStream.close(); + return m_FileName.string(); +} + +CForecastModelPersist::CRestore::CRestore(const SModelParams& modelParams, + double minimumSeasonalVarianceScale, + const std::string& fileName) + : m_ModelParams(modelParams), + m_MinimumSeasonalVarianceScale(minimumSeasonalVarianceScale), + m_InStream(fileName), m_RestoreTraverser(m_InStream) { +} + +bool CForecastModelPersist::CRestore::nextModel(TMathsModelPtr& model, + model_t::EFeature& feature, + std::string& byFieldValue) { + if (m_RestoreTraverser.isEof() || m_RestoreTraverser.name().empty()) { + return false; + } + + if (m_RestoreTraverser.name() != FORECAST_MODEL_PERSIST_TAG) { + LOG_ERROR(<< "Failed to restore forecast model, unexpected tag"); + return false; + } + + if (!m_RestoreTraverser.hasSubLevel()) { + LOG_ERROR(<< "Failed to restore forecast model, unexpected format"); + return false; + } + + TMathsModelPtr originalModel; + if (!m_RestoreTraverser.traverseSubLevel(boost::bind( + CForecastModelPersist::CRestore::restoreOneModel, _1, + boost::cref(m_ModelParams), m_MinimumSeasonalVarianceScale, + boost::ref(originalModel), boost::ref(feature), boost::ref(byFieldValue)))) { + LOG_ERROR(<< "Failed to restore forecast model, internal error"); + return false; + } + + model.reset(originalModel->cloneForForecast()); + m_RestoreTraverser.nextObject(); + + return true; +} + +bool CForecastModelPersist::CRestore::restoreOneModel(core::CStateRestoreTraverser& traverser, + const SModelParams modelParams, + double minimumSeasonalVarianceScale, + TMathsModelPtr& model, + model_t::EFeature& feature, + std::string& byFieldValue) { + // reset all + model.reset(); + bool restoredFeature = false; + bool restoredDataType = false; + byFieldValue.clear(); + maths_t::EDataType dataType; + + do { + const std::string& name = traverser.name(); + RESTORE_ENUM_CHECKED(FEATURE_TAG, feature, model_t::EFeature, restoredFeature) + RESTORE_ENUM_CHECKED(DATA_TYPE_TAG, dataType, maths_t::EDataType, restoredDataType) + RESTORE_BUILT_IN(BY_FIELD_VALUE_TAG, byFieldValue) + if (name == MODEL_TAG) { + if (!restoredDataType) { + LOG_ERROR(<< "Failed to restore forecast model, datatype missing"); + return false; + } + + maths::SModelRestoreParams params{ + maths::CModelParams(modelParams.s_BucketLength, modelParams.s_LearnRate, + modelParams.s_DecayRate, minimumSeasonalVarianceScale), + maths::STimeSeriesDecompositionRestoreParams{ + modelParams.s_DecayRate, modelParams.s_BucketLength, + modelParams.s_ComponentSize}, + modelParams.distributionRestoreParams(dataType)}; + + if (!traverser.traverseSubLevel( + boost::bind(maths::CModelStateSerialiser(), + boost::cref(params), boost::ref(model), _1))) { + LOG_ERROR(<< "Failed to restore forecast model, model missing"); + return false; + } + } + } while (traverser.next()); + + // only the by_field_value can be empty + if (!model || !restoredFeature || !restoredDataType) { + LOG_ERROR(<< "Failed to restore forecast model, data missing"); + return false; + } + + return true; +} + +} /* namespace model */ +} /* namespace ml */ diff --git a/lib/model/CMetricModelFactory.cc b/lib/model/CMetricModelFactory.cc index e23b2f44cd..5b0f3b922a 100644 --- a/lib/model/CMetricModelFactory.cc +++ b/lib/model/CMetricModelFactory.cc @@ -56,7 +56,8 @@ CMetricModelFactory::makeModel(const SModelInitializationData& initData) const { return new CMetricModel( this->modelParams(), dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 0.4, true), + this->defaultFeatureModels(features, dataGatherer->bucketLength(), + this->minimumSeasonalVarianceScale(), true), this->defaultCorrelatePriors(features), this->defaultCorrelates(features), influenceCalculators); } @@ -260,6 +261,10 @@ void CMetricModelFactory::bucketResultsDelay(std::size_t bucketResultsDelay) { m_BucketResultsDelay = bucketResultsDelay; } +double CMetricModelFactory::minimumSeasonalVarianceScale() const { + return 0.4; +} + CMetricModelFactory::TStrCRefVec CMetricModelFactory::partitioningFields() const { TStrCRefVec result; result.reserve(2); diff --git a/lib/model/CMetricPopulationModelFactory.cc b/lib/model/CMetricPopulationModelFactory.cc index 5aef1b1786..8052acfc24 100644 --- a/lib/model/CMetricPopulationModelFactory.cc +++ b/lib/model/CMetricPopulationModelFactory.cc @@ -55,7 +55,8 @@ CMetricPopulationModelFactory::makeModel(const SModelInitializationData& initDat return new CMetricPopulationModel( this->modelParams(), dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 1.0, false), + this->defaultFeatureModels(features, dataGatherer->bucketLength(), + this->minimumSeasonalVarianceScale(), false), this->defaultCorrelatePriors(features), this->defaultCorrelates(features), influenceCalculators); } @@ -256,6 +257,10 @@ void CMetricPopulationModelFactory::bucketResultsDelay(std::size_t bucketResults m_BucketResultsDelay = bucketResultsDelay; } +double CMetricPopulationModelFactory::minimumSeasonalVarianceScale() const { + return 1.0; +} + CMetricPopulationModelFactory::TStrCRefVec CMetricPopulationModelFactory::partitioningFields() const { TStrCRefVec result; diff --git a/lib/model/Makefile b/lib/model/Makefile index 6c12a68d7f..99a779ed15 100644 --- a/lib/model/Makefile +++ b/lib/model/Makefile @@ -8,6 +8,7 @@ include $(CPP_SRC_HOME)/mk/defines.mk TARGET=$(OBJS_DIR)/libMlModel$(DYNAMIC_LIB_EXT) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 @@ -38,6 +39,7 @@ CEventRatePopulationModel.cc \ CEventRatePopulationModelFactory.cc \ CFeatureData.cc \ CForecastDataSink.cc \ +CForecastModelPersist.cc \ CGathererTools.cc \ CHierarchicalResults.cc \ CHierarchicalResultsAggregator.cc \ diff --git a/lib/model/unittest/CForecastModelPersistTest.cc b/lib/model/unittest/CForecastModelPersistTest.cc new file mode 100644 index 0000000000..f46d6e1f65 --- /dev/null +++ b/lib/model/unittest/CForecastModelPersistTest.cc @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +#include "CForecastModelPersistTest.h" + +#include +#include + +#include +#include +#include + +#include + +#include + +#include + +using namespace ml; +using namespace model; + +void CForecastModelPersistTest::testPersistAndRestore() { + core_t::TTime bucketLength{1800}; + double minimumSeasonalVarianceScale = 0.2; + SModelParams params{bucketLength}; + params.s_DecayRate = 0.001; + params.s_LearnRate = 1.0; + maths::CTimeSeriesDecomposition trend(params.s_DecayRate, bucketLength); + maths::CNormalMeanPrecConjugate prior{maths::CNormalMeanPrecConjugate::nonInformativePrior( + maths_t::E_ContinuousData, params.s_DecayRate)}; + maths::CModelParams timeSeriesModelParams{bucketLength, params.s_LearnRate, + params.s_DecayRate, + minimumSeasonalVarianceScale}; + maths::CUnivariateTimeSeriesModel timeSeriesModel{timeSeriesModelParams, 1, trend, prior}; + + CForecastModelPersist::CPersist persister(ml::test::CTestTmpDir::tmpDir()); + persister.addModel(&timeSeriesModel, model_t::EFeature::E_IndividualCountByBucketAndPerson, + "some_by_field"); + + maths::CNormalMeanPrecConjugate otherPrior{maths::CNormalMeanPrecConjugate::nonInformativePrior( + maths_t::E_MixedData, params.s_DecayRate)}; + maths::CUnivariateTimeSeriesModel otherTimeSeriesModel{timeSeriesModelParams, + 2, trend, otherPrior}; + + persister.addModel(&otherTimeSeriesModel, model_t::EFeature::E_IndividualLowMeanByPerson, + "some_other_by_field"); + + maths::CNormalMeanPrecConjugate otherPriorEmptyByField{ + maths::CNormalMeanPrecConjugate::nonInformativePrior(maths_t::E_DiscreteData, + params.s_DecayRate)}; + maths::CUnivariateTimeSeriesModel otherTimeSeriesModelEmptyByField{ + timeSeriesModelParams, 3, trend, otherPriorEmptyByField}; + + persister.addModel(&otherTimeSeriesModelEmptyByField, + model_t::EFeature::E_IndividualHighMedianByPerson, ""); + std::string persistedModels = persister.finalizePersistAndGetFile(); + + { + CForecastModelPersist::CRestore restorer(params, minimumSeasonalVarianceScale, + persistedModels); + CForecastModelPersist::TMathsModelPtr restoredModel; + std::string restoredByFieldValue; + model_t::EFeature restoredFeature; + + // test timeSeriesModel + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualCountByBucketAndPerson, + restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string("some_by_field"), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(1), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_ContinuousData, restoredModel->dataType()); + + CForecastModelPersist::TMathsModelPtr timeSeriesModelForForecast{ + timeSeriesModel.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(timeSeriesModelForForecast->params().learnRate(), + restoredModel->params().learnRate()); + CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); + CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, + restoredModel->params().minimumSeasonalVarianceScale()); + + CPPUNIT_ASSERT_EQUAL(timeSeriesModelForForecast->checksum(42), + restoredModel->checksum(42)); + + // test otherTimeSeriesModel + restoredModel.reset(); + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualLowMeanByPerson, restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string("some_other_by_field"), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(2), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_MixedData, restoredModel->dataType()); + CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelForForecast{ + otherTimeSeriesModel.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelForForecast->params().learnRate(), + restoredModel->params().learnRate()); + CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); + CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, + restoredModel->params().minimumSeasonalVarianceScale()); + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelForForecast->checksum(42), + restoredModel->checksum(42)); + + // test otherTimeSeriesModelEmptyByField + restoredModel.reset(); + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualHighMedianByPerson, restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string(), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(3), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_DiscreteData, restoredModel->dataType()); + CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelEmptyByFieldForForecast{ + otherTimeSeriesModelEmptyByField.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelEmptyByFieldForForecast->checksum(42), + restoredModel->checksum(42)); + + CPPUNIT_ASSERT(!restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + } + std::remove(persistedModels.c_str()); +} + +void CForecastModelPersistTest::testPersistAndRestoreEmpty() { + core_t::TTime bucketLength{1800}; + double minimumSeasonalVarianceScale = 0.2; + SModelParams params{bucketLength}; + + CForecastModelPersist::CPersist persister(ml::test::CTestTmpDir::tmpDir()); + std::string persistedModels = persister.finalizePersistAndGetFile(); + { + CForecastModelPersist::CRestore restorer(params, minimumSeasonalVarianceScale, + persistedModels); + CForecastModelPersist::TMathsModelPtr restoredModel; + std::string restoredByFieldValue; + model_t::EFeature restoredFeature; + + CPPUNIT_ASSERT(!restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + } + std::remove(persistedModels.c_str()); +} + +CppUnit::Test* CForecastModelPersistTest::suite(void) { + CppUnit::TestSuite* suiteOfTests = new CppUnit::TestSuite("CForecastModelPersistTest"); + + suiteOfTests->addTest(new CppUnit::TestCaller( + "CForecastModelPersistTest::testPersistAndRestore", + &CForecastModelPersistTest::testPersistAndRestore)); + suiteOfTests->addTest(new CppUnit::TestCaller( + "CForecastModelPersistTest::testPersistAndRestoreEmpty", + &CForecastModelPersistTest::testPersistAndRestoreEmpty)); + + return suiteOfTests; +} diff --git a/lib/model/unittest/CForecastModelPersistTest.h b/lib/model/unittest/CForecastModelPersistTest.h new file mode 100644 index 0000000000..8c3be93920 --- /dev/null +++ b/lib/model/unittest/CForecastModelPersistTest.h @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +#ifndef INCLUDED_CForecastModelPersistTest_h +#define INCLUDED_CForecastModelPersistTest_h + +#include + +class CForecastModelPersistTest : public CppUnit::TestFixture { +public: + void testPersistAndRestore(); + void testPersistAndRestoreEmpty(); + + static CppUnit::Test* suite(); +}; + +#endif // INCLUDED_CForecastModelPersistTest_h diff --git a/lib/model/unittest/Main.cc b/lib/model/unittest/Main.cc index 8a7309143d..c99befcb3f 100644 --- a/lib/model/unittest/Main.cc +++ b/lib/model/unittest/Main.cc @@ -18,6 +18,7 @@ #include "CEventRateModelTest.h" #include "CEventRatePopulationDataGathererTest.h" #include "CEventRatePopulationModelTest.h" +#include "CForecastModelPersistTest.h" #include "CFunctionTypesTest.h" #include "CGathererToolsTest.h" #include "CHierarchicalResultsLevelSetTest.h" @@ -58,6 +59,7 @@ int main(int argc, const char** argv) { runner.addTest(CEventRatePopulationDataGathererTest::suite()); runner.addTest(CEventRatePopulationModelTest::suite()); runner.addTest(CFunctionTypesTest::suite()); + runner.addTest(CForecastModelPersistTest::suite()); runner.addTest(CGathererToolsTest::suite()); runner.addTest(CHierarchicalResultsTest::suite()); runner.addTest(CHierarchicalResultsLevelSetTest::suite()); diff --git a/lib/model/unittest/Makefile b/lib/model/unittest/Makefile index ba17e80151..d831b4399f 100644 --- a/lib/model/unittest/Makefile +++ b/lib/model/unittest/Makefile @@ -8,6 +8,7 @@ include $(CPP_SRC_HOME)/mk/defines.mk TARGET=ml_test$(EXE_EXT) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 @@ -30,6 +31,7 @@ SRCS=\ CEventRateModelTest.cc \ CEventRatePopulationDataGathererTest.cc \ CEventRatePopulationModelTest.cc \ + CForecastModelPersistTest.cc \ CFunctionTypesTest.cc \ CGathererToolsTest.cc \ CHierarchicalResultsTest.cc \ From 54a5c429b4a7e2d0b0021484e9b2fa0603b5ed70 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 3 May 2018 20:56:19 +0200 Subject: [PATCH 2/6] fix windows build issue --- include/model/CForecastModelPersist.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/model/CForecastModelPersist.h b/include/model/CForecastModelPersist.h index a5d1f0c71e..03c53e5cb6 100644 --- a/include/model/CForecastModelPersist.h +++ b/include/model/CForecastModelPersist.h @@ -42,7 +42,7 @@ class MODEL_EXPORT CForecastModelPersist final { using TMathsModelPtr = std::shared_ptr; public: - class CPersist final { + class MODEL_EXPORT CPersist final { public: explicit CPersist(const std::string& temporaryPath); @@ -71,7 +71,7 @@ class MODEL_EXPORT CForecastModelPersist final { size_t m_ModelCount; }; - class CRestore final { + class MODEL_EXPORT CRestore final { public: explicit CRestore(const SModelParams& modelParams, double minimumSeasonalVarianceScale, From d85bd81cb0d3721807c537eb87bd923612fcd81a Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 4 May 2018 16:57:58 +0200 Subject: [PATCH 3/6] add changelog entry --- docs/CHANGELOG.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 88d99f4e01..449ce55854 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -29,6 +29,8 @@ Improve and use periodic boundary condition for seasonal component modeling ({pull}84[#84]) +Allow forecasts to overflow model state to disk, allowing forecasts of big jobs ({pull}89[#89]) + === Bug Fixes === Regressions From 808c73767dcaf7455bc91fdbdfd663a5004351fe Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 4 May 2018 21:29:27 +0200 Subject: [PATCH 4/6] address review comments --- include/api/CForecastRunner.h | 12 ++++++------ include/model/CModelFactory.h | 2 +- lib/api/CForecastRunner.cc | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/include/api/CForecastRunner.h b/include/api/CForecastRunner.h index 960a44ec5d..7ee5c2f0c8 100644 --- a/include/api/CForecastRunner.h +++ b/include/api/CForecastRunner.h @@ -71,15 +71,15 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { //! max memory allowed to use for forecast models static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520ull; // 20MB - // Note: This value measures the size in memory, not the size of the persistence, - // which is likely higher and would be hard to calculate upfront + //! Note: This value measures the size in memory, not the size of the persistence, + //! which is likely higher and would be hard to calculate upfront //! max memory allowed to use for forecast models persisting to disk static const size_t MAX_FORECAST_MODEL_PERSISTANCE_MEMORY = 524288000ull; // 500MB - // Note: This value is lower than on X-pack side to prevent side-effects, - // if you change this value also change the limit on X-pack side. - // The purpose of this value is to guard the rest of the system regarding - // an out of disk space + //! Note: This value is lower than on X-pack side to prevent side-effects, + //! if you change this value also change the limit on X-pack side. + //! The purpose of this value is to guard the rest of the system regarding + //! an out of disk space //! minimum disk space required for disk persistence static const size_t MIN_FORECAST_AVAILABLE_DISK_SPACE = 4294967296ull; // 4GB diff --git a/include/model/CModelFactory.h b/include/model/CModelFactory.h index 8d2228a7bd..3f8aa00c6d 100644 --- a/include/model/CModelFactory.h +++ b/include/model/CModelFactory.h @@ -351,7 +351,7 @@ class MODEL_EXPORT CModelFactory { //! component. std::size_t componentSize() const; - // Get the minimum seasonal variance scale, specific to the model + //! Get the minimum seasonal variance scale, specific to the model virtual double minimumSeasonalVarianceScale() const = 0; protected: diff --git a/lib/api/CForecastRunner.cc b/lib/api/CForecastRunner.cc index be7776b955..c9df3f2ecd 100644 --- a/lib/api/CForecastRunner.cc +++ b/lib/api/CForecastRunner.cc @@ -165,8 +165,8 @@ void CForecastRunner::forecastWorker() { if (modelRestore->nextModel(model, feature, byFieldValue)) { series.s_ToForecast.emplace_back( feature, std::move(model), byFieldValue); - } else // restorer exhausted, no need for further restoring - { + } else { + // restorer exhausted, no need for further restoring modelRestore.reset(); break; } From 0a8bb6340c8ec6ed348cc5bd895e8533c1e1f3f4 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 7 May 2018 09:00:41 +0200 Subject: [PATCH 5/6] make changelog entry similar to X-pack one --- docs/CHANGELOG.asciidoc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 449ce55854..ed5b77a812 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -29,7 +29,8 @@ Improve and use periodic boundary condition for seasonal component modeling ({pull}84[#84]) -Allow forecasts to overflow model state to disk, allowing forecasts of big jobs ({pull}89[#89]) +Forecasting of Machine Learning job time series is now supported for large jobs by temporary storing +model state on disk ({pull}89[#89]) === Bug Fixes From ce4b2b6ef9b90833e8bcc1e73a5378d0f4c52d57 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 9 May 2018 09:04:13 +0200 Subject: [PATCH 6/6] improve changelog --- docs/CHANGELOG.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index ed5b77a812..ad47b135cc 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -29,7 +29,7 @@ Improve and use periodic boundary condition for seasonal component modeling ({pull}84[#84]) -Forecasting of Machine Learning job time series is now supported for large jobs by temporary storing +Forecasting of Machine Learning job time series is now supported for large jobs by temporarily storing model state on disk ({pull}89[#89]) === Bug Fixes