Skip to content

Commit 0199538

Browse files
committed
value builder
1 parent ea9ad92 commit 0199538

File tree

3 files changed

+68
-85
lines changed

3 files changed

+68
-85
lines changed

ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -357,15 +357,6 @@ TReader* CastToBlockReaderImpl(IBlockReader& reader) {
357357
template <typename TDerived, typename TReader = IBlockReader, typename TArrayBuilderImpl = IArrayBuilder, typename TScalarBuilderImpl = IScalarBuilder>
358358
struct TUnaryKernelExec {
359359

360-
template<typename TSink>
361-
static void Process(const TBlockItem& arg, TUdfKernelState& state, const TSink& sink) {
362-
if constexpr (std::is_invocable_v<decltype(&TDerived::template Process<TSink>), TBlockItem, TUdfKernelState&, TSink&>) {
363-
TDerived::Process(arg, state, sink);
364-
} else {
365-
TDerived::Process(arg, sink);
366-
}
367-
}
368-
369360
static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
370361
auto& state = dynamic_cast<TUdfKernelState&>(*ctx->state());
371362
auto& reader = state.GetReader(0);
@@ -377,7 +368,7 @@ static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute
377368
auto* builderImpl = CastToScalarBuilderImpl<TScalarBuilderImpl>(builder);
378369

379370
auto item = readerImpl->GetScalarItem(*arg.scalar());
380-
TDerived::Process(item, [&](TBlockItem out) {
371+
TDerived::Process(item, state.GetValueBuilder(), [&](auto out) {
381372
*res = builderImpl->Build(out);
382373
});
383374
}
@@ -392,7 +383,7 @@ static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute
392383
for (int64_t i = 0; i < array.length;) {
393384
for (size_t j = 0; j < maxBlockLength && i < array.length; ++j, ++i) {
394385
auto item = readerImpl->GetItem(array, i);
395-
TDerived::Process(item, [&](TBlockItem out) {
386+
TDerived::Process(item, state.GetValueBuilder(), [&](auto out) {
396387
builderImpl->Add(out);
397388
});
398389
}
@@ -409,15 +400,6 @@ static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute
409400

410401
template <typename TDerived, typename TReader1 = IBlockReader, typename TReader2 = IBlockReader, typename TArrayBuilderImpl = IArrayBuilder, typename TScalarBuilderImpl = IScalarBuilder>
411402
struct TBinaryKernelExec {
412-
template<typename TSink>
413-
static void Process(const TBlockItem& arg1, const TBlockItem& arg2, TUdfKernelState& state, const TSink& sink) {
414-
if constexpr (std::is_invocable_v<decltype(&TDerived::template Process<TSink>), TBlockItem, TBlockItem, TUdfKernelState&, TSink&>) {
415-
TDerived::Process(arg1, arg2, state, sink);
416-
} else {
417-
TDerived::Process(arg1, arg2, sink);
418-
}
419-
}
420-
421403
static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
422404
auto& state = dynamic_cast<TUdfKernelState&>(*ctx->state());
423405

@@ -435,7 +417,8 @@ struct TBinaryKernelExec {
435417

436418
auto item1 = reader1Impl->GetScalarItem(*arg1.scalar());
437419
auto item2 = reader2Impl->GetScalarItem(*arg2.scalar());
438-
TDerived::Process(item1, item2, [&](TBlockItem out) {
420+
421+
TDerived::Process(item1, item2, state.GetValueBuilder(), [&](TBlockItem out) {
439422
*res = builderImpl->Build(out);
440423
});
441424
}
@@ -451,7 +434,7 @@ struct TBinaryKernelExec {
451434
for (int64_t i = 0; i < array2.length;) {
452435
for (size_t j = 0; j < maxBlockLength && i < array2.length; ++j, ++i) {
453436
auto item2 = reader2Impl->GetItem(array2, i);
454-
TDerived::Process(item1, item2, [&](TBlockItem out) {
437+
TDerived::Process(item1, item2, state.GetValueBuilder(), [&](TBlockItem out) {
455438
builderImpl->Add(out);
456439
});
457440
}
@@ -472,7 +455,7 @@ struct TBinaryKernelExec {
472455
for (int64_t i = 0; i < array1.length;) {
473456
for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) {
474457
auto item1 = reader1Impl->GetItem(array1, i);
475-
TDerived::Process(item1, item2, [&](TBlockItem out) {
458+
TDerived::Process(item1, item2, state.GetValueBuilder(), [&](TBlockItem out) {
476459
builderImpl->Add(out);
477460
});
478461
}
@@ -494,9 +477,9 @@ struct TBinaryKernelExec {
494477
Y_ENSURE(array1.length == array2.length);
495478
for (int64_t i = 0; i < array1.length;) {
496479
for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) {
497-
auto item1 = reader1.GetItem(array1, i);
498-
auto item2 = reader2.GetItem(array2, i);
499-
TDerived::Process(item1, item2, [&](TBlockItem out) {
480+
auto item1 = reader1Impl->GetItem(array1, i);
481+
auto item2 = reader2Impl->GetItem(array2, i);
482+
TDerived::Process(item1, item2, state.GetValueBuilder(), [&](TBlockItem out) {
500483
builderImpl->Add(out);
501484
});
502485
}
@@ -553,7 +536,7 @@ struct TGenericKernelExec {
553536
auto& reader = state.GetReader(k);
554537
args[k] = reader.GetScalarItem(*batch[k].scalar());
555538
}
556-
TDerived::Process(items, [&](TBlockItem out) {
539+
TDerived::Process(items, state.GetValueBuilder(), [&](TBlockItem out) {
557540
*res = builderImpl->Build(out);
558541
});
559542
} else {
@@ -583,7 +566,7 @@ struct TGenericKernelExec {
583566

584567
args[k] = reader.GetItem(*batch[k].array(), i);
585568
}
586-
TDerived::Process(items, [&](TBlockItem out) {
569+
TDerived::Process(items, state.GetValueBuilder(), [&](TBlockItem out) {
587570
builderImpl->Add(out);
588571
});
589572
}

ydb/library/yql/udfs/common/string/string_udf.cpp

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,24 @@ using namespace NUdf;
3131

3232
namespace {
3333

34-
#define STRING_UDF(udfName, function) \
35-
BEGIN_SIMPLE_STRICT_ARROW_UDF(T##udfName, char*(TAutoMap<char*>)) { \
36-
const TString input(args[0].AsStringRef()); \
37-
const auto& result = function(input); \
38-
return valueBuilder->NewString(result); \
39-
} \
40-
\
41-
struct T##udfName##KernelExec \
42-
: public TUnaryKernelExec<T##udfName##KernelExec> \
43-
{ \
44-
template <typename TSink> \
45-
static void Process(TBlockItem arg1, const TSink& sink) { \
46-
const TString input(arg1.AsStringRef()); \
47-
const auto& result = function(input); \
48-
sink(TBlockItem(result)); \
49-
} \
50-
}; \
51-
\
34+
#define STRING_UDF(udfName, function) \
35+
BEGIN_SIMPLE_STRICT_ARROW_UDF(T##udfName, char*(TAutoMap<char*>)) { \
36+
const TString input(args[0].AsStringRef()); \
37+
const auto& result = function(input); \
38+
return valueBuilder->NewString(result); \
39+
} \
40+
\
41+
struct T##udfName##KernelExec \
42+
: public TUnaryKernelExec<T##udfName##KernelExec> \
43+
{ \
44+
template <typename TSink> \
45+
static void Process(TBlockItem arg1, const IValueBuilder&, const TSink& sink) { \
46+
const TString input(arg1.AsStringRef()); \
47+
const auto& result = function(input); \
48+
sink(TBlockItem(result)); \
49+
} \
50+
}; \
51+
\
5252
END_SIMPLE_ARROW_UDF(T##udfName, T##udfName##KernelExec::Do)
5353

5454

@@ -69,7 +69,7 @@ namespace {
6969
: public TUnaryKernelExec<T##udfName##KernelExec> \
7070
{ \
7171
template <typename TSink> \
72-
static void Process(TBlockItem arg1, const TSink& sink) { \
72+
static void Process(TBlockItem arg1, const IValueBuilder&, const TSink& sink) { \
7373
if (!arg1) { \
7474
return sink(TBlockItem()); \
7575
} \
@@ -126,7 +126,7 @@ namespace {
126126
: public TUnaryKernelExec<T##udfName##KernelExec> \
127127
{ \
128128
template <typename TSink> \
129-
static void Process(TBlockItem arg1, const TSink& sink) { \
129+
static void Process(TBlockItem arg1, const IValueBuilder&, const TSink& sink) { \
130130
TString input(arg1.AsStringRef()); \
131131
if (input.function()) { \
132132
sink(TBlockItem(input)); \
@@ -185,7 +185,7 @@ namespace {
185185
: public TUnaryKernelExec<T##function##KernelExec> \
186186
{ \
187187
template <typename TSink> \
188-
static void Process(TBlockItem arg1, const TSink& sink) { \
188+
static void Process(TBlockItem arg1, const IValueBuilder&, const TSink& sink) { \
189189
if (arg1) { \
190190
const TStringBuf input(arg1.AsStringRef()); \
191191
bool result = true; \
@@ -231,7 +231,7 @@ namespace {
231231
: public TGenericKernelExec<T##function##KernelExec, 3> \
232232
{ \
233233
template <typename TSink> \
234-
static void Process(TBlockItem args, const TSink& sink) { \
234+
static void Process(TBlockItem args, const IValueBuilder&, const TSink& sink) { \
235235
TStringStream result; \
236236
const TStringBuf input(args.GetElement(0).AsStringRef()); \
237237
char paddingSymbol = ' '; \
@@ -264,7 +264,7 @@ namespace {
264264
: public TUnaryKernelExec<T##function##KernelExec> \
265265
{ \
266266
template <typename TSink> \
267-
static void Process(TBlockItem arg1, const TSink& sink) { \
267+
static void Process(TBlockItem arg1, const IValueBuilder&, const TSink& sink) { \
268268
TStringStream result; \
269269
result << function(arg1.Get<argType>()); \
270270
sink(TBlockItem(TStringRef(result.Data(), result.Size()))); \
@@ -285,7 +285,7 @@ namespace {
285285
: public TUnaryKernelExec<T##function##KernelExec> \
286286
{ \
287287
template <typename TSink> \
288-
static void Process(TBlockItem arg1, const TSink& sink) { \
288+
static void Process(TBlockItem arg1, const IValueBuilder&, const TSink& sink) { \
289289
TStringStream result; \
290290
const TStringBuf input(arg1.AsStringRef()); \
291291
result << function(input); \
@@ -307,7 +307,7 @@ namespace {
307307
: public TUnaryKernelExec<T##udfName##KernelExec> \
308308
{ \
309309
template <typename TSink> \
310-
static void Process(TBlockItem arg1, const TSink& sink) { \
310+
static void Process(TBlockItem arg1, const IValueBuilder&, const TSink& sink) { \
311311
TStringStream result; \
312312
result << HumanReadableSize(arg1.Get<ui64>(), hrSize); \
313313
sink(TBlockItem(TStringRef(result.Data(), result.Size()))); \
@@ -414,7 +414,7 @@ namespace {
414414
: public TBinaryKernelExec<TCollapseTextKernelExec>
415415
{
416416
template <typename TSink>
417-
static void Process(TBlockItem arg1, TBlockItem arg2, const TSink& sink) {
417+
static void Process(TBlockItem arg1, TBlockItem arg2, const IValueBuilder&, const TSink& sink) {
418418
TString input(arg1.AsStringRef());
419419
ui64 maxLength = arg2.Get<ui64>();
420420
CollapseText(input, maxLength);
@@ -437,7 +437,7 @@ namespace {
437437

438438
struct TContainsKernelExec : public TBinaryKernelExec<TContainsKernelExec> {
439439
template <typename TSink>
440-
static void Process(TBlockItem arg1, TBlockItem arg2, const TSink& sink) {
440+
static void Process(TBlockItem arg1, TBlockItem arg2, const IValueBuilder&, const TSink& sink) {
441441
if (!arg1)
442442
return sink(TBlockItem(false));
443443

@@ -461,7 +461,7 @@ namespace {
461461
: public TGenericKernelExec<TReplaceAllKernelExec, 3>
462462
{
463463
template <typename TSink>
464-
static void Process(TBlockItem args, const TSink& sink) {
464+
static void Process(TBlockItem args, const IValueBuilder&, const TSink& sink) {
465465
TString result(args.GetElement(0).AsStringRef());
466466
const TStringBuf what(args.GetElement(1).AsStringRef());
467467
const TStringBuf with(args.GetElement(2).AsStringRef());
@@ -490,7 +490,7 @@ namespace {
490490
: public TGenericKernelExec<TReplaceFirstKernelExec, 3>
491491
{
492492
template <typename TSink>
493-
static void Process(TBlockItem args, const TSink& sink) {
493+
static void Process(TBlockItem args, const IValueBuilder&, const TSink& sink) {
494494
std::string result(args.GetElement(0).AsStringRef());
495495
const std::string_view what(args.GetElement(1).AsStringRef());
496496
const std::string_view with(args.GetElement(2).AsStringRef());
@@ -519,7 +519,7 @@ namespace {
519519
: public TGenericKernelExec<TReplaceLastKernelExec, 3>
520520
{
521521
template <typename TSink>
522-
static void Process(TBlockItem args, const TSink& sink) {
522+
static void Process(TBlockItem args, const IValueBuilder&, const TSink& sink) {
523523
std::string result(args.GetElement(0).AsStringRef());
524524
const std::string_view what(args.GetElement(1).AsStringRef());
525525
const std::string_view with(args.GetElement(2).AsStringRef());
@@ -558,7 +558,7 @@ namespace {
558558
: public TBinaryKernelExec<TRemoveAllKernelExec>
559559
{
560560
template <typename TSink>
561-
static void Process(TBlockItem arg1, TBlockItem arg2, const TSink& sink) {
561+
static void Process(TBlockItem arg1, TBlockItem arg2, const IValueBuilder&, const TSink& sink) {
562562
std::string input(arg1.AsStringRef());
563563
const std::string_view remove(arg2.AsStringRef());
564564
std::array<bool, 256> chars{};
@@ -602,7 +602,7 @@ namespace {
602602
: public TBinaryKernelExec<TRemoveFirstKernelExec>
603603
{
604604
template <typename TSink>
605-
static void Process(TBlockItem arg1, TBlockItem arg2, const TSink& sink) {
605+
static void Process(TBlockItem arg1, TBlockItem arg2, const IValueBuilder&, const TSink& sink) {
606606
std::string input(arg1.AsStringRef());
607607
const std::string_view remove(arg2.AsStringRef());
608608
std::array<bool, 256> chars{};
@@ -642,7 +642,7 @@ namespace {
642642
: public TBinaryKernelExec<TRemoveLastKernelExec>
643643
{
644644
template <typename TSink>
645-
static void Process(TBlockItem arg1, TBlockItem arg2, const TSink& sink) {
645+
static void Process(TBlockItem arg1, TBlockItem arg2, const IValueBuilder&, const TSink& sink) {
646646
std::string input(arg1.AsStringRef());
647647
const std::string_view remove(arg2.AsStringRef());
648648
std::array<bool, 256> chars{};
@@ -789,7 +789,7 @@ namespace {
789789

790790
struct TLevensteinDistanceKernelExec : public TBinaryKernelExec<TLevensteinDistanceKernelExec> {
791791
template <typename TSink>
792-
static void Process(TBlockItem arg1, TBlockItem arg2, const TSink& sink) {
792+
static void Process(TBlockItem arg1, TBlockItem arg2, const IValueBuilder&, const TSink& sink) {
793793
const std::string_view left(arg1.AsStringRef());
794794
const std::string_view right(arg2.AsStringRef());
795795
const ui64 result = NLevenshtein::Distance(left, right);
@@ -811,7 +811,7 @@ namespace {
811811
: public TUnaryKernelExec<THumanReadableDurationKernelExec>
812812
{
813813
template <typename TSink>
814-
static void Process(TBlockItem arg1, const TSink& sink) {
814+
static void Process(TBlockItem arg1, const IValueBuilder&, const TSink& sink) {
815815
TStringStream result;
816816
result << HumanReadable(TDuration::MicroSeconds(arg1.Get<ui64>()));
817817
sink(TBlockItem(TStringRef(result.Data(), result.Size())));
@@ -829,7 +829,7 @@ namespace {
829829

830830
struct TPrecKernelExec : public TBinaryKernelExec<TPrecKernelExec> {
831831
template <typename TSink>
832-
static void Process(TBlockItem arg1, TBlockItem arg2, const TSink& sink) {
832+
static void Process(TBlockItem arg1, TBlockItem arg2, const IValueBuilder&, const TSink& sink) {
833833
TStringStream result;
834834
result << Prec(arg1.Get<double>(), arg2.Get<ui64>());
835835
sink(TBlockItem(TStringRef(result.Data(), result.Size())));

0 commit comments

Comments
 (0)