Skip to content

Commit bf1621b

Browse files
Merge 6868cc4 into e77b182
2 parents e77b182 + 6868cc4 commit bf1621b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+1589
-945
lines changed

ydb/core/formats/arrow/common/accessor.cpp

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include <ydb/library/actors/core/log.h>
66
#include <ydb/core/formats/arrow/permutations.h>
77
#include <ydb/core/formats/arrow/arrow_helpers.h>
8+
#include <ydb/core/formats/arrow/splitter/simple.h>
9+
#include <ydb/core/formats/arrow/save_load/saver.h>
810

911
namespace NKikimr::NArrow::NAccessor {
1012

@@ -72,38 +74,36 @@ const std::partial_ordering IChunkedArray::TAddress::Compare(const TAddress& ite
7274
return TComparator::TypedCompare<true>(*Array, Position, *item.Array, item.Position);
7375
}
7476

75-
namespace {
76-
class TChunkAccessor {
77-
private:
78-
std::shared_ptr<arrow::ChunkedArray> ChunkedArray;
79-
public:
80-
TChunkAccessor(const std::shared_ptr<arrow::ChunkedArray>& chunkedArray)
81-
: ChunkedArray(chunkedArray)
82-
{
83-
84-
}
85-
ui64 GetChunksCount() const {
86-
return (ui64)ChunkedArray->num_chunks();
87-
}
88-
ui64 GetChunkLength(const ui32 idx) const {
89-
return (ui64)ChunkedArray->chunk(idx)->length();
90-
}
91-
std::shared_ptr<arrow::Array> GetArray(const ui32 idx) const {
92-
return ChunkedArray->chunk(idx);
93-
}
94-
};
95-
77+
TChunkedArraySerialized::TChunkedArraySerialized(const std::shared_ptr<IChunkedArray>& array, const TString& serializedData)
78+
: Array(array)
79+
, SerializedData(serializedData) {
80+
AFL_VERIFY(serializedData);
81+
AFL_VERIFY(Array);
82+
AFL_VERIFY(Array->GetRecordsCount());
9683
}
9784

9885
std::optional<ui64> TTrivialArray::DoGetRawSize() const {
9986
return NArrow::GetArrayDataSize(Array);
10087
}
10188

89+
std::vector<NKikimr::NArrow::NAccessor::TChunkedArraySerialized> TTrivialArray::DoSplitBySizes(
90+
const TColumnSaver& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) {
91+
auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("f", GetDataType()) }));
92+
auto chunks = NArrow::NSplitter::TSimpleSplitter(saver).SplitBySizes(
93+
arrow::RecordBatch::Make(schema, GetRecordsCount(), { Array }), fullSerializedData, splitSizes);
94+
std::vector<TChunkedArraySerialized> result;
95+
for (auto&& i : chunks) {
96+
AFL_VERIFY(i.GetSlicedBatch()->num_columns() == 1);
97+
result.emplace_back(std::make_shared<TTrivialArray>(i.GetSlicedBatch()->column(0)), i.GetSerializedChunk());
98+
}
99+
return result;
100+
}
101+
102102
std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const {
103-
AFL_VERIFY(StartPosition <= position);
104-
AFL_VERIFY(position < FinishPosition);
105-
AFL_VERIFY(item.StartPosition <= itemPosition);
106-
AFL_VERIFY(itemPosition < item.FinishPosition);
103+
AFL_VERIFY(StartPosition <= position)("pos", position)("start", StartPosition);
104+
AFL_VERIFY(position < FinishPosition)("pos", position)("finish", FinishPosition);
105+
AFL_VERIFY(item.StartPosition <= itemPosition)("start", item.StartPosition)("item", itemPosition);
106+
AFL_VERIFY(itemPosition < item.FinishPosition)("item", itemPosition)("finish", item.FinishPosition);
107107
return TComparator::TypedCompare<true>(*Array, position - StartPosition, *item.Array, itemPosition - item.StartPosition);
108108
}
109109

@@ -119,9 +119,38 @@ TString IChunkedArray::TCurrentChunkAddress::DebugString(const ui64 position) co
119119
return NArrow::DebugString(Array, position - StartPosition);
120120
}
121121

122-
IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {
123-
TChunkAccessor accessor(Array);
124-
return SelectChunk(chunkCurrent, position, accessor);
122+
namespace {
123+
class TChunkAccessor {
124+
private:
125+
std::shared_ptr<arrow::ChunkedArray> ChunkedArray;
126+
std::optional<IChunkedArray::TCurrentChunkAddress>* Result;
127+
128+
public:
129+
TChunkAccessor(const std::shared_ptr<arrow::ChunkedArray>& chunkedArray, std::optional<IChunkedArray::TCurrentChunkAddress>& result)
130+
: ChunkedArray(chunkedArray)
131+
, Result(&result)
132+
{
133+
}
134+
ui64 GetChunksCount() const {
135+
return (ui64)ChunkedArray->num_chunks();
136+
}
137+
ui64 GetChunkLength(const ui32 idx) const {
138+
return (ui64)ChunkedArray->chunk(idx)->length();
139+
}
140+
void OnArray(const ui32 idx, const ui32 startPosition, const ui32 /*internalPosition*/) const {
141+
*Result = IChunkedArray::TCurrentChunkAddress(ChunkedArray->chunk(idx), startPosition, idx);
142+
}
143+
};
144+
145+
} // namespace
146+
147+
IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk(
148+
const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {
149+
std::optional<IChunkedArray::TCurrentChunkAddress> result;
150+
TChunkAccessor accessor(Array, result);
151+
SelectChunk(chunkCurrent, position, accessor);
152+
AFL_VERIFY(result);
153+
return *result;
125154
}
126155

127156
std::optional<ui64> TTrivialChunkedArray::DoGetRawSize() const {

0 commit comments

Comments
 (0)