File tree 1 file changed +6
-1
lines changed
ydb/library/yql/dq/runtime
1 file changed +6
-1
lines changed Original file line number Diff line number Diff line change @@ -421,6 +421,10 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
421
421
return CurrBlockIndex_ >= BlockLen_;
422
422
}
423
423
424
+ bool IsFinished () const {
425
+ return IsFinished_;
426
+ }
427
+
424
428
void NextRow () {
425
429
Y_DEBUG_ABORT_UNLESS (!IsEmpty ());
426
430
++CurrBlockIndex_;
@@ -633,7 +637,7 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
633
637
input.NextRow ();
634
638
InputRows_.pop_back ();
635
639
if (input.IsEmpty ()) {
636
- auto status = input. FetchNext ( );
640
+ auto status = FetchInput (inputIndex );
637
641
if (status == NUdf::EFetchStatus::Yield) {
638
642
StartInputIndex_ = inputIndex;
639
643
return status;
@@ -645,6 +649,7 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
645
649
}
646
650
647
651
if (!OutputBlockLen_) {
652
+ YQL_ENSURE (AllOf (InputData_, [](const TDqInputBatch& input) { return input.IsEmpty () && input.IsFinished (); }));
648
653
return NUdf::EFetchStatus::Finish;
649
654
}
650
655
You can’t perform that action at this time.
0 commit comments