Skip to content

Commit 8a9b6af

Browse files
Merge 09bd1cf into 7954aba
2 parents 7954aba + 09bd1cf commit 8a9b6af

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1831
-182
lines changed

ydb/core/formats/arrow/accessor/abstract/accessor.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,17 @@ std::shared_ptr<IChunkedArray> IChunkedArray::DoApplyFilter(const TColumnFilter&
120120
}
121121

122122
std::shared_ptr<IChunkedArray> IChunkedArray::ApplyFilter(const TColumnFilter& filter, const std::shared_ptr<IChunkedArray>& selfPtr) const {
123+
AFL_VERIFY(selfPtr);
123124
if (filter.IsTotalAllowFilter()) {
124125
return selfPtr;
125126
}
126127
if (filter.IsTotalDenyFilter()) {
127128
return TTrivialArray::BuildEmpty(GetDataType());
128129
}
129-
return DoApplyFilter(filter);
130+
auto result = DoApplyFilter(filter);
131+
AFL_VERIFY(result);
132+
AFL_VERIFY(result->GetRecordsCount() == filter.GetFilteredCountVerified());
133+
return result;
130134
}
131135

132136
TString IChunkedArray::TReader::DebugString(const ui32 position) const {

ydb/core/formats/arrow/accessor/abstract/accessor.h

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ class IChunkedArray {
4343
SerializedChunkedArray,
4444
CompositeChunkedArray,
4545
SparsedArray,
46-
SubColumnsArray
46+
SubColumnsArray,
47+
SubColumnsPartialArray
4748
};
4849

4950
class TCommonChunkAddress {
@@ -322,6 +323,21 @@ class IChunkedArray {
322323
public:
323324
std::shared_ptr<IChunkedArray> ApplyFilter(const TColumnFilter& filter, const std::shared_ptr<IChunkedArray>& selfPtr) const;
324325

326+
template <class TResult, class TActor>
327+
static std::optional<TResult> VisitDataOwners(const std::shared_ptr<IChunkedArray>& arr, const TActor& actor) {
328+
AFL_VERIFY(arr);
329+
std::optional<IChunkedArray::TFullChunkedArrayAddress> arrCurrent;
330+
for (ui32 currentIndex = 0; currentIndex < arr->GetRecordsCount();) {
331+
arrCurrent = arr->GetArray(arrCurrent, currentIndex, arr);
332+
auto result = actor(arrCurrent->GetArray());
333+
if (!!result) {
334+
return result;
335+
}
336+
currentIndex = currentIndex + arrCurrent->GetArray()->GetRecordsCount();
337+
}
338+
return std::nullopt;
339+
}
340+
325341
NJson::TJsonValue DebugJson() const {
326342
NJson::TJsonValue result = NJson::JSON_MAP;
327343
result.InsertValue("type", ::ToString(Type));
@@ -415,14 +431,18 @@ class IChunkedArray {
415431
std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;
416432
std::shared_ptr<IChunkedArray> ISlice(const ui32 offset, const ui32 count) const {
417433
AFL_VERIFY(offset + count <= GetRecordsCount())("offset", offset)("count", count)("records", GetRecordsCount());
418-
return DoISlice(offset, count);
434+
auto result = DoISlice(offset, count);
435+
AFL_VERIFY(result);
436+
AFL_VERIFY(result->GetRecordsCount() == count)("records", result->GetRecordsCount())("count", count);
437+
return result;
419438
}
420439

421440
bool IsDataOwner() const {
422441
switch (Type) {
423442
case EType::SparsedArray:
424443
case EType::ChunkedArray:
425444
case EType::SubColumnsArray:
445+
case EType::SubColumnsPartialArray:
426446
case EType::Array:
427447
return true;
428448
case EType::Undefined:

ydb/core/formats/arrow/accessor/common/chunk_data.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,9 @@ TChunkConstructionData::TChunkConstructionData(const ui32 recordsCount, const st
1515
AFL_VERIFY(!!DefaultSerializer);
1616
}
1717

18+
TChunkConstructionData TChunkConstructionData::GetSubset(const ui32 recordsCount) const {
19+
AFL_VERIFY(recordsCount <= RecordsCount)("sub", recordsCount)("global", RecordsCount);
20+
return TChunkConstructionData(recordsCount, DefaultValue, ColumnType, DefaultSerializer);
21+
}
22+
1823
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/common/chunk_data.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ class TChunkConstructionData {
2020
public:
2121
TChunkConstructionData(const ui32 recordsCount, const std::shared_ptr<arrow::Scalar>& defaultValue,
2222
const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer);
23+
24+
TChunkConstructionData GetSubset(const ui32 recordsCount) const;
2325
};
2426

2527
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/composite/accessor.cpp

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#include "accessor.h"
2+
3+
#include <ydb/core/formats/arrow/arrow_filter.h>
24
namespace NKikimr::NArrow::NAccessor {
35

46
namespace {
@@ -29,33 +31,49 @@ class TCompositeChunkAccessor {
2931
} // namespace
3032

3133
std::shared_ptr<IChunkedArray> ICompositeChunkedArray::DoISlice(const ui32 offset, const ui32 count) const {
32-
ui32 slicedRecordsCount = 0;
33-
ui32 currentIndex = offset;
34+
ui32 currentIndex = 0;
3435
std::optional<IChunkedArray::TFullChunkedArrayAddress> arrAddress;
3536
std::vector<std::shared_ptr<IChunkedArray>> chunks;
36-
while (slicedRecordsCount < count && currentIndex < GetRecordsCount()) {
37+
while (currentIndex < offset + count) {
3738
arrAddress = GetArray(arrAddress, currentIndex, nullptr);
38-
const ui32 localIndex = arrAddress->GetAddress().GetLocalIndex(currentIndex);
39-
const ui32 localCount = (arrAddress->GetArray()->GetRecordsCount() + slicedRecordsCount < count)
40-
? arrAddress->GetArray()->GetRecordsCount()
41-
: (count - slicedRecordsCount);
42-
43-
if (localIndex == 0 && localCount == arrAddress->GetArray()->GetRecordsCount()) {
44-
chunks.emplace_back(arrAddress->GetArray());
39+
const auto& arr = arrAddress->GetArray();
40+
if (currentIndex + arr->GetRecordsCount() < offset) {
41+
} else if (currentIndex >= offset && currentIndex + arr->GetRecordsCount() <= offset + count) {
42+
chunks.emplace_back(arr);
4543
} else {
46-
chunks.emplace_back(arrAddress->GetArray()->ISlice(localIndex, localCount));
44+
const ui32 localStart = std::max<ui32>(offset, currentIndex);
45+
const ui32 localFinish = std::min<ui32>(offset + count, currentIndex + arr->GetRecordsCount());
46+
AFL_VERIFY(localStart < localFinish)("start", localStart)("finish", localFinish);
47+
chunks.emplace_back(arrAddress->GetArray()->ISlice(localStart - currentIndex, localFinish - localStart));
4748
}
48-
slicedRecordsCount += localCount;
49-
currentIndex += localCount;
49+
currentIndex += arr->GetRecordsCount();
5050
}
51-
AFL_VERIFY(slicedRecordsCount == count)("sliced", slicedRecordsCount)("count", count);
5251
if (chunks.size() == 1) {
5352
return chunks.front();
5453
} else {
5554
return std::make_shared<TCompositeChunkedArray>(std::move(chunks), count, GetDataType());
5655
}
5756
}
5857

58+
std::shared_ptr<IChunkedArray> ICompositeChunkedArray::DoApplyFilter(const TColumnFilter& filter) const {
59+
std::optional<IChunkedArray::TFullChunkedArrayAddress> arrAddress;
60+
std::vector<std::shared_ptr<IChunkedArray>> chunks;
61+
ui32 currentIndex = 0;
62+
while (currentIndex < GetRecordsCount()) {
63+
arrAddress = GetArray(arrAddress, currentIndex, nullptr);
64+
if (filter.CheckSlice(currentIndex, arrAddress->GetArray()->GetRecordsCount())) {
65+
auto sliceFilter = filter.Slice(currentIndex, arrAddress->GetArray()->GetRecordsCount());
66+
chunks.emplace_back(sliceFilter.Apply(arrAddress->GetArray()));
67+
}
68+
currentIndex += arrAddress->GetArray()->GetRecordsCount();
69+
}
70+
if (chunks.size() == 1) {
71+
return chunks.front();
72+
} else {
73+
return std::make_shared<TCompositeChunkedArray>(std::move(chunks), filter.GetFilteredCountVerified(), GetDataType());
74+
}
75+
}
76+
5977
IChunkedArray::TLocalDataAddress TCompositeChunkedArray::DoGetLocalData(
6078
const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const {
6179
AFL_VERIFY(false);

ydb/core/formats/arrow/accessor/composite/accessor.h

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class ICompositeChunkedArray: public NArrow::NAccessor::IChunkedArray {
99
private:
1010
using TBase = NArrow::NAccessor::IChunkedArray;
1111
virtual std::shared_ptr<IChunkedArray> DoISlice(const ui32 offset, const ui32 count) const override final;
12+
virtual std::shared_ptr<IChunkedArray> DoApplyFilter(const TColumnFilter& filter) const override;
1213

1314
public:
1415
using TBase::TBase;
@@ -54,6 +55,47 @@ class TCompositeChunkedArray: public ICompositeChunkedArray {
5455
, Chunks(std::move(chunks)) {
5556
}
5657

58+
class TIterator: TNonCopyable {
59+
private:
60+
const std::shared_ptr<TCompositeChunkedArray> Owner;
61+
ui32 RecordIndex = 0;
62+
std::optional<TFullChunkedArrayAddress> CurrentChunk;
63+
64+
public:
65+
TIterator(const std::shared_ptr<TCompositeChunkedArray>& owner)
66+
: Owner(owner) {
67+
if (Owner->GetRecordsCount()) {
68+
CurrentChunk = Owner->GetArray(CurrentChunk, RecordIndex, Owner);
69+
}
70+
}
71+
72+
const std::shared_ptr<IChunkedArray>& GetArray() const {
73+
AFL_VERIFY(CurrentChunk);
74+
return CurrentChunk->GetArray();
75+
}
76+
77+
bool IsValid() {
78+
return RecordIndex < Owner->GetRecordsCount();
79+
}
80+
81+
bool Next() {
82+
AFL_VERIFY(IsValid());
83+
AFL_VERIFY(CurrentChunk);
84+
RecordIndex += CurrentChunk->GetArray()->GetRecordsCount();
85+
AFL_VERIFY(RecordIndex <= Owner->GetRecordsCount());
86+
if (IsValid()) {
87+
CurrentChunk = Owner->GetArray(CurrentChunk, RecordIndex, Owner);
88+
return true;
89+
} else {
90+
return false;
91+
}
92+
}
93+
};
94+
95+
static TIterator BuildIterator(std::shared_ptr<TCompositeChunkedArray>& owner) {
96+
return TIterator(owner);
97+
}
98+
5799
class TBuilder {
58100
private:
59101
ui32 RecordsCount = 0;
@@ -74,9 +116,12 @@ class TCompositeChunkedArray: public ICompositeChunkedArray {
74116
RecordsCount += arr->GetRecordsCount();
75117
}
76118

77-
std::shared_ptr<TCompositeChunkedArray> Finish() {
119+
std::shared_ptr<IChunkedArray> Finish() {
78120
AFL_VERIFY(!Finished);
79121
Finished = true;
122+
if (Chunks.size() == 1) {
123+
return Chunks.front();
124+
}
80125
return std::shared_ptr<TCompositeChunkedArray>(new TCompositeChunkedArray(std::move(Chunks), RecordsCount, Type));
81126
}
82127
};
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#include <ydb/core/formats/arrow/accessor/composite/accessor.h>
2+
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
3+
#include <ydb/core/formats/arrow/arrow_filter.h>
4+
5+
#include <library/cpp/testing/unittest/registar.h>
6+
7+
#include <regex>
8+
9+
Y_UNIT_TEST_SUITE(CompositeArrayAccessor) {
10+
using namespace NKikimr::NArrow::NAccessor;
11+
using namespace NKikimr::NArrow;
12+
13+
std::string PrepareToCompare(const std::string& str) {
14+
return std::regex_replace(str, std::regex(" |\\n"), "");
15+
}
16+
17+
static std::shared_ptr<IChunkedArray> BuildCompositeArray() {
18+
TCompositeChunkedArray::TBuilder builder(arrow::utf8());
19+
{
20+
TTrivialArray::TPlainBuilder arrBuilder;
21+
arrBuilder.AddRecord(1, "a1");
22+
arrBuilder.AddRecord(3, "a3");
23+
arrBuilder.AddRecord(7, "a7");
24+
builder.AddChunk(arrBuilder.Finish(10));
25+
}
26+
{
27+
TTrivialArray::TPlainBuilder arrBuilder;
28+
arrBuilder.AddRecord(1, "b1");
29+
arrBuilder.AddRecord(2, "b2");
30+
arrBuilder.AddRecord(3, "b3");
31+
builder.AddChunk(arrBuilder.Finish(5));
32+
}
33+
{
34+
TTrivialArray::TPlainBuilder arrBuilder;
35+
arrBuilder.AddRecord(0, "c0");
36+
arrBuilder.AddRecord(2, "c2");
37+
arrBuilder.AddRecord(3, "c3");
38+
builder.AddChunk(arrBuilder.Finish(4));
39+
}
40+
return builder.Finish();
41+
}
42+
43+
Y_UNIT_TEST(SlicesSimple) {
44+
auto arr = BuildCompositeArray();
45+
Cerr << PrepareToCompare(arr->GetChunkedArray()->ToString()) << Endl;
46+
{
47+
auto slice = arr->ISlice(0, 10);
48+
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
49+
AFL_VERIFY(arrString == R"([[null,"a1",null,"a3",null,null,null,"a7",null,null]])")("string", arrString);
50+
}
51+
{
52+
auto slice = arr->ISlice(3, 8);
53+
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
54+
AFL_VERIFY(arrString == R"([["a3",null,null,null,"a7",null,null],[null]])")("string", arrString);
55+
}
56+
{
57+
auto slice = arr->ISlice(3, 16);
58+
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
59+
AFL_VERIFY(arrString == R"([["a3",null,null,null,"a7",null,null],[null,"b1","b2","b3",null],["c0",null,"c2","c3"]])")("string", arrString);
60+
}
61+
{
62+
auto slice = arr->ISlice(8, 11);
63+
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
64+
AFL_VERIFY(arrString == R"([[null,null],[null,"b1","b2","b3",null],["c0",null,"c2","c3"]])")("string", arrString);
65+
}
66+
{
67+
auto slice = arr->ISlice(8, 2);
68+
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
69+
AFL_VERIFY(arrString == R"([[null,null]])")("string", arrString);
70+
}
71+
{
72+
auto slice = arr->ISlice(9, 3);
73+
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
74+
AFL_VERIFY(arrString == R"([[null],[null,"b1"]])")("string", arrString);
75+
}
76+
}
77+
78+
Y_UNIT_TEST(FilterSimple) {
79+
auto arr = BuildCompositeArray();
80+
{
81+
const TColumnFilter filter = TColumnFilter::BuildConstFilter(true, { 1, 2, 3, 4, 5, 4 });
82+
83+
auto filtered = arr->ApplyFilter(filter, arr);
84+
const TString arrString = PrepareToCompare(filtered->GetChunkedArray()->ToString());
85+
AFL_VERIFY(arrString == R"([[null,"a3",null,null],[null,"b1","b2","b3",null]])")("string", arrString);
86+
}
87+
{
88+
const TColumnFilter filter = TColumnFilter::BuildConstFilter(false, { 1, 2, 3, 4, 5, 4 });
89+
90+
auto filtered = arr->ApplyFilter(filter, arr);
91+
const TString arrString = PrepareToCompare(filtered->GetChunkedArray()->ToString());
92+
AFL_VERIFY(arrString == R"([["a1",null,null,"a7",null,null],["c0",null,"c2","c3"]])")("string", arrString);
93+
}
94+
{
95+
const TColumnFilter filter = TColumnFilter::BuildConstFilter(false, { 3, 1, 3, 1, 3, 1, 3, 1, 3 });
96+
97+
auto filtered = arr->ApplyFilter(filter, arr);
98+
const TString arrString = PrepareToCompare(filtered->GetChunkedArray()->ToString());
99+
AFL_VERIFY(arrString == R"([["a3","a7"],["b1"],["c0"]])")("string", arrString);
100+
}
101+
}
102+
};
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
UNITTEST_FOR(ydb/core/formats/arrow/accessor/sparsed)
2+
3+
SIZE(SMALL)
4+
5+
PEERDIR(
6+
ydb/core/formats/arrow/accessor/sparsed
7+
ydb/core/formats/arrow/accessor/plain
8+
ydb/core/formats/arrow
9+
yql/essentials/public/udf/service/stub
10+
ydb/core/formats/arrow
11+
)
12+
13+
YQL_LAST_ABI_VERSION()
14+
15+
SRCS(
16+
ut_composite.cpp
17+
)
18+
19+
END()

ydb/core/formats/arrow/accessor/composite/ya.make

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,7 @@ SRCS(
1010
)
1111

1212
END()
13+
14+
RECURSE_FOR_TESTS(
15+
ut
16+
)

ydb/core/formats/arrow/accessor/sub_columns/accessor.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ class TSubColumnsArray: public IChunkedArray {
9696
virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 /*index*/) const override {
9797
return nullptr;
9898
}
99+
100+
std::shared_ptr<IChunkedArray> GetPathAccessor(const std::string_view svPath, const ui32 recordsCount) const {
101+
auto accResult = ColumnsData.GetPathAccessor(svPath);
102+
if (accResult) {
103+
return accResult;
104+
}
105+
return OthersData.GetPathAccessor(svPath, recordsCount);
106+
}
99107
};
100108

101109
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/sub_columns/columns_storage.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ TColumnsData TColumnsData::Slice(const ui32 offset, const ui32 count) const {
2626
}
2727

2828
TColumnsData TColumnsData::ApplyFilter(const TColumnFilter& filter) const {
29+
if (!Stats.GetColumnsCount()) {
30+
return *this;
31+
}
2932
auto records = Records;
3033
AFL_VERIFY(filter.Apply(records));
3134
if (records->GetRecordsCount()) {

0 commit comments

Comments
 (0)