File tree Expand file tree Collapse file tree 1 file changed +6
-1
lines changed
ydb/library/yql/dq/runtime Expand file tree Collapse file tree 1 file changed +6
-1
lines changed Original file line number Diff line number Diff line change @@ -432,6 +432,10 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
432
432
return CurrBlockIndex_ >= BlockLen_;
433
433
}
434
434
435
+ bool IsFinished () const {
436
+ return IsFinished_;
437
+ }
438
+
435
439
void NextRow () {
436
440
Y_DEBUG_ABORT_UNLESS (!IsEmpty ());
437
441
++CurrBlockIndex_;
@@ -644,7 +648,7 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
644
648
input.NextRow ();
645
649
InputRows_.pop_back ();
646
650
if (input.IsEmpty ()) {
647
- auto status = input. FetchNext ( );
651
+ auto status = FetchInput (inputIndex );
648
652
if (status == NUdf::EFetchStatus::Yield) {
649
653
StartInputIndex_ = inputIndex;
650
654
return status;
@@ -656,6 +660,7 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
656
660
}
657
661
658
662
if (!OutputBlockLen_) {
663
+ YQL_ENSURE (AllOf (InputData_, [](const TDqInputBatch& input) { return input.IsEmpty () && input.IsFinished (); }));
659
664
return NUdf::EFetchStatus::Finish;
660
665
}
661
666
You can’t perform that action at this time.
0 commit comments