Skip to content

Commit fd4aef4

Browse files
authored
Merge 502e507 into 8a1ba16
2 parents 8a1ba16 + 502e507 commit fd4aef4

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp

+9-5
Original file line numberDiff line numberDiff line change
@@ -698,17 +698,20 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapp
698698
TUnboxedValueVector Values_;
699699
std::vector<std::unique_ptr<IBlockReader>> Readers_;
700700
std::vector<std::unique_ptr<IBlockItemConverter>> Converters_;
701+
const std::vector<arrow::ValueDescr> ValuesDescr_;
701702

702703
TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types)
703704
: TComputationValue(memInfo)
704705
, Values_(types.size() + 1)
706+
, ValuesDescr_(ToValueDescr(types))
705707
{
706708
Pointer_ = Values_.data();
707709

708710
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
709711
for (size_t i = 0; i < types.size(); ++i) {
710-
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), types[i]));
711-
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), types[i], pgBuilder));
712+
const TType* blockItemType = AS_TYPE(TBlockType, types[i])->GetItemType();
713+
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
714+
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
712715
}
713716
}
714717

@@ -718,7 +721,9 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapp
718721

719722
NUdf::TUnboxedValuePod Get(const THolderFactory& holderFactory, size_t idx) const {
720723
TBlockItem item;
721-
if (const auto& datum = TArrowBlock::From(Values_[idx]).GetDatum(); datum.is_scalar()) {
724+
const auto& datum = TArrowBlock::From(Values_[idx]).GetDatum();
725+
Y_DEBUG_ABORT_UNLESS(ValuesDescr_[idx] == datum.descr());
726+
if (datum.is_scalar()) {
722727
item = Readers_[idx]->GetScalarItem(*datum.scalar());
723728
} else {
724729
MKQL_ENSURE(datum.is_array(), "Expecting array");
@@ -1230,8 +1235,7 @@ IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNode
12301235
MKQL_ENSURE(wideComponents.size() > 0, "Expected at least one column");
12311236
TVector<TType*> items;
12321237
for (ui32 i = 0; i < wideComponents.size() - 1; ++i) {
1233-
const auto blockType = AS_TYPE(TBlockType, wideComponents[i]);
1234-
items.push_back(blockType->GetItemType());
1238+
items.push_back(AS_TYPE(TBlockType, wideComponents[i]));
12351239
}
12361240

12371241
const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));

0 commit comments

Comments
 (0)