Skip to content

Refactor arrow kernels to include valueBuilder #6231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ BEGIN_SIMPLE_ARROW_UDF(TInc, i32(i32)) {

struct TIncKernelExec : public NYql::NUdf::TUnaryKernelExec<TIncKernelExec> {
template <typename TSink>
static void Process(NYql::NUdf::TBlockItem arg, const TSink& sink) {
static void Process(const NYql::NUdf::IValueBuilder* valueBuilder, NYql::NUdf::TBlockItem arg, const TSink& sink) {
Y_UNUSED(valueBuilder);
sink(NYql::NUdf::TBlockItem(arg.As<i32>() + 1));
}
};
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/yql/public/udf/arrow/block_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class TFixedSizeBlockReaderBase : public IBlockReader {
if (IsNull(data, index)) {
return {};
}
} else {
Y_DEBUG_ABORT_UNLESS(!data.MayHaveNulls());
}
return static_cast<TDerived*>(this)->MakeBlockItem(data.GetValues<T>(1)[index]);
}
Expand Down Expand Up @@ -129,6 +131,8 @@ class TStringBlockReader final : public IBlockReader {
if (IsNull(data, index)) {
return {};
}
} else {
Y_DEBUG_ABORT_UNLESS(!data.MayHaveNulls());
}

const TOffset* offsets = data.GetValues<TOffset>(1);
Expand Down Expand Up @@ -210,6 +214,8 @@ class TTupleBlockReaderBase : public IBlockReader {
if constexpr (Nullable) {
if (IsNull(data, index)) {
return {};
} else {
Y_DEBUG_ABORT_UNLESS(!data.MayHaveNulls());
}
}
return static_cast<TDerived*>(this)->GetChildrenItems(data, index);
Expand Down
37 changes: 22 additions & 15 deletions ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ using TExec = arrow::Status(*)(arrow::compute::KernelContext*, const arrow::comp

class TUdfKernelState : public arrow::compute::KernelState {
public:
TUdfKernelState(const TVector<const TType*>& argTypes, const TType* outputType, bool onlyScalars, const ITypeInfoHelper* typeInfoHelper, const IPgBuilder& pgBuilder)
TUdfKernelState(const TVector<const TType*>& argTypes, const TType* outputType, bool onlyScalars, const ITypeInfoHelper* typeInfoHelper, const IValueBuilder* valueBuilder)
: ArgTypes_(argTypes)
, OutputType_(outputType)
, OnlyScalars_(onlyScalars)
, TypeInfoHelper_(typeInfoHelper)
, PgBuilder_(pgBuilder)
, ValueBuilder_(valueBuilder)
{
Readers_.resize(ArgTypes_.size());
}
Expand All @@ -48,7 +48,7 @@ class TUdfKernelState : public arrow::compute::KernelState {
IArrayBuilder& GetArrayBuilder() {
Y_ENSURE(!OnlyScalars_);
if (!ArrayBuilder_) {
ArrayBuilder_ = MakeArrayBuilder(*TypeInfoHelper_, OutputType_, *GetYqlMemoryPool(), TypeInfoHelper_->GetMaxBlockLength(OutputType_), &PgBuilder_);
ArrayBuilder_ = MakeArrayBuilder(*TypeInfoHelper_, OutputType_, *GetYqlMemoryPool(), TypeInfoHelper_->GetMaxBlockLength(OutputType_), &ValueBuilder_->GetPgBuilder());
}

return *ArrayBuilder_;
Expand All @@ -62,13 +62,18 @@ class TUdfKernelState : public arrow::compute::KernelState {

return *ScalarBuilder_;
}

const IValueBuilder& GetValueBuilder() {
Y_ENSURE(ValueBuilder_);
return *ValueBuilder_;
}

private:
const TVector<const TType*> ArgTypes_;
const TType* OutputType_;
const bool OnlyScalars_;
const ITypeInfoHelper* TypeInfoHelper_;
const IPgBuilder& PgBuilder_;
const IValueBuilder* ValueBuilder_;
TVector<std::unique_ptr<IBlockReader>> Readers_;
std::unique_ptr<IArrayBuilder> ArrayBuilder_;
std::unique_ptr<IScalarBuilder> ScalarBuilder_;
Expand Down Expand Up @@ -157,7 +162,7 @@ class TSimpleArrowUdfImpl : public TBoxedValue {
}
}

TUdfKernelState kernelState(ArgTypes_, OutputType_, OnlyScalars_, TypeInfoHelper_.Get(), valueBuilder->GetPgBuilder());
TUdfKernelState kernelState(ArgTypes_, OutputType_, OnlyScalars_, TypeInfoHelper_.Get(), valueBuilder);
arrow::compute::ExecContext execContext(GetYqlMemoryPool());
arrow::compute::KernelContext kernelContext(&execContext);
kernelContext.SetState(&kernelState);
Expand Down Expand Up @@ -351,6 +356,7 @@ TReader* CastToBlockReaderImpl(IBlockReader& reader) {

template <typename TDerived, typename TReader = IBlockReader, typename TArrayBuilderImpl = IArrayBuilder, typename TScalarBuilderImpl = IScalarBuilder>
struct TUnaryKernelExec {

static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
auto& state = dynamic_cast<TUdfKernelState&>(*ctx->state());
auto& reader = state.GetReader(0);
Expand All @@ -362,7 +368,7 @@ struct TUnaryKernelExec {
auto* builderImpl = CastToScalarBuilderImpl<TScalarBuilderImpl>(builder);

auto item = readerImpl->GetScalarItem(*arg.scalar());
TDerived::Process(item, [&](TBlockItem out) {
TDerived::Process(&state.GetValueBuilder(), item, [&](TBlockItem out) {
*res = builderImpl->Build(out);
});
}
Expand All @@ -377,7 +383,7 @@ struct TUnaryKernelExec {
for (int64_t i = 0; i < array.length;) {
for (size_t j = 0; j < maxBlockLength && i < array.length; ++j, ++i) {
auto item = readerImpl->GetItem(array, i);
TDerived::Process(item, [&](TBlockItem out) {
TDerived::Process(&state.GetValueBuilder(), item, [&](TBlockItem out) {
builderImpl->Add(out);
});
}
Expand Down Expand Up @@ -411,7 +417,8 @@ struct TBinaryKernelExec {

auto item1 = reader1Impl->GetScalarItem(*arg1.scalar());
auto item2 = reader2Impl->GetScalarItem(*arg2.scalar());
TDerived::Process(item1, item2, [&](TBlockItem out) {

TDerived::Process(&state.GetValueBuilder(), item1, item2, [&](TBlockItem out) {
*res = builderImpl->Build(out);
});
}
Expand All @@ -427,7 +434,7 @@ struct TBinaryKernelExec {
for (int64_t i = 0; i < array2.length;) {
for (size_t j = 0; j < maxBlockLength && i < array2.length; ++j, ++i) {
auto item2 = reader2Impl->GetItem(array2, i);
TDerived::Process(item1, item2, [&](TBlockItem out) {
TDerived::Process(&state.GetValueBuilder(), item1, item2, [&](TBlockItem out) {
builderImpl->Add(out);
});
}
Expand All @@ -448,7 +455,7 @@ struct TBinaryKernelExec {
for (int64_t i = 0; i < array1.length;) {
for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) {
auto item1 = reader1Impl->GetItem(array1, i);
TDerived::Process(item1, item2, [&](TBlockItem out) {
TDerived::Process(&state.GetValueBuilder(), item1, item2, [&](TBlockItem out) {
builderImpl->Add(out);
});
}
Expand All @@ -470,9 +477,9 @@ struct TBinaryKernelExec {
Y_ENSURE(array1.length == array2.length);
for (int64_t i = 0; i < array1.length;) {
for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) {
auto item1 = reader1.GetItem(array1, i);
auto item2 = reader2.GetItem(array2, i);
TDerived::Process(item1, item2, [&](TBlockItem out) {
auto item1 = reader1Impl->GetItem(array1, i);
auto item2 = reader2Impl->GetItem(array2, i);
TDerived::Process(&state.GetValueBuilder(), item1, item2, [&](TBlockItem out) {
builderImpl->Add(out);
});
}
Expand Down Expand Up @@ -529,7 +536,7 @@ struct TGenericKernelExec {
auto& reader = state.GetReader(k);
args[k] = reader.GetScalarItem(*batch[k].scalar());
}
TDerived::Process(items, [&](TBlockItem out) {
TDerived::Process(&state.GetValueBuilder(), items, [&](TBlockItem out) {
*res = builderImpl->Build(out);
});
} else {
Expand Down Expand Up @@ -559,7 +566,7 @@ struct TGenericKernelExec {

args[k] = reader.GetItem(*batch[k].array(), i);
}
TDerived::Process(items, [&](TBlockItem out) {
TDerived::Process(&state.GetValueBuilder(), items, [&](TBlockItem out) {
builderImpl->Add(out);
});
}
Expand Down
Loading
Loading