Skip to content

Commit c78925f

Browse files
authored
Merge 2f5a4ad into cc81c7e
2 parents cc81c7e + 2f5a4ad commit c78925f

File tree

7 files changed

+123
-82
lines changed

7 files changed

+123
-82
lines changed

ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ BEGIN_SIMPLE_ARROW_UDF(TInc, i32(i32)) {
1616

1717
struct TIncKernelExec : public NYql::NUdf::TUnaryKernelExec<TIncKernelExec> {
1818
template <typename TSink>
19-
static void Process(NYql::NUdf::TBlockItem arg, const TSink& sink) {
19+
static void Process(NYql::NUdf::TBlockItem arg, const NYql::NUdf::IValueBuilder& valueBuilder, const TSink& sink) {
20+
Y_UNUSED(valueBuilder);
2021
sink(NYql::NUdf::TBlockItem(arg.As<i32>() + 1));
2122
}
2223
};

ydb/library/yql/public/udf/arrow/block_builder.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class IScalarBuilder {
5555
public:
5656
virtual ~IScalarBuilder() = default;
5757
virtual arrow::Datum Build(TBlockItem value) const = 0;
58+
virtual arrow::Datum Build(NUdf::TUnboxedValuePod value) const = 0;
5859
};
5960

6061
inline std::shared_ptr<arrow::DataType> GetArrowType(const ITypeInfoHelper& typeInfoHelper, const TType* type) {

ydb/library/yql/public/udf/arrow/block_item.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,19 @@ class TBlockItem {
5959
Raw.Halfs[1] = high;
6060
}
6161

62+
inline static TBlockItem Embedded(const TStringRef& value) {
63+
UDF_VERIFY(value.Size() <= sizeof(TRawEmbeddedValue::Buffer));
64+
65+
TBlockItem v;
66+
v.Raw.Embedded.Size = value.Size();
67+
v.Raw.Embedded.Meta = static_cast<ui8>(EMarkers::Embedded);
68+
if (v.Raw.Embedded.Size) {
69+
std::memcpy(v.Raw.Embedded.Buffer, value.Data(), v.Raw.Embedded.Size);
70+
}
71+
72+
return v;
73+
}
74+
6275
inline ui64 Low() const {
6376
return Raw.Halfs[0];
6477
}

ydb/library/yql/public/udf/arrow/block_reader.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class TFixedSizeBlockReaderBase : public IBlockReader {
4040
if (IsNull(data, index)) {
4141
return {};
4242
}
43+
} else {
44+
Y_DEBUG_ABORT_UNLESS(!data.MayHaveNulls());
4345
}
4446
return static_cast<TDerived*>(this)->MakeBlockItem(data.GetValues<T>(1)[index]);
4547
}
@@ -129,6 +131,8 @@ class TStringBlockReader final : public IBlockReader {
129131
if (IsNull(data, index)) {
130132
return {};
131133
}
134+
} else {
135+
Y_DEBUG_ABORT_UNLESS(!data.MayHaveNulls());
132136
}
133137

134138
const TOffset* offsets = data.GetValues<TOffset>(1);
@@ -210,6 +214,8 @@ class TTupleBlockReaderBase : public IBlockReader {
210214
if constexpr (Nullable) {
211215
if (IsNull(data, index)) {
212216
return {};
217+
} else {
218+
Y_DEBUG_ABORT_UNLESS(!data.MayHaveNulls());
213219
}
214220
}
215221
return static_cast<TDerived*>(this)->GetChildrenItems(data, index);

ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ using TExec = arrow::Status(*)(arrow::compute::KernelContext*, const arrow::comp
2727

2828
class TUdfKernelState : public arrow::compute::KernelState {
2929
public:
30-
TUdfKernelState(const TVector<const TType*>& argTypes, const TType* outputType, bool onlyScalars, const ITypeInfoHelper* typeInfoHelper, const IPgBuilder& pgBuilder)
30+
TUdfKernelState(const TVector<const TType*>& argTypes, const TType* outputType, bool onlyScalars, const ITypeInfoHelper* typeInfoHelper, const IValueBuilder* valueBuilder)
3131
: ArgTypes_(argTypes)
3232
, OutputType_(outputType)
3333
, OnlyScalars_(onlyScalars)
3434
, TypeInfoHelper_(typeInfoHelper)
35-
, PgBuilder_(pgBuilder)
35+
, ValueBuilder_(valueBuilder)
3636
{
3737
Readers_.resize(ArgTypes_.size());
3838
}
@@ -48,7 +48,7 @@ class TUdfKernelState : public arrow::compute::KernelState {
4848
IArrayBuilder& GetArrayBuilder() {
4949
Y_ENSURE(!OnlyScalars_);
5050
if (!ArrayBuilder_) {
51-
ArrayBuilder_ = MakeArrayBuilder(*TypeInfoHelper_, OutputType_, *GetYqlMemoryPool(), TypeInfoHelper_->GetMaxBlockLength(OutputType_), &PgBuilder_);
51+
ArrayBuilder_ = MakeArrayBuilder(*TypeInfoHelper_, OutputType_, *GetYqlMemoryPool(), TypeInfoHelper_->GetMaxBlockLength(OutputType_), &ValueBuilder_->GetPgBuilder());
5252
}
5353

5454
return *ArrayBuilder_;
@@ -62,13 +62,18 @@ class TUdfKernelState : public arrow::compute::KernelState {
6262

6363
return *ScalarBuilder_;
6464
}
65+
66+
const IValueBuilder& GetValueBuilder() {
67+
Y_ENSURE(ValueBuilder_);
68+
return *ValueBuilder_;
69+
}
6570

6671
private:
6772
const TVector<const TType*> ArgTypes_;
6873
const TType* OutputType_;
6974
const bool OnlyScalars_;
7075
const ITypeInfoHelper* TypeInfoHelper_;
71-
const IPgBuilder& PgBuilder_;
76+
const IValueBuilder* ValueBuilder_;
7277
TVector<std::unique_ptr<IBlockReader>> Readers_;
7378
std::unique_ptr<IArrayBuilder> ArrayBuilder_;
7479
std::unique_ptr<IScalarBuilder> ScalarBuilder_;
@@ -157,7 +162,7 @@ class TSimpleArrowUdfImpl : public TBoxedValue {
157162
}
158163
}
159164

160-
TUdfKernelState kernelState(ArgTypes_, OutputType_, OnlyScalars_, TypeInfoHelper_.Get(), valueBuilder->GetPgBuilder());
165+
TUdfKernelState kernelState(ArgTypes_, OutputType_, OnlyScalars_, TypeInfoHelper_.Get(), valueBuilder);
161166
arrow::compute::ExecContext execContext(GetYqlMemoryPool());
162167
arrow::compute::KernelContext kernelContext(&execContext);
163168
kernelContext.SetState(&kernelState);
@@ -351,7 +356,8 @@ TReader* CastToBlockReaderImpl(IBlockReader& reader) {
351356

352357
template <typename TDerived, typename TReader = IBlockReader, typename TArrayBuilderImpl = IArrayBuilder, typename TScalarBuilderImpl = IScalarBuilder>
353358
struct TUnaryKernelExec {
354-
static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
359+
360+
static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
355361
auto& state = dynamic_cast<TUdfKernelState&>(*ctx->state());
356362
auto& reader = state.GetReader(0);
357363
auto* readerImpl = CastToBlockReaderImpl<TReader>(reader);
@@ -362,9 +368,10 @@ struct TUnaryKernelExec {
362368
auto* builderImpl = CastToScalarBuilderImpl<TScalarBuilderImpl>(builder);
363369

364370
auto item = readerImpl->GetScalarItem(*arg.scalar());
365-
TDerived::Process(item, [&](TBlockItem out) {
371+
auto sink = [&](auto out) {
366372
*res = builderImpl->Build(out);
367-
});
373+
};
374+
TDerived::Process(item, state.GetValueBuilder(), sink);
368375
}
369376
else {
370377
auto& array = *arg.array();
@@ -377,9 +384,10 @@ struct TUnaryKernelExec {
377384
for (int64_t i = 0; i < array.length;) {
378385
for (size_t j = 0; j < maxBlockLength && i < array.length; ++j, ++i) {
379386
auto item = readerImpl->GetItem(array, i);
380-
TDerived::Process(item, [&](TBlockItem out) {
387+
auto sink = [&](auto out) {
381388
builderImpl->Add(out);
382-
});
389+
};
390+
TDerived::Process(item, state.GetValueBuilder(), sink);
383391
}
384392
auto outputDatum = builderImpl->Build(false);
385393
ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });
@@ -411,9 +419,11 @@ struct TBinaryKernelExec {
411419

412420
auto item1 = reader1Impl->GetScalarItem(*arg1.scalar());
413421
auto item2 = reader2Impl->GetScalarItem(*arg2.scalar());
414-
TDerived::Process(item1, item2, [&](TBlockItem out) {
422+
423+
auto sink = [&](TBlockItem out) {
415424
*res = builderImpl->Build(out);
416-
});
425+
};
426+
TDerived::Process(item1, item2, state.GetValueBuilder(), sink);
417427
}
418428
else if (arg1.is_scalar() && arg2.is_array()) {
419429
auto item1 = reader1Impl->GetScalarItem(*arg1.scalar());
@@ -427,9 +437,11 @@ struct TBinaryKernelExec {
427437
for (int64_t i = 0; i < array2.length;) {
428438
for (size_t j = 0; j < maxBlockLength && i < array2.length; ++j, ++i) {
429439
auto item2 = reader2Impl->GetItem(array2, i);
430-
TDerived::Process(item1, item2, [&](TBlockItem out) {
440+
441+
auto sink = [&](TBlockItem out) {
431442
builderImpl->Add(out);
432-
});
443+
};
444+
TDerived::Process(item1, item2, state.GetValueBuilder(), sink);
433445
}
434446
auto outputDatum = builder.Build(false);
435447
ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });
@@ -448,9 +460,11 @@ struct TBinaryKernelExec {
448460
for (int64_t i = 0; i < array1.length;) {
449461
for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) {
450462
auto item1 = reader1Impl->GetItem(array1, i);
451-
TDerived::Process(item1, item2, [&](TBlockItem out) {
463+
464+
auto sink = [&](TBlockItem out) {
452465
builderImpl->Add(out);
453-
});
466+
};
467+
TDerived::Process(item1, item2, state.GetValueBuilder(), sink);
454468
}
455469
auto outputDatum = builder.Build(false);
456470
ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });
@@ -470,11 +484,13 @@ struct TBinaryKernelExec {
470484
Y_ENSURE(array1.length == array2.length);
471485
for (int64_t i = 0; i < array1.length;) {
472486
for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) {
473-
auto item1 = reader1.GetItem(array1, i);
474-
auto item2 = reader2.GetItem(array2, i);
475-
TDerived::Process(item1, item2, [&](TBlockItem out) {
487+
auto item1 = reader1Impl->GetItem(array1, i);
488+
auto item2 = reader2Impl->GetItem(array2, i);
489+
490+
auto sink = [&](TBlockItem out) {
476491
builderImpl->Add(out);
477-
});
492+
};
493+
TDerived::Process(item1, item2, state.GetValueBuilder(), sink);
478494
}
479495
auto outputDatum = builder.Build(false);
480496
ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });
@@ -529,9 +545,11 @@ struct TGenericKernelExec {
529545
auto& reader = state.GetReader(k);
530546
args[k] = reader.GetScalarItem(*batch[k].scalar());
531547
}
532-
TDerived::Process(items, [&](TBlockItem out) {
548+
549+
auto sink = [&](TBlockItem out) {
533550
*res = builderImpl->Build(out);
534-
});
551+
};
552+
TDerived::Process(items, state.GetValueBuilder(), sink);
535553
} else {
536554
auto& builder = state.GetArrayBuilder();
537555
auto* builderImpl = CastToArrayBuilderImpl<TArrayBuilderImpl>(builder);
@@ -559,9 +577,11 @@ struct TGenericKernelExec {
559577

560578
args[k] = reader.GetItem(*batch[k].array(), i);
561579
}
562-
TDerived::Process(items, [&](TBlockItem out) {
580+
581+
auto sink = [&](TBlockItem out) {
563582
builderImpl->Add(out);
564-
});
583+
};
584+
TDerived::Process(items, state.GetValueBuilder(), sink);
565585
}
566586
auto outputDatum = builderImpl->Build(false);
567587
ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });

0 commit comments

Comments
 (0)