@@ -360,6 +360,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
360
360
361
361
enum class EOperatingMode {
362
362
InMemory,
363
+ SplittingState,
363
364
Spilling,
364
365
ProcessSpilled
365
366
};
@@ -378,6 +379,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
378
379
Extract,
379
380
Finish
380
381
};
382
+
381
383
TSpillingSupportState (
382
384
TMemoryUsageInfo* memInfo,
383
385
const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth, size_t itemNodesSize,
@@ -403,7 +405,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
403
405
}
404
406
405
407
EUpdateResult Update () {
406
- if (IsEverythingExtracted) return EUpdateResult::Finish;
408
+ if (IsEverythingExtracted) {
409
+ return EUpdateResult::Finish;
410
+ }
407
411
408
412
switch (GetMode ()) {
409
413
case EOperatingMode::InMemory: {
@@ -415,11 +419,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
415
419
416
420
return EUpdateResult::ReadInput;
417
421
}
422
+ case EOperatingMode::SplittingState: {
423
+ if (SplitStateIntoBucketsAndWait ()) return EUpdateResult::Yield;
424
+ return Update ();
425
+ }
418
426
case EOperatingMode::Spilling: {
419
427
UpdateSpillingBuckets ();
420
428
429
+
421
430
if (!HasMemoryForProcessing () && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait ()) {
422
- return EUpdateResult::Yield;
431
+ return EUpdateResult::Yield;
423
432
}
424
433
425
434
if (BufferForUsedInputItems.size ()) {
@@ -522,13 +531,65 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
522
531
return ProcessSpilledData ();
523
532
}
524
533
525
- void SplitStateIntoBuckets () {
526
- while (const auto keyAndState = static_cast <NUdf::TUnboxedValue *>(InMemoryProcessingState.Extract ())) {
534
+ ui32 GetLargestInMemoryBucketNumber () const {
535
+ ui64 maxSize = 0 ;
536
+ ui32 largestInMemoryBucketNum = (ui32)-1 ;
537
+ for (ui64 i = 0 ; i < SpilledBucketCount; ++i) {
538
+ if (SpilledBuckets[i].BucketState == TSpilledBucket::EBucketState::InMemory) {
539
+ if (SpilledBuckets[i].LineCount >= maxSize) {
540
+ largestInMemoryBucketNum = i;
541
+ maxSize = SpilledBuckets[i].LineCount ;
542
+ }
543
+ }
544
+ }
545
+ return largestInMemoryBucketNum;
546
+ }
547
+
548
+ bool IsSpillingWhileStateSplitAllowed () const {
549
+ // TODO: Write better condition here. For example: InMemorybuckets > 64
550
+ return true ;
551
+ }
552
+
553
+ bool SplitStateIntoBucketsAndWait () {
554
+ if (SplitStateSpillingBucket != -1 ) {
555
+ auto & bucket = SpilledBuckets[SplitStateSpillingBucket];
556
+ MKQL_ENSURE (bucket.AsyncWriteOperation .has_value (), " Internal logic error" );
557
+ if (!bucket.AsyncWriteOperation ->HasValue ()) return true ;
558
+ bucket.SpilledState ->AsyncWriteCompleted (bucket.AsyncWriteOperation ->ExtractValue ());
559
+ bucket.AsyncWriteOperation = std::nullopt;
560
+
561
+ while (const auto keyAndState = static_cast <NUdf::TUnboxedValue*>(bucket.InMemoryProcessingState ->Extract ())) {
562
+ bucket.AsyncWriteOperation = bucket.SpilledState ->WriteWideItem ({keyAndState, KeyAndStateType->GetElementsCount ()});
563
+ for (size_t i = 0 ; i < KeyAndStateType->GetElementsCount (); ++i) {
564
+ // releasing values stored in unsafe TUnboxedValue buffer
565
+ keyAndState[i].UnRef ();
566
+ }
567
+ if (bucket.AsyncWriteOperation ) return true ;
568
+ }
569
+
570
+ SplitStateSpillingBucket = -1 ;
571
+ }
572
+ while (const auto keyAndState = static_cast <NUdf::TUnboxedValue *>(InMemoryProcessingState.Extract ())) {
527
573
auto hash = Hasher (keyAndState); // Hasher uses only key for hashing
528
574
auto bucketId = hash % SpilledBucketCount;
529
575
auto & bucket = SpilledBuckets[bucketId];
530
576
531
577
bucket.LineCount ++;
578
+
579
+ if (bucket.BucketState != TSpilledBucket::EBucketState::InMemory) {
580
+ bucket.BucketState = TSpilledBucket::EBucketState::SpillingState;
581
+ bucket.AsyncWriteOperation = bucket.SpilledState ->WriteWideItem ({keyAndState, KeyAndStateType->GetElementsCount ()});
582
+ for (size_t i = 0 ; i < KeyAndStateType->GetElementsCount (); ++i) {
583
+ // releasing values stored in unsafe TUnboxedValue buffer
584
+ keyAndState[i].UnRef ();
585
+ }
586
+ if (bucket.AsyncWriteOperation ) {
587
+ SplitStateSpillingBucket = bucketId;
588
+ return true ;
589
+ }
590
+ continue ;
591
+ }
592
+
532
593
auto & processingState = *bucket.InMemoryProcessingState ;
533
594
534
595
for (size_t i = 0 ; i < KeyWidth; ++i) {
@@ -540,16 +601,58 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
540
601
// jumping into unsafe world, refusing ownership
541
602
static_cast <NUdf::TUnboxedValue&>(processingState.Throat [i - KeyWidth]) = std::move (keyAndState[i]);
542
603
}
604
+
605
+ if (InMemoryBucketsCount && !HasMemoryForProcessing () && IsSpillingWhileStateSplitAllowed ()) {
606
+ ui32 bucketNumToSpill = GetLargestInMemoryBucketNumber ();
607
+
608
+ SplitStateSpillingBucket = bucketNumToSpill;
609
+ InMemoryBucketsCount--;
610
+
611
+ auto & bucket = SpilledBuckets[bucketNumToSpill];
612
+ bucket.BucketState = TSpilledBucket::EBucketState::SpillingState;
613
+
614
+ while (const auto keyAndState = static_cast <NUdf::TUnboxedValue*>(bucket.InMemoryProcessingState ->Extract ())) {
615
+ bucket.AsyncWriteOperation = bucket.SpilledState ->WriteWideItem ({keyAndState, KeyAndStateType->GetElementsCount ()});
616
+ for (size_t i = 0 ; i < KeyAndStateType->GetElementsCount (); ++i) {
617
+ // releasing values stored in unsafe TUnboxedValue buffer
618
+ keyAndState[i].UnRef ();
619
+ }
620
+ if (bucket.AsyncWriteOperation ) return true ;
621
+ }
622
+
623
+ bucket.AsyncWriteOperation = bucket.SpilledState ->FinishWriting ();
624
+ if (bucket.AsyncWriteOperation ) return true ;
625
+ }
626
+ }
627
+
628
+ for (ui64 i = 0 ; i < SpilledBucketCount; ++i) {
629
+ auto & bucket = SpilledBuckets[i];
630
+ if (bucket.BucketState == TSpilledBucket::EBucketState::SpillingState) {
631
+ if (bucket.AsyncWriteOperation .has_value ()) {
632
+ if (!bucket.AsyncWriteOperation ->HasValue ()) return true ;
633
+ bucket.SpilledState ->AsyncWriteCompleted (bucket.AsyncWriteOperation ->ExtractValue ());
634
+ bucket.AsyncWriteOperation = std::nullopt;
635
+ }
636
+
637
+ bucket.AsyncWriteOperation = bucket.SpilledState ->FinishWriting ();
638
+ if (bucket.AsyncWriteOperation ) return true ;
639
+ bucket.InMemoryProcessingState ->ReadMore <false >();
640
+
641
+ bucket.BucketState = TSpilledBucket::EBucketState::SpillingData;
642
+ }
543
643
}
544
644
545
645
InMemoryProcessingState.ReadMore <false >();
646
+ IsInMemoryProcessingStateSplitted = true ;
647
+ SwitchMode (EOperatingMode::Spilling);
648
+ return false ;
546
649
}
547
650
548
651
bool CheckMemoryAndSwitchToSpilling () {
549
652
if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition ()) {
550
653
LogMemoryUsage ();
551
654
552
- SwitchMode (EOperatingMode::Spilling );
655
+ SwitchMode (EOperatingMode::SplittingState );
553
656
return true ;
554
657
}
555
658
@@ -619,15 +722,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
619
722
return true ;
620
723
}
621
724
while (InMemoryBucketsCount > 0 ) {
622
- ui64 maxLineCount = 0 ;
623
- ui32 maxLineBucketInd = (ui32)-1 ;
624
- for (ui64 i = 0 ; i < SpilledBucketCount; ++i) {
625
- const auto & bucket = SpilledBuckets[i];
626
- if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory && (maxLineBucketInd == (ui32)-1 || bucket.LineCount > maxLineCount)) {
627
- maxLineCount = bucket.LineCount ;
628
- maxLineBucketInd = i;
629
- }
630
- }
725
+ ui32 maxLineBucketInd = GetLargestInMemoryBucketNumber ();
631
726
MKQL_ENSURE (maxLineBucketInd != (ui32)-1 , " Internal logic error" );
632
727
633
728
auto & bucketToSpill = SpilledBuckets[maxLineBucketInd];
@@ -701,8 +796,8 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
701
796
MKQL_ENSURE (false , " Internal logic error" );
702
797
break ;
703
798
}
704
- case EOperatingMode::Spilling : {
705
- YQL_LOG (INFO) << " switching Memory mode to Spilling " ;
799
+ case EOperatingMode::SplittingState : {
800
+ YQL_LOG (INFO) << " switching Memory mode to SplittingState " ;
706
801
MKQL_ENSURE (EOperatingMode::InMemory == Mode, " Internal logic error" );
707
802
SpilledBuckets.resize (SpilledBucketCount);
708
803
auto spiller = Ctx.SpillerFactory ->CreateSpiller ();
@@ -711,7 +806,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
711
806
b.SpilledData = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, UsedInputItemType, 5_MB);
712
807
b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount () - KeyWidth, Hasher, Equal);
713
808
}
714
- SplitStateIntoBuckets ();
809
+ break ;
810
+ }
811
+ case EOperatingMode::Spilling: {
812
+ YQL_LOG (INFO) << " switching Memory mode to Spilling" ;
813
+ MKQL_ENSURE (EOperatingMode::SplittingState == Mode || EOperatingMode::InMemory == Mode, " Internal logic error" );
715
814
716
815
Tongue = ViewForKeyAndState.data ();
717
816
break ;
@@ -744,6 +843,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
744
843
bool IsEverythingExtracted = false ;
745
844
746
845
TState InMemoryProcessingState;
846
+ bool IsInMemoryProcessingStateSplitted = false ;
747
847
const TMultiType* const UsedInputItemType;
748
848
const TMultiType* const KeyAndStateType;
749
849
const size_t KeyWidth;
@@ -760,6 +860,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
760
860
ui64 BufferForUsedInputItemsBucketId;
761
861
TUnboxedValueVector BufferForUsedInputItems;
762
862
std::vector<NUdf::TUnboxedValuePod, TMKQLAllocator<NUdf::TUnboxedValuePod>> ViewForKeyAndState;
863
+ i64 SplitStateSpillingBucket = -1 ;
763
864
764
865
TMemoryUsageInfo* MemInfo = nullptr ;
765
866
TEqualsFunc const Equal;
0 commit comments