@@ -132,6 +132,24 @@ struct TCombinerNodes {
132
132
}
133
133
}
134
134
135
+ void ExtractValues (TComputationContext& ctx, NUdf::TUnboxedValue** from, NUdf::TUnboxedValue* to) const {
136
+ for (ui32 i = 0U ; i < ItemNodes.size (); ++i) {
137
+ if (from[i]) {
138
+ to[i] = std::move (*(from[i]));
139
+ }
140
+ }
141
+ }
142
+
143
+ void ExtractValues (TComputationContext& ctx, NUdf::TUnboxedValue* from, NUdf::TUnboxedValue** to) const {
144
+ for (size_t i = 0 , j = 0 ; i != ItemNodes.size (); ++i) {
145
+ if (IsInputItemNodeUsed (i)) {
146
+ *to[i] = std::move (from[j++]);
147
+ } else {
148
+ to[i] = nullptr ;
149
+ }
150
+ }
151
+ }
152
+
135
153
void ProcessItem (TComputationContext& ctx, NUdf::TUnboxedValue* keys, NUdf::TUnboxedValue* state) const {
136
154
if (keys) {
137
155
std::fill_n (keys, KeyResultNodes.size (), NUdf::TUnboxedValuePod ());
@@ -346,16 +364,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
346
364
enum class ETasteResult : i8 {
347
365
Init = -1 ,
348
366
Update,
349
- Skip
367
+ ConsumeRawData,
368
+ ExtractRawData
350
369
};
351
370
TSpillingSupportState (
352
- TMemoryUsageInfo* memInfo, size_t wideFieldsIndex,
371
+ TMemoryUsageInfo* memInfo,
353
372
const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth, size_t itemNodesSize,
354
373
const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx
355
374
)
356
375
: TBase(memInfo)
357
376
, InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount () - keyWidth, hash, equal)
358
- , WideFieldsIndex(wideFieldsIndex)
359
377
, UsedInputItemType(usedInputItemType)
360
378
, KeyAndStateType(keyAndStateType)
361
379
, KeyWidth(keyWidth)
@@ -380,7 +398,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
380
398
bool IsProcessingRequired () const {
381
399
if (InputStatus != EFetchResult::Finish) return true ;
382
400
383
- return HasDataForProcessing;
401
+ return HasRawDataToExtract || HasDataForProcessing;
384
402
}
385
403
386
404
bool UpdateAndWait () {
@@ -424,10 +442,19 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
424
442
return isNew ? ETasteResult::Init : ETasteResult::Update;
425
443
}
426
444
if (GetMode () == EOperatingMode::ProcessSpilled) {
445
+ if (HasRawDataToExtract) {
446
+ // Tongue not used here.
447
+ Throat = BufferForUsedInputItems.data ();
448
+ HasRawDataToExtract = false ;
449
+ HasDataForProcessing = true ;
450
+ return ETasteResult::ExtractRawData;
451
+ }
452
+ HasDataForProcessing = false ;
427
453
// while restoration we process buckets one by one starting from the first in a queue
428
454
bool isNew = SpilledBuckets.front ().InMemoryProcessingState ->TasteIt ();
429
455
Throat = SpilledBuckets.front ().InMemoryProcessingState ->Throat ;
430
456
Tongue = SpilledBuckets.front ().InMemoryProcessingState ->Tongue ;
457
+ BufferForUsedInputItems.resize (0 );
431
458
return isNew ? ETasteResult::Init : ETasteResult::Update;
432
459
}
433
460
@@ -445,9 +472,13 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
445
472
446
473
// Corresponding bucket is spilled, we don't need a key anymore, full input will be spilled
447
474
BufferForKeyAndState.resize (0 );
448
- TryToSpillRawData (bucket, bucketId);
475
+ // Prepare space for raw data
476
+ MKQL_ENSURE (BufferForUsedInputItems.size () == 0 , " Internal logic error" );
477
+ BufferForUsedInputItems.resize (ItemNodesSize);
478
+ BufferForUsedInputItemsBucketId = bucketId;
479
+ Throat = BufferForUsedInputItems.data ();
449
480
450
- return ETasteResult::Skip ;
481
+ return ETasteResult::ConsumeRawData ;
451
482
}
452
483
453
484
NUdf::TUnboxedValuePod* Extract () {
@@ -472,25 +503,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
472
503
BufferForKeyAndState.resize (0 );
473
504
}
474
505
475
- // Copies data from WideFields to local and tries to spill it using suitable bucket.
476
- // if the bucket is already busy, then the buffer will wait for the next iteration.
477
- void TryToSpillRawData (TSpilledBucket& bucket, size_t bucketId) {
478
- auto **fields = Ctx.WideFields .data () + WideFieldsIndex;
479
- MKQL_ENSURE (BufferForUsedInputItems.empty (), " Internal logic error" );
480
-
481
- for (size_t i = 0 ; i < ItemNodesSize; ++i) {
482
- if (fields[i]) {
483
- BufferForUsedInputItems.push_back (*fields[i]);
484
- }
485
- }
486
- if (bucket.AsyncWriteOperation .has_value ()) {
487
- BufferForUsedInputItemsBucketId = bucketId;
488
- return ;
489
- }
490
- bucket.AsyncWriteOperation = bucket.SpilledData ->WriteWideItem (BufferForUsedInputItems);
491
- BufferForUsedInputItems.resize (0 );
492
- }
493
-
494
506
bool FlushSpillingBuffersAndWait () {
495
507
UpdateSpillingBuckets ();
496
508
@@ -620,8 +632,14 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
620
632
}
621
633
AsyncReadOperation = std::nullopt;
622
634
}
635
+
623
636
auto & bucket = SpilledBuckets.front ();
624
637
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false ;
638
+ if (HasDataForProcessing) {
639
+ Tongue = bucket.InMemoryProcessingState ->Tongue ;
640
+ Throat = bucket.InMemoryProcessingState ->Throat ;
641
+ return false ;
642
+ }
625
643
// recover spilled state
626
644
while (!bucket.SpilledState ->Empty ()) {
627
645
RecoverState = true ;
@@ -651,17 +669,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
651
669
if (AsyncReadOperation) {
652
670
return true ;
653
671
}
654
- auto **fields = Ctx.WideFields .data () + WideFieldsIndex;
655
- for (size_t i = 0 , j = 0 ; i < ItemNodesSize; ++i) {
656
- if (fields[i]) {
657
- fields[i] = &(BufferForUsedInputItems[j++]);
658
- }
659
- }
660
672
661
673
Tongue = bucket.InMemoryProcessingState ->Tongue ;
662
674
Throat = bucket.InMemoryProcessingState ->Throat ;
663
675
664
- HasDataForProcessing = true ;
676
+ HasRawDataToExtract = true ;
665
677
return false ;
666
678
}
667
679
bucket.BucketState = TSpilledBucket::EBucketState::InMemory;
@@ -725,8 +737,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
725
737
726
738
bool HasDataForProcessing = false ;
727
739
740
+ bool HasRawDataToExtract = false ;
741
+
728
742
TState InMemoryProcessingState;
729
- const size_t WideFieldsIndex;
730
743
const TMultiType* const UsedInputItemType;
731
744
const TMultiType* const KeyAndStateType;
732
745
const size_t KeyWidth;
@@ -1237,6 +1250,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
1237
1250
, AllowSpilling(allowSpilling)
1238
1251
{}
1239
1252
1253
+ // MARK: DoCAlculate
1240
1254
EFetchResult DoCalculate (NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const * output) const {
1241
1255
if (!state.HasValue ()) {
1242
1256
MakeState (ctx, state);
@@ -1246,14 +1260,12 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
1246
1260
auto **fields = ctx.WideFields .data () + WideFieldsIndex;
1247
1261
1248
1262
while (true ) {
1249
- for (auto i = 0U ; i < Nodes.ItemNodes .size (); ++i)
1250
- fields[i] = Nodes.GetUsedInputItemNodePtrOrNull (ctx, i);
1251
-
1252
1263
if (ptr->UpdateAndWait ()) {
1253
1264
return EFetchResult::Yield;
1254
1265
}
1255
-
1256
1266
if (ptr->InputStatus != EFetchResult::Finish) {
1267
+ for (auto i = 0U ; i < Nodes.ItemNodes .size (); ++i)
1268
+ fields[i] = Nodes.GetUsedInputItemNodePtrOrNull (ctx, i);
1257
1269
switch (ptr->InputStatus = Flow->FetchValues (ctx, fields)) {
1258
1270
case EFetchResult::One:
1259
1271
break ;
@@ -1274,7 +1286,11 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
1274
1286
case TSpillingSupportState::ETasteResult::Update:
1275
1287
Nodes.ProcessItem (ctx, static_cast <NUdf::TUnboxedValue*>(ptr->Tongue ), static_cast <NUdf::TUnboxedValue*>(ptr->Throat ));
1276
1288
break ;
1277
- case TSpillingSupportState::ETasteResult::Skip:
1289
+ case TSpillingSupportState::ETasteResult::ConsumeRawData:
1290
+ Nodes.ExtractValues (ctx, fields, static_cast <NUdf::TUnboxedValue*>(ptr->Throat ));
1291
+ break ;
1292
+ case TSpillingSupportState::ETasteResult::ExtractRawData:
1293
+ Nodes.ExtractValues (ctx, static_cast <NUdf::TUnboxedValue*>(ptr->Throat ), fields);
1278
1294
break ;
1279
1295
}
1280
1296
continue ;
@@ -1553,8 +1569,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
1553
1569
#endif
1554
1570
private:
1555
1571
void MakeState (TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
1556
- state = ctx.HolderFactory .Create <TSpillingSupportState>(WideFieldsIndex,
1557
- UsedInputItemType, KeyAndStateType,
1572
+ state = ctx.HolderFactory .Create <TSpillingSupportState>(UsedInputItemType, KeyAndStateType,
1558
1573
Nodes.KeyNodes .size (),
1559
1574
Nodes.ItemNodes .size (),
1560
1575
#ifdef MKQL_DISABLE_CODEGEN
0 commit comments