Skip to content

Commit 462023d

Browse files
committed
Introduce a mode in which data_frame_analyzer binary only outputs memory estimations but does not perform any analysis
1 parent 6d67786 commit 462023d

16 files changed

+296
-10
lines changed

bin/data_frame_analyzer/CCmdLineParser.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const std::string CCmdLineParser::DESCRIPTION = "Usage: data_frame_analyzer [opt
2020
bool CCmdLineParser::parse(int argc,
2121
const char* const* argv,
2222
std::string& configFile,
23+
bool& memoryUsageEstimationOnly,
2324
std::string& logProperties,
2425
std::string& logPipe,
2526
bool& lengthEncodedInput,
@@ -35,6 +36,7 @@ bool CCmdLineParser::parse(int argc,
3536
("version", "Display version information and exit")
3637
("config", boost::program_options::value<std::string>(),
3738
"The configuration file")
39+
("memoryUsageEstimationOnly", "Whether to perform memory usage estimation only")
3840
("logProperties", boost::program_options::value<std::string>(),
3941
"Optional logger properties file")
4042
("logPipe", boost::program_options::value<std::string>(),
@@ -66,6 +68,9 @@ bool CCmdLineParser::parse(int argc,
6668
if (vm.count("config") > 0) {
6769
configFile = vm["config"].as<std::string>();
6870
}
71+
if (vm.count("memoryUsageEstimationOnly") > 0) {
72+
memoryUsageEstimationOnly = true;
73+
}
6974
if (vm.count("logProperties") > 0) {
7075
logProperties = vm["logProperties"].as<std::string>();
7176
}

bin/data_frame_analyzer/CCmdLineParser.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class CCmdLineParser {
2727
static bool parse(int argc,
2828
const char* const* argv,
2929
std::string& configFile,
30+
bool& memoryUsageEstimationOnly,
3031
std::string& logProperties,
3132
std::string& logPipe,
3233
bool& lengthEncodedInput,

bin/data_frame_analyzer/Main.cc

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <api/CDataFrameOutliersRunner.h>
3131
#include <api/CIoManager.h>
3232
#include <api/CLengthEncodedInputParser.h>
33+
#include <api/CMemoryUsageEstimationResultJsonWriter.h>
3334

3435
#include "CCmdLineParser.h"
3536

@@ -86,6 +87,7 @@ int main(int argc, char** argv) {
8687

8788
// Read command line options
8889
std::string configFile;
90+
bool memoryUsageEstimationOnly(false);
8991
std::string logProperties;
9092
std::string logPipe;
9193
bool lengthEncodedInput(false);
@@ -94,7 +96,7 @@ int main(int argc, char** argv) {
9496
std::string outputFileName;
9597
bool isOutputFileNamedPipe(false);
9698
if (ml::data_frame_analyzer::CCmdLineParser::parse(
97-
argc, argv, configFile, logProperties, logPipe, lengthEncodedInput, inputFileName,
99+
argc, argv, configFile, memoryUsageEstimationOnly, logProperties, logPipe, lengthEncodedInput, inputFileName,
98100
isInputFileNamedPipe, outputFileName, isOutputFileNamedPipe) == false) {
99101
return EXIT_FAILURE;
100102
}
@@ -127,13 +129,6 @@ int main(int argc, char** argv) {
127129
}
128130

129131
using TInputParserUPtr = std::unique_ptr<ml::api::CInputParser>;
130-
auto inputParser{[lengthEncodedInput, &ioMgr]() -> TInputParserUPtr {
131-
if (lengthEncodedInput) {
132-
return std::make_unique<ml::api::CLengthEncodedInputParser>(ioMgr.inputStream());
133-
}
134-
return std::make_unique<ml::api::CCsvInputParser>(
135-
ioMgr.inputStream(), ml::api::CCsvInputParser::COMMA);
136-
}()};
137132

138133
std::string analysisSpecificationJson;
139134
bool couldReadConfigFile;
@@ -145,6 +140,17 @@ int main(int argc, char** argv) {
145140

146141
auto analysisSpecification =
147142
std::make_unique<ml::api::CDataFrameAnalysisSpecification>(analysisSpecificationJson);
143+
144+
if (memoryUsageEstimationOnly) {
145+
const auto result = analysisSpecification->estimateMemoryUsage();
146+
auto outStream = [&ioMgr]() {
147+
return std::make_unique<ml::core::CJsonOutputStreamWrapper>(ioMgr.outputStream());
148+
}();
149+
ml::api::CMemoryUsageEstimationResultJsonWriter jsonWriter(*outStream);
150+
jsonWriter.write(result);
151+
return EXIT_SUCCESS;
152+
}
153+
148154
if (analysisSpecification->numberThreads() > 1) {
149155
ml::core::startDefaultAsyncExecutor(analysisSpecification->numberThreads());
150156
}
@@ -156,6 +162,13 @@ int main(int argc, char** argv) {
156162

157163
CCleanUpOnExit::add(dataFrameAnalyzer.dataFrameDirectory());
158164

165+
auto inputParser{[lengthEncodedInput, &ioMgr]() -> TInputParserUPtr {
166+
if (lengthEncodedInput) {
167+
return std::make_unique<ml::api::CLengthEncodedInputParser>(ioMgr.inputStream());
168+
}
169+
return std::make_unique<ml::api::CCsvInputParser>(
170+
ioMgr.inputStream(), ml::api::CCsvInputParser::COMMA);
171+
}()};
159172
if (inputParser->readStreamIntoVecs(
160173
[&dataFrameAnalyzer](const auto& fieldNames, const auto& fieldValues) {
161174
return dataFrameAnalyzer.handleRecord(fieldNames, fieldValues);

include/api/CDataFrameAnalysisRunner.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class CRowRef;
3131
}
3232
namespace api {
3333
class CDataFrameAnalysisSpecification;
34+
struct SMemoryUsageEstimationResult;
3435

3536
//! \brief Hierarchy for running a specific core::CDataFrame analyses.
3637
//!
@@ -75,6 +76,11 @@ class API_EXPORT CDataFrameAnalysisRunner {
7576
//! number of rows per subset.
7677
void computeAndSaveExecutionStrategy();
7778

79+
//! Estimates memory usage in two cases: one partition (the whole data frame
80+
//! fits in main memory) and maximum tolerable number of partitions (only
81+
//! one partition needs to be loaded to main memory.
82+
SMemoryUsageEstimationResult estimateMemoryUsage() const;
83+
7884
//! Check if the data frame for this analysis should use in or out of core
7985
//! storage.
8086
bool storeDataFrameInMainMemory() const;

include/api/CDataFrameAnalysisSpecification.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ class API_EXPORT CDataFrameAnalysisSpecification {
151151
//! calling thread until the runner has finished.
152152
CDataFrameAnalysisRunner* run(core::CDataFrame& frame) const;
153153

154+
//! Estimates memory usage in two cases: one partition (the whole data frame
155+
//! fits in main memory) and maximum tolerable number of partitions (only
156+
//! one partition needs to be loaded to main memory.
157+
SMemoryUsageEstimationResult estimateMemoryUsage() const;
158+
154159
private:
155160
void initializeRunner(const rapidjson::Value& jsonAnalysis);
156161

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
#ifndef INCLUDED_ml_api_CMemoryUsageEstimationResultJsonWriter_h
7+
#define INCLUDED_ml_api_CMemoryUsageEstimationResultJsonWriter_h
8+
9+
#include <core/CJsonOutputStreamWrapper.h>
10+
#include <core/CNonCopyable.h>
11+
#include <core/CRapidJsonConcurrentLineWriter.h>
12+
13+
#include <api/ImportExport.h>
14+
15+
#include <string>
16+
17+
namespace ml {
18+
namespace api {
19+
20+
//! Structure to store the memory usage estimation result
21+
struct API_EXPORT SMemoryUsageEstimationResult {
22+
SMemoryUsageEstimationResult(size_t memoryUsageWithOnePartition, size_t memoryUsageWithMaxPartitions);
23+
SMemoryUsageEstimationResult(const SMemoryUsageEstimationResult& result);
24+
25+
const size_t s_MemoryUsageWithOnePartition;
26+
const size_t s_MemoryUsageWithMaxPartitions;
27+
};
28+
29+
//! \brief
30+
//! Write memory usage estimation result in JSON format
31+
//!
32+
//! DESCRIPTION:\n
33+
//! Outputs the memory usage estimation result.
34+
//!
35+
class API_EXPORT CMemoryUsageEstimationResultJsonWriter : private core::CNonCopyable {
36+
public:
37+
//! Constructor that causes output to be written to the specified wrapped stream
38+
CMemoryUsageEstimationResultJsonWriter(core::CJsonOutputStreamWrapper& strmOut);
39+
40+
//! Writes the given memory usage estimation result in JSON format.
41+
void write(const SMemoryUsageEstimationResult& result);
42+
43+
private:
44+
//! JSON line writer
45+
core::CRapidJsonConcurrentLineWriter m_Writer;
46+
};
47+
}
48+
}
49+
50+
#endif // INCLUDED_ml_api_CMemoryUsageEstimationResultJsonWriter_h

lib/api/CDataFrameAnalysisRunner.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <core/CScopedFastLock.h>
1212

1313
#include <api/CDataFrameAnalysisSpecification.h>
14+
#include <api/CMemoryUsageEstimationResultJsonWriter.h>
1415

1516
#include <boost/iterator/counting_iterator.hpp>
1617

@@ -36,6 +37,23 @@ CDataFrameAnalysisRunner::~CDataFrameAnalysisRunner() {
3637
this->waitToFinish();
3738
}
3839

40+
SMemoryUsageEstimationResult CDataFrameAnalysisRunner::estimateMemoryUsage() const {
41+
std::size_t numberRows{m_Spec.numberRows()};
42+
std::size_t numberColumns{m_Spec.numberColumns() + this->numberExtraColumns()};
43+
std::size_t maximumNumberPartitions{
44+
static_cast<std::size_t>(std::sqrt(static_cast<double>(numberRows)) + 0.5)};
45+
if (maximumNumberPartitions == 0) {
46+
return SMemoryUsageEstimationResult(0, 0);
47+
}
48+
std::size_t memoryUsageWithOnePartition{
49+
this->estimateMemoryUsage(numberRows, numberRows, numberColumns)};
50+
std::size_t memoryUsageWithMaxPartitions{
51+
this->estimateMemoryUsage(numberRows, numberRows / maximumNumberPartitions, numberColumns)};
52+
return SMemoryUsageEstimationResult(
53+
std::ceil(roundMb(memoryUsageWithOnePartition)),
54+
std::ceil(roundMb(memoryUsageWithMaxPartitions)));
55+
}
56+
3957
void CDataFrameAnalysisRunner::computeAndSaveExecutionStrategy() {
4058

4159
std::size_t numberRows{m_Spec.numberRows()};

lib/api/CDataFrameAnalysisSpecification.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <api/CDataFrameAnalysisConfigReader.h>
1313
#include <api/CDataFrameBoostedTreeRunner.h>
1414
#include <api/CDataFrameOutliersRunner.h>
15+
#include <api/CMemoryUsageEstimationResultJsonWriter.h>
1516

1617
#include <rapidjson/document.h>
1718
#include <rapidjson/ostreamwrapper.h>
@@ -183,6 +184,11 @@ CDataFrameAnalysisRunner* CDataFrameAnalysisSpecification::run(core::CDataFrame&
183184
return nullptr;
184185
}
185186

187+
SMemoryUsageEstimationResult CDataFrameAnalysisSpecification::estimateMemoryUsage() const {
188+
// TODO: What to do if m_Runner is nullptr?
189+
return m_Runner->estimateMemoryUsage();
190+
}
191+
186192
void CDataFrameAnalysisSpecification::initializeRunner(const rapidjson::Value& jsonAnalysis) {
187193
// We pass of the interpretation of the parameters object to the appropriate
188194
// analysis runner.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
#include <api/CMemoryUsageEstimationResultJsonWriter.h>
8+
9+
namespace ml {
10+
namespace api {
11+
namespace {
12+
13+
// JSON field names
14+
const std::string MEMORY_USAGE_WITH_ONE_PARTITION("memory_usage_with_one_partition");
15+
const std::string MEMORY_USAGE_WITH_MAX_PARTITIONS("memory_usage_with_max_partitions");
16+
}
17+
18+
SMemoryUsageEstimationResult::SMemoryUsageEstimationResult(
19+
std::size_t memoryUsageWithOnePartition, std::size_t memoryUsageWithMaxPartitions):
20+
s_MemoryUsageWithOnePartition(memoryUsageWithOnePartition), s_MemoryUsageWithMaxPartitions(memoryUsageWithMaxPartitions) {}
21+
22+
SMemoryUsageEstimationResult::SMemoryUsageEstimationResult(
23+
const SMemoryUsageEstimationResult& result):
24+
SMemoryUsageEstimationResult(result.s_MemoryUsageWithOnePartition, result.s_MemoryUsageWithMaxPartitions) {}
25+
26+
CMemoryUsageEstimationResultJsonWriter::CMemoryUsageEstimationResultJsonWriter(core::CJsonOutputStreamWrapper& strmOut) : m_Writer(strmOut) {
27+
// Don't write any output in the constructor because, the way things work at
28+
// the moment, the output stream might be redirected after construction
29+
}
30+
31+
void CMemoryUsageEstimationResultJsonWriter::write(const SMemoryUsageEstimationResult& result) {
32+
m_Writer.StartObject();
33+
m_Writer.Key(MEMORY_USAGE_WITH_ONE_PARTITION);
34+
m_Writer.Uint64(result.s_MemoryUsageWithOnePartition);
35+
m_Writer.Key(MEMORY_USAGE_WITH_MAX_PARTITIONS);
36+
m_Writer.Uint64(result.s_MemoryUsageWithMaxPartitions);
37+
m_Writer.EndObject();
38+
m_Writer.flush();
39+
}
40+
}
41+
}

lib/api/Makefile.first

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ CInputParser.cc \
4343
CIoManager.cc \
4444
CJsonOutputWriter.cc \
4545
CLengthEncodedInputParser.cc \
46+
CMemoryUsageEstimationResultJsonWriter.cc \
4647
CModelPlotDataJsonWriter.cc \
4748
CModelSizeStatsJsonWriter.cc \
4849
CModelSnapshotJsonWriter.cc \

lib/api/unittest/CDataFrameAnalysisRunnerTest.cc

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <api/CDataFrameAnalysisSpecification.h>
1414
#include <api/CDataFrameAnalysisSpecificationJsonWriter.h>
1515
#include <api/CDataFrameOutliersRunner.h>
16+
#include <api/CMemoryUsageEstimationResultJsonWriter.h>
1617

1718
#include <test/CTestTmpDir.h>
1819

@@ -121,6 +122,67 @@ void CDataFrameAnalysisRunnerTest::testComputeAndSaveExecutionStrategyDiskUsageF
121122
}
122123
}
123124

125+
void CDataFrameAnalysisRunnerTest::testEstimateMemoryUsage() {
126+
127+
std::vector<std::string> errors;
128+
std::mutex errorsMutex;
129+
auto errorHandler = [&errors, &errorsMutex](std::string error) {
130+
std::lock_guard<std::mutex> lock{errorsMutex};
131+
errors.push_back(error);
132+
};
133+
134+
core::CLogger::CScopeSetFatalErrorHandler scope{errorHandler};
135+
api::CDataFrameOutliersRunnerFactory factory;
136+
137+
// Test estimation for empty data frame
138+
{
139+
errors.clear();
140+
std::string jsonSpec{api::CDataFrameAnalysisSpecificationJsonWriter::jsonString(
141+
0, 5, 100000000, 1, {}, true, test::CTestTmpDir::tmpDir(), "", "outlier_detection", "")};
142+
api::CDataFrameAnalysisSpecification spec{jsonSpec};
143+
144+
// Check memory estimation result
145+
api::SMemoryUsageEstimationResult result = spec.estimateMemoryUsage();
146+
CPPUNIT_ASSERT_EQUAL(0, static_cast<int>(result.s_MemoryUsageWithOnePartition));
147+
CPPUNIT_ASSERT_EQUAL(0, static_cast<int>(result.s_MemoryUsageWithMaxPartitions));
148+
149+
// no error should be registered
150+
CPPUNIT_ASSERT_EQUAL(1, static_cast<int>(errors.size()));
151+
}
152+
153+
// Test estimation for data frame with 1 row
154+
{
155+
errors.clear();
156+
std::string jsonSpec{api::CDataFrameAnalysisSpecificationJsonWriter::jsonString(
157+
1, 5, 100000000, 1, {}, true, test::CTestTmpDir::tmpDir(), "", "outlier_detection", "")};
158+
api::CDataFrameAnalysisSpecification spec{jsonSpec};
159+
160+
// Check memory estimation result
161+
api::SMemoryUsageEstimationResult result = spec.estimateMemoryUsage();
162+
CPPUNIT_ASSERT_EQUAL(6050, static_cast<int>(result.s_MemoryUsageWithOnePartition));
163+
CPPUNIT_ASSERT_EQUAL(6050, static_cast<int>(result.s_MemoryUsageWithMaxPartitions));
164+
165+
// no error should be registered
166+
CPPUNIT_ASSERT_EQUAL(0, static_cast<int>(errors.size()));
167+
}
168+
169+
// Test estimation for data frame with 4 rows
170+
{
171+
errors.clear();
172+
std::string jsonSpec{api::CDataFrameAnalysisSpecificationJsonWriter::jsonString(
173+
4, 5, 100000000, 1, {}, true, test::CTestTmpDir::tmpDir(), "", "outlier_detection", "")};
174+
api::CDataFrameAnalysisSpecification spec{jsonSpec};
175+
176+
// Check memory estimation result
177+
api::SMemoryUsageEstimationResult result = spec.estimateMemoryUsage();
178+
CPPUNIT_ASSERT_EQUAL(9104, static_cast<int>(result.s_MemoryUsageWithOnePartition));
179+
CPPUNIT_ASSERT_EQUAL(8528, static_cast<int>(result.s_MemoryUsageWithMaxPartitions));
180+
181+
// no error should be registered
182+
CPPUNIT_ASSERT_EQUAL(0, static_cast<int>(errors.size()));
183+
}
184+
}
185+
124186
CppUnit::Test* CDataFrameAnalysisRunnerTest::suite() {
125187
CppUnit::TestSuite* suiteOfTests = new CppUnit::TestSuite("CDataFrameAnalysisRunnerTest");
126188

@@ -130,6 +192,9 @@ CppUnit::Test* CDataFrameAnalysisRunnerTest::suite() {
130192
suiteOfTests->addTest(new CppUnit::TestCaller<CDataFrameAnalysisRunnerTest>(
131193
"CDataFrameAnalysisRunnerTest::testComputeAndSaveExecutionStrategyDiskUsageFlag",
132194
&CDataFrameAnalysisRunnerTest::testComputeAndSaveExecutionStrategyDiskUsageFlag));
195+
suiteOfTests->addTest(new CppUnit::TestCaller<CDataFrameAnalysisRunnerTest>(
196+
"CDataFrameAnalysisRunnerTest::testEstimateMemoryUsage",
197+
&CDataFrameAnalysisRunnerTest::testEstimateMemoryUsage));
133198

134199
return suiteOfTests;
135200
}

lib/api/unittest/CDataFrameAnalysisRunnerTest.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class CDataFrameAnalysisRunnerTest : public CppUnit::TestFixture {
1313
public:
1414
void testComputeExecutionStrategyForOutliers();
1515
void testComputeAndSaveExecutionStrategyDiskUsageFlag();
16+
void testEstimateMemoryUsage();
1617

1718
static CppUnit::Test* suite();
1819

0 commit comments

Comments
 (0)