@@ -119,6 +119,13 @@ TRuntimeNode Combine(TProgramBuilder& pb, TRuntimeNode stream, std::function<TRu
119
119
pb.CombineCore (stream, keyExtractor, init, update, finishLambda, 64ul << 20 );
120
120
}
121
121
122
+ template <bool SPILLING>
123
+ TRuntimeNode WideLastCombiner (TProgramBuilder& pb, TRuntimeNode flow, const TProgramBuilder::TWideLambda& extractor, const TProgramBuilder::TBinaryWideLambda& init, const TProgramBuilder::TTernaryWideLambda& update, const TProgramBuilder::TBinaryWideLambda& finish) {
124
+ return SPILLING ?
125
+ pb.WideLastCombinerWithSpilling (flow, extractor, init, update, finish):
126
+ pb.WideLastCombiner (flow, extractor, init, update, finish);
127
+ }
128
+
122
129
} // unnamed
123
130
124
131
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
@@ -1042,12 +1049,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
1042
1049
1043
1050
const auto list = pb.NewList (tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
1044
1051
1045
- auto wideLastCombinerCollable = &TProgramBuilder::WideLastCombiner;
1046
- if (SPILLING) {
1047
- wideLastCombinerCollable = &TProgramBuilder::WideLastCombinerWithSpilling;
1048
- }
1049
-
1050
- const auto pgmReturn = pb.Collect (pb.NarrowMap ((pb.*wideLastCombinerCollable)(pb.ExpandMap (pb.ToFlow (list),
1052
+ const auto pgmReturn = pb.Collect (pb.NarrowMap (WideLastCombiner<SPILLING>(pb, pb.ExpandMap (pb.ToFlow (list),
1051
1053
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth (item, 0U ), pb.Nth (item, 1U )}; }),
1052
1054
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front ()}; },
1053
1055
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1138,11 +1140,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
1138
1140
1139
1141
const auto list = pb.NewList (tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
1140
1142
1141
- auto wideLastCombinerCollable = &TProgramBuilder::WideLastCombiner;
1142
- if (SPILLING) {
1143
- wideLastCombinerCollable = &TProgramBuilder::WideLastCombinerWithSpilling;
1144
- }
1145
- const auto pgmReturn = pb.Collect (pb.NarrowMap ((pb.*wideLastCombinerCollable)(pb.ExpandMap (pb.ToFlow (list),
1143
+ const auto pgmReturn = pb.Collect (pb.NarrowMap (WideLastCombiner<SPILLING>(pb, pb.ExpandMap (pb.ToFlow (list),
1146
1144
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth (item, 0U ), pb.Nth (item, 1U )}; }),
1147
1145
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front ()}; },
1148
1146
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1232,11 +1230,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
1232
1230
1233
1231
const auto landmine = pb.NewDataLiteral <NUdf::EDataSlot::String>(" ACHTUNG MINEN!" );
1234
1232
1235
- auto wideLastCombinerCollable = &TProgramBuilder::WideLastCombiner;
1236
- if (SPILLING) {
1237
- wideLastCombinerCollable = &TProgramBuilder::WideLastCombinerWithSpilling;
1238
- }
1239
- const auto pgmReturn = pb.Collect (pb.NarrowMap ((pb.*wideLastCombinerCollable)(pb.ExpandMap (pb.ToFlow (list),
1233
+ const auto pgmReturn = pb.Collect (pb.NarrowMap (WideLastCombiner<SPILLING>(pb, pb.ExpandMap (pb.ToFlow (list),
1240
1234
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth (item, 0U ), pb.Unwrap (pb.Nth (item, 1U ), landmine, __FILE__, __LINE__, 0 ), pb.Nth (item, 2U )}; }),
1241
1235
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front ()}; },
1242
1236
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1321,11 +1315,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
1321
1315
1322
1316
const auto landmine = pb.NewDataLiteral <NUdf::EDataSlot::String>(" ACHTUNG MINEN!" );
1323
1317
1324
- auto wideLastCombinerCollable = &TProgramBuilder::WideLastCombiner;
1325
- if (SPILLING) {
1326
- wideLastCombinerCollable = &TProgramBuilder::WideLastCombinerWithSpilling;
1327
- }
1328
- const auto pgmReturn = pb.Collect (pb.NarrowMap ((pb.*wideLastCombinerCollable)(pb.ExpandMap (pb.ToFlow (list),
1318
+ const auto pgmReturn = pb.Collect (pb.NarrowMap (WideLastCombiner<SPILLING>(pb, pb.ExpandMap (pb.ToFlow (list),
1329
1319
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth (item, 0U ), pb.Nth (item, 1U ), pb.Nth (item, 2U )}; }),
1330
1320
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front ()}; },
1331
1321
[&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1376,11 +1366,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
1376
1366
1377
1367
const auto list = pb.NewList (tupleType, {data, data, data, data});
1378
1368
1379
- auto wideLastCombinerCollable = &TProgramBuilder::WideLastCombiner;
1380
- if (SPILLING) {
1381
- wideLastCombinerCollable = &TProgramBuilder::WideLastCombinerWithSpilling;
1382
- }
1383
- const auto pgmReturn = pb.Collect (pb.NarrowMap ((pb.*wideLastCombinerCollable)(pb.ExpandMap (pb.ToFlow (list),
1369
+ const auto pgmReturn = pb.Collect (pb.NarrowMap (WideLastCombiner<SPILLING>(pb, pb.ExpandMap (pb.ToFlow (list),
1384
1370
[](TRuntimeNode) -> TRuntimeNode::TList { return {}; }),
1385
1371
[](TRuntimeNode::TList items) { return items; },
1386
1372
[](TRuntimeNode::TList, TRuntimeNode::TList items) { return items; },
0 commit comments