Skip to content

Commit 6123ecb

Browse files
Tests for wide combiner with spilling (#6880)
1 parent 70eed72 commit 6123ecb

File tree

4 files changed

+159
-51
lines changed

4 files changed

+159
-51
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp

+18-10
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
521521

522522
if (finishedCount != SpilledBuckets.size()) return true;
523523

524-
YQL_LOG(INFO) << "switching to ProcessSpilled";
525524
SwitchMode(EOperatingMode::ProcessSpilled);
526525

527526
return ProcessSpilledDataAndWait();
@@ -551,11 +550,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
551550

552551
bool CheckMemoryAndSwitchToSpilling() {
553552
if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
554-
const auto used = TlsAllocState->GetUsed();
555-
const auto limit = TlsAllocState->GetLimit();
556-
557-
YQL_LOG(INFO) << "yellow zone reached " << (used*100/limit) << "%=" << used << "/" << limit;
558-
YQL_LOG(INFO) << "switching Memory mode to Spilling";
553+
LogMemoryUsage();
559554

560555
SwitchMode(EOperatingMode::Spilling);
561556
return true;
@@ -564,6 +559,19 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
564559
return false;
565560
}
566561

562+
void LogMemoryUsage() const {
563+
const auto used = TlsAllocState->GetUsed();
564+
const auto limit = TlsAllocState->GetLimit();
565+
TStringBuilder logmsg;
566+
logmsg << "Memory usage: ";
567+
if (limit) {
568+
logmsg << (used*100/limit) << "%=";
569+
}
570+
logmsg << (used/1_MB) << "MB/" << (limit/1_MB) << "MB";
571+
572+
YQL_LOG(INFO) << logmsg;
573+
}
574+
567575
void SpillMoreStateFromBucket(TSpilledBucket& bucket) {
568576
MKQL_ENSURE(!bucket.AsyncWriteOperation.has_value(), "Internal logic error");
569577

@@ -688,10 +696,12 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
688696
void SwitchMode(EOperatingMode mode) {
689697
switch(mode) {
690698
case EOperatingMode::InMemory: {
699+
YQL_LOG(INFO) << "switching Memory mode to InMemory";
691700
MKQL_ENSURE(false, "Internal logic error");
692701
break;
693702
}
694703
case EOperatingMode::Spilling: {
704+
YQL_LOG(INFO) << "switching Memory mode to Spilling";
695705
MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
696706
SpilledBuckets.resize(SpilledBucketCount);
697707
auto spiller = Ctx.SpillerFactory->CreateSpiller();
@@ -707,6 +717,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
707717
break;
708718
}
709719
case EOperatingMode::ProcessSpilled: {
720+
YQL_LOG(INFO) << "switching Memory mode to ProcessSpilled";
710721
MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error");
711722
MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error");
712723
BufferForKeyAndState.resize(0);
@@ -722,9 +733,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
722733
}
723734

724735
bool IsSwitchToSpillingModeCondition() const {
725-
return false;
726-
// TODO: YQL-18033
727-
// return !HasMemoryForProcessing();
736+
return !HasMemoryForProcessing();
728737
}
729738

730739
public:
@@ -1250,7 +1259,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12501259
, AllowSpilling(allowSpilling)
12511260
{}
12521261

1253-
// MARK: DoCAlculate
12541262
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
12551263
if (!state.HasValue()) {
12561264
MakeState(ctx, state);

ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
void N(NUnitTest::TTestContext&)
4949

5050
#define Y_UNIT_TEST_LLVM(N) Y_UNIT_TEST_TWIN(N, LLVM)
51+
#define Y_UNIT_TEST_LLVM_SPILLING(N) Y_UNIT_TEST_QUAD(N, LLVM, SPILLING)
5152

5253
#define Y_UNIT_TEST_QUAD(N, OPT1, OPT2) \
5354
template<bool OPT1, bool OPT2> void N(NUnitTest::TTestContext&); \
@@ -79,7 +80,7 @@ struct TUdfModuleInfo {
7980
NUdf::TUniquePtr<NUdf::IUdfModule> Module;
8081
};
8182

82-
template<bool UseLLVM>
83+
template<bool UseLLVM, bool EnableSpilling = false>
8384
struct TSetup {
8485
explicit TSetup(TComputationNodeFactory nodeFactory = GetTestFactory(), TVector<TUdfModuleInfo>&& modules = {})
8586
: Alloc(__LOCATION__)
@@ -96,6 +97,8 @@ struct TSetup {
9697
FunctionRegistry = mutableRegistry;
9798
}
9899

100+
Alloc.Ref().ForcefullySetMemoryYellowZone(EnableSpilling);
101+
99102
RandomProvider = CreateDeterministicRandomProvider(1);
100103
TimeProvider = CreateDeterministicTimeProvider(10000000);
101104

ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp

+119-40
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
#include <ydb/library/yql/minikql/mkql_string_util.h>
66

77
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
8+
#include <ydb/library/yql/minikql/computation/mock_spiller_factory_ut.h>
89

910
#include <cstring>
10-
#include <random>
1111
#include <algorithm>
1212

1313
namespace NKikimr {
@@ -30,6 +30,7 @@ using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
3030
{}
3131
private:
3232
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
33+
3334
constexpr auto size = Y_ARRAY_SIZE(g_TestYieldStreamData);
3435
if (Index == size) {
3536
return NUdf::EFetchStatus::Finish;
@@ -47,6 +48,7 @@ using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
4748
items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val)));
4849

4950
++Index;
51+
5052
return NUdf::EFetchStatus::Ok;
5153
}
5254

@@ -117,6 +119,13 @@ TRuntimeNode Combine(TProgramBuilder& pb, TRuntimeNode stream, std::function<TRu
117119
pb.CombineCore(stream, keyExtractor, init, update, finishLambda, 64ul << 20);
118120
}
119121

122+
template<bool SPILLING>
123+
TRuntimeNode WideLastCombiner(TProgramBuilder& pb, TRuntimeNode flow, const TProgramBuilder::TWideLambda& extractor, const TProgramBuilder::TBinaryWideLambda& init, const TProgramBuilder::TTernaryWideLambda& update, const TProgramBuilder::TBinaryWideLambda& finish) {
124+
return SPILLING ?
125+
pb.WideLastCombinerWithSpilling(flow, extractor, init, update, finish):
126+
pb.WideLastCombiner(flow, extractor, init, update, finish);
127+
}
128+
120129
} // unnamed
121130

122131
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
@@ -996,8 +1005,13 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerPerfTest) {
9961005
#endif
9971006
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 29u
9981007
Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
999-
Y_UNIT_TEST_LLVM(TestLongStringsRefCounting) {
1000-
TSetup<LLVM> setup;
1008+
Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsRefCounting) {
1009+
// Currently LLVM version doesn't support spilling.
1010+
if (LLVM && SPILLING) return;
1011+
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
1012+
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
1013+
1014+
TSetup<LLVM, SPILLING> setup;
10011015
TProgramBuilder& pb = *setup.PgmBuilder;
10021016

10031017
const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
@@ -1035,7 +1049,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
10351049

10361050
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
10371051

1038-
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
1052+
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
10391053
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
10401054
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
10411055
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1059,22 +1073,37 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
10591073
));
10601074

10611075
const auto graph = setup.BuildGraph(pgmReturn);
1076+
if (SPILLING) {
1077+
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
1078+
}
10621079
const auto iterator = graph->GetValue().GetListIterator();
1080+
1081+
std::unordered_set<TString> expected {
1082+
"key one",
1083+
"very long value 2 / key two",
1084+
"very long key one",
1085+
"very long value 8 / very long value 7 / very long value 6"
1086+
};
1087+
10631088
NUdf::TUnboxedValue item;
1064-
UNIT_ASSERT(iterator.Next(item));
1065-
UNBOXED_VALUE_STR_EQUAL(item, "key one");
1066-
UNIT_ASSERT(iterator.Next(item));
1067-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 2 / key two");
1068-
UNIT_ASSERT(iterator.Next(item));
1069-
UNBOXED_VALUE_STR_EQUAL(item, "very long key one");
1070-
UNIT_ASSERT(iterator.Next(item));
1071-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 8 / very long value 7 / very long value 6");
1089+
while (!expected.empty()) {
1090+
UNIT_ASSERT(iterator.Next(item));
1091+
const auto actual = TString(item.AsStringRef());
1092+
1093+
auto it = expected.find(actual);
1094+
UNIT_ASSERT(it != expected.end());
1095+
expected.erase(it);
1096+
}
10721097
UNIT_ASSERT(!iterator.Next(item));
10731098
UNIT_ASSERT(!iterator.Next(item));
10741099
}
10751100

1076-
Y_UNIT_TEST_LLVM(TestLongStringsPasstroughtRefCounting) {
1077-
TSetup<LLVM> setup;
1101+
Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsPasstroughtRefCounting) {
1102+
// Currently LLVM version doesn't support spilling.
1103+
if (LLVM && SPILLING) return;
1104+
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
1105+
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
1106+
TSetup<LLVM, SPILLING> setup;
10781107
TProgramBuilder& pb = *setup.PgmBuilder;
10791108

10801109
const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
@@ -1111,7 +1140,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
11111140

11121141
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
11131142

1114-
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
1143+
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
11151144
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
11161145
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
11171146
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1134,22 +1163,40 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
11341163
));
11351164

11361165
const auto graph = setup.BuildGraph(pgmReturn);
1166+
if (SPILLING) {
1167+
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
1168+
}
11371169
const auto iterator = graph->GetValue().GetListIterator();
1170+
1171+
std::unordered_set<TString> expected {
1172+
"very long value 1 / key one / very long value 1 / key one",
1173+
"very long value 3 / key two / very long value 2 / key two",
1174+
"very long value 4 / very long key one / very long value 4 / very long key one",
1175+
"very long value 9 / very long key two / very long value 5 / very long key two"
1176+
};
1177+
11381178
NUdf::TUnboxedValue item;
1139-
UNIT_ASSERT(iterator.Next(item));
1140-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 1 / key one / very long value 1 / key one");
1141-
UNIT_ASSERT(iterator.Next(item));
1142-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 3 / key two / very long value 2 / key two");
1143-
UNIT_ASSERT(iterator.Next(item));
1144-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 4 / very long key one / very long value 4 / very long key one");
1145-
UNIT_ASSERT(iterator.Next(item));
1146-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 9 / very long key two / very long value 5 / very long key two");
1179+
while (!expected.empty()) {
1180+
UNIT_ASSERT(iterator.Next(item));
1181+
const auto actual = TString(item.AsStringRef());
1182+
1183+
auto it = expected.find(actual);
1184+
UNIT_ASSERT(it != expected.end());
1185+
expected.erase(it);
1186+
}
11471187
UNIT_ASSERT(!iterator.Next(item));
11481188
UNIT_ASSERT(!iterator.Next(item));
11491189
}
11501190

1151-
Y_UNIT_TEST_LLVM(TestDoNotCalculateUnusedInput) {
1152-
TSetup<LLVM> setup;
1191+
Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedInput) {
1192+
// Test is broken. Remove this if after YQL-18808.
1193+
if (SPILLING) return;
1194+
1195+
// Currently LLVM version doesn't support spilling.
1196+
if (LLVM && SPILLING) return;
1197+
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
1198+
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
1199+
TSetup<LLVM, SPILLING> setup;
11531200
TProgramBuilder& pb = *setup.PgmBuilder;
11541201

11551202
const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
@@ -1183,7 +1230,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
11831230

11841231
const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");
11851232

1186-
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
1233+
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
11871234
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }),
11881235
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
11891236
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1207,18 +1254,34 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
12071254
));
12081255

12091256
const auto graph = setup.BuildGraph(pgmReturn);
1257+
if (SPILLING) {
1258+
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
1259+
}
1260+
std::unordered_set<TString> expected {
1261+
"key one / value 2 / value 1 / value 5 / value 4",
1262+
"key two / value 4 / value 3 / value 3 / value 2"
1263+
};
1264+
12101265
const auto iterator = graph->GetValue().GetListIterator();
12111266
NUdf::TUnboxedValue item;
1212-
UNIT_ASSERT(iterator.Next(item));
1213-
UNBOXED_VALUE_STR_EQUAL(item, "key one / value 2 / value 1 / value 5 / value 4");
1214-
UNIT_ASSERT(iterator.Next(item));
1215-
UNBOXED_VALUE_STR_EQUAL(item, "key two / value 4 / value 3 / value 3 / value 2");
1267+
while (!expected.empty()) {
1268+
UNIT_ASSERT(iterator.Next(item));
1269+
const auto actual = TString(item.AsStringRef());
1270+
1271+
auto it = expected.find(actual);
1272+
UNIT_ASSERT(it != expected.end());
1273+
expected.erase(it);
1274+
}
12161275
UNIT_ASSERT(!iterator.Next(item));
12171276
UNIT_ASSERT(!iterator.Next(item));
12181277
}
12191278

1220-
Y_UNIT_TEST_LLVM(TestDoNotCalculateUnusedOutput) {
1221-
TSetup<LLVM> setup;
1279+
Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedOutput) {
1280+
// Currently LLVM version doesn't support spilling.
1281+
if (LLVM && SPILLING) return;
1282+
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
1283+
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
1284+
TSetup<LLVM, SPILLING> setup;
12221285
TProgramBuilder& pb = *setup.PgmBuilder;
12231286

12241287
const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
@@ -1252,7 +1315,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
12521315

12531316
const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");
12541317

1255-
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
1318+
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
12561319
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }),
12571320
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
12581321
[&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1268,26 +1331,42 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
12681331
));
12691332

12701333
const auto graph = setup.BuildGraph(pgmReturn);
1334+
if (SPILLING) {
1335+
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
1336+
}
1337+
std::unordered_set<TString> expected {
1338+
"key one: value 1, value 4, value 5, value 1, value 2",
1339+
"key two: value 2, value 3, value 3, value 4"
1340+
};
1341+
12711342
const auto iterator = graph->GetValue().GetListIterator();
12721343
NUdf::TUnboxedValue item;
1273-
UNIT_ASSERT(iterator.Next(item));
1274-
UNBOXED_VALUE_STR_EQUAL(item, "key one: value 1, value 4, value 5, value 1, value 2");
1275-
UNIT_ASSERT(iterator.Next(item));
1276-
UNBOXED_VALUE_STR_EQUAL(item, "key two: value 2, value 3, value 3, value 4");
1344+
while (!expected.empty()) {
1345+
UNIT_ASSERT(iterator.Next(item));
1346+
const auto actual = TString(item.AsStringRef());
1347+
1348+
auto it = expected.find(actual);
1349+
UNIT_ASSERT(it != expected.end());
1350+
expected.erase(it);
1351+
}
12771352
UNIT_ASSERT(!iterator.Next(item));
12781353
UNIT_ASSERT(!iterator.Next(item));
12791354
}
12801355

1281-
Y_UNIT_TEST_LLVM(TestThinAllLambdas) {
1282-
TSetup<LLVM> setup;
1356+
Y_UNIT_TEST_LLVM_SPILLING(TestThinAllLambdas) {
1357+
// Currently LLVM version doesn't support spilling.
1358+
if (LLVM && SPILLING) return;
1359+
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
1360+
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
1361+
TSetup<LLVM, SPILLING> setup;
12831362
TProgramBuilder& pb = *setup.PgmBuilder;
12841363

12851364
const auto tupleType = pb.NewTupleType({});
12861365
const auto data = pb.NewTuple({});
12871366

12881367
const auto list = pb.NewList(tupleType, {data, data, data, data});
12891368

1290-
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
1369+
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
12911370
[](TRuntimeNode) -> TRuntimeNode::TList { return {}; }),
12921371
[](TRuntimeNode::TList items) { return items; },
12931372
[](TRuntimeNode::TList, TRuntimeNode::TList items) { return items; },

0 commit comments

Comments
 (0)