Skip to content

Commit 37970c6

Browse files
Merge 86195be into 20441f2
2 parents 20441f2 + 86195be commit 37970c6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1561
-120
lines changed

ydb/core/formats/arrow/accessor/abstract/accessor.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ class IChunkedArray {
4343
SerializedChunkedArray,
4444
CompositeChunkedArray,
4545
SparsedArray,
46-
SubColumnsArray
46+
SubColumnsArray,
47+
SubColumnsPartialArray
4748
};
4849

4950
class TCommonChunkAddress {
@@ -423,6 +424,7 @@ class IChunkedArray {
423424
case EType::SparsedArray:
424425
case EType::ChunkedArray:
425426
case EType::SubColumnsArray:
427+
case EType::SubColumnsPartialArray:
426428
case EType::Array:
427429
return true;
428430
case EType::Undefined:

ydb/core/formats/arrow/accessor/common/chunk_data.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,9 @@ TChunkConstructionData::TChunkConstructionData(const ui32 recordsCount, const st
1515
AFL_VERIFY(!!DefaultSerializer);
1616
}
1717

18+
TChunkConstructionData TChunkConstructionData::GetSubset(const ui32 recordsCount) const {
19+
AFL_VERIFY(recordsCount <= RecordsCount)("sub", recordsCount)("global", RecordsCount);
20+
return TChunkConstructionData(recordsCount, DefaultValue, ColumnType, DefaultSerializer);
21+
}
22+
1823
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/common/chunk_data.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ class TChunkConstructionData {
2020
public:
2121
TChunkConstructionData(const ui32 recordsCount, const std::shared_ptr<arrow::Scalar>& defaultValue,
2222
const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer);
23+
24+
TChunkConstructionData GetSubset(const ui32 recordsCount) const;
2325
};
2426

2527
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/composite/accessor.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#include "accessor.h"
2+
3+
#include <ydb/core/formats/arrow/arrow_filter.h>
24
namespace NKikimr::NArrow::NAccessor {
35

46
namespace {
@@ -56,6 +58,26 @@ std::shared_ptr<IChunkedArray> ICompositeChunkedArray::DoISlice(const ui32 offse
5658
}
5759
}
5860

61+
std::shared_ptr<IChunkedArray> ICompositeChunkedArray::DoApplyFilter(const TColumnFilter& filter) const {
62+
std::optional<IChunkedArray::TFullChunkedArrayAddress> arrAddress;
63+
std::vector<std::shared_ptr<IChunkedArray>> chunks;
64+
ui32 currentIndex = 0;
65+
while (currentIndex < GetRecordsCount()) {
66+
arrAddress = GetArray(arrAddress, currentIndex, nullptr);
67+
if (!filter.CheckSlice(currentIndex, arrAddress->GetArray()->GetRecordsCount())) {
68+
continue;
69+
}
70+
auto sliceFilter = filter.Slice(currentIndex, arrAddress->GetArray()->GetRecordsCount());
71+
chunks.emplace_back(sliceFilter.Apply(arrAddress->GetArray()));
72+
currentIndex += arrAddress->GetArray()->GetRecordsCount();
73+
}
74+
if (chunks.size() == 1) {
75+
return chunks.front();
76+
} else {
77+
return std::make_shared<TCompositeChunkedArray>(std::move(chunks), filter.GetFilteredCountVerified(), GetDataType());
78+
}
79+
}
80+
5981
IChunkedArray::TLocalDataAddress TCompositeChunkedArray::DoGetLocalData(
6082
const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const {
6183
AFL_VERIFY(false);

ydb/core/formats/arrow/accessor/composite/accessor.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class ICompositeChunkedArray: public NArrow::NAccessor::IChunkedArray {
99
private:
1010
using TBase = NArrow::NAccessor::IChunkedArray;
1111
virtual std::shared_ptr<IChunkedArray> DoISlice(const ui32 offset, const ui32 count) const override final;
12+
virtual std::shared_ptr<IChunkedArray> DoApplyFilter(const TColumnFilter& filter) const override;
1213

1314
public:
1415
using TBase::TBase;
@@ -54,6 +55,47 @@ class TCompositeChunkedArray: public ICompositeChunkedArray {
5455
, Chunks(std::move(chunks)) {
5556
}
5657

58+
class TIterator: TNonCopyable {
59+
private:
60+
const std::shared_ptr<TCompositeChunkedArray> Owner;
61+
ui32 RecordIndex = 0;
62+
std::optional<TFullChunkedArrayAddress> CurrentChunk;
63+
64+
public:
65+
TIterator(const std::shared_ptr<TCompositeChunkedArray>& owner)
66+
: Owner(owner) {
67+
if (Owner->GetRecordsCount()) {
68+
CurrentChunk = Owner->GetArray(CurrentChunk, RecordIndex, Owner);
69+
}
70+
}
71+
72+
const std::shared_ptr<IChunkedArray>& GetArray() const {
73+
AFL_VERIFY(CurrentChunk);
74+
return CurrentChunk->GetArray();
75+
}
76+
77+
bool IsValid() {
78+
return RecordIndex < Owner->GetRecordsCount();
79+
}
80+
81+
bool Next() {
82+
AFL_VERIFY(IsValid());
83+
AFL_VERIFY(CurrentChunk);
84+
RecordIndex += CurrentChunk->GetArray()->GetRecordsCount();
85+
AFL_VERIFY(RecordIndex <= Owner->GetRecordsCount());
86+
if (IsValid()) {
87+
CurrentChunk = Owner->GetArray(CurrentChunk, RecordIndex, Owner);
88+
return true;
89+
} else {
90+
return false;
91+
}
92+
}
93+
};
94+
95+
static TIterator BuildIterator(std::shared_ptr<TCompositeChunkedArray>& owner) {
96+
return TIterator(owner);
97+
}
98+
5799
class TBuilder {
58100
private:
59101
ui32 RecordsCount = 0;

ydb/core/formats/arrow/accessor/sub_columns/accessor.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ class TSubColumnsArray: public IChunkedArray {
9696
virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 /*index*/) const override {
9797
return nullptr;
9898
}
99+
100+
std::shared_ptr<IChunkedArray> GetPathAccessor(const std::string_view svPath, const ui32 recordsCount) const {
101+
auto accResult = ColumnsData.GetPathAccessor(svPath);
102+
if (accResult) {
103+
return accResult;
104+
}
105+
return OthersData.GetPathAccessor(svPath, recordsCount);
106+
}
99107
};
100108

101109
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/sub_columns/columns_storage.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ TColumnsData TColumnsData::Slice(const ui32 offset, const ui32 count) const {
2626
}
2727

2828
TColumnsData TColumnsData::ApplyFilter(const TColumnFilter& filter) const {
29+
if (!Stats.GetColumnsCount()) {
30+
return *this;
31+
}
2932
auto records = Records;
3033
AFL_VERIFY(filter.Apply(records));
3134
if (records->GetRecordsCount()) {

ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp

Lines changed: 44 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -14,45 +14,23 @@ TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstructDefault(con
1414

1515
TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromString(
1616
const TString& originalData, const TChunkConstructionData& externalInfo) const {
17-
TStringInput si(originalData);
18-
ui32 protoSize;
19-
si.Read(&protoSize, sizeof(protoSize));
20-
ui64 currentIndex = sizeof(protoSize);
21-
NKikimrArrowAccessorProto::TSubColumnsAccessor proto;
22-
if (!proto.ParseFromArray(originalData.data() + currentIndex, protoSize)) {
23-
return TConclusionStatus::Fail("cannot parse proto");
17+
auto headerConclusion = TSubColumnsHeader::ReadHeader(originalData, externalInfo);
18+
if (headerConclusion.IsFail()) {
19+
return headerConclusion;
2420
}
25-
currentIndex += protoSize;
26-
TDictStats columnStats = [&]() {
27-
if (proto.GetColumnStatsSize()) {
28-
std::shared_ptr<arrow::RecordBatch> rbColumnStats = TStatusValidator::GetValid(externalInfo.GetDefaultSerializer()->Deserialize(
29-
TString(originalData.data() + currentIndex, proto.GetColumnStatsSize()), TDictStats::GetStatsSchema()));
30-
return TDictStats(rbColumnStats);
31-
} else {
32-
return TDictStats::BuildEmpty();
33-
}
34-
}();
35-
currentIndex += proto.GetColumnStatsSize();
36-
TDictStats otherStats = [&]() {
37-
if (proto.GetOtherStatsSize()) {
38-
std::shared_ptr<arrow::RecordBatch> rbOtherStats = TStatusValidator::GetValid(externalInfo.GetDefaultSerializer()->Deserialize(
39-
TString(originalData.data() + currentIndex, proto.GetOtherStatsSize()), TDictStats::GetStatsSchema()));
40-
return TDictStats(rbOtherStats);
41-
} else {
42-
return TDictStats::BuildEmpty();
43-
}
44-
}();
45-
currentIndex += proto.GetOtherStatsSize();
21+
ui32 currentIndex = headerConclusion->GetHeaderSize();
22+
const auto& proto = headerConclusion->GetAddressesProto();
4623

4724
std::shared_ptr<TGeneralContainer> columnKeysContainer;
4825
{
4926
std::vector<std::shared_ptr<IChunkedArray>> columns;
50-
auto schema = columnStats.BuildColumnsSchema();
51-
AFL_VERIFY(columnStats.GetColumnsCount() == (ui32)proto.GetKeyColumns().size())("schema", columnStats.GetColumnsCount())(
27+
auto schema = headerConclusion->GetColumnStats().BuildColumnsSchema();
28+
AFL_VERIFY(headerConclusion->GetColumnStats().GetColumnsCount() == (ui32)proto.GetKeyColumns().size())(
29+
"schema", headerConclusion->GetColumnStats().GetColumnsCount())(
5230
"proto", proto.GetKeyColumns().size());
5331
for (ui32 i = 0; i < (ui32)proto.GetKeyColumns().size(); ++i) {
5432
std::shared_ptr<TColumnLoader> columnLoader = std::make_shared<TColumnLoader>(
55-
externalInfo.GetDefaultSerializer(), columnStats.GetAccessorConstructor(i), schema->field(i), nullptr, 0);
33+
externalInfo.GetDefaultSerializer(), headerConclusion->GetColumnStats().GetAccessorConstructor(i), schema->field(i), nullptr, 0);
5634
std::vector<TDeserializeChunkedArray::TChunk> chunks = { TDeserializeChunkedArray::TChunk(
5735
externalInfo.GetRecordsCount(), TStringBuf(originalData.data() + currentIndex, proto.GetKeyColumns(i).GetSize())) };
5836
columns.emplace_back(std::make_shared<TDeserializeChunkedArray>(externalInfo.GetRecordsCount(), columnLoader, std::move(chunks), true));
@@ -62,26 +40,17 @@ TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromStrin
6240
}
6341
TOthersData otherData = TOthersData::BuildEmpty();
6442
if (proto.GetOtherColumns().size() && proto.GetOtherRecordsCount()) {
65-
std::shared_ptr<TGeneralContainer> otherKeysContainer;
66-
std::vector<std::shared_ptr<IChunkedArray>> columns;
67-
AFL_VERIFY(TOthersData::GetSchema()->num_fields() == proto.GetOtherColumns().size())("proto", proto.GetOtherColumns().size())(
68-
"schema", TOthersData::GetSchema()->num_fields());
69-
auto schema = TOthersData::GetSchema();
70-
for (ui32 i = 0; i < (ui32)proto.GetOtherColumns().size(); ++i) {
71-
std::shared_ptr<TColumnLoader> columnLoader = std::make_shared<TColumnLoader>(
72-
externalInfo.GetDefaultSerializer(), std::make_shared<NPlain::TConstructor>(), schema->field(i), nullptr, 0);
73-
std::vector<TDeserializeChunkedArray::TChunk> chunks = { TDeserializeChunkedArray::TChunk(
74-
proto.GetOtherRecordsCount(), TStringBuf(originalData.data() + currentIndex, proto.GetOtherColumns(i).GetSize())) };
75-
columns.emplace_back(std::make_shared<TDeserializeChunkedArray>(proto.GetOtherRecordsCount(), columnLoader, std::move(chunks), true));
76-
currentIndex += proto.GetOtherColumns(i).GetSize();
77-
}
78-
otherKeysContainer = std::make_shared<TGeneralContainer>(schema, std::move(columns));
79-
otherData = TOthersData(otherStats, otherKeysContainer);
43+
AFL_VERIFY(currentIndex < originalData.size());
44+
std::shared_ptr<TGeneralContainer> otherKeysContainer = BuildOthersContainer(
45+
TStringBuf(originalData.data() + currentIndex, originalData.size() - currentIndex), proto, externalInfo).DetachResult();
46+
currentIndex += headerConclusion->GetOthersSize();
47+
otherData = TOthersData(headerConclusion->GetOtherStats(), otherKeysContainer);
8048
}
81-
TColumnsData columnData(columnStats, columnKeysContainer);
49+
TColumnsData columnData(headerConclusion->GetColumnStats(), columnKeysContainer);
8250
auto result = std::make_shared<TSubColumnsArray>(
8351
std::move(columnData), std::move(otherData), externalInfo.GetColumnType(), externalInfo.GetRecordsCount(), Settings);
8452
result->StoreSourceString(originalData);
53+
AFL_VERIFY(currentIndex == originalData.size())("index", currentIndex)("size", originalData.size());
8554
return result;
8655
}
8756

@@ -105,4 +74,32 @@ TString TConstructor::DoSerializeToString(const std::shared_ptr<IChunkedArray>&
10574
return arr->SerializeToString(externalInfo);
10675
}
10776

77+
TConclusion<std::shared_ptr<TGeneralContainer>> TConstructor::BuildOthersContainer(
78+
const TStringBuf data, const NKikimrArrowAccessorProto::TSubColumnsAccessor& proto, const TChunkConstructionData& externalInfo) {
79+
std::vector<std::shared_ptr<IChunkedArray>> columns;
80+
AFL_VERIFY(TOthersData::GetSchema()->num_fields() == proto.GetOtherColumns().size())("proto", proto.GetOtherColumns().size())(
81+
"schema", TOthersData::GetSchema()->num_fields());
82+
auto schema = TOthersData::GetSchema();
83+
ui32 currentIndex = 0;
84+
for (ui32 i = 0; i < (ui32)proto.GetOtherColumns().size(); ++i) {
85+
std::shared_ptr<TColumnLoader> columnLoader = std::make_shared<TColumnLoader>(
86+
externalInfo.GetDefaultSerializer(), std::make_shared<NPlain::TConstructor>(), schema->field(i), nullptr, 0);
87+
std::vector<TDeserializeChunkedArray::TChunk> chunks = { TDeserializeChunkedArray::TChunk(
88+
proto.GetOtherRecordsCount(), TStringBuf(data.data() + currentIndex, proto.GetOtherColumns(i).GetSize())) };
89+
columns.emplace_back(std::make_shared<TDeserializeChunkedArray>(proto.GetOtherRecordsCount(), columnLoader, std::move(chunks), true));
90+
currentIndex += proto.GetOtherColumns(i).GetSize();
91+
}
92+
return std::make_shared<TGeneralContainer>(schema, std::move(columns));
93+
}
94+
95+
TConclusion<std::shared_ptr<TSubColumnsPartialArray>> TConstructor::BuildPartialReader(
96+
const TString& originalData, const TChunkConstructionData& externalInfo) {
97+
auto headerConclusion = TSubColumnsHeader::ReadHeader(originalData, externalInfo);
98+
if (headerConclusion.IsFail()) {
99+
return headerConclusion;
100+
}
101+
return std::make_shared<TSubColumnsPartialArray>(
102+
headerConclusion.DetachResult(), externalInfo.GetRecordsCount(), externalInfo.GetColumnType());
103+
}
104+
108105
} // namespace NKikimr::NArrow::NAccessor::NSubColumns

ydb/core/formats/arrow/accessor/sub_columns/constructor.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22
#include "data_extractor.h"
3+
#include "partial.h"
34

45
#include <ydb/core/formats/arrow/accessor/abstract/constructor.h>
56
#include <ydb/core/formats/arrow/accessor/common/const.h>
@@ -42,6 +43,12 @@ class TConstructor: public IConstructor {
4243
: TBase(IChunkedArray::EType::SubColumnsArray) {
4344
}
4445

46+
static TConclusion<std::shared_ptr<TGeneralContainer>> BuildOthersContainer(
47+
const TStringBuf data, const NKikimrArrowAccessorProto::TSubColumnsAccessor& proto, const TChunkConstructionData& externalInfo);
48+
49+
static TConclusion<std::shared_ptr<TSubColumnsPartialArray>> BuildPartialReader(
50+
const TString& originalData, const TChunkConstructionData& externalInfo);
51+
4552
TConstructor(const TSettings& settings)
4653
: TBase(IChunkedArray::EType::SubColumnsArray)
4754
, Settings(settings) {
@@ -50,6 +57,31 @@ class TConstructor: public IConstructor {
5057
virtual TString GetClassName() const override {
5158
return GetClassNameStatic();
5259
}
60+
61+
static TConclusion<ui32> GetHeaderSize(const TString& blob) {
62+
TStringInput si(blob);
63+
ui32 protoSize;
64+
if (blob.size() < sizeof(protoSize)) {
65+
return TConclusionStatus::Fail("incorrect blob (too small)");
66+
}
67+
si.Read(&protoSize, sizeof(protoSize));
68+
return (ui32)(protoSize + sizeof(protoSize));
69+
}
70+
71+
static TConclusion<ui32> GetFullHeaderSize(const TString& blob) {
72+
TStringInput si(blob);
73+
ui32 protoSize;
74+
if (blob.size() < sizeof(protoSize)) {
75+
return TConclusionStatus::Fail("incorrect blob (too small)");
76+
}
77+
si.Read(&protoSize, sizeof(protoSize));
78+
ui32 currentIndex = sizeof(protoSize);
79+
NKikimrArrowAccessorProto::TSubColumnsAccessor proto;
80+
if (!proto.ParseFromArray(blob.data() + currentIndex, protoSize)) {
81+
return TConclusionStatus::Fail("cannot parse proto");
82+
}
83+
return (ui32)(protoSize + sizeof(protoSize) + proto.GetColumnStatsSize() + proto.GetOtherStatsSize());
84+
}
5385
};
5486

5587
} // namespace NKikimr::NArrow::NAccessor::NSubColumns

ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ std::shared_ptr<TSubColumnsArray> TDataBuilder::Finish() {
6969
case IChunkedArray::EType::SerializedChunkedArray:
7070
case IChunkedArray::EType::CompositeChunkedArray:
7171
case IChunkedArray::EType::SubColumnsArray:
72+
case IChunkedArray::EType::SubColumnsPartialArray:
7273
case IChunkedArray::EType::ChunkedArray:
7374
AFL_VERIFY(false);
7475
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#include "header.h"
2+
3+
#include <ydb/core/formats/arrow/serializer/abstract.h>
4+
5+
#include <ydb/library/formats/arrow/validation/validation.h>
6+
7+
namespace NKikimr::NArrow::NAccessor::NSubColumns {
8+
9+
TConclusion<TSubColumnsHeader> TSubColumnsHeader::ReadHeader(const TString& originalData, const TChunkConstructionData& externalInfo) {
10+
TStringInput si(originalData);
11+
ui32 protoSize;
12+
si.Read(&protoSize, sizeof(protoSize));
13+
ui32 currentIndex = sizeof(protoSize);
14+
NKikimrArrowAccessorProto::TSubColumnsAccessor proto;
15+
if (!proto.ParseFromArray(originalData.data() + currentIndex, protoSize)) {
16+
return TConclusionStatus::Fail("cannot parse proto");
17+
}
18+
currentIndex += protoSize;
19+
TDictStats columnStats = [&]() {
20+
if (proto.GetColumnStatsSize()) {
21+
std::shared_ptr<arrow::RecordBatch> rbColumnStats = TStatusValidator::GetValid(externalInfo.GetDefaultSerializer()->Deserialize(
22+
TString(originalData.data() + currentIndex, proto.GetColumnStatsSize()), TDictStats::GetStatsSchema()));
23+
return TDictStats(rbColumnStats);
24+
} else {
25+
return TDictStats::BuildEmpty();
26+
}
27+
}();
28+
currentIndex += proto.GetColumnStatsSize();
29+
TDictStats otherStats = [&]() {
30+
if (proto.GetOtherStatsSize()) {
31+
std::shared_ptr<arrow::RecordBatch> rbOtherStats = TStatusValidator::GetValid(externalInfo.GetDefaultSerializer()->Deserialize(
32+
TString(originalData.data() + currentIndex, proto.GetOtherStatsSize()), TDictStats::GetStatsSchema()));
33+
return TDictStats(rbOtherStats);
34+
} else {
35+
return TDictStats::BuildEmpty();
36+
}
37+
}();
38+
currentIndex += proto.GetOtherStatsSize();
39+
return TSubColumnsHeader(std::move(columnStats), std::move(otherStats), std::move(proto), currentIndex);
40+
}
41+
42+
} // namespace NKikimr::NArrow::NAccessor::NSubColumns

0 commit comments

Comments
 (0)