Skip to content

Commit 16f6aff

Browse files
committed
Introduce memory usage estimation mode in data_frame_analyzer (elastic#584)
1 parent c06dc8f commit 16f6aff

16 files changed

+327
-19
lines changed

bin/data_frame_analyzer/CCmdLineParser.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const std::string CCmdLineParser::DESCRIPTION = "Usage: data_frame_analyzer [opt
2020
bool CCmdLineParser::parse(int argc,
2121
const char* const* argv,
2222
std::string& configFile,
23+
bool& memoryUsageEstimationOnly,
2324
std::string& logProperties,
2425
std::string& logPipe,
2526
bool& lengthEncodedInput,
@@ -35,6 +36,7 @@ bool CCmdLineParser::parse(int argc,
3536
("version", "Display version information and exit")
3637
("config", boost::program_options::value<std::string>(),
3738
"The configuration file")
39+
("memoryUsageEstimationOnly", "Whether to perform memory usage estimation only")
3840
("logProperties", boost::program_options::value<std::string>(),
3941
"Optional logger properties file")
4042
("logPipe", boost::program_options::value<std::string>(),
@@ -66,6 +68,9 @@ bool CCmdLineParser::parse(int argc,
6668
if (vm.count("config") > 0) {
6769
configFile = vm["config"].as<std::string>();
6870
}
71+
if (vm.count("memoryUsageEstimationOnly") > 0) {
72+
memoryUsageEstimationOnly = true;
73+
}
6974
if (vm.count("logProperties") > 0) {
7075
logProperties = vm["logProperties"].as<std::string>();
7176
}

bin/data_frame_analyzer/CCmdLineParser.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class CCmdLineParser {
2727
static bool parse(int argc,
2828
const char* const* argv,
2929
std::string& configFile,
30+
bool& memoryUsageEstimationOnly,
3031
std::string& logProperties,
3132
std::string& logPipe,
3233
bool& lengthEncodedInput,

bin/data_frame_analyzer/Main.cc

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <api/CDataFrameOutliersRunner.h>
3131
#include <api/CIoManager.h>
3232
#include <api/CLengthEncodedInputParser.h>
33+
#include <api/CMemoryUsageEstimationResultJsonWriter.h>
3334

3435
#include "CCmdLineParser.h"
3536

@@ -86,6 +87,7 @@ int main(int argc, char** argv) {
8687

8788
// Read command line options
8889
std::string configFile;
90+
bool memoryUsageEstimationOnly(false);
8991
std::string logProperties;
9092
std::string logPipe;
9193
bool lengthEncodedInput(false);
@@ -94,8 +96,9 @@ int main(int argc, char** argv) {
9496
std::string outputFileName;
9597
bool isOutputFileNamedPipe(false);
9698
if (ml::data_frame_analyzer::CCmdLineParser::parse(
97-
argc, argv, configFile, logProperties, logPipe, lengthEncodedInput, inputFileName,
98-
isInputFileNamedPipe, outputFileName, isOutputFileNamedPipe) == false) {
99+
argc, argv, configFile, memoryUsageEstimationOnly, logProperties,
100+
logPipe, lengthEncodedInput, inputFileName, isInputFileNamedPipe,
101+
outputFileName, isOutputFileNamedPipe) == false) {
99102
return EXIT_FAILURE;
100103
}
101104

@@ -127,13 +130,6 @@ int main(int argc, char** argv) {
127130
}
128131

129132
using TInputParserUPtr = std::unique_ptr<ml::api::CInputParser>;
130-
auto inputParser{[lengthEncodedInput, &ioMgr]() -> TInputParserUPtr {
131-
if (lengthEncodedInput) {
132-
return std::make_unique<ml::api::CLengthEncodedInputParser>(ioMgr.inputStream());
133-
}
134-
return std::make_unique<ml::api::CCsvInputParser>(
135-
ioMgr.inputStream(), ml::api::CCsvInputParser::COMMA);
136-
}()};
137133

138134
std::string analysisSpecificationJson;
139135
bool couldReadConfigFile;
@@ -145,6 +141,16 @@ int main(int argc, char** argv) {
145141

146142
auto analysisSpecification =
147143
std::make_unique<ml::api::CDataFrameAnalysisSpecification>(analysisSpecificationJson);
144+
145+
if (memoryUsageEstimationOnly) {
146+
auto outStream = [&ioMgr]() {
147+
return std::make_unique<ml::core::CJsonOutputStreamWrapper>(ioMgr.outputStream());
148+
}();
149+
ml::api::CMemoryUsageEstimationResultJsonWriter writer(*outStream);
150+
analysisSpecification->estimateMemoryUsage(writer);
151+
return EXIT_SUCCESS;
152+
}
153+
148154
if (analysisSpecification->numberThreads() > 1) {
149155
ml::core::startDefaultAsyncExecutor(analysisSpecification->numberThreads());
150156
}
@@ -156,6 +162,13 @@ int main(int argc, char** argv) {
156162

157163
CCleanUpOnExit::add(dataFrameAnalyzer.dataFrameDirectory());
158164

165+
auto inputParser{[lengthEncodedInput, &ioMgr]() -> TInputParserUPtr {
166+
if (lengthEncodedInput) {
167+
return std::make_unique<ml::api::CLengthEncodedInputParser>(ioMgr.inputStream());
168+
}
169+
return std::make_unique<ml::api::CCsvInputParser>(
170+
ioMgr.inputStream(), ml::api::CCsvInputParser::COMMA);
171+
}()};
159172
if (inputParser->readStreamIntoVecs(
160173
[&dataFrameAnalyzer](const auto& fieldNames, const auto& fieldValues) {
161174
return dataFrameAnalyzer.handleRecord(fieldNames, fieldValues);

include/api/CDataFrameAnalysisRunner.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class CRowRef;
3131
}
3232
namespace api {
3333
class CDataFrameAnalysisSpecification;
34+
class CMemoryUsageEstimationResultJsonWriter;
3435

3536
//! \brief Hierarchy for running a specific core::CDataFrame analyses.
3637
//!
@@ -75,6 +76,11 @@ class API_EXPORT CDataFrameAnalysisRunner {
7576
//! number of rows per subset.
7677
void computeAndSaveExecutionStrategy();
7778

79+
//! Estimates memory usage in two cases: one partition (the whole data frame
80+
//! fits in main memory) and maximum tolerable number of partitions (only
81+
//! one partition needs to be loaded to main memory).
82+
void estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const;
83+
7884
//! Check if the data frame for this analysis should use in or out of core
7985
//! storage.
8086
bool storeDataFrameInMainMemory() const;

include/api/CDataFrameAnalysisSpecification.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ class API_EXPORT CDataFrameAnalysisSpecification {
151151
//! calling thread until the runner has finished.
152152
CDataFrameAnalysisRunner* run(const TStrVec& featureNames, core::CDataFrame& frame) const;
153153

154+
//! Estimates memory usage in two cases: one partition (the whole data frame
155+
//! fits in main memory) and maximum tolerable number of partitions (only
156+
//! one partition needs to be loaded to main memory).
157+
void estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const;
158+
154159
private:
155160
void initializeRunner(const rapidjson::Value& jsonAnalysis);
156161

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
#ifndef INCLUDED_ml_api_CMemoryUsageEstimationResultJsonWriter_h
7+
#define INCLUDED_ml_api_CMemoryUsageEstimationResultJsonWriter_h
8+
9+
#include <core/CJsonOutputStreamWrapper.h>
10+
#include <core/CNonCopyable.h>
11+
#include <core/CRapidJsonConcurrentLineWriter.h>
12+
13+
#include <api/ImportExport.h>
14+
15+
#include <string>
16+
17+
namespace ml {
18+
namespace api {
19+
20+
//! \brief
21+
//! Write memory usage estimation result in JSON format
22+
//!
23+
//! DESCRIPTION:\n
24+
//! Outputs the memory usage estimation result.
25+
//!
26+
class API_EXPORT CMemoryUsageEstimationResultJsonWriter : private core::CNonCopyable {
27+
public:
28+
//! \param[in] strmOut The wrapped stream to which to write output.
29+
CMemoryUsageEstimationResultJsonWriter(core::CJsonOutputStreamWrapper& strmOut);
30+
31+
//! Writes the given memory usage estimation result in JSON format.
32+
void write(const std::string& expectedMemoryUsageWithOnePartition,
33+
const std::string& expectedMemoryUsageWithMaxPartitions);
34+
35+
private:
36+
//! JSON line writer
37+
core::CRapidJsonConcurrentLineWriter m_Writer;
38+
};
39+
}
40+
}
41+
42+
#endif // INCLUDED_ml_api_CMemoryUsageEstimationResultJsonWriter_h

lib/api/CDataFrameAnalysisRunner.cc

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <core/CScopedFastLock.h>
1212

1313
#include <api/CDataFrameAnalysisSpecification.h>
14+
#include <api/CMemoryUsageEstimationResultJsonWriter.h>
1415

1516
#include <boost/iterator/counting_iterator.hpp>
1617

@@ -24,6 +25,13 @@ std::size_t memoryLimitWithSafetyMargin(const CDataFrameAnalysisSpecification& s
2425
return static_cast<std::size_t>(0.9 * static_cast<double>(spec.memoryLimit()) + 0.5);
2526
}
2627

28+
std::size_t maximumNumberPartitions(const CDataFrameAnalysisSpecification& spec) {
29+
// We limit the maximum number of partitions to rows^(1/2) because very
30+
// large numbers of partitions are going to be slow and it is better to tell
31+
// user to allocate more resources for the job in this case.
32+
return static_cast<std::size_t>(std::sqrt(static_cast<double>(spec.numberRows())) + 0.5);
33+
}
34+
2735
const std::size_t MAXIMUM_FRACTIONAL_PROGRESS{std::size_t{1}
2836
<< ((sizeof(std::size_t) - 2) * 8)};
2937
}
@@ -36,6 +44,25 @@ CDataFrameAnalysisRunner::~CDataFrameAnalysisRunner() {
3644
this->waitToFinish();
3745
}
3846

47+
void CDataFrameAnalysisRunner::estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const {
48+
std::size_t numberRows{m_Spec.numberRows()};
49+
std::size_t numberColumns{m_Spec.numberColumns() + this->numberExtraColumns()};
50+
std::size_t maxNumberPartitions{maximumNumberPartitions(m_Spec)};
51+
if (maxNumberPartitions == 0) {
52+
writer.write("0", "0");
53+
return;
54+
}
55+
std::size_t expectedMemoryUsageWithOnePartition{
56+
this->estimateMemoryUsage(numberRows, numberRows, numberColumns)};
57+
std::size_t expectedMemoryUsageWithMaxPartitions{this->estimateMemoryUsage(
58+
numberRows, numberRows / maxNumberPartitions, numberColumns)};
59+
auto roundUpToNearestKilobyte = [](std::size_t bytes) {
60+
return std::to_string((bytes + 1024 - 1) / 1024) + "kB";
61+
};
62+
writer.write(roundUpToNearestKilobyte(expectedMemoryUsageWithOnePartition),
63+
roundUpToNearestKilobyte(expectedMemoryUsageWithMaxPartitions));
64+
}
65+
3966
void CDataFrameAnalysisRunner::computeAndSaveExecutionStrategy() {
4067

4168
std::size_t numberRows{m_Spec.numberRows()};
@@ -45,17 +72,12 @@ void CDataFrameAnalysisRunner::computeAndSaveExecutionStrategy() {
4572
LOG_TRACE(<< "memory limit = " << memoryLimit);
4673

4774
// Find the smallest number of partitions such that the size per partition
48-
// is less than the memory limit. We limit this to rows^(1/2) because very
49-
// large numbers of partitions are going to be slow and it is better to tell
50-
// user to allocate more resources for the job in this case.
51-
52-
std::size_t maximumNumberPartitions{
53-
static_cast<std::size_t>(std::sqrt(static_cast<double>(numberRows)) + 0.5)};
75+
// is less than the memory limit.
5476

77+
std::size_t maxNumberPartitions{maximumNumberPartitions(m_Spec)};
5578
std::size_t memoryUsage{0};
5679

57-
for (m_NumberPartitions = 1; m_NumberPartitions < maximumNumberPartitions;
58-
++m_NumberPartitions) {
80+
for (m_NumberPartitions = 1; m_NumberPartitions < maxNumberPartitions; ++m_NumberPartitions) {
5981
std::size_t partitionNumberRows{numberRows / m_NumberPartitions};
6082
memoryUsage = this->estimateMemoryUsage(numberRows, partitionNumberRows, numberColumns);
6183
LOG_TRACE(<< "partition number rows = " << partitionNumberRows);

lib/api/CDataFrameAnalysisSpecification.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <api/CDataFrameAnalysisConfigReader.h>
1313
#include <api/CDataFrameBoostedTreeRunner.h>
1414
#include <api/CDataFrameOutliersRunner.h>
15+
#include <api/CMemoryUsageEstimationResultJsonWriter.h>
1516

1617
#include <rapidjson/document.h>
1718
#include <rapidjson/ostreamwrapper.h>
@@ -184,6 +185,14 @@ CDataFrameAnalysisRunner* CDataFrameAnalysisSpecification::run(const TStrVec& fe
184185
return nullptr;
185186
}
186187

188+
void CDataFrameAnalysisSpecification::estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const {
189+
if (m_Runner == nullptr) {
190+
HANDLE_FATAL(<< "Internal error: no runner available so can't estimate memory. Please report this problem.");
191+
return;
192+
}
193+
m_Runner->estimateMemoryUsage(writer);
194+
}
195+
187196
void CDataFrameAnalysisSpecification::initializeRunner(const rapidjson::Value& jsonAnalysis) {
188197
// We pass of the interpretation of the parameters object to the appropriate
189198
// analysis runner.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
#include <api/CMemoryUsageEstimationResultJsonWriter.h>
8+
9+
namespace ml {
10+
namespace api {
11+
namespace {
12+
13+
// JSON field names
14+
const std::string EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION("expected_memory_usage_with_one_partition");
15+
const std::string EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS("expected_memory_usage_with_max_partitions");
16+
}
17+
18+
CMemoryUsageEstimationResultJsonWriter::CMemoryUsageEstimationResultJsonWriter(core::CJsonOutputStreamWrapper& strmOut)
19+
: m_Writer(strmOut) {
20+
// Don't write any output in the constructor because, the way things work at
21+
// the moment, the output stream might be redirected after construction
22+
}
23+
24+
void CMemoryUsageEstimationResultJsonWriter::write(const std::string& expectedMemoryUsageWithOnePartition,
25+
const std::string& expectedMemoryUsageWithMaxPartitions) {
26+
m_Writer.StartObject();
27+
m_Writer.Key(EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION);
28+
m_Writer.String(expectedMemoryUsageWithOnePartition);
29+
m_Writer.Key(EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS);
30+
m_Writer.String(expectedMemoryUsageWithMaxPartitions);
31+
m_Writer.EndObject();
32+
m_Writer.flush();
33+
}
34+
}
35+
}

lib/api/Makefile.first

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ CInputParser.cc \
4343
CIoManager.cc \
4444
CJsonOutputWriter.cc \
4545
CLengthEncodedInputParser.cc \
46+
CMemoryUsageEstimationResultJsonWriter.cc \
4647
CModelPlotDataJsonWriter.cc \
4748
CModelSizeStatsJsonWriter.cc \
4849
CModelSnapshotJsonWriter.cc \

0 commit comments

Comments
 (0)