Skip to content

Commit 833f929

Browse files
WideCombiner with spilling better buffer pass (#7022)
1 parent d3fa399 commit 833f929

File tree

1 file changed

+91
-88
lines changed

1 file changed

+91
-88
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp

+91-88
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,15 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
364364
enum class ETasteResult: i8 {
365365
Init = -1,
366366
Update,
367-
ConsumeRawData,
368-
ExtractRawData
367+
ConsumeRawData
368+
};
369+
370+
enum class EUpdateResult: i8 {
371+
Yield = -1,
372+
ExtractRawData,
373+
ReadInput,
374+
Extract,
375+
Finish
369376
};
370377
TSpillingSupportState(
371378
TMemoryUsageInfo* memInfo,
@@ -398,28 +405,29 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
398405
bool IsProcessingRequired() const {
399406
if (InputStatus != EFetchResult::Finish) return true;
400407

401-
return HasRawDataToExtract || HasDataForProcessing;
408+
return !SpilledBuckets.empty() && SpilledBuckets.front().BucketState != TSpilledBucket::EBucketState::InMemory;
402409
}
403410

404-
bool UpdateAndWait() {
411+
EUpdateResult Update() {
412+
if (IsEverythingExtracted) return EUpdateResult::Finish;
413+
405414
switch (GetMode()) {
406415
case EOperatingMode::InMemory: {
407416
if (CheckMemoryAndSwitchToSpilling()) {
408-
return UpdateAndWait();
417+
return Update();
409418
}
410-
return false;
419+
if (InputStatus == EFetchResult::Finish) return EUpdateResult::Extract;
420+
421+
return EUpdateResult::ReadInput;
411422
}
412-
413-
case EOperatingMode::ProcessSpilled:
414-
return ProcessSpilledDataAndWait();
415423
case EOperatingMode::Spilling: {
416424
UpdateSpillingBuckets();
417425

418-
if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return true;
426+
if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return EUpdateResult::Yield;
419427

420428
if (BufferForUsedInputItems.size()) {
421429
auto& bucket = SpilledBuckets[BufferForUsedInputItemsBucketId];
422-
if (bucket.AsyncWriteOperation.has_value()) return true;
430+
if (bucket.AsyncWriteOperation.has_value()) return EUpdateResult::Yield;
423431

424432
bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems);
425433
BufferForUsedInputItems.resize(0); //for freeing allocated key value asap
@@ -429,8 +437,10 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
429437

430438
// Prepare buffer for reading new key
431439
BufferForKeyAndState.resize(KeyWidth);
432-
return false;
440+
return EUpdateResult::ReadInput;
433441
}
442+
case EOperatingMode::ProcessSpilled:
443+
return ProcessSpilledData();
434444
}
435445
}
436446

@@ -442,14 +452,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
442452
return isNew ? ETasteResult::Init : ETasteResult::Update;
443453
}
444454
if (GetMode() == EOperatingMode::ProcessSpilled) {
445-
if (HasRawDataToExtract) {
446-
// Tongue not used here.
447-
Throat = BufferForUsedInputItems.data();
448-
HasRawDataToExtract = false;
449-
HasDataForProcessing = true;
450-
return ETasteResult::ExtractRawData;
451-
}
452-
HasDataForProcessing = false;
453455
// while restoration we process buckets one by one starting from the first in a queue
454456
bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt();
455457
Throat = SpilledBuckets.front().InMemoryProcessingState->Throat;
@@ -476,20 +478,27 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
476478
MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error");
477479
BufferForUsedInputItems.resize(ItemNodesSize);
478480
BufferForUsedInputItemsBucketId = bucketId;
481+
479482
Throat = BufferForUsedInputItems.data();
480-
483+
481484
return ETasteResult::ConsumeRawData;
482485
}
483486

484487
NUdf::TUnboxedValuePod* Extract() {
485-
if (GetMode() == EOperatingMode::InMemory) return static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract());
488+
NUdf::TUnboxedValue* value = nullptr;
489+
if (GetMode() == EOperatingMode::InMemory) {
490+
value = static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract());
491+
if (!value) IsEverythingExtracted = true;
492+
return value;
493+
}
486494

487495
MKQL_ENSURE(SpilledBuckets.front().BucketState == TSpilledBucket::EBucketState::InMemory, "Internal logic error");
488496
MKQL_ENSURE(SpilledBuckets.size() > 0, "Internal logic error");
489497

490-
auto value = static_cast<NUdf::TUnboxedValue*>(SpilledBuckets.front().InMemoryProcessingState->Extract());
498+
value = static_cast<NUdf::TUnboxedValue*>(SpilledBuckets.front().InMemoryProcessingState->Extract());
491499
if (!value) {
492500
SpilledBuckets.pop_front();
501+
if (SpilledBuckets.empty()) IsEverythingExtracted = true;
493502
}
494503

495504
return value;
@@ -503,7 +512,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
503512
BufferForKeyAndState.resize(0);
504513
}
505514

506-
bool FlushSpillingBuffersAndWait() {
515+
EUpdateResult FlushSpillingBuffersAndWait() {
507516
UpdateSpillingBuckets();
508517

509518
ui64 finishedCount = 0;
@@ -519,11 +528,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
519528
}
520529
}
521530

522-
if (finishedCount != SpilledBuckets.size()) return true;
531+
if (finishedCount != SpilledBuckets.size()) return EUpdateResult::Yield;
523532

524533
SwitchMode(EOperatingMode::ProcessSpilled);
525534

526-
return ProcessSpilledDataAndWait();
535+
return ProcessSpilledData();
527536
}
528537

529538
void SplitStateIntoBuckets() {
@@ -628,11 +637,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
628637
return false;
629638
}
630639

631-
bool ProcessSpilledDataAndWait() {
632-
if (SpilledBuckets.empty()) return false;
633-
640+
EUpdateResult ProcessSpilledData() {
634641
if (AsyncReadOperation) {
635-
if (!AsyncReadOperation->HasValue()) return true;
642+
if (!AsyncReadOperation->HasValue()) return EUpdateResult::Yield;
636643
if (RecoverState) {
637644
SpilledBuckets[0].SpilledState->AsyncReadCompleted(AsyncReadOperation->ExtractValue().value(), Ctx.HolderFactory);
638645
} else {
@@ -642,20 +649,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
642649
}
643650

644651
auto& bucket = SpilledBuckets.front();
645-
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false;
646-
if (HasDataForProcessing) {
647-
Tongue = bucket.InMemoryProcessingState->Tongue;
648-
Throat = bucket.InMemoryProcessingState->Throat;
649-
return false;
650-
}
652+
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return EUpdateResult::Extract;
653+
651654
//recover spilled state
652655
while(!bucket.SpilledState->Empty()) {
653656
RecoverState = true;
654657
BufferForKeyAndState.resize(KeyAndStateType->GetElementsCount());
655658
AsyncReadOperation = bucket.SpilledState->ExtractWideItem(BufferForKeyAndState);
656659
if (AsyncReadOperation) {
657660
BufferForKeyAndState.resize(0);
658-
return true;
661+
return EUpdateResult::Yield;
659662
}
660663
for (size_t i = 0; i< KeyWidth; ++i) {
661664
//jumping into unsafe world, refusing ownership
@@ -675,18 +678,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
675678
BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount());
676679
AsyncReadOperation = bucket.SpilledData->ExtractWideItem(BufferForUsedInputItems);
677680
if (AsyncReadOperation) {
678-
return true;
681+
return EUpdateResult::Yield;
679682
}
680683

684+
Throat = BufferForUsedInputItems.data();
681685
Tongue = bucket.InMemoryProcessingState->Tongue;
682-
Throat = bucket.InMemoryProcessingState->Throat;
683686

684-
HasRawDataToExtract = true;
685-
return false;
687+
return EUpdateResult::ExtractRawData;
686688
}
687689
bucket.BucketState = TSpilledBucket::EBucketState::InMemory;
688-
HasDataForProcessing = false;
689-
return false;
690+
return EUpdateResult::Extract;
690691
}
691692

692693
EOperatingMode GetMode() const {
@@ -744,9 +745,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
744745
private:
745746
ui64 NextBucketToSpill = 0;
746747

747-
bool HasDataForProcessing = false;
748-
749-
bool HasRawDataToExtract = false;
748+
bool IsEverythingExtracted = false;
750749

751750
TState InMemoryProcessingState;
752751
const TMultiType* const UsedInputItemType;
@@ -1268,50 +1267,49 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12681267
auto **fields = ctx.WideFields.data() + WideFieldsIndex;
12691268

12701269
while (true) {
1271-
if (ptr->UpdateAndWait()) {
1272-
return EFetchResult::Yield;
1273-
}
1274-
if (ptr->InputStatus != EFetchResult::Finish) {
1275-
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
1276-
fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
1277-
switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) {
1278-
case EFetchResult::One:
1279-
break;
1280-
case EFetchResult::Finish:
1281-
continue;
1282-
case EFetchResult::Yield:
1283-
return EFetchResult::Yield;
1270+
switch(ptr->Update()) {
1271+
case TSpillingSupportState::EUpdateResult::ReadInput: {
1272+
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
1273+
fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
1274+
switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) {
1275+
case EFetchResult::One:
1276+
break;
1277+
case EFetchResult::Finish:
1278+
continue;
1279+
case EFetchResult::Yield:
1280+
return EFetchResult::Yield;
1281+
}
1282+
break;
12841283
}
1284+
case TSpillingSupportState::EUpdateResult::Yield:
1285+
return EFetchResult::Yield;
1286+
case TSpillingSupportState::EUpdateResult::ExtractRawData:
1287+
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
1288+
break;
1289+
case TSpillingSupportState::EUpdateResult::Extract:
1290+
if (const auto values = static_cast<NUdf::TUnboxedValue*>(ptr->Extract())) {
1291+
Nodes.FinishItem(ctx, values, output);
1292+
return EFetchResult::One;
1293+
}
1294+
continue;
1295+
case TSpillingSupportState::EUpdateResult::Finish:
1296+
return EFetchResult::Finish;
12851297
}
12861298

1287-
if (ptr->IsProcessingRequired()) {
1288-
Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
1299+
Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
12891300

1290-
switch(ptr->TasteIt()) {
1291-
case TSpillingSupportState::ETasteResult::Init:
1292-
Nodes.ProcessItem(ctx, nullptr, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
1293-
break;
1294-
case TSpillingSupportState::ETasteResult::Update:
1295-
Nodes.ProcessItem(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
1296-
break;
1297-
case TSpillingSupportState::ETasteResult::ConsumeRawData:
1298-
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
1299-
break;
1300-
case TSpillingSupportState::ETasteResult::ExtractRawData:
1301-
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
1302-
break;
1303-
}
1304-
continue;
1305-
}
1306-
1307-
if (const auto values = static_cast<NUdf::TUnboxedValue*>(ptr->Extract())) {
1308-
Nodes.FinishItem(ctx, values, output);
1309-
return EFetchResult::One;
1301+
switch(ptr->TasteIt()) {
1302+
case TSpillingSupportState::ETasteResult::Init:
1303+
Nodes.ProcessItem(ctx, nullptr, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
1304+
break;
1305+
case TSpillingSupportState::ETasteResult::Update:
1306+
Nodes.ProcessItem(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
1307+
break;
1308+
case TSpillingSupportState::ETasteResult::ConsumeRawData:
1309+
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
1310+
break;
13101311
}
13111312

1312-
if (!ptr->HasAnyData()) {
1313-
return EFetchResult::Finish;
1314-
}
13151313
}
13161314
}
13171315
Y_UNREACHABLE();
@@ -1366,13 +1364,18 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
13661364

13671365
block = more;
13681366

1369-
const auto waitMoreFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSpillingSupportState::UpdateAndWait));
1370-
const auto waitMoreFuncPtr = CastInst::Create(Instruction::IntToPtr, waitMoreFunc, PointerType::getUnqual(boolFuncType), "wait_more_func", block);
1371-
const auto waitMore = CallInst::Create(boolFuncType, waitMoreFuncPtr, { stateArg }, "wait_more", block);
1367+
const auto updateFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSpillingSupportState::Update));
1368+
const auto updateType = FunctionType::get(wayType, {stateArg->getType()}, false);
1369+
const auto updateFuncPtr = CastInst::Create(Instruction::IntToPtr, updateFunc, PointerType::getUnqual(updateType), "update_func", block);
1370+
const auto update = CallInst::Create(updateType, updateFuncPtr, { stateArg }, "update", block);
13721371

13731372
result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), block);
13741373

1375-
BranchInst::Create(over, test, waitMore, block);
1374+
const auto updateWay = SwitchInst::Create(update, test, 3U, block);
1375+
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::Yield)), over);
1376+
// TODO add exctraction code and jmp there
1377+
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::ExtractRawData)), test);
1378+
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::Extract)), test);
13761379

13771380
block = test;
13781381

0 commit comments

Comments
 (0)