Skip to content

Commit b0bd8f9

Browse files
correct indexation and fix json construction (#15222)
1 parent 11dd5a0 commit b0bd8f9

File tree

35 files changed

+367
-93
lines changed

35 files changed

+367
-93
lines changed

ydb/core/formats/arrow/accessor/abstract/accessor.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
44
#include <ydb/core/formats/arrow/arrow_filter.h>
5+
#include <ydb/core/formats/arrow/arrow_helpers.h>
56

67
#include <ydb/library/actors/core/log.h>
78
#include <ydb/library/formats/arrow/arrow_helpers.h>
@@ -133,6 +134,17 @@ std::shared_ptr<IChunkedArray> IChunkedArray::ApplyFilter(const TColumnFilter& f
133134
return result;
134135
}
135136

137+
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::GetChunkedArray() const {
138+
std::vector<std::shared_ptr<arrow::Array>> chunks;
139+
std::optional<TFullDataAddress> address;
140+
for (ui32 position = 0; position < GetRecordsCount();) {
141+
address = GetChunk(address, position);
142+
chunks.emplace_back(address->GetArray());
143+
position += address->GetArray()->length();
144+
}
145+
return std::make_shared<arrow::ChunkedArray>(chunks, GetDataType());
146+
}
147+
136148
TString IChunkedArray::TReader::DebugString(const ui32 position) const {
137149
auto address = GetReadChunk(position);
138150
return NArrow::DebugString(address.GetArray(), address.GetPosition());
@@ -188,4 +200,8 @@ TString IChunkedArray::TFullDataAddress::DebugString(const ui64 position) const
188200
return NArrow::DebugString(Array, Address.GetLocalIndex(position));
189201
}
190202

203+
void IChunkedArray::TLocalDataAddress::Reallocate() {
204+
Array = NArrow::ReallocateArray(Array);
205+
}
206+
191207
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/abstract/accessor.h

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class IChunkedArray {
4747
SubColumnsPartialArray
4848
};
4949

50+
using TValuesSimpleVisitor = std::function<void(std::shared_ptr<arrow::Array>)>;
51+
5052
class TCommonChunkAddress {
5153
private:
5254
YDB_READONLY(ui64, StartPosition, 0);
@@ -192,6 +194,8 @@ class IChunkedArray {
192194
TCommonChunkAddress Address;
193195

194196
public:
197+
void Reallocate();
198+
195199
const TCommonChunkAddress& GetAddress() const {
196200
return Address;
197201
}
@@ -254,6 +258,7 @@ class IChunkedArray {
254258
virtual ui32 DoGetNullsCount() const = 0;
255259
virtual ui32 DoGetValueRawBytes() const = 0;
256260
virtual std::shared_ptr<IChunkedArray> DoApplyFilter(const TColumnFilter& filter) const;
261+
virtual void DoVisitValues(const TValuesSimpleVisitor& visitor) const = 0;
257262

258263
protected:
259264
std::shared_ptr<arrow::Schema> GetArraySchema() const {
@@ -323,6 +328,14 @@ class IChunkedArray {
323328
public:
324329
std::shared_ptr<IChunkedArray> ApplyFilter(const TColumnFilter& filter, const std::shared_ptr<IChunkedArray>& selfPtr) const;
325330

331+
virtual void Reallocate() {
332+
333+
}
334+
335+
void VisitValues(const TValuesSimpleVisitor& visitor) const {
336+
DoVisitValues(visitor);
337+
}
338+
326339
template <class TResult, class TActor>
327340
static std::optional<TResult> VisitDataOwners(const std::shared_ptr<IChunkedArray>& arr, const TActor& actor) {
328341
AFL_VERIFY(arr);
@@ -416,16 +429,7 @@ class IChunkedArray {
416429
return *result;
417430
}
418431

419-
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const {
420-
std::vector<std::shared_ptr<arrow::Array>> chunks;
421-
std::optional<TFullDataAddress> address;
422-
for (ui32 position = 0; position < GetRecordsCount();) {
423-
address = GetChunk(address, position);
424-
chunks.emplace_back(address->GetArray());
425-
position += address->GetArray()->length();
426-
}
427-
return std::make_shared<arrow::ChunkedArray>(chunks, GetDataType());
428-
}
432+
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const;
429433
virtual ~IChunkedArray() = default;
430434

431435
std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;

ydb/core/formats/arrow/accessor/abstract/constructor.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,13 @@ class IConstructor {
6868
if (originalArray->GetType() == GetType()) {
6969
return originalArray;
7070
} else {
71-
return DoConstruct(originalArray, externalInfo);
71+
auto result = DoConstruct(originalArray, externalInfo);
72+
if (result.IsFail()) {
73+
return result;
74+
}
75+
AFL_VERIFY(result.GetResult()->GetRecordsCount() == originalArray->GetRecordsCount())("result", result.GetResult()->GetRecordsCount())(
76+
"original", originalArray->GetRecordsCount());
77+
return result;
7278
}
7379
}
7480

ydb/core/formats/arrow/accessor/composite/accessor.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ class TCompositeChunkedArray: public ICompositeChunkedArray {
2222
private:
2323
YDB_READONLY_DEF(std::vector<std::shared_ptr<IChunkedArray>>, Chunks);
2424

25+
virtual void DoVisitValues(const TValuesSimpleVisitor& visitor) const override {
26+
for (auto&& i : Chunks) {
27+
i->VisitValues(visitor);
28+
}
29+
}
30+
2531
protected:
2632
virtual ui32 DoGetNullsCount() const override {
2733
AFL_VERIFY(false);

ydb/core/formats/arrow/accessor/composite/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ PEERDIR(
77
ydb/core/formats/arrow/accessor/plain
88
ydb/core/formats/arrow
99
yql/essentials/public/udf/service/stub
10+
yql/essentials/sql/pg_dummy
1011
ydb/core/formats/arrow
1112
)
1213

ydb/core/formats/arrow/accessor/composite_serial/accessor.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
#include "accessor.h"
22

3+
#include <ydb/core/formats/arrow/accessor/sparsed/constructor.h>
4+
5+
#include <ydb/library/actors/prof/tag.h>
6+
37
namespace NKikimr::NArrow::NAccessor {
48

59
namespace {
@@ -26,8 +30,21 @@ class TSerializedChunkAccessor {
2630
Result = IChunkedArray::TLocalChunkedArrayAddress(Chunks[chunkIdx].GetArrayVerified(Loader), startPosition, chunkIdx);
2731
}
2832
};
33+
2934
} // namespace
3035

36+
std::shared_ptr<IChunkedArray> TDeserializeChunkedArray::TChunk::GetArrayVerified(const std::shared_ptr<TColumnLoader>& loader) const {
37+
if (PredefinedArray) {
38+
return PredefinedArray;
39+
}
40+
if (!!Data) {
41+
return loader->ApplyVerified(Data, RecordsCount);
42+
} else {
43+
AFL_VERIFY(!!DataBuffer);
44+
return loader->ApplyVerified(TString(DataBuffer.data(), DataBuffer.size()), RecordsCount);
45+
}
46+
}
47+
3148
IChunkedArray::TLocalDataAddress TDeserializeChunkedArray::DoGetLocalData(
3249
const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const {
3350
AFL_VERIFY(false);

ydb/core/formats/arrow/accessor/composite_serial/accessor.h

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,24 +34,20 @@ class TDeserializeChunkedArray: public ICompositeChunkedArray {
3434
, DataBuffer(dataBuffer) {
3535
}
3636

37-
std::shared_ptr<IChunkedArray> GetArrayVerified(const std::shared_ptr<TColumnLoader>& loader) const {
38-
if (PredefinedArray) {
39-
return PredefinedArray;
40-
}
41-
if (!!Data) {
42-
return loader->ApplyVerified(Data, RecordsCount);
43-
} else {
44-
AFL_VERIFY(!!DataBuffer);
45-
return loader->ApplyVerified(TString(DataBuffer.data(), DataBuffer.size()), RecordsCount);
46-
}
47-
}
37+
std::shared_ptr<IChunkedArray> GetArrayVerified(const std::shared_ptr<TColumnLoader>& loader) const;
4838
};
4939

5040
private:
5141
std::shared_ptr<TColumnLoader> Loader;
5242
std::vector<TChunk> Chunks;
5343
const bool ForLazyInitialization = false;
5444

45+
virtual void DoVisitValues(const TValuesSimpleVisitor& visitor) const override {
46+
for (auto&& i : Chunks) {
47+
i.GetArrayVerified(Loader)->VisitValues(visitor);
48+
}
49+
}
50+
5551
protected:
5652
virtual ui32 DoGetNullsCount() const override {
5753
AFL_VERIFY(false);

ydb/core/formats/arrow/accessor/plain/accessor.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,24 @@ std::shared_ptr<TTrivialArray> TTrivialArray::BuildEmpty(const std::shared_ptr<a
2626
return std::make_shared<TTrivialArray>(TThreadSimpleArraysCache::GetNull(type, 0));
2727
}
2828

29+
void TTrivialArray::Reallocate() {
30+
Array = NArrow::ReallocateArray(Array);
31+
}
32+
33+
std::shared_ptr<arrow::Array> TTrivialArray::BuildArrayFromOptionalScalar(
34+
const std::shared_ptr<arrow::Scalar>& scalar, const std::shared_ptr<arrow::DataType>& type) {
35+
if (scalar) {
36+
AFL_VERIFY(scalar->type->id() == type->id());
37+
auto builder = NArrow::MakeBuilder(scalar->type, 1);
38+
TStatusValidator::Validate(builder->AppendScalar(*scalar));
39+
return NArrow::FinishBuilder(std::move(builder));
40+
} else {
41+
auto builder = NArrow::MakeBuilder(type, 1);
42+
TStatusValidator::Validate(builder->AppendNull());
43+
return NArrow::FinishBuilder(std::move(builder));
44+
}
45+
}
46+
2947
namespace {
3048
class TChunkAccessor {
3149
private:

ydb/core/formats/arrow/accessor/plain/accessor.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ namespace NKikimr::NArrow::NAccessor {
1010
class TTrivialArray: public IChunkedArray {
1111
private:
1212
using TBase = IChunkedArray;
13-
const std::shared_ptr<arrow::Array> Array;
13+
std::shared_ptr<arrow::Array> Array;
14+
15+
virtual void DoVisitValues(const TValuesSimpleVisitor& visitor) const override {
16+
visitor(Array);
17+
}
1418

1519
protected:
1620
virtual std::optional<ui64> DoGetRawSize() const override;
@@ -32,6 +36,8 @@ class TTrivialArray: public IChunkedArray {
3236
virtual ui32 DoGetValueRawBytes() const override;
3337

3438
public:
39+
virtual void Reallocate() override;
40+
3541
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const override {
3642
return std::make_shared<arrow::ChunkedArray>(Array);
3743
}
@@ -54,6 +60,9 @@ class TTrivialArray: public IChunkedArray {
5460
return NArrow::FinishBuilder(std::move(builder));
5561
}
5662

63+
static std::shared_ptr<arrow::Array> BuildArrayFromOptionalScalar(
64+
const std::shared_ptr<arrow::Scalar>& scalar, const std::shared_ptr<arrow::DataType>& type);
65+
5766
TTrivialArray(const std::shared_ptr<arrow::Scalar>& scalar)
5867
: TBase(1, EType::Array, TValidator::CheckNotNull(scalar)->type)
5968
, Array(BuildArrayFromScalar(scalar)) {
@@ -102,6 +111,12 @@ class TTrivialChunkedArray: public IChunkedArray {
102111
using TBase = IChunkedArray;
103112
const std::shared_ptr<arrow::ChunkedArray> Array;
104113

114+
virtual void DoVisitValues(const TValuesSimpleVisitor& visitor) const override {
115+
for (auto&& i : Array->chunks()) {
116+
visitor(i);
117+
}
118+
}
119+
105120
protected:
106121
virtual ui32 DoGetValueRawBytes() const override;
107122
virtual ui32 DoGetNullsCount() const override {

ydb/core/formats/arrow/accessor/sparsed/accessor.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ TSparsedArrayChunk TSparsedArray::MakeDefaultChunk(
9797
return TSparsedArrayChunk(recordsCount, it->second, defaultValue);
9898
}
9999

100+
void TSparsedArray::Reallocate() {
101+
Record = TSparsedArrayChunk(GetRecordsCount(), NArrow::ReallocateBatch(Record.GetRecords()), DefaultValue);
102+
}
103+
100104
IChunkedArray::TLocalDataAddress TSparsedArrayChunk::GetChunk(
101105
const std::optional<IChunkedArray::TCommonChunkAddress>& /*chunkCurrent*/, const ui64 position) const {
102106
const auto predCompare = [](const ui32 position, const TInternalChunkInfo& item) {
@@ -134,6 +138,7 @@ TSparsedArrayChunk::TSparsedArrayChunk(
134138
if (DefaultValue) {
135139
AFL_VERIFY(DefaultValue->type->id() == ColValue->type_id());
136140
}
141+
DefaultsArray = TTrivialArray::BuildArrayFromOptionalScalar(DefaultValue, ColValue->type());
137142
ui32 nextIndex = 0;
138143
ui32 startIndexExt = 0;
139144
ui32 startIndexInt = 0;

ydb/core/formats/arrow/accessor/sparsed/accessor.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22
#include <ydb/core/formats/arrow/accessor/abstract/accessor.h>
3+
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
34
#include <ydb/core/formats/arrow/arrow_helpers.h>
45

56
#include <ydb/library/accessor/accessor.h>
@@ -51,8 +52,14 @@ class TSparsedArrayChunk: public TMoveOnly {
5152
};
5253

5354
std::vector<TInternalChunkInfo> RemapExternalToInternal;
55+
std::shared_ptr<arrow::Array> DefaultsArray;
5456

5557
public:
58+
void VisitValues(const IChunkedArray::TValuesSimpleVisitor& visitor) const {
59+
visitor(ColValue);
60+
visitor(DefaultsArray);
61+
}
62+
5663
ui32 GetFinishPosition() const {
5764
return RecordsCount;
5865
}
@@ -101,6 +108,10 @@ class TSparsedArray: public IChunkedArray {
101108
TSparsedArrayChunk Record;
102109
friend class TSparsedArrayChunk;
103110

111+
virtual void DoVisitValues(const IChunkedArray::TValuesSimpleVisitor& visitor) const override {
112+
Record.VisitValues(visitor);
113+
}
114+
104115
protected:
105116
virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override;
106117

@@ -148,6 +159,8 @@ class TSparsedArray: public IChunkedArray {
148159
const std::shared_ptr<arrow::Scalar>& defaultValue, const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount);
149160

150161
public:
162+
virtual void Reallocate() override;
163+
151164
static std::shared_ptr<TSparsedArray> Make(const IChunkedArray& defaultArray, const std::shared_ptr<arrow::Scalar>& defaultValue);
152165

153166
TSparsedArray(const std::shared_ptr<arrow::Scalar>& defaultValue, const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount)

ydb/core/formats/arrow/accessor/sparsed/ut/ya.make

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ PEERDIR(
77
ydb/core/formats/arrow/accessor/plain
88
ydb/core/formats/arrow
99
yql/essentials/public/udf/service/stub
10-
ydb/core/formats/arrow
10+
yql/essentials/sql/pg_dummy
1111
)
1212

1313
YQL_LAST_ABI_VERSION()

ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ TConclusion<std::shared_ptr<TSubColumnsArray>> TSubColumnsArray::Make(const std:
2424
for (ui32 i = 0; i < reader.GetRecordsCount();) {
2525
auto address = reader.GetReadChunk(i);
2626
storage.emplace_back(address.GetArray());
27-
adapter->AddDataToBuilders(address.GetArray(), builder);
27+
auto conclusion = adapter->AddDataToBuilders(address.GetArray(), builder);
28+
if (conclusion.IsFail()) {
29+
return conclusion;
30+
}
2831
i += address.GetArray()->length();
2932
AFL_VERIFY(i <= reader.GetRecordsCount());
3033
}

ydb/core/formats/arrow/accessor/sub_columns/accessor.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ class TSubColumnsArray: public IChunkedArray {
6363
}
6464

6565
public:
66+
virtual void DoVisitValues(const std::function<void(std::shared_ptr<arrow::Array>)>& /*visitor*/) const override {
67+
AFL_VERIFY(false);
68+
}
69+
6670
void StoreSourceString(const TString& sourceDeserializationString) {
6771
AFL_VERIFY(!SourceDeserializationString);
6872
SourceDeserializationString = sourceDeserializationString;

0 commit comments

Comments
 (0)