File tree 1 file changed +6
-1
lines changed
ydb/library/yql/dq/runtime
1 file changed +6
-1
lines changed Original file line number Diff line number Diff line change @@ -433,6 +433,10 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
433
433
return CurrBlockIndex_ >= BlockLen_;
434
434
}
435
435
436
+ bool IsFinished () const {
437
+ return IsFinished_;
438
+ }
439
+
436
440
void NextRow () {
437
441
Y_DEBUG_ABORT_UNLESS (!IsEmpty ());
438
442
++CurrBlockIndex_;
@@ -645,7 +649,7 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
645
649
input.NextRow ();
646
650
InputRows_.pop_back ();
647
651
if (input.IsEmpty ()) {
648
- auto status = input. FetchNext ( );
652
+ auto status = FetchInput (inputIndex );
649
653
if (status == NUdf::EFetchStatus::Yield) {
650
654
StartInputIndex_ = inputIndex;
651
655
return status;
@@ -657,6 +661,7 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
657
661
}
658
662
659
663
if (!OutputBlockLen_) {
664
+ YQL_ENSURE (AllOf (InputData_, [](const TDqInputBatch& input) { return input.IsEmpty () && input.IsFinished (); }));
660
665
return NUdf::EFetchStatus::Finish;
661
666
}
662
667
You can’t perform that action at this time.
0 commit comments