Skip to content

Tests for wide combiner with spilling #6880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

28 changes: 18 additions & 10 deletions ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {

if (finishedCount != SpilledBuckets.size()) return true;

YQL_LOG(INFO) << "switching to ProcessSpilled";
SwitchMode(EOperatingMode::ProcessSpilled);

return ProcessSpilledDataAndWait();
Expand Down Expand Up @@ -551,11 +550,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {

bool CheckMemoryAndSwitchToSpilling() {
if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
const auto used = TlsAllocState->GetUsed();
const auto limit = TlsAllocState->GetLimit();

YQL_LOG(INFO) << "yellow zone reached " << (used*100/limit) << "%=" << used << "/" << limit;
YQL_LOG(INFO) << "switching Memory mode to Spilling";
LogMemoryUsage();

SwitchMode(EOperatingMode::Spilling);
return true;
Expand All @@ -564,6 +559,19 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
return false;
}

void LogMemoryUsage() const {
const auto used = TlsAllocState->GetUsed();
const auto limit = TlsAllocState->GetLimit();
TStringBuilder logmsg;
logmsg << "Memory usage: ";
if (limit) {
logmsg << (used*100/limit) << "%=";
}
logmsg << (used/1_MB) << "MB/" << (limit/1_MB) << "MB";

YQL_LOG(INFO) << logmsg;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we want to print it on Debug level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed that this log would be written once per task, so we decided to leave the INFO level.

}

void SpillMoreStateFromBucket(TSpilledBucket& bucket) {
MKQL_ENSURE(!bucket.AsyncWriteOperation.has_value(), "Internal logic error");

Expand Down Expand Up @@ -688,10 +696,12 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
void SwitchMode(EOperatingMode mode) {
switch(mode) {
case EOperatingMode::InMemory: {
YQL_LOG(INFO) << "switching Memory mode to InMemory";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here also in Debug?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed that this log would be written once per task, so we decided to leave the INFO level.

MKQL_ENSURE(false, "Internal logic error");
break;
}
case EOperatingMode::Spilling: {
YQL_LOG(INFO) << "switching Memory mode to Spilling";
MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
SpilledBuckets.resize(SpilledBucketCount);
auto spiller = Ctx.SpillerFactory->CreateSpiller();
Expand All @@ -707,6 +717,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
break;
}
case EOperatingMode::ProcessSpilled: {
YQL_LOG(INFO) << "switching Memory mode to ProcessSpilled";
MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error");
MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error");
BufferForKeyAndState.resize(0);
Expand All @@ -722,9 +733,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}

bool IsSwitchToSpillingModeCondition() const {
return false;
// TODO: YQL-18033
// return !HasMemoryForProcessing();
return !HasMemoryForProcessing();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that the hardcoded value is removed here

}

public:
Expand Down Expand Up @@ -1250,7 +1259,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
, AllowSpilling(allowSpilling)
{}

// MARK: DoCAlculate
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (!state.HasValue()) {
MakeState(ctx, state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
void N(NUnitTest::TTestContext&)

#define Y_UNIT_TEST_LLVM(N) Y_UNIT_TEST_TWIN(N, LLVM)
#define Y_UNIT_TEST_LLVM_SPILLING(N) Y_UNIT_TEST_QUAD(N, LLVM, SPILLING)

#define Y_UNIT_TEST_QUAD(N, OPT1, OPT2) \
template<bool OPT1, bool OPT2> void N(NUnitTest::TTestContext&); \
Expand Down Expand Up @@ -79,7 +80,7 @@ struct TUdfModuleInfo {
NUdf::TUniquePtr<NUdf::IUdfModule> Module;
};

template<bool UseLLVM>
template<bool UseLLVM, bool EnableSpilling = false>
struct TSetup {
explicit TSetup(TComputationNodeFactory nodeFactory = GetTestFactory(), TVector<TUdfModuleInfo>&& modules = {})
: Alloc(__LOCATION__)
Expand All @@ -96,6 +97,8 @@ struct TSetup {
FunctionRegistry = mutableRegistry;
}

Alloc.Ref().ForcefullySetMemoryYellowZone(EnableSpilling);

RandomProvider = CreateDeterministicRandomProvider(1);
TimeProvider = CreateDeterministicTimeProvider(10000000);

Expand Down
159 changes: 119 additions & 40 deletions ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
#include <ydb/library/yql/minikql/mkql_string_util.h>

#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/computation/mock_spiller_factory_ut.h>

#include <cstring>
#include <random>
#include <algorithm>

namespace NKikimr {
Expand All @@ -30,6 +30,7 @@ using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
{}
private:
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {

constexpr auto size = Y_ARRAY_SIZE(g_TestYieldStreamData);
if (Index == size) {
return NUdf::EFetchStatus::Finish;
Expand All @@ -47,6 +48,7 @@ using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val)));

++Index;

return NUdf::EFetchStatus::Ok;
}

Expand Down Expand Up @@ -117,6 +119,13 @@ TRuntimeNode Combine(TProgramBuilder& pb, TRuntimeNode stream, std::function<TRu
pb.CombineCore(stream, keyExtractor, init, update, finishLambda, 64ul << 20);
}

template<bool SPILLING>
TRuntimeNode WideLastCombiner(TProgramBuilder& pb, TRuntimeNode flow, const TProgramBuilder::TWideLambda& extractor, const TProgramBuilder::TBinaryWideLambda& init, const TProgramBuilder::TTernaryWideLambda& update, const TProgramBuilder::TBinaryWideLambda& finish) {
return SPILLING ?
pb.WideLastCombinerWithSpilling(flow, extractor, init, update, finish):
pb.WideLastCombiner(flow, extractor, init, update, finish);
}

} // unnamed

#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
Expand Down Expand Up @@ -996,8 +1005,13 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerPerfTest) {
#endif
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 29u
Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
Y_UNIT_TEST_LLVM(TestLongStringsRefCounting) {
TSetup<LLVM> setup;
Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsRefCounting) {
// Currently LLVM version doesn't support spilling.
if (LLVM && SPILLING) return;
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;

TSetup<LLVM, SPILLING> setup;
TProgramBuilder& pb = *setup.PgmBuilder;

const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
Expand Down Expand Up @@ -1035,7 +1049,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {

const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});

const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
Expand All @@ -1059,22 +1073,37 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
));

const auto graph = setup.BuildGraph(pgmReturn);
if (SPILLING) {
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
}
const auto iterator = graph->GetValue().GetListIterator();

std::unordered_set<TString> expected {
"key one",
"very long value 2 / key two",
"very long key one",
"very long value 8 / very long value 7 / very long value 6"
};

NUdf::TUnboxedValue item;
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "key one");
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "very long value 2 / key two");
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "very long key one");
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "very long value 8 / very long value 7 / very long value 6");
while (!expected.empty()) {
UNIT_ASSERT(iterator.Next(item));
const auto actual = TString(item.AsStringRef());

auto it = expected.find(actual);
UNIT_ASSERT(it != expected.end());
expected.erase(it);
}
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}

Y_UNIT_TEST_LLVM(TestLongStringsPasstroughtRefCounting) {
TSetup<LLVM> setup;
Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsPasstroughtRefCounting) {
// Currently LLVM version doesn't support spilling.
if (LLVM && SPILLING) return;
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
TSetup<LLVM, SPILLING> setup;
TProgramBuilder& pb = *setup.PgmBuilder;

const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
Expand Down Expand Up @@ -1111,7 +1140,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {

const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});

const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
Expand All @@ -1134,22 +1163,40 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
));

const auto graph = setup.BuildGraph(pgmReturn);
if (SPILLING) {
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
}
const auto iterator = graph->GetValue().GetListIterator();

std::unordered_set<TString> expected {
"very long value 1 / key one / very long value 1 / key one",
"very long value 3 / key two / very long value 2 / key two",
"very long value 4 / very long key one / very long value 4 / very long key one",
"very long value 9 / very long key two / very long value 5 / very long key two"
};

NUdf::TUnboxedValue item;
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "very long value 1 / key one / very long value 1 / key one");
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "very long value 3 / key two / very long value 2 / key two");
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "very long value 4 / very long key one / very long value 4 / very long key one");
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "very long value 9 / very long key two / very long value 5 / very long key two");
while (!expected.empty()) {
UNIT_ASSERT(iterator.Next(item));
const auto actual = TString(item.AsStringRef());

auto it = expected.find(actual);
UNIT_ASSERT(it != expected.end());
expected.erase(it);
}
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}

Y_UNIT_TEST_LLVM(TestDoNotCalculateUnusedInput) {
TSetup<LLVM> setup;
Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedInput) {
// Test is broken. Remove this if after YQL-18808.
if (SPILLING) return;

// Currently LLVM version doesn't support spilling.
if (LLVM && SPILLING) return;
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
TSetup<LLVM, SPILLING> setup;
TProgramBuilder& pb = *setup.PgmBuilder;

const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
Expand Down Expand Up @@ -1183,7 +1230,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {

const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");

const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }),
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
Expand All @@ -1207,18 +1254,34 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
));

const auto graph = setup.BuildGraph(pgmReturn);
if (SPILLING) {
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
}
std::unordered_set<TString> expected {
"key one / value 2 / value 1 / value 5 / value 4",
"key two / value 4 / value 3 / value 3 / value 2"
};

const auto iterator = graph->GetValue().GetListIterator();
NUdf::TUnboxedValue item;
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "key one / value 2 / value 1 / value 5 / value 4");
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "key two / value 4 / value 3 / value 3 / value 2");
while (!expected.empty()) {
UNIT_ASSERT(iterator.Next(item));
const auto actual = TString(item.AsStringRef());

auto it = expected.find(actual);
UNIT_ASSERT(it != expected.end());
expected.erase(it);
}
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}

Y_UNIT_TEST_LLVM(TestDoNotCalculateUnusedOutput) {
TSetup<LLVM> setup;
Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedOutput) {
// Currently LLVM version doesn't support spilling.
if (LLVM && SPILLING) return;
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
TSetup<LLVM, SPILLING> setup;
TProgramBuilder& pb = *setup.PgmBuilder;

const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
Expand Down Expand Up @@ -1252,7 +1315,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {

const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");

const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }),
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
[&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList {
Expand All @@ -1268,26 +1331,42 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
));

const auto graph = setup.BuildGraph(pgmReturn);
if (SPILLING) {
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
}
std::unordered_set<TString> expected {
"key one: value 1, value 4, value 5, value 1, value 2",
"key two: value 2, value 3, value 3, value 4"
};

const auto iterator = graph->GetValue().GetListIterator();
NUdf::TUnboxedValue item;
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "key one: value 1, value 4, value 5, value 1, value 2");
UNIT_ASSERT(iterator.Next(item));
UNBOXED_VALUE_STR_EQUAL(item, "key two: value 2, value 3, value 3, value 4");
while (!expected.empty()) {
UNIT_ASSERT(iterator.Next(item));
const auto actual = TString(item.AsStringRef());

auto it = expected.find(actual);
UNIT_ASSERT(it != expected.end());
expected.erase(it);
}
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}

Y_UNIT_TEST_LLVM(TestThinAllLambdas) {
TSetup<LLVM> setup;
Y_UNIT_TEST_LLVM_SPILLING(TestThinAllLambdas) {
// Currently LLVM version doesn't support spilling.
if (LLVM && SPILLING) return;
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
TSetup<LLVM, SPILLING> setup;
TProgramBuilder& pb = *setup.PgmBuilder;

const auto tupleType = pb.NewTupleType({});
const auto data = pb.NewTuple({});

const auto list = pb.NewList(tupleType, {data, data, data, data});

const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
[](TRuntimeNode) -> TRuntimeNode::TList { return {}; }),
[](TRuntimeNode::TList items) { return items; },
[](TRuntimeNode::TList, TRuntimeNode::TList items) { return items; },
Expand Down
Loading
Loading