Skip to content

Commit e8c31b0

Browse files
committed
add sink as lvalue
1 parent b36b77c commit e8c31b0

File tree

1 file changed

+30
-17
lines changed

1 file changed

+30
-17
lines changed

ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -368,9 +368,10 @@ static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute
368368
auto* builderImpl = CastToScalarBuilderImpl<TScalarBuilderImpl>(builder);
369369

370370
auto item = readerImpl->GetScalarItem(*arg.scalar());
371-
TDerived::Process(item, state.GetValueBuilder(), [&](auto out) {
371+
auto sink = [&](auto out) {
372372
*res = builderImpl->Build(out);
373-
});
373+
};
374+
TDerived::Process(item, state.GetValueBuilder(), sink);
374375
}
375376
else {
376377
auto& array = *arg.array();
@@ -383,9 +384,10 @@ static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute
383384
for (int64_t i = 0; i < array.length;) {
384385
for (size_t j = 0; j < maxBlockLength && i < array.length; ++j, ++i) {
385386
auto item = readerImpl->GetItem(array, i);
386-
TDerived::Process(item, state.GetValueBuilder(), [&](auto out) {
387-
builder.Add(out);
388-
});
387+
auto sink = [&](auto out) {
388+
builderImpl->Add(out);
389+
};
390+
TDerived::Process(item, state.GetValueBuilder(), sink);
389391
}
390392
auto outputDatum = builderImpl->Build(false);
391393
ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });
@@ -418,9 +420,10 @@ struct TBinaryKernelExec {
418420
auto item1 = reader1Impl->GetScalarItem(*arg1.scalar());
419421
auto item2 = reader2Impl->GetScalarItem(*arg2.scalar());
420422

421-
TDerived::Process(item1, item2, state.GetValueBuilder(), [&](TBlockItem out) {
423+
auto sink = [&](TBlockItem out) {
422424
*res = builderImpl->Build(out);
423-
});
425+
};
426+
TDerived::Process(item1, item2, state.GetValueBuilder(), sink);
424427
}
425428
else if (arg1.is_scalar() && arg2.is_array()) {
426429
auto item1 = reader1Impl->GetScalarItem(*arg1.scalar());
@@ -434,9 +437,11 @@ struct TBinaryKernelExec {
434437
for (int64_t i = 0; i < array2.length;) {
435438
for (size_t j = 0; j < maxBlockLength && i < array2.length; ++j, ++i) {
436439
auto item2 = reader2Impl->GetItem(array2, i);
437-
TDerived::Process(item1, item2, state.GetValueBuilder(), [&](TBlockItem out) {
440+
441+
auto sink = [&](TBlockItem out) {
438442
builderImpl->Add(out);
439-
});
443+
};
444+
TDerived::Process(item1, item2, state.GetValueBuilder(), sink);
440445
}
441446
auto outputDatum = builder.Build(false);
442447
ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });
@@ -455,9 +460,11 @@ struct TBinaryKernelExec {
455460
for (int64_t i = 0; i < array1.length;) {
456461
for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) {
457462
auto item1 = reader1Impl->GetItem(array1, i);
458-
TDerived::Process(item1, item2, state.GetValueBuilder(), [&](TBlockItem out) {
463+
464+
auto sink = [&](TBlockItem out) {
459465
builderImpl->Add(out);
460-
});
466+
};
467+
TDerived::Process(item1, item2, state.GetValueBuilder(), sink);
461468
}
462469
auto outputDatum = builder.Build(false);
463470
ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });
@@ -479,9 +486,11 @@ struct TBinaryKernelExec {
479486
for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) {
480487
auto item1 = reader1Impl->GetItem(array1, i);
481488
auto item2 = reader2Impl->GetItem(array2, i);
482-
TDerived::Process(item1, item2, state.GetValueBuilder(), [&](TBlockItem out) {
489+
490+
auto sink = [&](TBlockItem out) {
483491
builderImpl->Add(out);
484-
});
492+
};
493+
TDerived::Process(item1, item2, state.GetValueBuilder(), sink);
485494
}
486495
auto outputDatum = builder.Build(false);
487496
ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });
@@ -536,9 +545,11 @@ struct TGenericKernelExec {
536545
auto& reader = state.GetReader(k);
537546
args[k] = reader.GetScalarItem(*batch[k].scalar());
538547
}
539-
TDerived::Process(items, state.GetValueBuilder(), [&](TBlockItem out) {
548+
549+
auto sink = [&](TBlockItem out) {
540550
*res = builderImpl->Build(out);
541-
});
551+
};
552+
TDerived::Process(items, state.GetValueBuilder(), sink);
542553
} else {
543554
auto& builder = state.GetArrayBuilder();
544555
auto* builderImpl = CastToArrayBuilderImpl<TArrayBuilderImpl>(builder);
@@ -566,9 +577,11 @@ struct TGenericKernelExec {
566577

567578
args[k] = reader.GetItem(*batch[k].array(), i);
568579
}
569-
TDerived::Process(items, state.GetValueBuilder(), [&](TBlockItem out) {
580+
581+
auto sink = [&](TBlockItem out) {
570582
builderImpl->Add(out);
571-
});
583+
};
584+
TDerived::Process(items, state.GetValueBuilder(), sink);
572585
}
573586
auto outputDatum = builderImpl->Build(false);
574587
ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });

0 commit comments

Comments
 (0)