Skip to content

[ML] Improvements to upfront memory estimation for data frame analyses #1003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ progress, memory usage, etc. (See {ml-pull}906[#906].)
and classification. (See {ml-pull}948[#948].)
* Add new model_size_stats fields to instrument categorization. (See {ml-pull}948[#948]
and {pull}51879[#51879], issue: {issue}50794[#50749].)
* Improve upfront memory estimation for all data frame analyses, which were higher than
necessary. This will improve the allocation of data frame analyses to cluster nodes.
(See {ml-pull}1003[#1003].)

=== Bug Fixes

Expand Down
2 changes: 1 addition & 1 deletion include/api/CDataFrameAnalysisInstrumentation.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class API_EXPORT CDataFrameAnalysisInstrumentation
private:
void writeProgress(std::uint32_t step);
void writeMemory(std::uint32_t step);
void writeState(uint32_t step);
void writeState(std::uint32_t step);

private:
std::atomic_bool m_Finished;
Expand Down
5 changes: 5 additions & 0 deletions include/maths/CDataFrameAnalysisInstrumentationInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class MATHS_EXPORT CDataFrameAnalysisInstrumentationInterface {
public:
using TProgressCallback = std::function<void(double)>;
using TMemoryUsageCallback = std::function<void(std::int64_t)>;
using TStepCallback = std::function<void(std::uint32_t)>;

public:
virtual ~CDataFrameAnalysisInstrumentationInterface() = default;
Expand Down Expand Up @@ -47,6 +48,10 @@ class MATHS_EXPORT CDataFrameAnalysisInstrumentationInterface {
TMemoryUsageCallback memoryUsageCallback() {
return [this](std::int64_t delta) { this->updateMemoryUsage(delta); };
}
//! Factory for the nextStep() callback function object.
TStepCallback stepCallback() {
return [this](std::uint32_t step) { this->nextStep(step); };
}
};

//! \brief Dummies out all instrumentation.
Expand Down
9 changes: 5 additions & 4 deletions lib/api/CDataFrameAnalysisRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ TBoolVec CDataFrameAnalysisRunner::columnsForWhichEmptyIsMissing(const TStrVec&

void CDataFrameAnalysisRunner::estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const {
std::size_t numberRows{m_Spec.numberRows()};
std::size_t numberColumns{m_Spec.numberColumns() + this->numberExtraColumns()};
std::size_t numberColumns{m_Spec.numberColumns()};
std::size_t maxNumberPartitions{maximumNumberPartitions(m_Spec)};
if (maxNumberPartitions == 0) {
writer.write("0", "0");
Expand All @@ -68,7 +68,7 @@ void CDataFrameAnalysisRunner::estimateMemoryUsage(CMemoryUsageEstimationResultJ
void CDataFrameAnalysisRunner::computeAndSaveExecutionStrategy() {

std::size_t numberRows{m_Spec.numberRows()};
std::size_t numberColumns{m_Spec.numberColumns() + this->numberExtraColumns()};
std::size_t numberColumns{m_Spec.numberColumns()};
std::size_t memoryLimit{m_Spec.memoryLimit()};

LOG_TRACE(<< "memory limit = " << memoryLimit);
Expand Down Expand Up @@ -163,8 +163,9 @@ const CDataFrameAnalysisSpecification& CDataFrameAnalysisRunner::spec() const {
std::size_t CDataFrameAnalysisRunner::estimateMemoryUsage(std::size_t totalNumberRows,
std::size_t partitionNumberRows,
std::size_t numberColumns) const {
return core::CDataFrame::estimateMemoryUsage(this->storeDataFrameInMainMemory(),
totalNumberRows, numberColumns) +
return core::CDataFrame::estimateMemoryUsage(
this->storeDataFrameInMainMemory(), totalNumberRows,
numberColumns + this->numberExtraColumns()) +
this->estimateBookkeepingMemoryUsage(m_NumberPartitions, totalNumberRows,
partitionNumberRows, numberColumns);
}
Expand Down
14 changes: 8 additions & 6 deletions lib/api/unittest/CDataFrameAnalysisRunnerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ BOOST_AUTO_TEST_CASE(testComputeAndSaveExecutionStrategyDiskUsageFlag) {
}
}

namespace {
void testEstimateMemoryUsage(std::int64_t numberRows,
const std::string& expectedExpectedMemoryWithoutDisk,
const std::string& expectedExpectedMemoryWithDisk,
Expand Down Expand Up @@ -152,32 +153,33 @@ void testEstimateMemoryUsage(std::int64_t numberRows,

BOOST_TEST_REQUIRE(result.HasMember("expected_memory_without_disk"));
BOOST_REQUIRE_EQUAL(expectedExpectedMemoryWithoutDisk,
std::string(result["expected_memory_without_disk"].GetString()));
result["expected_memory_without_disk"].GetString());
BOOST_TEST_REQUIRE(result.HasMember("expected_memory_with_disk"));
BOOST_REQUIRE_EQUAL(expectedExpectedMemoryWithDisk,
std::string(result["expected_memory_with_disk"].GetString()));
result["expected_memory_with_disk"].GetString());

BOOST_REQUIRE_EQUAL(expectedNumberErrors, static_cast<int>(errors.size()));
}
}

BOOST_AUTO_TEST_CASE(testEstimateMemoryUsageFor0Rows) {
testEstimateMemoryUsage(0, "0", "0", 1);
}

BOOST_AUTO_TEST_CASE(testEstimateMemoryUsageFor1Row) {
testEstimateMemoryUsage(1, "6kB", "6kB", 0);
testEstimateMemoryUsage(1, "4kB", "4kB", 0);
}

BOOST_AUTO_TEST_CASE(testEstimateMemoryUsageFor10Rows) {
testEstimateMemoryUsage(10, "15kB", "13kB", 0);
testEstimateMemoryUsage(10, "12kB", "10kB", 0);
}

BOOST_AUTO_TEST_CASE(testEstimateMemoryUsageFor100Rows) {
testEstimateMemoryUsage(100, "62kB", "35kB", 0);
testEstimateMemoryUsage(100, "57kB", "35kB", 0);
}

BOOST_AUTO_TEST_CASE(testEstimateMemoryUsageFor1000Rows) {
testEstimateMemoryUsage(1000, "450kB", "143kB", 0);
testEstimateMemoryUsage(1000, "403kB", "142kB", 0);
}

void testColumnsForWhichEmptyIsMissing(const std::string& analysis,
Expand Down
33 changes: 33 additions & 0 deletions lib/api/unittest/CDataFrameAnalyzerFeatureImportanceTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ struct SFixture {

analyzer.handleRecord(fieldNames, {"", "", "", "", "", "", "$"});

LOG_DEBUG(<< "estimated memory usage = "
<< core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage));
LOG_DEBUG(<< "peak memory = "
<< core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage));
LOG_DEBUG(<< "time to train = " << core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain)
<< "ms");

BOOST_TEST_REQUIRE(
core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) <
core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage));

rapidjson::Document results;
rapidjson::ParseResult ok(results.Parse(s_Output.str()));
BOOST_TEST_REQUIRE(static_cast<bool>(ok) == true);
Expand Down Expand Up @@ -184,6 +195,17 @@ struct SFixture {

analyzer.handleRecord(fieldNames, {"", "", "", "", "", "", "$"});

LOG_DEBUG(<< "estimated memory usage = "
<< core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage));
LOG_DEBUG(<< "peak memory = "
<< core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage));
LOG_DEBUG(<< "time to train = " << core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain)
<< "ms");

BOOST_TEST_REQUIRE(
core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) <
core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage));

rapidjson::Document results;
rapidjson::ParseResult ok(results.Parse(s_Output.str()));
BOOST_TEST_REQUIRE(static_cast<bool>(ok) == true);
Expand All @@ -208,6 +230,17 @@ struct SFixture {

analyzer.handleRecord(fieldNames, {"", "", "", "", "", "", "$"});

LOG_DEBUG(<< "estimated memory usage = "
<< core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage));
LOG_DEBUG(<< "peak memory = "
<< core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage));
LOG_DEBUG(<< "time to train = " << core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain)
<< "ms");

BOOST_TEST_REQUIRE(
core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) <
core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage));

rapidjson::Document results;
rapidjson::ParseResult ok(results.Parse(s_Output.str()));
BOOST_TEST_REQUIRE(static_cast<bool>(ok) == true);
Expand Down
27 changes: 19 additions & 8 deletions lib/api/unittest/CDataFrameAnalyzerOutlierTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/

#include "CDataFrameMockAnalysisRunner.h"

#include <core/CContainerPrinter.h>
#include <core/CJsonOutputStreamWrapper.h>
#include <core/CProgramCounters.h>
Expand All @@ -17,11 +15,10 @@
#include <api/CDataFrameAnalysisSpecification.h>
#include <api/CDataFrameAnalyzer.h>

#include <test/BoostTestCloseAbsolute.h>
#include <test/CDataFrameAnalysisSpecificationFactory.h>
#include <test/CRandomNumbers.h>

#include <test/BoostTestCloseAbsolute.h>

#include <boost/test/unit_test.hpp>

#include <memory>
Expand Down Expand Up @@ -87,9 +84,10 @@ void addOutlierTestData(TStrVec fieldNames,
}

frame->finishWritingRows();
CDataFrameMockAnalysisState state;
maths::CDataFrameAnalysisInstrumentationStub instrumentation;
maths::COutliers::compute(
{1, 1, true, method, numberNeighbours, computeFeatureInfluence, 0.05}, *frame, state);
{1, 1, true, method, numberNeighbours, computeFeatureInfluence, 0.05},
*frame, instrumentation);

expectedScores.resize(numberInliers + numberOutliers);
expectedFeatureInfluences.resize(numberInliers + numberOutliers, TDoubleVec(5));
Expand Down Expand Up @@ -202,10 +200,17 @@ BOOST_AUTO_TEST_CASE(testRunOutlierDetection) {

LOG_DEBUG(<< "number partitions = "
<< core::CProgramCounters::counter(counter_t::E_DFONumberPartitions));
LOG_DEBUG(<< "estimated memory usage = "
<< core::CProgramCounters::counter(counter_t::E_DFOEstimatedPeakMemoryUsage));
LOG_DEBUG(<< "peak memory = "
<< core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage));

BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFONumberPartitions) == 1);
BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage) < 100000);
// Allow a 20% margin
BOOST_TEST_REQUIRE(
core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage) <
(120 * core::CProgramCounters::counter(counter_t::E_DFOEstimatedPeakMemoryUsage)) / 100);
}

BOOST_AUTO_TEST_CASE(testRunOutlierDetectionPartitioned) {
Expand Down Expand Up @@ -249,11 +254,17 @@ BOOST_AUTO_TEST_CASE(testRunOutlierDetectionPartitioned) {

LOG_DEBUG(<< "number partitions = "
<< core::CProgramCounters::counter(counter_t::E_DFONumberPartitions));
LOG_DEBUG(<< "estimated memory usage = "
<< core::CProgramCounters::counter(counter_t::E_DFOEstimatedPeakMemoryUsage));
LOG_DEBUG(<< "peak memory = "
<< core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage));

BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFONumberPartitions) > 1);
BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage) <
300000); // + 16%
// Allow a 20% margin
BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage) < 120000);
BOOST_TEST_REQUIRE(
core::CProgramCounters::counter(counter_t::E_DFOPeakMemoryUsage) <
(120 * core::CProgramCounters::counter(counter_t::E_DFOEstimatedPeakMemoryUsage)) / 100);
}

BOOST_AUTO_TEST_CASE(testRunOutlierFeatureInfluences) {
Expand Down
7 changes: 7 additions & 0 deletions lib/api/unittest/CDataFrameAnalyzerTrainingTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,9 @@ BOOST_AUTO_TEST_CASE(testRunBoostedTreeRegressionTraining) {
BOOST_TEST_REQUIRE(core::CProgramCounters::counter(
counter_t::E_DFTPMEstimatedPeakMemoryUsage) < 6000000);
BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) < 1500000);
BOOST_TEST_REQUIRE(
core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) <
core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage));
BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) > 0);
BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) <= duration);
}
Expand Down Expand Up @@ -722,9 +725,13 @@ BOOST_AUTO_TEST_CASE(testRunBoostedTreeClassifierTraining) {
<< core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage));
LOG_DEBUG(<< "time to train = " << core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain)
<< "ms");

BOOST_TEST_REQUIRE(core::CProgramCounters::counter(
counter_t::E_DFTPMEstimatedPeakMemoryUsage) < 6000000);
BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) < 1500000);
BOOST_TEST_REQUIRE(
core::CProgramCounters::counter(counter_t::E_DFTPMPeakMemoryUsage) <
core::CProgramCounters::counter(counter_t::E_DFTPMEstimatedPeakMemoryUsage));
BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) > 0);
BOOST_TEST_REQUIRE(core::CProgramCounters::counter(counter_t::E_DFTPMTimeToTrain) <= duration);
}
Expand Down
4 changes: 2 additions & 2 deletions lib/api/unittest/CDataFrameMockAnalysisRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ std::size_t CDataFrameMockAnalysisRunner::estimateBookkeepingMemoryUsage(std::si

const ml::api::CDataFrameAnalysisInstrumentation&
CDataFrameMockAnalysisRunner::instrumentation() const {
return m_State;
return m_Instrumentation;
}

ml::api::CDataFrameAnalysisInstrumentation& CDataFrameMockAnalysisRunner::instrumentation() {
return m_State;
return m_Instrumentation;
}

ml::test::CRandomNumbers CDataFrameMockAnalysisRunner::ms_Rng;
Expand Down
3 changes: 1 addition & 2 deletions lib/api/unittest/CDataFrameMockAnalysisRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class CDataFrameMockAnalysisRunner final : public ml::api::CDataFrameAnalysisRun
ml::core::CRapidJsonConcurrentLineWriter&) const override;

const ml::api::CDataFrameAnalysisInstrumentation& instrumentation() const override;

ml::api::CDataFrameAnalysisInstrumentation& instrumentation() override;

private:
Expand All @@ -42,7 +41,7 @@ class CDataFrameMockAnalysisRunner final : public ml::api::CDataFrameAnalysisRun

private:
static ml::test::CRandomNumbers ms_Rng;
CDataFrameMockAnalysisState m_State;
CDataFrameMockAnalysisState m_Instrumentation;
};

class CDataFrameMockAnalysisRunnerFactory final : public ml::api::CDataFrameAnalysisRunnerFactory {
Expand Down
6 changes: 2 additions & 4 deletions lib/maths/CBoostedTreeImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,6 @@ std::size_t CBoostedTreeImpl::estimateMemoryUsage(std::size_t numberRows,
std::size_t forestMemoryUsage{
m_MaximumNumberTrees *
(sizeof(TNodeVec) + maximumNumberNodes * sizeof(CBoostedTreeNode))};
std::size_t extraColumnsMemoryUsage{numberExtraColumnsForTrain(m_Loss->numberParameters()) *
numberRows * sizeof(CFloatStorage)};
std::size_t foldRoundLossMemoryUsage{m_NumberFolds * m_NumberRounds *
sizeof(TOptionalDouble)};
std::size_t hyperparametersMemoryUsage{numberColumns * sizeof(double)};
Expand All @@ -304,8 +302,8 @@ std::size_t CBoostedTreeImpl::estimateMemoryUsage(std::size_t numberRows,
this->numberHyperparametersToTune(), m_NumberRounds)};
std::size_t shapMemoryUsage{
m_TopShapValues > 0 ? numberRows * numberColumns * sizeof(CFloatStorage) : 0};
return sizeof(*this) + forestMemoryUsage + extraColumnsMemoryUsage +
foldRoundLossMemoryUsage + hyperparametersMemoryUsage +
return sizeof(*this) + forestMemoryUsage + foldRoundLossMemoryUsage +
hyperparametersMemoryUsage +
std::max(leafNodeStatisticsMemoryUsage, shapMemoryUsage) + // not concurrent
dataTypeMemoryUsage + featureSampleProbabilities + missingFeatureMaskMemoryUsage +
trainTestMaskMemoryUsage + bayesianOptimisationMemoryUsage;
Expand Down
Loading