|
5 | 5 | #include <ydb/library/actors/core/log.h>
|
6 | 6 | #include <ydb/core/formats/arrow/permutations.h>
|
7 | 7 | #include <ydb/core/formats/arrow/arrow_helpers.h>
|
| 8 | +#include <ydb/core/formats/arrow/splitter/simple.h> |
| 9 | +#include <ydb/core/formats/arrow/save_load/saver.h> |
8 | 10 |
|
9 | 11 | namespace NKikimr::NArrow::NAccessor {
|
10 | 12 |
|
@@ -72,56 +74,83 @@ const std::partial_ordering IChunkedArray::TAddress::Compare(const TAddress& ite
|
72 | 74 | return TComparator::TypedCompare<true>(*Array, Position, *item.Array, item.Position);
|
73 | 75 | }
|
74 | 76 |
|
| 77 | + TChunkedArraySerialized::TChunkedArraySerialized(const std::shared_ptr<IChunkedArray>& array, const TString& serializedData) |
| 78 | + : Array(array) |
| 79 | + , SerializedData(serializedData) { |
| 80 | + AFL_VERIFY(serializedData); |
| 81 | + AFL_VERIFY(Array); |
| 82 | + AFL_VERIFY(Array->GetRecordsCount()); |
| 83 | +} |
| 84 | + |
| 85 | +std::optional<ui64> TTrivialArray::DoGetRawSize() const { |
| 86 | + return NArrow::GetArrayDataSize(Array); |
| 87 | +} |
| 88 | + |
| 89 | +std::vector<NKikimr::NArrow::NAccessor::TChunkedArraySerialized> TTrivialArray::DoSplitBySizes( |
| 90 | + const TColumnSaver& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) { |
| 91 | + auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("f", GetDataType()) })); |
| 92 | + auto chunks = NArrow::NSplitter::TSimpleSplitter(saver).SplitBySizes( |
| 93 | + arrow::RecordBatch::Make(schema, GetRecordsCount(), { Array }), fullSerializedData, splitSizes); |
| 94 | + std::vector<TChunkedArraySerialized> result; |
| 95 | + for (auto&& i : chunks) { |
| 96 | + AFL_VERIFY(i.GetSlicedBatch()->num_columns() == 1); |
| 97 | + result.emplace_back(std::make_shared<TTrivialArray>(i.GetSlicedBatch()->column(0)), i.GetSerializedChunk()); |
| 98 | + } |
| 99 | + return result; |
| 100 | +} |
| 101 | + |
| 102 | +std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const { |
| 103 | + AFL_VERIFY(GetStartPosition() <= position)("pos", position)("start", GetStartPosition()); |
| 104 | + AFL_VERIFY(position < GetFinishPosition())("pos", position)("finish", GetFinishPosition()); |
| 105 | + AFL_VERIFY(item.GetStartPosition() <= itemPosition)("start", item.GetStartPosition())("item", itemPosition); |
| 106 | + AFL_VERIFY(itemPosition < item.GetFinishPosition())("item", itemPosition)("finish", item.GetFinishPosition()); |
| 107 | + return TComparator::TypedCompare<true>(*Array, position - GetStartPosition(), *item.Array, itemPosition - item.GetStartPosition()); |
| 108 | +} |
| 109 | + |
| 110 | +std::shared_ptr<arrow::Array> IChunkedArray::TCurrentChunkAddress::CopyRecord(const ui64 recordIndex) const { |
| 111 | + AFL_VERIFY(GetStartPosition() <= recordIndex); |
| 112 | + AFL_VERIFY(recordIndex < GetFinishPosition()); |
| 113 | + return NArrow::CopyRecords(Array, { recordIndex - GetStartPosition() }); |
| 114 | +} |
| 115 | + |
| 116 | +TString IChunkedArray::TCurrentChunkAddress::DebugString(const ui64 position) const { |
| 117 | + AFL_VERIFY(position < GetFinishPosition()); |
| 118 | + AFL_VERIFY(GetStartPosition() <= position); |
| 119 | + return NArrow::DebugString(Array, position - GetStartPosition()); |
| 120 | +} |
| 121 | + |
75 | 122 | namespace {
|
76 | 123 | class TChunkAccessor {
|
77 | 124 | private:
|
78 | 125 | std::shared_ptr<arrow::ChunkedArray> ChunkedArray;
|
| 126 | + std::optional<IChunkedArray::TCurrentChunkAddress>* Result; |
| 127 | + |
79 | 128 | public:
|
80 |
| - TChunkAccessor(const std::shared_ptr<arrow::ChunkedArray>& chunkedArray) |
| 129 | + TChunkAccessor(const std::shared_ptr<arrow::ChunkedArray>& chunkedArray, std::optional<IChunkedArray::TCurrentChunkAddress>& result) |
81 | 130 | : ChunkedArray(chunkedArray)
|
| 131 | + , Result(&result) |
82 | 132 | {
|
83 |
| - |
84 | 133 | }
|
85 | 134 | ui64 GetChunksCount() const {
|
86 | 135 | return (ui64)ChunkedArray->num_chunks();
|
87 | 136 | }
|
88 | 137 | ui64 GetChunkLength(const ui32 idx) const {
|
89 | 138 | return (ui64)ChunkedArray->chunk(idx)->length();
|
90 | 139 | }
|
91 |
| - std::shared_ptr<arrow::Array> GetArray(const ui32 idx) const { |
92 |
| - return ChunkedArray->chunk(idx); |
| 140 | + void OnArray(const ui32 idx, const ui32 startPosition, const ui32 /*internalPosition*/) const { |
| 141 | + *Result = IChunkedArray::TCurrentChunkAddress(ChunkedArray->chunk(idx), startPosition, idx); |
93 | 142 | }
|
94 | 143 | };
|
95 | 144 |
|
96 |
| -} |
97 |
| - |
98 |
| -std::optional<ui64> TTrivialArray::DoGetRawSize() const { |
99 |
| - return NArrow::GetArrayDataSize(Array); |
100 |
| -} |
101 |
| - |
102 |
| -std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const { |
103 |
| - AFL_VERIFY(StartPosition <= position); |
104 |
| - AFL_VERIFY(position < FinishPosition); |
105 |
| - AFL_VERIFY(item.StartPosition <= itemPosition); |
106 |
| - AFL_VERIFY(itemPosition < item.FinishPosition); |
107 |
| - return TComparator::TypedCompare<true>(*Array, position - StartPosition, *item.Array, itemPosition - item.StartPosition); |
108 |
| -} |
109 |
| - |
110 |
| -std::shared_ptr<arrow::Array> IChunkedArray::TCurrentChunkAddress::CopyRecord(const ui64 recordIndex) const { |
111 |
| - AFL_VERIFY(StartPosition <= recordIndex); |
112 |
| - AFL_VERIFY(recordIndex < FinishPosition); |
113 |
| - return NArrow::CopyRecords(Array, { recordIndex - StartPosition }); |
114 |
| -} |
115 |
| - |
116 |
| -TString IChunkedArray::TCurrentChunkAddress::DebugString(const ui64 position) const { |
117 |
| - AFL_VERIFY(position < FinishPosition); |
118 |
| - AFL_VERIFY(StartPosition <= position); |
119 |
| - return NArrow::DebugString(Array, position - StartPosition); |
120 |
| -} |
| 145 | +} // namespace |
121 | 146 |
|
122 |
| -IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const { |
123 |
| - TChunkAccessor accessor(Array); |
124 |
| - return SelectChunk(chunkCurrent, position, accessor); |
| 147 | +IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk( |
| 148 | + const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const { |
| 149 | + std::optional<IChunkedArray::TCurrentChunkAddress> result; |
| 150 | + TChunkAccessor accessor(Array, result); |
| 151 | + SelectChunk(chunkCurrent, position, accessor); |
| 152 | + AFL_VERIFY(result); |
| 153 | + return *result; |
125 | 154 | }
|
126 | 155 |
|
127 | 156 | std::optional<ui64> TTrivialChunkedArray::DoGetRawSize() const {
|
|
0 commit comments