Skip to content

Commit b0abac7

Browse files
authored
Introduce computation node for BlockMapJoinCore (LeftOnly and LeftSemi) (#7947)
1 parent 0245f52 commit b0abac7

9 files changed

+634
-26
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
#include "mkql_map_join.h"
2+
3+
#include <ydb/library/yql/minikql/computation/mkql_block_builder.h>
4+
#include <ydb/library/yql/minikql/computation/mkql_block_impl.h>
5+
#include <ydb/library/yql/minikql/computation/mkql_block_reader.h>
6+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders_codegen.h>
7+
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
8+
#include <ydb/library/yql/minikql/mkql_node_cast.h>
9+
#include <ydb/library/yql/minikql/mkql_program_builder.h>
10+
11+
namespace NKikimr {
12+
namespace NMiniKQL {
13+
14+
namespace {
15+
16+
size_t CalcMaxBlockLength(const TVector<TType*>& items) {
17+
return CalcBlockLen(std::accumulate(items.cbegin(), items.cend(), 0ULL,
18+
[](size_t max, const TType* type) {
19+
const TType* itemType = AS_TYPE(TBlockType, type)->GetItemType();
20+
return std::max(max, CalcMaxBlockItemSize(itemType));
21+
}));
22+
}
23+
24+
template <bool RightRequired>
25+
class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<RightRequired>>
26+
{
27+
using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<RightRequired>>;
28+
public:
29+
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
30+
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
31+
TVector<ui32>&& leftKeyColumns,
32+
IComputationWideFlowNode* flow, IComputationNode* dict)
33+
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
34+
, ResultJoinItems_(std::move(resultJoinItems))
35+
, LeftFlowItems_(std::move(leftFlowItems))
36+
, LeftKeyColumns_(std::move(leftKeyColumns))
37+
, Flow_(flow)
38+
, Dict_(dict)
39+
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(ResultJoinItems_.size()))
40+
{}
41+
42+
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
43+
auto& s = GetState(state, ctx);
44+
auto** fields = ctx.WideFields.data() + WideFieldsIndex_;
45+
const auto dict = Dict_->GetValue(ctx);
46+
47+
do {
48+
while (s.IsNotFull() && s.NextRow()) {
49+
const auto key = MakeKeysTuple(ctx, s, LeftKeyColumns_);
50+
if (key && dict.Contains(key) == RightRequired) {
51+
s.CopyRow();
52+
}
53+
}
54+
if (!s.IsFinished()) {
55+
switch (Flow_->FetchValues(ctx, fields)) {
56+
case EFetchResult::Yield:
57+
return EFetchResult::Yield;
58+
case EFetchResult::One:
59+
s.Reset();
60+
continue;
61+
case EFetchResult::Finish:
62+
s.Finish();
63+
break;
64+
}
65+
}
66+
// Leave the outer loop, if no values left in the flow.
67+
Y_DEBUG_ABORT_UNLESS(s.IsFinished());
68+
break;
69+
} while (true);
70+
71+
if (s.IsEmpty()) {
72+
return EFetchResult::Finish;
73+
}
74+
s.MakeBlocks();
75+
const auto sliceSize = s.Slice();
76+
77+
for (size_t i = 0; i < ResultJoinItems_.size(); i++) {
78+
if (const auto out = output[i]) {
79+
*out = s.Get(sliceSize, ctx.HolderFactory, i);
80+
}
81+
}
82+
83+
return EFetchResult::One;
84+
}
85+
86+
private:
87+
void RegisterDependencies() const final {
88+
if (const auto flow = this->FlowDependsOn(Flow_)) {
89+
this->DependsOn(flow, Dict_);
90+
}
91+
}
92+
93+
class TState : public TComputationValue<TState> {
94+
using TBase = TComputationValue<TState>;
95+
size_t Current_ = 0;
96+
size_t Next_ = 0;
97+
bool IsFinished_ = false;
98+
size_t MaxLength_;
99+
size_t BuilderAllocatedSize_ = 0;
100+
size_t MaxBuilderAllocatedSize_ = 0;
101+
static const size_t MaxAllocatedFactor_ = 4;
102+
size_t InputRows_ = 0;
103+
size_t OutputRows_ = 0;
104+
size_t InputWidth_;
105+
size_t OutputWidth_;
106+
TUnboxedValueVector Inputs_;
107+
const std::vector<arrow::ValueDescr> InputsDescr_;
108+
TVector<std::deque<std::shared_ptr<arrow::ArrayData>>> Deques;
109+
TVector<std::shared_ptr<arrow::ArrayData>> Arrays;
110+
TVector<std::unique_ptr<IBlockReader>> Readers_;
111+
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
112+
TVector<std::unique_ptr<IArrayBuilder>> Builders_;
113+
114+
public:
115+
TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
116+
const TVector<TType*>& inputItems, const TVector<TType*> outputItems,
117+
NUdf::TUnboxedValue**const fields)
118+
: TBase(memInfo)
119+
, InputWidth_(inputItems.size() - 1)
120+
, OutputWidth_(outputItems.size() - 1)
121+
, Inputs_(inputItems.size())
122+
, InputsDescr_(ToValueDescr(inputItems))
123+
, Deques(OutputWidth_)
124+
, Arrays(OutputWidth_)
125+
{
126+
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
127+
MaxLength_ = CalcMaxBlockLength(outputItems);
128+
for (size_t i = 0; i < inputItems.size(); i++) {
129+
fields[i] = &Inputs_[i];
130+
const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
131+
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
132+
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
133+
}
134+
// The last output column (i.e. block length) doesn't require a block builder.
135+
for (size_t i = 0; i < OutputWidth_; i++) {
136+
const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
137+
Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
138+
}
139+
MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
140+
}
141+
142+
void Reset() {
143+
Next_ = 0;
144+
InputRows_ = GetBlockCount(Inputs_.back());
145+
}
146+
147+
void Finish() {
148+
IsFinished_ = true;
149+
}
150+
151+
bool NextRow() {
152+
if (Next_ >= InputRows_) {
153+
return false;
154+
}
155+
Current_ = Next_++;
156+
return true;
157+
}
158+
159+
bool IsNotFull() {
160+
return OutputRows_ < MaxLength_
161+
&& BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
162+
}
163+
164+
bool IsEmpty() {
165+
return OutputRows_ == 0;
166+
}
167+
168+
bool IsFinished() {
169+
return IsFinished_;
170+
}
171+
172+
TBlockItem GetItem(size_t idx) const {
173+
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
174+
ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
175+
if (datum.is_scalar()) {
176+
return Readers_[idx]->GetScalarItem(*datum.scalar());
177+
}
178+
MKQL_ENSURE(datum.is_array(), "Expecting array");
179+
return Readers_[idx]->GetItem(*datum.array(), Current_);
180+
}
181+
182+
NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
183+
return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
184+
}
185+
186+
void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
187+
Builders_[idx]->Add(value);
188+
}
189+
190+
void AddItem(const TBlockItem& item, size_t idx) {
191+
Builders_[idx]->Add(item);
192+
}
193+
194+
void CopyRow() {
195+
// Copy items from the "left" flow.
196+
for (size_t i = 0; i < InputWidth_; i++) {
197+
AddItem(GetItem(i), i);
198+
}
199+
OutputRows_++;
200+
}
201+
202+
void CopyArray(size_t idx, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) {
203+
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
204+
Y_ENSURE(datum.is_array());
205+
Builders_[idx]->AddMany(*datum.array(), popCount, sparseBitmap, bitmapSize);
206+
}
207+
208+
void MakeBlocks() {
209+
if (OutputRows_ == 0) {
210+
return;
211+
}
212+
BuilderAllocatedSize_ = 0;
213+
214+
for (size_t i = 0; i < Builders_.size(); i++) {
215+
const auto& datum = Builders_[i]->Build(IsFinished_);
216+
Deques[i].clear();
217+
MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
218+
ForEachArrayData(datum, [this, i](const auto& arrayData) {
219+
Deques[i].push_back(arrayData);
220+
});
221+
}
222+
}
223+
224+
ui64 Slice() {
225+
auto sliceSize = OutputRows_;
226+
for (size_t i = 0; i < Deques.size(); i++) {
227+
const auto& arrays = Deques[i];
228+
if (arrays.empty()) {
229+
continue;
230+
}
231+
Y_ABORT_UNLESS(ui64(arrays.front()->length) <= OutputRows_);
232+
sliceSize = std::min<ui64>(sliceSize, arrays.front()->length);
233+
}
234+
235+
for (size_t i = 0; i < Arrays.size(); i++) {
236+
auto& arrays = Deques[i];
237+
if (arrays.empty()) {
238+
continue;
239+
}
240+
if (auto& head = arrays.front(); ui64(head->length) == sliceSize) {
241+
Arrays[i] = std::move(head);
242+
arrays.pop_front();
243+
} else {
244+
Arrays[i] = Chop(head, sliceSize);
245+
}
246+
}
247+
248+
OutputRows_ -= sliceSize;
249+
return sliceSize;
250+
}
251+
252+
NUdf::TUnboxedValuePod Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const {
253+
MKQL_ENSURE(idx <= OutputWidth_, "Deques index overflow");
254+
// Return the slice length as the last column value (i.e. block length).
255+
if (idx == OutputWidth_) {
256+
return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
257+
}
258+
if (auto array = Arrays[idx]) {
259+
return holderFactory.CreateArrowBlock(std::move(array));
260+
} else {
261+
return NUdf::TUnboxedValuePod();
262+
}
263+
}
264+
265+
};
266+
267+
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
268+
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
269+
}
270+
271+
TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
272+
if (!state.HasValue()) {
273+
MakeState(ctx, state);
274+
}
275+
return *static_cast<TState*>(state.AsBoxed().Get());
276+
}
277+
278+
NUdf::TUnboxedValue MakeKeysTuple(const TComputationContext& ctx, const TState& state, const TVector<ui32>& keyColumns) const {
279+
// TODO: Handle complex key.
280+
// TODO: Handle converters.
281+
return state.GetValue(ctx.HolderFactory, keyColumns.front());
282+
}
283+
284+
const TVector<TType*> ResultJoinItems_;
285+
const TVector<TType*> LeftFlowItems_;
286+
const TVector<ui32> LeftKeyColumns_;
287+
IComputationWideFlowNode* const Flow_;
288+
IComputationNode* const Dict_;
289+
ui32 WideFieldsIndex_;
290+
};
291+
292+
} // namespace
293+
294+
IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
295+
MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
296+
297+
const auto joinType = callable.GetType()->GetReturnType();
298+
MKQL_ENSURE(joinType->IsFlow(), "Expected WideFlow as a resulting stream");
299+
const auto joinFlowType = AS_TYPE(TFlowType, joinType);
300+
MKQL_ENSURE(joinFlowType->GetItemType()->IsMulti(),
301+
"Expected Multi as a resulting item type");
302+
const auto joinComponents = GetWideComponents(joinFlowType);
303+
MKQL_ENSURE(joinComponents.size() > 0, "Expected at least one column");
304+
const TVector<TType*> joinItems(joinComponents.cbegin(), joinComponents.cend());
305+
306+
const auto leftFlowNode = callable.GetInput(0);
307+
MKQL_ENSURE(leftFlowNode.GetStaticType()->IsFlow(),
308+
"Expected WideFlow as a left stream");
309+
const auto leftFlowType = AS_TYPE(TFlowType, leftFlowNode);
310+
MKQL_ENSURE(leftFlowType->GetItemType()->IsMulti(),
311+
"Expected Multi as a left stream item type");
312+
const auto leftFlowComponents = GetWideComponents(leftFlowType);
313+
MKQL_ENSURE(leftFlowComponents.size() > 0, "Expected at least one column");
314+
const TVector<TType*> leftFlowItems(leftFlowComponents.cbegin(), leftFlowComponents.cend());
315+
316+
const auto joinKindNode = callable.GetInput(2);
317+
const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();
318+
const auto joinKind = GetJoinKind(rawKind);
319+
// TODO: Handle other join types.
320+
Y_ENSURE(joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly);
321+
322+
const auto tupleLiteral = AS_VALUE(TTupleLiteral, callable.GetInput(3));
323+
TVector<ui32> leftKeyColumns;
324+
leftKeyColumns.reserve(tupleLiteral->GetValuesCount());
325+
for (ui32 i = 0; i < tupleLiteral->GetValuesCount(); i++) {
326+
const auto item = AS_VALUE(TDataLiteral, tupleLiteral->GetValue(i));
327+
leftKeyColumns.emplace_back(item->AsValue().Get<ui32>());
328+
}
329+
// TODO: Handle multi keys.
330+
Y_ENSURE(leftKeyColumns.size() == 1);
331+
332+
const auto flow = LocateNode(ctx.NodeLocator, callable, 0);
333+
const auto dict = LocateNode(ctx.NodeLocator, callable, 1);
334+
335+
switch (joinKind) {
336+
case EJoinKind::LeftSemi:
337+
return new TBlockWideMapJoinWrapper<true>(ctx.Mutables, std::move(joinItems),
338+
std::move(leftFlowItems), std::move(leftKeyColumns),
339+
static_cast<IComputationWideFlowNode*>(flow), dict);
340+
case EJoinKind::LeftOnly:
341+
return new TBlockWideMapJoinWrapper<false>(ctx.Mutables, std::move(joinItems),
342+
std::move(leftFlowItems), std::move(leftKeyColumns),
343+
static_cast<IComputationWideFlowNode*>(flow), dict);
344+
default:
345+
Y_ABORT();
346+
}
347+
}
348+
349+
} // namespace NMiniKQL
350+
} // namespace NKikimr
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#pragma once
2+
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
3+
4+
namespace NKikimr {
5+
namespace NMiniKQL {
6+
7+
IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx);
8+
9+
} // NKikimr
10+
} // NMiniKQL

ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "mkql_block_if.h"
1515
#include "mkql_block_just.h"
1616
#include "mkql_block_logical.h"
17+
#include "mkql_block_map_join.h"
1718
#include "mkql_block_compress.h"
1819
#include "mkql_block_skiptake.h"
1920
#include "mkql_block_top.h"
@@ -309,6 +310,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
309310
{"BlockMergeFinalizeHashed", &WrapBlockMergeFinalizeHashed},
310311
{"BlockMergeManyFinalizeHashed", &WrapBlockMergeManyFinalizeHashed},
311312
{"ScalarApply", &WrapScalarApply},
313+
{"BlockMapJoinCore", &WrapBlockMapJoinCore},
312314
{"MakeHeap", &WrapMakeHeap},
313315
{"PushHeap", &WrapPushHeap},
314316
{"PopHeap", &WrapPopHeap},

0 commit comments

Comments
 (0)