Skip to content

Commit ab71459

Browse files
Merge 6feabce into ae7146c
2 parents ae7146c + 6feabce commit ab71459

File tree

19 files changed

+440
-231
lines changed

19 files changed

+440
-231
lines changed

ydb/core/formats/arrow/accessor/abstract/accessor.cpp

Lines changed: 74 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, con
2525
ui32 currentOffset = offset;
2626
ui32 countLeast = count;
2727
std::vector<std::shared_ptr<arrow::Array>> chunks;
28-
auto address = GetChunk({}, offset);
28+
auto address = GetChunkSlow(offset);
2929
while (countLeast) {
30-
address = GetChunk(address, currentOffset);
31-
const ui64 internalPos = currentOffset - address.GetStartPosition();
30+
address = GetChunk(address.GetAddress(), currentOffset);
31+
const ui64 internalPos = address.GetAddress().GetLocalIndex(currentOffset);
3232
if (internalPos + countLeast <= (ui64)address.GetArray()->length()) {
3333
chunks.emplace_back(address.GetArray()->Slice(internalPos, countLeast));
3434
break;
@@ -43,6 +43,64 @@ std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, con
4343
return std::make_shared<arrow::ChunkedArray>(chunks, DataType);
4444
}
4545

46+
NKikimr::NArrow::NAccessor::IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(
47+
const std::optional<TAddressChain>& chunkCurrent, const ui64 position) const {
48+
AFL_VERIFY(position < GetRecordsCount());
49+
std::optional<TCommonChunkAddress> address;
50+
51+
if (IsDataOwner()) {
52+
if (chunkCurrent) {
53+
AFL_VERIFY(chunkCurrent->GetSize() == 1)("size", chunkCurrent->GetSize());
54+
}
55+
auto localAddress = DoGetLocalData(address, position);
56+
TAddressChain addressChain;
57+
addressChain.Add(localAddress.GetAddress());
58+
return TFullDataAddress(localAddress.GetArray(), std::move(addressChain));
59+
} else {
60+
auto chunkedArrayAddress = GetArray(chunkCurrent, position, nullptr);
61+
if (chunkCurrent) {
62+
AFL_VERIFY(chunkCurrent->GetSize() == 1 + chunkedArrayAddress.GetAddress().GetSize())("current", chunkCurrent->GetSize())(
63+
"chunked", chunkedArrayAddress.GetAddress().GetSize());
64+
}
65+
auto localAddress = chunkedArrayAddress.GetArray()->DoGetLocalData(address, chunkedArrayAddress.GetAddress().GetLocalIndex(position));
66+
auto fullAddress = std::move(chunkedArrayAddress.MutableAddress());
67+
fullAddress.Add(localAddress.GetAddress());
68+
return TFullDataAddress(localAddress.GetArray(), std::move(fullAddress));
69+
}
70+
}
71+
72+
IChunkedArray::TFullChunkedArrayAddress IChunkedArray::GetArray(
73+
const std::optional<TAddressChain>& chunkCurrent, const ui64 position, const std::shared_ptr<IChunkedArray>& selfPtr) const {
74+
AFL_VERIFY(position < GetRecordsCount());
75+
if (IsDataOwner()) {
76+
AFL_VERIFY(selfPtr);
77+
TAddressChain chain;
78+
chain.Add(TCommonChunkAddress(0, GetRecordsCount(), 0));
79+
return IChunkedArray::TFullChunkedArrayAddress(selfPtr, std::move(chain));
80+
}
81+
TAddressChain addressChain;
82+
83+
auto* currentLevel = this;
84+
ui32 currentPosition = position;
85+
ui32 idx = 0;
86+
std::vector<std::shared_ptr<IChunkedArray>> chainForTemporarySave;
87+
while (!currentLevel->IsDataOwner()) {
88+
std::optional<TCommonChunkAddress> currentAddress;
89+
if (chunkCurrent) {
90+
currentAddress = chunkCurrent->GetAddress(idx);
91+
}
92+
auto nextChunkedArray = currentLevel->DoGetLocalChunkedArray(currentAddress, currentPosition);
93+
chainForTemporarySave.emplace_back(nextChunkedArray.GetArray());
94+
currentLevel = chainForTemporarySave.back().get();
95+
addressChain.Add(nextChunkedArray.GetAddress());
96+
AFL_VERIFY(nextChunkedArray.GetAddress().GetStartPosition() <= currentPosition);
97+
currentPosition -= nextChunkedArray.GetAddress().GetStartPosition();
98+
++idx;
99+
}
100+
AFL_VERIFY(!chunkCurrent || chunkCurrent->GetSize() - idx <= 1)("idx", idx)("size", chunkCurrent->GetSize());
101+
return TFullChunkedArrayAddress(chainForTemporarySave.back(), std::move(addressChain));
102+
}
103+
46104
TString IChunkedArray::TReader::DebugString(const ui32 position) const {
47105
auto address = GetReadChunk(position);
48106
return NArrow::DebugString(address.GetArray(), address.GetPosition());
@@ -63,11 +121,11 @@ std::partial_ordering IChunkedArray::TReader::CompareColumns(const std::vector<T
63121

64122
IChunkedArray::TAddress IChunkedArray::TReader::GetReadChunk(const ui64 position) const {
65123
AFL_VERIFY(position < ChunkedArray->GetRecordsCount());
66-
if (CurrentChunkAddress && position < CurrentChunkAddress->GetStartPosition() + CurrentChunkAddress->GetArray()->length() && CurrentChunkAddress->GetStartPosition() <= position) {
124+
if (CurrentChunkAddress && CurrentChunkAddress->GetAddress().Contains(position)) {
67125
} else {
68-
CurrentChunkAddress = ChunkedArray->DoGetChunk(CurrentChunkAddress, position);
126+
CurrentChunkAddress = ChunkedArray->GetChunk(CurrentChunkAddress, position);
69127
}
70-
return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), position - CurrentChunkAddress->GetStartPosition(), CurrentChunkAddress->GetChunkIndex());
128+
return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), CurrentChunkAddress->GetAddress().GetLocalIndex(position));
71129
}
72130

73131
const std::partial_ordering IChunkedArray::TAddress::Compare(const TAddress& item) const {
@@ -82,24 +140,20 @@ const std::partial_ordering IChunkedArray::TAddress::Compare(const TAddress& ite
82140
AFL_VERIFY(Array->GetRecordsCount());
83141
}
84142

85-
std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const {
86-
AFL_VERIFY(GetStartPosition() <= position)("pos", position)("start", GetStartPosition());
87-
AFL_VERIFY(position < GetFinishPosition())("pos", position)("finish", GetFinishPosition());
88-
AFL_VERIFY(item.GetStartPosition() <= itemPosition)("start", item.GetStartPosition())("item", itemPosition);
89-
AFL_VERIFY(itemPosition < item.GetFinishPosition())("item", itemPosition)("finish", item.GetFinishPosition());
90-
return TComparator::TypedCompare<true>(*Array, position - GetStartPosition(), *item.Array, itemPosition - item.GetStartPosition());
143+
std::partial_ordering IChunkedArray::TFullDataAddress::Compare(
144+
const ui64 position, const TFullDataAddress& item, const ui64 itemPosition) const {
145+
AFL_VERIFY(Address.Contains(position))("pos", position)("start", Address.DebugString());
146+
AFL_VERIFY(item.Address.Contains(itemPosition))("pos", itemPosition)("start", item.Address.DebugString());
147+
return TComparator::TypedCompare<true>(
148+
*Array, Address.GetLocalIndex(position), *item.Array, item.Address.GetLocalIndex(itemPosition));
91149
}
92150

93-
std::shared_ptr<arrow::Array> IChunkedArray::TCurrentChunkAddress::CopyRecord(const ui64 recordIndex) const {
94-
AFL_VERIFY(GetStartPosition() <= recordIndex);
95-
AFL_VERIFY(recordIndex < GetFinishPosition());
96-
return NArrow::CopyRecords(Array, { recordIndex - GetStartPosition() });
151+
std::shared_ptr<arrow::Array> IChunkedArray::TFullDataAddress::CopyRecord(const ui64 recordIndex) const {
152+
return NArrow::CopyRecords(Array, { Address.GetLocalIndex(recordIndex) });
97153
}
98154

99-
TString IChunkedArray::TCurrentChunkAddress::DebugString(const ui64 position) const {
100-
AFL_VERIFY(position < GetFinishPosition());
101-
AFL_VERIFY(GetStartPosition() <= position);
102-
return NArrow::DebugString(Array, position - GetStartPosition());
155+
TString IChunkedArray::TFullDataAddress::DebugString(const ui64 position) const {
156+
return NArrow::DebugString(Array, Address.GetLocalIndex(position));
103157
}
104158

105159
}

0 commit comments

Comments
 (0)