diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 3696f71751..7a878e63e4 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -43,6 +43,9 @@ and classification. (See {ml-pull}948[#948].) (See {ml-pull}991[#991].) * Add new model_size_stats fields to instrument categorization. (See {ml-pull}948[#948] and {pull}51879[#51879], issue: {issue}50794[#50749].) +* Improve upfront memory estimation for all data frame analyses, which were higher than +necessary. This will improve the allocation of data frame analyses to cluster nodes. +(See {ml-pull}1003[#1003].) === Bug Fixes diff --git a/include/api/CDataFrameAnalysisInstrumentation.h b/include/api/CDataFrameAnalysisInstrumentation.h index 76ab6cecc9..4a0ccdedd4 100644 --- a/include/api/CDataFrameAnalysisInstrumentation.h +++ b/include/api/CDataFrameAnalysisInstrumentation.h @@ -74,7 +74,7 @@ class API_EXPORT CDataFrameAnalysisInstrumentation private: void writeProgress(std::uint32_t step); void writeMemory(std::uint32_t step); - void writeState(uint32_t step); + void writeState(std::uint32_t step); private: std::atomic_bool m_Finished; diff --git a/include/maths/CDataFrameAnalysisInstrumentationInterface.h b/include/maths/CDataFrameAnalysisInstrumentationInterface.h index 509cb209ba..416882d424 100644 --- a/include/maths/CDataFrameAnalysisInstrumentationInterface.h +++ b/include/maths/CDataFrameAnalysisInstrumentationInterface.h @@ -20,6 +20,7 @@ class MATHS_EXPORT CDataFrameAnalysisInstrumentationInterface { public: using TProgressCallback = std::function; using TMemoryUsageCallback = std::function; + using TStepCallback = std::function; public: virtual ~CDataFrameAnalysisInstrumentationInterface() = default; @@ -47,6 +48,10 @@ class MATHS_EXPORT CDataFrameAnalysisInstrumentationInterface { TMemoryUsageCallback memoryUsageCallback() { return [this](std::int64_t delta) { this->updateMemoryUsage(delta); }; } + //! Factory for the nextStep() callback function object. + TStepCallback stepCallback() { + return [this](std::uint32_t step) { this->nextStep(step); }; + } }; //! \brief Dummies out all instrumentation. diff --git a/lib/api/CDataFrameAnalysisRunner.cc b/lib/api/CDataFrameAnalysisRunner.cc index 52abaf389d..893742d422 100644 --- a/lib/api/CDataFrameAnalysisRunner.cc +++ b/lib/api/CDataFrameAnalysisRunner.cc @@ -48,7 +48,7 @@ TBoolVec CDataFrameAnalysisRunner::columnsForWhichEmptyIsMissing(const TStrVec& void CDataFrameAnalysisRunner::estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const { std::size_t numberRows{m_Spec.numberRows()}; - std::size_t numberColumns{m_Spec.numberColumns() + this->numberExtraColumns()}; + std::size_t numberColumns{m_Spec.numberColumns()}; std::size_t maxNumberPartitions{maximumNumberPartitions(m_Spec)}; if (maxNumberPartitions == 0) { writer.write("0", "0"); @@ -68,7 +68,7 @@ void CDataFrameAnalysisRunner::estimateMemoryUsage(CMemoryUsageEstimationResultJ void CDataFrameAnalysisRunner::computeAndSaveExecutionStrategy() { std::size_t numberRows{m_Spec.numberRows()}; - std::size_t numberColumns{m_Spec.numberColumns() + this->numberExtraColumns()}; + std::size_t numberColumns{m_Spec.numberColumns()}; std::size_t memoryLimit{m_Spec.memoryLimit()}; LOG_TRACE(<< "memory limit = " << memoryLimit); @@ -163,8 +163,9 @@ const CDataFrameAnalysisSpecification& CDataFrameAnalysisRunner::spec() const { std::size_t CDataFrameAnalysisRunner::estimateMemoryUsage(std::size_t totalNumberRows, std::size_t partitionNumberRows, std::size_t numberColumns) const { - return core::CDataFrame::estimateMemoryUsage(this->storeDataFrameInMainMemory(), - totalNumberRows, numberColumns) + + return core::CDataFrame::estimateMemoryUsage( + this->storeDataFrameInMainMemory(), totalNumberRows, + numberColumns + this->numberExtraColumns()) + this->estimateBookkeepingMemoryUsage(m_NumberPartitions, totalNumberRows, partitionNumberRows, numberColumns); } diff --git a/lib/api/unittest/CDataFrameAnalysisRunnerTest.cc b/lib/api/unittest/CDataFrameAnalysisRunnerTest.cc index f0ad108361..4f74941bf9 100644 --- a/lib/api/unittest/CDataFrameAnalysisRunnerTest.cc +++ b/lib/api/unittest/CDataFrameAnalysisRunnerTest.cc @@ -115,6 +115,7 @@ BOOST_AUTO_TEST_CASE(testComputeAndSaveExecutionStrategyDiskUsageFlag) { } } +namespace { void testEstimateMemoryUsage(std::int64_t numberRows, const std::string& expectedExpectedMemoryWithoutDisk, const std::string& expectedExpectedMemoryWithDisk, @@ -152,32 +153,33 @@ void testEstimateMemoryUsage(std::int64_t numberRows, BOOST_TEST_REQUIRE(result.HasMember("expected_memory_without_disk")); BOOST_REQUIRE_EQUAL(expectedExpectedMemoryWithoutDisk, - std::string(result["expected_memory_without_disk"].GetString())); + result["expected_memory_without_disk"].GetString()); BOOST_TEST_REQUIRE(result.HasMember("expected_memory_with_disk")); BOOST_REQUIRE_EQUAL(expectedExpectedMemoryWithDisk, - std::string(result["expected_memory_with_disk"].GetString())); + result["expected_memory_with_disk"].GetString()); BOOST_REQUIRE_EQUAL(expectedNumberErrors, static_cast(errors.size())); } +} BOOST_AUTO_TEST_CASE(testEstimateMemoryUsageFor0Rows) { testEstimateMemoryUsage(0, "0", "0", 1); } BOOST_AUTO_TEST_CASE(testEstimateMemoryUsageFor1Row) { - testEstimateMemoryUsage(1, "6kB", "6kB", 0); + testEstimateMemoryUsage(1, "4kB", "4kB", 0); } BOOST_AUTO_TEST_CASE(testEstimateMemoryUsageFor10Rows) { - testEstimateMemoryUsage(10, "15kB", "13kB", 0); + testEstimateMemoryUsage(10, "12kB", "10kB", 0); } BOOST_AUTO_TEST_CASE(testEstimateMemoryUsageFor100Rows) { - testEstimateMemoryUsage(100, "62kB", "35kB", 0); + testEstimateMemoryUsage(100, "57kB", "35kB", 0); } BOOST_AUTO_TEST_CASE(testEstimateMemoryUsageFor1000Rows) { - testEstimateMemoryUsage(1000, "450kB", "143kB", 0); + testEstimateMemoryUsage(1000, "403kB", "142kB", 0); } void testColumnsForWhichEmptyIsMissing(const std::string& analysis, diff --git a/lib/api/unittest/CDataFrameAnalyzerFeatureImportanceTest.cc b/lib/api/unittest/CDataFrameAnalyzerFeatureImportanceTest.cc index 645f5199d3..8efca5179e 100644 --- a/lib/api/unittest/CDataFrameAnalyzerFeatureImportanceTest.cc +++ b/lib/api/unittest/CDataFrameAnalyzerFeatureImportanceTest.cc @@ -156,6 +156,17 @@ struct SFixture { analyzer.handleRecord(fieldNames, {"", "", "", "", "", "", "$"}); + LOG_DEBUG(<< "estimated memory usage = " + << core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage)); + LOG_DEBUG(<< "peak memory = " + << core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage)); + LOG_DEBUG(<< "time to train = " << core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) + << "ms"); + + BOOST_TEST_REQUIRE( + core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) < + core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage)); + rapidjson::Document results; rapidjson::ParseResult ok(results.Parse(s_Output.str())); BOOST_TEST_REQUIRE(static_cast(ok) == true); @@ -184,6 +195,17 @@ struct SFixture { analyzer.handleRecord(fieldNames, {"", "", "", "", "", "", "$"}); + LOG_DEBUG(<< "estimated memory usage = " + << core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage)); + LOG_DEBUG(<< "peak memory = " + << core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage)); + LOG_DEBUG(<< "time to train = " << core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) + << "ms"); + + BOOST_TEST_REQUIRE( + core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) < + core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage)); + rapidjson::Document results; rapidjson::ParseResult ok(results.Parse(s_Output.str())); BOOST_TEST_REQUIRE(static_cast(ok) == true); @@ -208,6 +230,17 @@ struct SFixture { analyzer.handleRecord(fieldNames, {"", "", "", "", "", "", "$"}); + LOG_DEBUG(<< "estimated memory usage = " + << core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage)); + LOG_DEBUG(<< "peak memory = " + << core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage)); + LOG_DEBUG(<< "time to train = " << core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) + << "ms"); + + BOOST_TEST_REQUIRE( + core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) < + core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage)); + rapidjson::Document results; rapidjson::ParseResult ok(results.Parse(s_Output.str())); BOOST_TEST_REQUIRE(static_cast(ok) == true); diff --git a/lib/api/unittest/CDataFrameAnalyzerOutlierTest.cc b/lib/api/unittest/CDataFrameAnalyzerOutlierTest.cc index 61fcce5e1f..7bfaa8a4cc 100644 --- a/lib/api/unittest/CDataFrameAnalyzerOutlierTest.cc +++ b/lib/api/unittest/CDataFrameAnalyzerOutlierTest.cc @@ -4,8 +4,6 @@ * you may not use this file except in compliance with the Elastic License. */ -#include "CDataFrameMockAnalysisRunner.h" - #include #include #include @@ -17,11 +15,10 @@ #include #include +#include #include #include -#include - #include #include @@ -87,9 +84,10 @@ void addOutlierTestData(TStrVec fieldNames, } frame->finishWritingRows(); - CDataFrameMockAnalysisState state; + maths::CDataFrameAnalysisInstrumentationStub instrumentation; maths::COutliers::compute( - {1, 1, true, method, numberNeighbours, computeFeatureInfluence, 0.05}, *frame, state); + {1, 1, true, method, numberNeighbours, computeFeatureInfluence, 0.05}, + *frame, instrumentation); expectedScores.resize(numberInliers + numberOutliers); expectedFeatureInfluences.resize(numberInliers + numberOutliers, TDoubleVec(5)); @@ -202,10 +200,17 @@ BOOST_AUTO_TEST_CASE(testRunOutlierDetection) { LOG_DEBUG(<< "number partitions = " << core::CProgramCounters::counter(counter_t::E_DFONumberPartitions)); + LOG_DEBUG(<< "estimated memory usage = " + << core::CProgramCounters::counter(counter_t::E_DFOEstimatedPeakMemoryUsage)); LOG_DEBUG(<< "peak memory = " << core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage)); + BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFONumberPartitions) == 1); BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage) < 100000); + // Allow a 20% margin + BOOST_TEST_REQUIRE( + core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage) < + (120 * core::CProgramCounters::counter(counter_t::E_DFOEstimatedPeakMemoryUsage)) / 100); } BOOST_AUTO_TEST_CASE(testRunOutlierDetectionPartitioned) { @@ -249,11 +254,17 @@ BOOST_AUTO_TEST_CASE(testRunOutlierDetectionPartitioned) { LOG_DEBUG(<< "number partitions = " << core::CProgramCounters::counter(counter_t::E_DFONumberPartitions)); + LOG_DEBUG(<< "estimated memory usage = " + << core::CProgramCounters::counter(counter_t::E_DFOEstimatedPeakMemoryUsage)); LOG_DEBUG(<< "peak memory = " << core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage)); + BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFONumberPartitions) > 1); - BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage) < - 300000); // + 16% + // Allow a 20% margin + BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage) < 120000); + BOOST_TEST_REQUIRE( + core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage) < + (120 * core::CProgramCounters::counter(counter_t::E_DFOEstimatedPeakMemoryUsage)) / 100); } BOOST_AUTO_TEST_CASE(testRunOutlierFeatureInfluences) { diff --git a/lib/api/unittest/CDataFrameAnalyzerTrainingTest.cc b/lib/api/unittest/CDataFrameAnalyzerTrainingTest.cc index 0bfeeec148..b80144af63 100644 --- a/lib/api/unittest/CDataFrameAnalyzerTrainingTest.cc +++ b/lib/api/unittest/CDataFrameAnalyzerTrainingTest.cc @@ -427,6 +427,9 @@ BOOST_AUTO_TEST_CASE(testRunBoostedTreeRegressionTraining) { BOOST_TEST_REQUIRE(core::CProgramCounters::counter( counter_t::E_DFTPMEstimatedPeakMemoryUsage) < 6000000); BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) < 1500000); + BOOST_TEST_REQUIRE( + core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) < + core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage)); BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) > 0); BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) <= duration); } @@ -722,9 +725,13 @@ BOOST_AUTO_TEST_CASE(testRunBoostedTreeClassifierTraining) { << core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage)); LOG_DEBUG(<< "time to train = " << core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) << "ms"); + BOOST_TEST_REQUIRE(core::CProgramCounters::counter( counter_t::E_DFTPMEstimatedPeakMemoryUsage) < 6000000); BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) < 1500000); + BOOST_TEST_REQUIRE( + core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) < + core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage)); BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) > 0); BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) <= duration); } diff --git a/lib/api/unittest/CDataFrameMockAnalysisRunner.cc b/lib/api/unittest/CDataFrameMockAnalysisRunner.cc index f25468b0ee..b573e8299f 100644 --- a/lib/api/unittest/CDataFrameMockAnalysisRunner.cc +++ b/lib/api/unittest/CDataFrameMockAnalysisRunner.cc @@ -40,11 +40,11 @@ std::size_t CDataFrameMockAnalysisRunner::estimateBookkeepingMemoryUsage(std::si const ml::api::CDataFrameAnalysisInstrumentation& CDataFrameMockAnalysisRunner::instrumentation() const { - return m_State; + return m_Instrumentation; } ml::api::CDataFrameAnalysisInstrumentation& CDataFrameMockAnalysisRunner::instrumentation() { - return m_State; + return m_Instrumentation; } ml::test::CRandomNumbers CDataFrameMockAnalysisRunner::ms_Rng; diff --git a/lib/api/unittest/CDataFrameMockAnalysisRunner.h b/lib/api/unittest/CDataFrameMockAnalysisRunner.h index ee90146b56..f74d24985a 100644 --- a/lib/api/unittest/CDataFrameMockAnalysisRunner.h +++ b/lib/api/unittest/CDataFrameMockAnalysisRunner.h @@ -30,7 +30,6 @@ class CDataFrameMockAnalysisRunner final : public ml::api::CDataFrameAnalysisRun ml::core::CRapidJsonConcurrentLineWriter&) const override; const ml::api::CDataFrameAnalysisInstrumentation& instrumentation() const override; - ml::api::CDataFrameAnalysisInstrumentation& instrumentation() override; private: @@ -42,7 +41,7 @@ class CDataFrameMockAnalysisRunner final : public ml::api::CDataFrameAnalysisRun private: static ml::test::CRandomNumbers ms_Rng; - CDataFrameMockAnalysisState m_State; + CDataFrameMockAnalysisState m_Instrumentation; }; class CDataFrameMockAnalysisRunnerFactory final : public ml::api::CDataFrameAnalysisRunnerFactory { diff --git a/lib/maths/CBoostedTreeImpl.cc b/lib/maths/CBoostedTreeImpl.cc index 1a5280e2f5..4ca4a71523 100644 --- a/lib/maths/CBoostedTreeImpl.cc +++ b/lib/maths/CBoostedTreeImpl.cc @@ -330,8 +330,6 @@ std::size_t CBoostedTreeImpl::estimateMemoryUsage(std::size_t numberRows, std::size_t forestMemoryUsage{ m_MaximumNumberTrees * (sizeof(TNodeVec) + maximumNumberNodes * sizeof(CBoostedTreeNode))}; - std::size_t extraColumnsMemoryUsage{numberExtraColumnsForTrain(m_Loss->numberParameters()) * - numberRows * sizeof(CFloatStorage)}; std::size_t foldRoundLossMemoryUsage{m_NumberFolds * m_NumberRounds * sizeof(TOptionalDouble)}; std::size_t hyperparametersMemoryUsage{numberColumns * sizeof(double)}; @@ -349,8 +347,8 @@ std::size_t CBoostedTreeImpl::estimateMemoryUsage(std::size_t numberRows, this->numberHyperparametersToTune(), m_NumberRounds)}; std::size_t shapMemoryUsage{ m_TopShapValues > 0 ? numberRows * numberColumns * sizeof(CFloatStorage) : 0}; - return sizeof(*this) + forestMemoryUsage + extraColumnsMemoryUsage + - foldRoundLossMemoryUsage + hyperparametersMemoryUsage + + return sizeof(*this) + forestMemoryUsage + foldRoundLossMemoryUsage + + hyperparametersMemoryUsage + std::max(leafNodeStatisticsMemoryUsage, shapMemoryUsage) + // not concurrent dataTypeMemoryUsage + featureSampleProbabilities + missingFeatureMaskMemoryUsage + trainTestMaskMemoryUsage + bayesianOptimisationMemoryUsage; diff --git a/lib/maths/COutliers.cc b/lib/maths/COutliers.cc index 9b889e035e..b5bc28acb9 100644 --- a/lib/maths/COutliers.cc +++ b/lib/maths/COutliers.cc @@ -29,6 +29,7 @@ using namespace outliers_detail; namespace { using TRowItr = core::CDataFrame::TRowItr; +using TStepCallback = std::function; double shift(double score) { return std::exp(-2.0) + score; @@ -103,7 +104,6 @@ class CEnsemble { TSampler m_Sampler; TMatrix m_Projection; TPointVec m_SampledProjectedPoints; - TMemoryUsageCallback m_RecordMemoryUsage; }; using TModelBuilderVec = std::vector; @@ -123,8 +123,8 @@ class CEnsemble { return core::CMemory::dynamicSize(m_State); } - static std::size_t estimateMemoryUsage(std::size_t dimension) { - return (dimension + 2) * sizeof(CFloatStorage); + static std::size_t estimateMemoryUsage(std::size_t numberInfluences) { + return sizeof(CScorer) + (numberInfluences + 2) * sizeof(CFloatStorage); } private: @@ -156,10 +156,9 @@ class CEnsemble { public: CEnsemble(const TMethodFactoryVec& methodFactories, TModelBuilderVec builders, - TMemoryUsageCallback recordMemoryUsage); - ~CEnsemble() { - m_RecordMemoryUsage(-std::int64_t(core::CMemory::dynamicSize(m_Models))); - } + TMemoryUsageCallback recordMemoryUsage, + TStepCallback recordStep); + ~CEnsemble() { m_RecordMemoryUsage(-signedMemoryUsage(m_Models)); } CEnsemble(const CEnsemble&) = delete; CEnsemble& operator=(const CEnsemble&) = delete; @@ -175,17 +174,15 @@ class CEnsemble { CPRNG::CXorOShiro128Plus rng = CPRNG::CXorOShiro128Plus{}); //! Compute the outlier scores for \p points. - TScorerVec computeOutlierScores(const std::vector& points, - CDataFrameAnalysisInstrumentationInterface& state) const; + TScorerVec computeOutlierScores(const std::vector& points) const; //! Estimate the amount of memory that will be used by the ensemble. - static std::size_t - estimateMemoryUsedToComputeOutlierScores(TMethodSize methodSize, - std::size_t numberMethodsPerModel, - bool computeFeatureInfluence, - std::size_t totalNumberPoints, - std::size_t partitionNumberPoints, - std::size_t dimension); + static std::size_t estimateMemoryUsage(TMethodSize methodSize, + std::size_t numberMethodsPerModel, + bool computeFeatureInfluence, + std::size_t totalNumberPoints, + std::size_t partitionNumberPoints, + std::size_t dimension); //! Get a human readable description of the ensemble. std::string print() const; @@ -291,6 +288,7 @@ class CEnsemble { private: TModelVec m_Models; TMemoryUsageCallback m_RecordMemoryUsage; + TStepCallback m_RecordStep; }; template @@ -301,8 +299,9 @@ const double CEnsemble::NEIGHBOURHOOD_FRACTION{0.01}; template CEnsemble::CEnsemble(const TMethodFactoryVec& methodFactories, TModelBuilderVec builders, - TMemoryUsageCallback recordMemoryUsage) - : m_RecordMemoryUsage{std::move(recordMemoryUsage)} { + TMemoryUsageCallback recordMemoryUsage, + TStepCallback recordStep) + : m_RecordMemoryUsage{std::move(recordMemoryUsage)}, m_RecordStep{std::move(recordStep)} { m_Models.reserve(builders.size()); for (auto& builder : builders) { @@ -368,8 +367,7 @@ CEnsemble::makeBuilders(const TSizeVecVec& methods, template typename CEnsemble::TScorerVec -CEnsemble::computeOutlierScores(const std::vector& points, - CDataFrameAnalysisInstrumentationInterface& state) const { +CEnsemble::computeOutlierScores(const std::vector& points) const { if (points.empty()) { return {}; } @@ -379,22 +377,21 @@ CEnsemble::computeOutlierScores(const std::vector& points, TScorerVec scores(points.size()); m_RecordMemoryUsage(core::CMemory::dynamicSize(scores)); - std::uint32_t step{0ul}; + std::uint32_t step{0}; for (const auto& model : m_Models) { model.addOutlierScores(points, scores, m_RecordMemoryUsage); - state.nextStep(step++); + m_RecordStep(step++); } return scores; } template -std::size_t -CEnsemble::estimateMemoryUsedToComputeOutlierScores(TMethodSize methodSize, - std::size_t numberMethodsPerModel, - bool computeFeatureInfluence, - std::size_t totalNumberPoints, - std::size_t partitionNumberPoints, - std::size_t dimension) { +std::size_t CEnsemble::estimateMemoryUsage(TMethodSize methodSize, + std::size_t numberMethodsPerModel, + bool computeFeatureInfluence, + std::size_t totalNumberPoints, + std::size_t partitionNumberPoints, + std::size_t dimension) { std::size_t ensembleSize{computeEnsembleSize(numberMethodsPerModel, totalNumberPoints, dimension)}; std::size_t sampleSize{computeSampleSize(totalNumberPoints)}; @@ -403,30 +400,21 @@ CEnsemble::estimateMemoryUsedToComputeOutlierScores(TMethodSize methodSiz std::size_t projectionDimension{computeProjectionDimension(sampleSize, dimension)}; std::size_t numberNeighbours{(3 + maxNumberNeighbours) / 2}; - auto pointsMemory = [&] { - return partitionNumberPoints * - (sizeof(TPoint) + las::estimateMemoryUsage(dimension)); - }; - auto scorersMemory = [&] { - return partitionNumberPoints * - (sizeof(CScorer) + - CScorer::estimateMemoryUsage(computeFeatureInfluence ? dimension : 0)); - }; - auto modelMemory = [&] { - return CModel::estimateMemoryUsage(methodSize, sampleSize, numberNeighbours, - projectionDimension, dimension); - }; - auto partitionScoringMemory = [&] { - // The scores for a single method plus bookkeeping overhead - // for a single partition. - return numberMethodsPerModel * partitionNumberPoints * - (sizeof(TDouble1Vec) + - (computeFeatureInfluence ? projectionDimension * sizeof(double) : 0)) + - methodSize(numberNeighbours, partitionNumberPoints, projectionDimension); - }; - - return pointsMemory() + scorersMemory() + numberModels * modelMemory() + - partitionScoringMemory(); + std::size_t pointsMemory{partitionNumberPoints * + (sizeof(TPoint) + las::estimateMemoryUsage(dimension))}; + std::size_t scorersMemory{ + partitionNumberPoints * + CScorer::estimateMemoryUsage(computeFeatureInfluence ? dimension : 0)}; + std::size_t modelMemory{CModel::estimateMemoryUsage( + methodSize, sampleSize, numberNeighbours, projectionDimension, dimension)}; + // The scores for a single method plus bookkeeping overhead for a single partition. + std::size_t partitionScoringMemory{ + numberMethodsPerModel * partitionNumberPoints * + (sizeof(TDouble1Vec) + + (computeFeatureInfluence ? projectionDimension * sizeof(double) : 0)) + + methodSize(numberNeighbours, partitionNumberPoints, projectionDimension)}; + + return pointsMemory + scorersMemory + numberModels * modelMemory + partitionScoringMemory; } template @@ -803,13 +791,10 @@ std::size_t CEnsemble::CModel::estimateMemoryUsage(TMethodSize methodSize std::size_t numberNeighbours, std::size_t projectionDimension, std::size_t dimension) { - auto lookupMemory = [&] { - return TKdTree::estimateMemoryUsage(sampleSize, projectionDimension); - }; - auto projectionMemory = [&] { - return projectionDimension * dimension * sizeof(typename SCoordinate::Type); - }; - return lookupMemory() + 2 * projectionMemory() + + std::size_t lookupMemory{TKdTree::estimateMemoryUsage(sampleSize, projectionDimension)}; + std::size_t projectionMemory{projectionDimension * dimension * + sizeof(typename SCoordinate::Type)}; + return sizeof(CModel) + lookupMemory + 2 * projectionMemory + methodSize(numberNeighbours, sampleSize, projectionDimension); } @@ -854,7 +839,8 @@ template CEnsemble buildEnsemble(const COutliers::SComputeParameters& params, core::CDataFrame& frame, TProgressCallback recordProgress, - TMemoryUsageCallback recordMemoryUsage) { + TMemoryUsageCallback recordMemoryUsage, + TStepCallback recordStep) { using TSizeVec = typename CEnsemble::TSizeVec; using TSizeVecVec = typename CEnsemble::TSizeVecVec; @@ -881,21 +867,20 @@ CEnsemble buildEnsemble(const COutliers::SComputeParameters& params, } }); - return {methodFactories(params.s_ComputeFeatureInfluence, std::move(recordProgress)), - std::move(builders), std::move(recordMemoryUsage)}; + return CEnsemble{ + methodFactories(params.s_ComputeFeatureInfluence, std::move(recordProgress)), + std::move(builders), std::move(recordMemoryUsage), std::move(recordStep)}; } bool computeOutliersNoPartitions(const COutliers::SComputeParameters& params, core::CDataFrame& frame, - CDataFrameAnalysisInstrumentationInterface& state, - TProgressCallback recordProgress, - TMemoryUsageCallback recordMemoryUsage) { + CDataFrameAnalysisInstrumentationInterface& instrumentation) { using TPoint = CMemoryMappedDenseVector; using TPointVec = std::vector; std::int64_t frameMemory{signedMemoryUsage(frame)}; - recordMemoryUsage(frameMemory); + instrumentation.updateMemoryUsage(frameMemory); CEnsemble::TScorerVec scores; @@ -903,7 +888,8 @@ bool computeOutliersNoPartitions(const COutliers::SComputeParameters& params, { core::CStopWatch watch{true}; CEnsemble ensemble{buildEnsemble( - params, frame, std::move(recordProgress), recordMemoryUsage)}; + params, frame, instrumentation.progressCallback(), + instrumentation.memoryUsageCallback(), instrumentation.stepCallback())}; LOG_TRACE(<< "Ensemble = " << ensemble.print()); core::CProgramCounters::counter(counter_t::E_DFOTimeToCreateEnsemble) = watch.stop(); @@ -914,7 +900,7 @@ bool computeOutliersNoPartitions(const COutliers::SComputeParameters& params, // is thread safe. TPointVec points(frame.numberRows(), TPoint{nullptr, 1}); std::int64_t pointsMemory{signedMemoryUsage(points)}; - recordMemoryUsage(pointsMemory); + instrumentation.updateMemoryUsage(pointsMemory); auto rowsToPoints = [&points](TRowItr beginRows, TRowItr endRows) { for (auto row = beginRows; row != endRows; ++row) { @@ -933,11 +919,11 @@ bool computeOutliersNoPartitions(const COutliers::SComputeParameters& params, } watch.reset(true); - scores = ensemble.computeOutlierScores(points, state); + scores = ensemble.computeOutlierScores(points); core::CProgramCounters::counter(counter_t::E_DFOTimeToComputeScores) = watch.stop(); - recordMemoryUsage(-pointsMemory); + instrumentation.updateMemoryUsage(-pointsMemory); // This is a sanity check against CEnsemble accidentally writing to the data // frame via one of the memory mapped vectors. All bets are off as to whether @@ -961,7 +947,7 @@ bool computeOutliersNoPartitions(const COutliers::SComputeParameters& params, frame.resizeColumns(params.s_NumberThreads, (params.s_ComputeFeatureInfluence ? 2 : 1) * dimension + 1); - recordMemoryUsage(signedMemoryUsage(frame) - frameMemory); + instrumentation.updateMemoryUsage(signedMemoryUsage(frame) - frameMemory); bool successful; std::tie(std::ignore, successful) = frame.writeColumns(params.s_NumberThreads, writeScores); @@ -974,9 +960,7 @@ bool computeOutliersNoPartitions(const COutliers::SComputeParameters& params, bool computeOutliersPartitioned(const COutliers::SComputeParameters& params, core::CDataFrame& frame, - CDataFrameAnalysisInstrumentationInterface& state, - TProgressCallback recordProgress, - TMemoryUsageCallback recordMemoryUsage) { + CDataFrameAnalysisInstrumentationInterface& instrumentation) { using TPoint = CDenseVector; using TPointVec = std::vector; @@ -984,10 +968,11 @@ bool computeOutliersPartitioned(const COutliers::SComputeParameters& params, core::CStopWatch watch{true}; CEnsemble ensemble{buildEnsemble( params, frame, - [=](double progress) { - recordProgress(progress / static_cast(params.s_NumberPartitions)); + [&](double progress) { + instrumentation.updateProgress( + progress / static_cast(params.s_NumberPartitions)); }, - recordMemoryUsage)}; + instrumentation.memoryUsageCallback(), instrumentation.stepCallback())}; core::CProgramCounters::counter(counter_t::E_DFOTimeToCreateEnsemble) = watch.stop(); LOG_TRACE(<< "Ensemble = " << ensemble.print()); @@ -996,7 +981,7 @@ bool computeOutliersPartitioned(const COutliers::SComputeParameters& params, frame.resizeColumns(params.s_NumberThreads, (params.s_ComputeFeatureInfluence ? 2 : 1) * dimension + 1); - recordMemoryUsage(signedMemoryUsage(frame)); + instrumentation.updateMemoryUsage(signedMemoryUsage(frame)); std::size_t rowsPerPartition{(frame.numberRows() + params.s_NumberPartitions - 1) / params.s_NumberPartitions}; @@ -1006,8 +991,8 @@ bool computeOutliersPartitioned(const COutliers::SComputeParameters& params, // This is presized so that rowsToPoints only needs to access and write to // each element. Since it does this once per element it is thread safe. - TPointVec points(rowsPerPartition, SConstant::get(dimension, 0.0)); - recordMemoryUsage(signedMemoryUsage(points)); + TPointVec points(rowsPerPartition, SConstant::get(dimension, 0)); + instrumentation.updateMemoryUsage(signedMemoryUsage(points)); for (std::size_t i = 0, beginPartitionRows = 0; i < params.s_NumberPartitions; ++i, beginPartitionRows += rowsPerPartition) { @@ -1035,7 +1020,7 @@ bool computeOutliersPartitioned(const COutliers::SComputeParameters& params, } watch.reset(true); - auto scores = ensemble.computeOutlierScores(points, state); + auto scores = ensemble.computeOutlierScores(points); core::CProgramCounters::counter(counter_t::E_DFOTimeToComputeScores) += watch.stop(); @@ -1056,7 +1041,7 @@ bool computeOutliersPartitioned(const COutliers::SComputeParameters& params, return false; } - recordMemoryUsage(-signedMemoryUsage(scores)); + instrumentation.updateMemoryUsage(-signedMemoryUsage(scores)); } return true; @@ -1071,16 +1056,9 @@ void COutliers::compute(const SComputeParameters& params, CDataFrameUtils::standardizeColumns(params.s_NumberThreads, frame); } - auto recordProgress{instrumentation.progressCallback()}; - auto recordMemoryUsage{instrumentation.memoryUsageCallback()}; - bool successful{frame.inMainMemory() && params.s_NumberPartitions == 1 - ? computeOutliersNoPartitions(params, frame, instrumentation, - std::move(recordProgress), - std::move(recordMemoryUsage)) - : computeOutliersPartitioned(params, frame, instrumentation, - std::move(recordProgress), - std::move(recordMemoryUsage))}; + ? computeOutliersNoPartitions(params, frame, instrumentation) + : computeOutliersPartitioned(params, frame, instrumentation)}; if (successful == false) { HANDLE_FATAL(<< "Internal error: computing outliers for data frame. There " @@ -1123,7 +1101,7 @@ std::size_t COutliers::estimateMemoryUsedByCompute(const SComputeParameters& par } return std::size_t{0}; }; - return CEnsemble::estimateMemoryUsedToComputeOutlierScores( + return CEnsemble::estimateMemoryUsage( methodSize, params.s_Method == E_Ensemble ? 2 : 1 /*number methods*/, params.s_ComputeFeatureInfluence, totalNumberPoints, partitionNumberPoints, dimension);