Skip to content

Commit 038c363

Browse files
authored
Merge a863229 into d9565af
2 parents d9565af + a863229 commit 038c363

File tree

2 files changed

+149
-201
lines changed

2 files changed

+149
-201
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp

Lines changed: 146 additions & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,154 @@ size_t CalcMaxBlockLength(const TVector<TType*>& items) {
2121
}));
2222
}
2323

24+
template <bool RightRequired>
25+
class TBlockJoinState : public TBlockState {
26+
public:
27+
TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
28+
const TVector<TType*>& inputItems,
29+
const TVector<TType*> outputItems,
30+
NUdf::TUnboxedValue**const fields)
31+
: TBlockState(memInfo, outputItems.size())
32+
, InputWidth_(inputItems.size() - 1)
33+
, OutputWidth_(outputItems.size() - 1)
34+
, Inputs_(inputItems.size())
35+
, InputsDescr_(ToValueDescr(inputItems))
36+
{
37+
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
38+
MaxLength_ = CalcMaxBlockLength(outputItems);
39+
for (size_t i = 0; i < inputItems.size(); i++) {
40+
fields[i] = &Inputs_[i];
41+
const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
42+
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
43+
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
44+
}
45+
// The last output column (i.e. block length) doesn't require a block builder.
46+
for (size_t i = 0; i < OutputWidth_; i++) {
47+
const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
48+
Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
49+
}
50+
MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
51+
}
52+
53+
void CopyRow() {
54+
// Copy items from the "left" flow.
55+
for (size_t i = 0; i < InputWidth_; i++) {
56+
AddItem(GetItem(i), i);
57+
}
58+
OutputRows_++;
59+
}
60+
61+
void MakeRow(const NUdf::TUnboxedValuePod& value) {
62+
// Copy items from the "left" flow.
63+
for (size_t i = 0; i < InputWidth_; i++) {
64+
AddItem(GetItem(i), i);
65+
}
66+
// Convert and append items from the "right" dict.
67+
if constexpr (RightRequired) {
68+
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
69+
AddValue(value.GetElement(j), i);
70+
}
71+
} else {
72+
if (value) {
73+
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
74+
AddValue(value.GetElement(j), i);
75+
}
76+
} else {
77+
for (size_t i = InputWidth_; i < OutputWidth_; i++) {
78+
AddValue(value, i);
79+
}
80+
}
81+
}
82+
OutputRows_++;
83+
}
84+
85+
void MakeBlocks(const THolderFactory& holderFactory) {
86+
Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputRows_)));
87+
OutputRows_ = 0;
88+
BuilderAllocatedSize_ = 0;
89+
90+
for (size_t i = 0; i < Builders_.size(); i++) {
91+
Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(IsFinished_));
92+
}
93+
FillArrays();
94+
}
95+
96+
TBlockItem GetItem(size_t idx) const {
97+
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
98+
ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
99+
if (datum.is_scalar()) {
100+
return Readers_[idx]->GetScalarItem(*datum.scalar());
101+
}
102+
MKQL_ENSURE(datum.is_array(), "Expecting array");
103+
return Readers_[idx]->GetItem(*datum.array(), Current_);
104+
}
105+
106+
NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
107+
return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
108+
}
109+
110+
void Reset() {
111+
Next_ = 0;
112+
InputRows_ = GetBlockCount(Inputs_.back());
113+
}
114+
115+
void Finish() {
116+
IsFinished_ = true;
117+
}
118+
119+
bool NextRow() {
120+
if (Next_ >= InputRows_) {
121+
return false;
122+
}
123+
Current_ = Next_++;
124+
return true;
125+
}
126+
127+
bool IsNotFull() const {
128+
return OutputRows_ < MaxLength_
129+
&& BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
130+
}
131+
132+
bool IsEmpty() const {
133+
return OutputRows_ == 0;
134+
}
135+
136+
bool IsFinished() const {
137+
return IsFinished_;
138+
}
139+
140+
private:
141+
void AddItem(const TBlockItem& item, size_t idx) {
142+
Builders_[idx]->Add(item);
143+
}
144+
145+
void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
146+
Builders_[idx]->Add(value);
147+
}
148+
149+
size_t Current_ = 0;
150+
size_t Next_ = 0;
151+
bool IsFinished_ = false;
152+
size_t MaxLength_;
153+
size_t BuilderAllocatedSize_ = 0;
154+
size_t MaxBuilderAllocatedSize_ = 0;
155+
static const size_t MaxAllocatedFactor_ = 4;
156+
size_t InputRows_ = 0;
157+
size_t OutputRows_ = 0;
158+
size_t InputWidth_;
159+
size_t OutputWidth_;
160+
TUnboxedValueVector Inputs_;
161+
const std::vector<arrow::ValueDescr> InputsDescr_;
162+
TVector<std::unique_ptr<IBlockReader>> Readers_;
163+
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
164+
TVector<std::unique_ptr<IArrayBuilder>> Builders_;
165+
};
166+
24167
template <bool WithoutRight, bool RightRequired>
25168
class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>
26169
{
27170
using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>;
171+
using TState = TBlockJoinState<RightRequired>;
28172
public:
29173
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
30174
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
@@ -36,7 +180,7 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapp
36180
, LeftKeyColumns_(std::move(leftKeyColumns))
37181
, Flow_(flow)
38182
, Dict_(dict)
39-
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(ResultJoinItems_.size()))
183+
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
40184
{}
41185

42186
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -79,7 +223,7 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapp
79223
if (s.IsEmpty()) {
80224
return EFetchResult::Finish;
81225
}
82-
s.MakeBlocks();
226+
s.MakeBlocks(ctx.HolderFactory);
83227
const auto sliceSize = s.Slice();
84228

85229
for (size_t i = 0; i < ResultJoinItems_.size(); i++) {
@@ -98,198 +242,6 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapp
98242
}
99243
}
100244

101-
class TState : public TComputationValue<TState> {
102-
using TBase = TComputationValue<TState>;
103-
size_t Current_ = 0;
104-
size_t Next_ = 0;
105-
bool IsFinished_ = false;
106-
size_t MaxLength_;
107-
size_t BuilderAllocatedSize_ = 0;
108-
size_t MaxBuilderAllocatedSize_ = 0;
109-
static const size_t MaxAllocatedFactor_ = 4;
110-
size_t InputRows_ = 0;
111-
size_t OutputRows_ = 0;
112-
size_t InputWidth_;
113-
size_t OutputWidth_;
114-
TUnboxedValueVector Inputs_;
115-
const std::vector<arrow::ValueDescr> InputsDescr_;
116-
TVector<std::deque<std::shared_ptr<arrow::ArrayData>>> Deques;
117-
TVector<std::shared_ptr<arrow::ArrayData>> Arrays;
118-
TVector<std::unique_ptr<IBlockReader>> Readers_;
119-
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
120-
TVector<std::unique_ptr<IArrayBuilder>> Builders_;
121-
122-
public:
123-
TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
124-
const TVector<TType*>& inputItems, const TVector<TType*> outputItems,
125-
NUdf::TUnboxedValue**const fields)
126-
: TBase(memInfo)
127-
, InputWidth_(inputItems.size() - 1)
128-
, OutputWidth_(outputItems.size() - 1)
129-
, Inputs_(inputItems.size())
130-
, InputsDescr_(ToValueDescr(inputItems))
131-
, Deques(OutputWidth_)
132-
, Arrays(OutputWidth_)
133-
{
134-
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
135-
MaxLength_ = CalcMaxBlockLength(outputItems);
136-
for (size_t i = 0; i < inputItems.size(); i++) {
137-
fields[i] = &Inputs_[i];
138-
const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
139-
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
140-
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
141-
}
142-
// The last output column (i.e. block length) doesn't require a block builder.
143-
for (size_t i = 0; i < OutputWidth_; i++) {
144-
const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
145-
Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
146-
}
147-
MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
148-
}
149-
150-
void Reset() {
151-
Next_ = 0;
152-
InputRows_ = GetBlockCount(Inputs_.back());
153-
}
154-
155-
void Finish() {
156-
IsFinished_ = true;
157-
}
158-
159-
bool NextRow() {
160-
if (Next_ >= InputRows_) {
161-
return false;
162-
}
163-
Current_ = Next_++;
164-
return true;
165-
}
166-
167-
bool IsNotFull() {
168-
return OutputRows_ < MaxLength_
169-
&& BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
170-
}
171-
172-
bool IsEmpty() {
173-
return OutputRows_ == 0;
174-
}
175-
176-
bool IsFinished() {
177-
return IsFinished_;
178-
}
179-
180-
TBlockItem GetItem(size_t idx) const {
181-
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
182-
ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
183-
if (datum.is_scalar()) {
184-
return Readers_[idx]->GetScalarItem(*datum.scalar());
185-
}
186-
MKQL_ENSURE(datum.is_array(), "Expecting array");
187-
return Readers_[idx]->GetItem(*datum.array(), Current_);
188-
}
189-
190-
NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
191-
return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
192-
}
193-
194-
void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
195-
Builders_[idx]->Add(value);
196-
}
197-
198-
void AddItem(const TBlockItem& item, size_t idx) {
199-
Builders_[idx]->Add(item);
200-
}
201-
202-
void CopyRow() {
203-
// Copy items from the "left" flow.
204-
for (size_t i = 0; i < InputWidth_; i++) {
205-
AddItem(GetItem(i), i);
206-
}
207-
OutputRows_++;
208-
}
209-
210-
void MakeRow(const NUdf::TUnboxedValuePod& value) {
211-
// Copy items from the "left" flow.
212-
for (size_t i = 0; i < InputWidth_; i++) {
213-
AddItem(GetItem(i), i);
214-
}
215-
// Convert and append items from the "right" dict.
216-
if (value) {
217-
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
218-
AddValue(value.GetElement(j), i);
219-
}
220-
} else {
221-
for (size_t i = InputWidth_; i < OutputWidth_; i++) {
222-
AddValue(value, i);
223-
}
224-
}
225-
OutputRows_++;
226-
}
227-
228-
void CopyArray(size_t idx, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) {
229-
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
230-
Y_ENSURE(datum.is_array());
231-
Builders_[idx]->AddMany(*datum.array(), popCount, sparseBitmap, bitmapSize);
232-
}
233-
234-
void MakeBlocks() {
235-
if (OutputRows_ == 0) {
236-
return;
237-
}
238-
BuilderAllocatedSize_ = 0;
239-
240-
for (size_t i = 0; i < Builders_.size(); i++) {
241-
const auto& datum = Builders_[i]->Build(IsFinished_);
242-
Deques[i].clear();
243-
MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
244-
ForEachArrayData(datum, [this, i](const auto& arrayData) {
245-
Deques[i].push_back(arrayData);
246-
});
247-
}
248-
}
249-
250-
ui64 Slice() {
251-
auto sliceSize = OutputRows_;
252-
for (size_t i = 0; i < Deques.size(); i++) {
253-
const auto& arrays = Deques[i];
254-
if (arrays.empty()) {
255-
continue;
256-
}
257-
Y_ABORT_UNLESS(ui64(arrays.front()->length) <= OutputRows_);
258-
sliceSize = std::min<ui64>(sliceSize, arrays.front()->length);
259-
}
260-
261-
for (size_t i = 0; i < Arrays.size(); i++) {
262-
auto& arrays = Deques[i];
263-
if (arrays.empty()) {
264-
continue;
265-
}
266-
if (auto& head = arrays.front(); ui64(head->length) == sliceSize) {
267-
Arrays[i] = std::move(head);
268-
arrays.pop_front();
269-
} else {
270-
Arrays[i] = Chop(head, sliceSize);
271-
}
272-
}
273-
274-
OutputRows_ -= sliceSize;
275-
return sliceSize;
276-
}
277-
278-
NUdf::TUnboxedValuePod Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const {
279-
MKQL_ENSURE(idx <= OutputWidth_, "Deques index overflow");
280-
// Return the slice length as the last column value (i.e. block length).
281-
if (idx == OutputWidth_) {
282-
return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
283-
}
284-
if (auto array = Arrays[idx]) {
285-
return holderFactory.CreateArrowBlock(std::move(array));
286-
} else {
287-
return NUdf::TUnboxedValuePod();
288-
}
289-
}
290-
291-
};
292-
293245
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
294246
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
295247
}

ydb/library/yql/minikql/computation/mkql_block_impl.cpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -357,13 +357,9 @@ void TBlockState::FillArrays() {
357357
return;
358358
}
359359
MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
360-
if (datum.is_array()) {
361-
Deques[i].push_back(datum.array());
362-
} else {
363-
for (auto& chunk : datum.chunks()) {
364-
Deques[i].push_back(chunk->data());
365-
}
366-
}
360+
ForEachArrayData(datum, [this, i](const auto& arrayData) {
361+
Deques[i].push_back(arrayData);
362+
});
367363
}
368364
}
369365
}

0 commit comments

Comments
 (0)