diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index ff20307d634d..cf7662ed7843 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -521,7 +521,6 @@ class TSpillingSupportState : public TComputationValue { if (finishedCount != SpilledBuckets.size()) return true; - YQL_LOG(INFO) << "switching to ProcessSpilled"; SwitchMode(EOperatingMode::ProcessSpilled); return ProcessSpilledDataAndWait(); @@ -551,11 +550,7 @@ class TSpillingSupportState : public TComputationValue { bool CheckMemoryAndSwitchToSpilling() { if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) { - const auto used = TlsAllocState->GetUsed(); - const auto limit = TlsAllocState->GetLimit(); - - YQL_LOG(INFO) << "yellow zone reached " << (used*100/limit) << "%=" << used << "/" << limit; - YQL_LOG(INFO) << "switching Memory mode to Spilling"; + LogMemoryUsage(); SwitchMode(EOperatingMode::Spilling); return true; @@ -564,6 +559,19 @@ class TSpillingSupportState : public TComputationValue { return false; } + void LogMemoryUsage() const { + const auto used = TlsAllocState->GetUsed(); + const auto limit = TlsAllocState->GetLimit(); + TStringBuilder logmsg; + logmsg << "Memory usage: "; + if (limit) { + logmsg << (used*100/limit) << "%="; + } + logmsg << (used/1_MB) << "MB/" << (limit/1_MB) << "MB"; + + YQL_LOG(INFO) << logmsg; + } + void SpillMoreStateFromBucket(TSpilledBucket& bucket) { MKQL_ENSURE(!bucket.AsyncWriteOperation.has_value(), "Internal logic error"); @@ -688,10 +696,12 @@ class TSpillingSupportState : public TComputationValue { void SwitchMode(EOperatingMode mode) { switch(mode) { case EOperatingMode::InMemory: { + YQL_LOG(INFO) << "switching Memory mode to InMemory"; MKQL_ENSURE(false, "Internal logic error"); break; } case EOperatingMode::Spilling: { + YQL_LOG(INFO) << "switching Memory mode to Spilling"; MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error"); SpilledBuckets.resize(SpilledBucketCount); auto spiller = Ctx.SpillerFactory->CreateSpiller(); @@ -707,6 +717,7 @@ class TSpillingSupportState : public TComputationValue { break; } case EOperatingMode::ProcessSpilled: { + YQL_LOG(INFO) << "switching Memory mode to ProcessSpilled"; MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error"); MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error"); BufferForKeyAndState.resize(0); @@ -722,9 +733,7 @@ class TSpillingSupportState : public TComputationValue { } bool IsSwitchToSpillingModeCondition() const { - return false; - // TODO: YQL-18033 - // return !HasMemoryForProcessing(); + return !HasMemoryForProcessing(); } public: @@ -1250,7 +1259,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode void N(NUnitTest::TTestContext&); \ @@ -79,7 +80,7 @@ struct TUdfModuleInfo { NUdf::TUniquePtr Module; }; -template +template struct TSetup { explicit TSetup(TComputationNodeFactory nodeFactory = GetTestFactory(), TVector&& modules = {}) : Alloc(__LOCATION__) @@ -96,6 +97,8 @@ struct TSetup { FunctionRegistry = mutableRegistry; } + Alloc.Ref().ForcefullySetMemoryYellowZone(EnableSpilling); + RandomProvider = CreateDeterministicRandomProvider(1); TimeProvider = CreateDeterministicTimeProvider(10000000); diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp index 1984d7bde28e..ca32c6a29117 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp @@ -5,9 +5,9 @@ #include #include +#include #include -#include #include namespace NKikimr { @@ -30,6 +30,7 @@ using TBaseComputation = TMutableComputationNode; {} private: NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override { + constexpr auto size = Y_ARRAY_SIZE(g_TestYieldStreamData); if (Index == size) { return NUdf::EFetchStatus::Finish; @@ -47,6 +48,7 @@ using TBaseComputation = TMutableComputationNode; items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val))); ++Index; + return NUdf::EFetchStatus::Ok; } @@ -117,6 +119,13 @@ TRuntimeNode Combine(TProgramBuilder& pb, TRuntimeNode stream, std::function +TRuntimeNode WideLastCombiner(TProgramBuilder& pb, TRuntimeNode flow, const TProgramBuilder::TWideLambda& extractor, const TProgramBuilder::TBinaryWideLambda& init, const TProgramBuilder::TTernaryWideLambda& update, const TProgramBuilder::TBinaryWideLambda& finish) { + return SPILLING ? + pb.WideLastCombinerWithSpilling(flow, extractor, init, update, finish): + pb.WideLastCombiner(flow, extractor, init, update, finish); +} + } // unnamed #if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u @@ -996,8 +1005,13 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerPerfTest) { #endif #if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 29u Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { - Y_UNIT_TEST_LLVM(TestLongStringsRefCounting) { - TSetup setup; + Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsRefCounting) { + // Currently LLVM version doesn't support spilling. + if (LLVM && SPILLING) return; + // callable WideLastCombinerWithSpilling was introduced in 49 version of runtime + if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return; + + TSetup setup; TProgramBuilder& pb = *setup.PgmBuilder; const auto dataType = pb.NewDataType(NUdf::TDataType::Id); @@ -1035,7 +1049,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list), + const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner(pb, pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; }, [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList { @@ -1059,22 +1073,37 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { )); const auto graph = setup.BuildGraph(pgmReturn); + if (SPILLING) { + graph->GetContext().SpillerFactory = std::make_shared(); + } const auto iterator = graph->GetValue().GetListIterator(); + + std::unordered_set expected { + "key one", + "very long value 2 / key two", + "very long key one", + "very long value 8 / very long value 7 / very long value 6" + }; + NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "key one"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 2 / key two"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long key one"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 8 / very long value 7 / very long value 6"); + while (!expected.empty()) { + UNIT_ASSERT(iterator.Next(item)); + const auto actual = TString(item.AsStringRef()); + + auto it = expected.find(actual); + UNIT_ASSERT(it != expected.end()); + expected.erase(it); + } UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } - Y_UNIT_TEST_LLVM(TestLongStringsPasstroughtRefCounting) { - TSetup setup; + Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsPasstroughtRefCounting) { + // Currently LLVM version doesn't support spilling. + if (LLVM && SPILLING) return; + // callable WideLastCombinerWithSpilling was introduced in 49 version of runtime + if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return; + TSetup setup; TProgramBuilder& pb = *setup.PgmBuilder; const auto dataType = pb.NewDataType(NUdf::TDataType::Id); @@ -1111,7 +1140,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list), + const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner(pb, pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; }, [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList { @@ -1134,22 +1163,40 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { )); const auto graph = setup.BuildGraph(pgmReturn); + if (SPILLING) { + graph->GetContext().SpillerFactory = std::make_shared(); + } const auto iterator = graph->GetValue().GetListIterator(); + + std::unordered_set expected { + "very long value 1 / key one / very long value 1 / key one", + "very long value 3 / key two / very long value 2 / key two", + "very long value 4 / very long key one / very long value 4 / very long key one", + "very long value 9 / very long key two / very long value 5 / very long key two" + }; + NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 1 / key one / very long value 1 / key one"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 3 / key two / very long value 2 / key two"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 4 / very long key one / very long value 4 / very long key one"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 9 / very long key two / very long value 5 / very long key two"); + while (!expected.empty()) { + UNIT_ASSERT(iterator.Next(item)); + const auto actual = TString(item.AsStringRef()); + + auto it = expected.find(actual); + UNIT_ASSERT(it != expected.end()); + expected.erase(it); + } UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } - Y_UNIT_TEST_LLVM(TestDoNotCalculateUnusedInput) { - TSetup setup; + Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedInput) { + // Test is broken. Remove this if after YQL-18808. + if (SPILLING) return; + + // Currently LLVM version doesn't support spilling. + if (LLVM && SPILLING) return; + // callable WideLastCombinerWithSpilling was introduced in 49 version of runtime + if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return; + TSetup setup; TProgramBuilder& pb = *setup.PgmBuilder; const auto dataType = pb.NewDataType(NUdf::TDataType::Id); @@ -1183,7 +1230,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto landmine = pb.NewDataLiteral("ACHTUNG MINEN!"); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list), + const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner(pb, pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }), [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; }, [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList { @@ -1207,18 +1254,34 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { )); const auto graph = setup.BuildGraph(pgmReturn); + if (SPILLING) { + graph->GetContext().SpillerFactory = std::make_shared(); + } + std::unordered_set expected { + "key one / value 2 / value 1 / value 5 / value 4", + "key two / value 4 / value 3 / value 3 / value 2" + }; + const auto iterator = graph->GetValue().GetListIterator(); NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "key one / value 2 / value 1 / value 5 / value 4"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "key two / value 4 / value 3 / value 3 / value 2"); + while (!expected.empty()) { + UNIT_ASSERT(iterator.Next(item)); + const auto actual = TString(item.AsStringRef()); + + auto it = expected.find(actual); + UNIT_ASSERT(it != expected.end()); + expected.erase(it); + } UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } - Y_UNIT_TEST_LLVM(TestDoNotCalculateUnusedOutput) { - TSetup setup; + Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedOutput) { + // Currently LLVM version doesn't support spilling. + if (LLVM && SPILLING) return; + // callable WideLastCombinerWithSpilling was introduced in 49 version of runtime + if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return; + TSetup setup; TProgramBuilder& pb = *setup.PgmBuilder; const auto dataType = pb.NewDataType(NUdf::TDataType::Id); @@ -1252,7 +1315,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto landmine = pb.NewDataLiteral("ACHTUNG MINEN!"); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list), + const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner(pb, pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }), [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; }, [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { @@ -1268,18 +1331,34 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { )); const auto graph = setup.BuildGraph(pgmReturn); + if (SPILLING) { + graph->GetContext().SpillerFactory = std::make_shared(); + } + std::unordered_set expected { + "key one: value 1, value 4, value 5, value 1, value 2", + "key two: value 2, value 3, value 3, value 4" + }; + const auto iterator = graph->GetValue().GetListIterator(); NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "key one: value 1, value 4, value 5, value 1, value 2"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "key two: value 2, value 3, value 3, value 4"); + while (!expected.empty()) { + UNIT_ASSERT(iterator.Next(item)); + const auto actual = TString(item.AsStringRef()); + + auto it = expected.find(actual); + UNIT_ASSERT(it != expected.end()); + expected.erase(it); + } UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } - Y_UNIT_TEST_LLVM(TestThinAllLambdas) { - TSetup setup; + Y_UNIT_TEST_LLVM_SPILLING(TestThinAllLambdas) { + // Currently LLVM version doesn't support spilling. + if (LLVM && SPILLING) return; + // callable WideLastCombinerWithSpilling was introduced in 49 version of runtime + if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return; + TSetup setup; TProgramBuilder& pb = *setup.PgmBuilder; const auto tupleType = pb.NewTupleType({}); @@ -1287,7 +1366,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto list = pb.NewList(tupleType, {data, data, data, data}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list), + const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner(pb, pb.ExpandMap(pb.ToFlow(list), [](TRuntimeNode) -> TRuntimeNode::TList { return {}; }), [](TRuntimeNode::TList items) { return items; }, [](TRuntimeNode::TList, TRuntimeNode::TList items) { return items; }, diff --git a/ydb/library/yql/minikql/computation/mock_spiller_factory_ut.h b/ydb/library/yql/minikql/computation/mock_spiller_factory_ut.h new file mode 100644 index 000000000000..f4ac240d26a8 --- /dev/null +++ b/ydb/library/yql/minikql/computation/mock_spiller_factory_ut.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +namespace NKikimr::NMiniKQL { + +using namespace NActors; + +class TMockSpillerFactory : public ISpillerFactory +{ +public: + ISpiller::TPtr CreateSpiller() override { + return CreateMockSpiller(); + } +}; + +} // namespace NKikimr::NMiniKQL \ No newline at end of file