Skip to content

Commit 75900f5

Browse files
authored
Refactor arrow kernels to include valueBuilder (#6231)
Refactor arrow kernels to include another argument (valueBuilder). It is required by Datetime UDF.
1 parent 0201d66 commit 75900f5

File tree

5 files changed

+232
-218
lines changed

5 files changed

+232
-218
lines changed

ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ BEGIN_SIMPLE_ARROW_UDF(TInc, i32(i32)) {
1616

1717
struct TIncKernelExec : public NYql::NUdf::TUnaryKernelExec<TIncKernelExec> {
1818
template <typename TSink>
19-
static void Process(NYql::NUdf::TBlockItem arg, const TSink& sink) {
19+
static void Process(const NYql::NUdf::IValueBuilder* valueBuilder, NYql::NUdf::TBlockItem arg, const TSink& sink) {
20+
Y_UNUSED(valueBuilder);
2021
sink(NYql::NUdf::TBlockItem(arg.As<i32>() + 1));
2122
}
2223
};

ydb/library/yql/public/udf/arrow/block_reader.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class TFixedSizeBlockReaderBase : public IBlockReader {
4040
if (IsNull(data, index)) {
4141
return {};
4242
}
43+
} else {
44+
Y_DEBUG_ABORT_UNLESS(!data.MayHaveNulls());
4345
}
4446
return static_cast<TDerived*>(this)->MakeBlockItem(data.GetValues<T>(1)[index]);
4547
}
@@ -129,6 +131,8 @@ class TStringBlockReader final : public IBlockReader {
129131
if (IsNull(data, index)) {
130132
return {};
131133
}
134+
} else {
135+
Y_DEBUG_ABORT_UNLESS(!data.MayHaveNulls());
132136
}
133137

134138
const TOffset* offsets = data.GetValues<TOffset>(1);
@@ -210,6 +214,8 @@ class TTupleBlockReaderBase : public IBlockReader {
210214
if constexpr (Nullable) {
211215
if (IsNull(data, index)) {
212216
return {};
217+
} else {
218+
Y_DEBUG_ABORT_UNLESS(!data.MayHaveNulls());
213219
}
214220
}
215221
return static_cast<TDerived*>(this)->GetChildrenItems(data, index);

ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ using TExec = arrow::Status(*)(arrow::compute::KernelContext*, const arrow::comp
2727

2828
class TUdfKernelState : public arrow::compute::KernelState {
2929
public:
30-
TUdfKernelState(const TVector<const TType*>& argTypes, const TType* outputType, bool onlyScalars, const ITypeInfoHelper* typeInfoHelper, const IPgBuilder& pgBuilder)
30+
TUdfKernelState(const TVector<const TType*>& argTypes, const TType* outputType, bool onlyScalars, const ITypeInfoHelper* typeInfoHelper, const IValueBuilder* valueBuilder)
3131
: ArgTypes_(argTypes)
3232
, OutputType_(outputType)
3333
, OnlyScalars_(onlyScalars)
3434
, TypeInfoHelper_(typeInfoHelper)
35-
, PgBuilder_(pgBuilder)
35+
, ValueBuilder_(valueBuilder)
3636
{
3737
Readers_.resize(ArgTypes_.size());
3838
}
@@ -48,7 +48,7 @@ class TUdfKernelState : public arrow::compute::KernelState {
4848
IArrayBuilder& GetArrayBuilder() {
4949
Y_ENSURE(!OnlyScalars_);
5050
if (!ArrayBuilder_) {
51-
ArrayBuilder_ = MakeArrayBuilder(*TypeInfoHelper_, OutputType_, *GetYqlMemoryPool(), TypeInfoHelper_->GetMaxBlockLength(OutputType_), &PgBuilder_);
51+
ArrayBuilder_ = MakeArrayBuilder(*TypeInfoHelper_, OutputType_, *GetYqlMemoryPool(), TypeInfoHelper_->GetMaxBlockLength(OutputType_), &ValueBuilder_->GetPgBuilder());
5252
}
5353

5454
return *ArrayBuilder_;
@@ -62,13 +62,18 @@ class TUdfKernelState : public arrow::compute::KernelState {
6262

6363
return *ScalarBuilder_;
6464
}
65+
66+
const IValueBuilder& GetValueBuilder() {
67+
Y_ENSURE(ValueBuilder_);
68+
return *ValueBuilder_;
69+
}
6570

6671
private:
6772
const TVector<const TType*> ArgTypes_;
6873
const TType* OutputType_;
6974
const bool OnlyScalars_;
7075
const ITypeInfoHelper* TypeInfoHelper_;
71-
const IPgBuilder& PgBuilder_;
76+
const IValueBuilder* ValueBuilder_;
7277
TVector<std::unique_ptr<IBlockReader>> Readers_;
7378
std::unique_ptr<IArrayBuilder> ArrayBuilder_;
7479
std::unique_ptr<IScalarBuilder> ScalarBuilder_;
@@ -157,7 +162,7 @@ class TSimpleArrowUdfImpl : public TBoxedValue {
157162
}
158163
}
159164

160-
TUdfKernelState kernelState(ArgTypes_, OutputType_, OnlyScalars_, TypeInfoHelper_.Get(), valueBuilder->GetPgBuilder());
165+
TUdfKernelState kernelState(ArgTypes_, OutputType_, OnlyScalars_, TypeInfoHelper_.Get(), valueBuilder);
161166
arrow::compute::ExecContext execContext(GetYqlMemoryPool());
162167
arrow::compute::KernelContext kernelContext(&execContext);
163168
kernelContext.SetState(&kernelState);
@@ -351,6 +356,7 @@ TReader* CastToBlockReaderImpl(IBlockReader& reader) {
351356

352357
template <typename TDerived, typename TReader = IBlockReader, typename TArrayBuilderImpl = IArrayBuilder, typename TScalarBuilderImpl = IScalarBuilder>
353358
struct TUnaryKernelExec {
359+
354360
static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
355361
auto& state = dynamic_cast<TUdfKernelState&>(*ctx->state());
356362
auto& reader = state.GetReader(0);
@@ -362,7 +368,7 @@ struct TUnaryKernelExec {
362368
auto* builderImpl = CastToScalarBuilderImpl<TScalarBuilderImpl>(builder);
363369

364370
auto item = readerImpl->GetScalarItem(*arg.scalar());
365-
TDerived::Process(item, [&](TBlockItem out) {
371+
TDerived::Process(&state.GetValueBuilder(), item, [&](TBlockItem out) {
366372
*res = builderImpl->Build(out);
367373
});
368374
}
@@ -377,7 +383,7 @@ struct TUnaryKernelExec {
377383
for (int64_t i = 0; i < array.length;) {
378384
for (size_t j = 0; j < maxBlockLength && i < array.length; ++j, ++i) {
379385
auto item = readerImpl->GetItem(array, i);
380-
TDerived::Process(item, [&](TBlockItem out) {
386+
TDerived::Process(&state.GetValueBuilder(), item, [&](TBlockItem out) {
381387
builderImpl->Add(out);
382388
});
383389
}
@@ -411,7 +417,8 @@ struct TBinaryKernelExec {
411417

412418
auto item1 = reader1Impl->GetScalarItem(*arg1.scalar());
413419
auto item2 = reader2Impl->GetScalarItem(*arg2.scalar());
414-
TDerived::Process(item1, item2, [&](TBlockItem out) {
420+
421+
TDerived::Process(&state.GetValueBuilder(), item1, item2, [&](TBlockItem out) {
415422
*res = builderImpl->Build(out);
416423
});
417424
}
@@ -427,7 +434,7 @@ struct TBinaryKernelExec {
427434
for (int64_t i = 0; i < array2.length;) {
428435
for (size_t j = 0; j < maxBlockLength && i < array2.length; ++j, ++i) {
429436
auto item2 = reader2Impl->GetItem(array2, i);
430-
TDerived::Process(item1, item2, [&](TBlockItem out) {
437+
TDerived::Process(&state.GetValueBuilder(), item1, item2, [&](TBlockItem out) {
431438
builderImpl->Add(out);
432439
});
433440
}
@@ -448,7 +455,7 @@ struct TBinaryKernelExec {
448455
for (int64_t i = 0; i < array1.length;) {
449456
for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) {
450457
auto item1 = reader1Impl->GetItem(array1, i);
451-
TDerived::Process(item1, item2, [&](TBlockItem out) {
458+
TDerived::Process(&state.GetValueBuilder(), item1, item2, [&](TBlockItem out) {
452459
builderImpl->Add(out);
453460
});
454461
}
@@ -470,9 +477,9 @@ struct TBinaryKernelExec {
470477
Y_ENSURE(array1.length == array2.length);
471478
for (int64_t i = 0; i < array1.length;) {
472479
for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) {
473-
auto item1 = reader1.GetItem(array1, i);
474-
auto item2 = reader2.GetItem(array2, i);
475-
TDerived::Process(item1, item2, [&](TBlockItem out) {
480+
auto item1 = reader1Impl->GetItem(array1, i);
481+
auto item2 = reader2Impl->GetItem(array2, i);
482+
TDerived::Process(&state.GetValueBuilder(), item1, item2, [&](TBlockItem out) {
476483
builderImpl->Add(out);
477484
});
478485
}
@@ -529,7 +536,7 @@ struct TGenericKernelExec {
529536
auto& reader = state.GetReader(k);
530537
args[k] = reader.GetScalarItem(*batch[k].scalar());
531538
}
532-
TDerived::Process(items, [&](TBlockItem out) {
539+
TDerived::Process(&state.GetValueBuilder(), items, [&](TBlockItem out) {
533540
*res = builderImpl->Build(out);
534541
});
535542
} else {
@@ -559,7 +566,7 @@ struct TGenericKernelExec {
559566

560567
args[k] = reader.GetItem(*batch[k].array(), i);
561568
}
562-
TDerived::Process(items, [&](TBlockItem out) {
569+
TDerived::Process(&state.GetValueBuilder(), items, [&](TBlockItem out) {
563570
builderImpl->Add(out);
564571
});
565572
}

0 commit comments

Comments
 (0)