Skip to content

Commit 0d2f9d1

Browse files
special json accessors (#13933)
write json as subcolumns accessor for fast way select columns data
1 parent 709b413 commit 0d2f9d1

File tree

102 files changed

+4292
-670
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+4292
-670
lines changed

.github/config/muted_ya.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.Select
2828
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
2929
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
3030
ydb/core/kqp/ut/olap KqpOlap.TableSinkWithOlapStore
31+
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictActualization
32+
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictStatActualization
3133
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1
3234
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean
3335
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restarts

ydb/core/formats/arrow/accessor/abstract/constructor.h

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#pragma once
22

3-
#include <ydb/library/formats/arrow/protos/accessor.pb.h>
43
#include <ydb/library/formats/arrow/accessor/abstract/accessor.h>
54
#include <ydb/library/formats/arrow/accessor/common/chunk_data.h>
5+
#include <ydb/library/formats/arrow/protos/accessor.pb.h>
66
#include <ydb/services/bg_tasks/abstract/interface.h>
77

88
#include <library/cpp/object_factory/object_factory.h>
@@ -15,27 +15,33 @@ class IConstructor {
1515
using TProto = NKikimrArrowAccessorProto::TConstructor;
1616

1717
private:
18-
virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct(
19-
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const = 0;
20-
virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstructDefault(
21-
const TChunkConstructionData& externalInfo) const = 0;
18+
YDB_READONLY(IChunkedArray::EType, Type, IChunkedArray::EType::Undefined);
19+
20+
virtual TConclusion<std::shared_ptr<IChunkedArray>> DoDeserializeFromString(
21+
const TString& originalData, const TChunkConstructionData& externalInfo) const = 0;
22+
virtual TConclusion<std::shared_ptr<IChunkedArray>> DoConstructDefault(const TChunkConstructionData& externalInfo) const = 0;
2223
virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const = 0;
2324
virtual bool DoDeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) = 0;
24-
virtual std::shared_ptr<arrow::Schema> DoGetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const = 0;
2525
virtual TString DoDebugString() const {
2626
return "";
2727
}
2828
virtual bool DoIsEqualWithSameTypeTo(const IConstructor& item) const = 0;
29-
virtual std::shared_ptr<arrow::RecordBatch> DoConstruct(
30-
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const = 0;
29+
virtual TString DoSerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const = 0;
30+
31+
virtual TConclusion<std::shared_ptr<IChunkedArray>> DoConstruct(
32+
const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& originalArray, const TChunkConstructionData& externalInfo) const = 0;
3133

3234
public:
35+
IConstructor(const IChunkedArray::EType type)
36+
: Type(type) {
37+
}
38+
3339
virtual ~IConstructor() = default;
3440

35-
std::shared_ptr<arrow::RecordBatch> Construct(
36-
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
41+
TString SerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
3742
AFL_VERIFY(columnData);
38-
return DoConstruct(columnData, externalInfo);
43+
AFL_VERIFY(columnData->GetType() == Type)("column", columnData->GetType())("current", Type);
44+
return DoSerializeToString(columnData, externalInfo);
3945
}
4046

4147
bool IsEqualWithSameTypeTo(const IConstructor& item) const {
@@ -46,15 +52,25 @@ class IConstructor {
4652
return TStringBuilder() << GetClassName() << ":" << DoDebugString();
4753
}
4854

49-
TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> Construct(
50-
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const {
51-
return DoConstruct(originalData, externalInfo);
55+
TConclusion<std::shared_ptr<IChunkedArray>> DeserializeFromString(
56+
const TString& originalData, const TChunkConstructionData& externalInfo) const {
57+
return DoDeserializeFromString(originalData, externalInfo);
5258
}
5359

54-
TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> ConstructDefault(const TChunkConstructionData& externalInfo) const {
60+
TConclusion<std::shared_ptr<IChunkedArray>> ConstructDefault(const TChunkConstructionData& externalInfo) const {
5561
return DoConstructDefault(externalInfo);
5662
}
5763

64+
TConclusion<std::shared_ptr<IChunkedArray>> Construct(
65+
const std::shared_ptr<IChunkedArray>& originalArray, const TChunkConstructionData& externalInfo) const {
66+
AFL_VERIFY(originalArray);
67+
if (originalArray->GetType() == GetType()) {
68+
return originalArray;
69+
} else {
70+
return DoConstruct(originalArray, externalInfo);
71+
}
72+
}
73+
5874
bool DeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) {
5975
return DoDeserializeFromProto(proto);
6076
}
@@ -67,17 +83,13 @@ class IConstructor {
6783
proto = DoSerializeToProto();
6884
}
6985

70-
std::shared_ptr<arrow::Schema> GetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const {
71-
AFL_VERIFY(resultColumn);
72-
return DoGetExpectedSchema(resultColumn);
73-
}
74-
7586
virtual TString GetClassName() const = 0;
7687
};
7788

7889
class TConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IConstructor> {
7990
private:
8091
using TBase = NBackgroundTasks::TInterfaceProtoContainer<IConstructor>;
92+
8193
public:
8294
using TBase::TBase;
8395

@@ -94,9 +106,15 @@ class TConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<I
94106
}
95107
}
96108

97-
std::shared_ptr<arrow::RecordBatch> Construct(const std::shared_ptr<IChunkedArray>& batch, const TChunkConstructionData& externalInfo) const {
109+
TString SerializeToString(const std::shared_ptr<IChunkedArray>& batch, const TChunkConstructionData& externalInfo) const {
110+
AFL_VERIFY(!!GetObjectPtr());
111+
return GetObjectPtr()->SerializeToString(batch, externalInfo);
112+
}
113+
114+
TConclusion<std::shared_ptr<IChunkedArray>> DeserializeFromString(
115+
const TString& originalData, const TChunkConstructionData& externalInfo) const {
98116
AFL_VERIFY(!!GetObjectPtr());
99-
return GetObjectPtr()->Construct(batch, externalInfo);
117+
return GetObjectPtr()->DeserializeFromString(originalData, externalInfo);
100118
}
101119

102120
static TConstructorContainer GetDefaultConstructor();

ydb/core/formats/arrow/accessor/composite_serial/accessor.h

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
#pragma once
22
#include <ydb/core/formats/arrow/save_load/loader.h>
3+
34
#include <ydb/library/formats/arrow/accessor/abstract/accessor.h>
5+
#include <ydb/library/formats/arrow/accessor/composite/accessor.h>
46

57
namespace NKikimr::NArrow::NAccessor {
68

7-
class TDeserializeChunkedArray: public NArrow::NAccessor::IChunkedArray {
9+
class TDeserializeChunkedArray: public ICompositeChunkedArray {
810
private:
9-
using TBase = NArrow::NAccessor::IChunkedArray;
11+
using TBase = ICompositeChunkedArray;
1012

1113
public:
1214
class TChunk {
@@ -40,16 +42,19 @@ class TDeserializeChunkedArray: public NArrow::NAccessor::IChunkedArray {
4042
std::vector<TChunk> Chunks;
4143

4244
protected:
45+
virtual ui32 DoGetNullsCount() const override {
46+
AFL_VERIFY(false);
47+
return 0;
48+
}
49+
virtual ui32 DoGetValueRawBytes() const override {
50+
AFL_VERIFY(false);
51+
return 0;
52+
}
53+
4354
virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray(
4455
const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override;
4556
virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override;
4657

47-
virtual std::vector<TChunkedArraySerialized> DoSplitBySizes(
48-
const TColumnSaver& /*saver*/, const TString& /*fullSerializedData*/, const std::vector<ui64>& /*splitSizes*/) override {
49-
AFL_VERIFY(false);
50-
return {};
51-
}
52-
5358
virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 /*index*/) const override {
5459
AFL_VERIFY(false)("problem", "cannot use method");
5560
return nullptr;
@@ -61,7 +66,7 @@ class TDeserializeChunkedArray: public NArrow::NAccessor::IChunkedArray {
6166
AFL_VERIFY(false);
6267
return nullptr;
6368
}
64-
virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const override {
69+
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const override {
6570
AFL_VERIFY(false);
6671
return nullptr;
6772
}

ydb/core/formats/arrow/accessor/plain/accessor.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "accessor.h"
22

33
#include <ydb/core/formats/arrow/arrow_helpers.h>
4+
#include <ydb/core/formats/arrow/save_load/loader.h>
45
#include <ydb/core/formats/arrow/size_calcer.h>
56
#include <ydb/core/formats/arrow/splitter/simple.h>
67

@@ -10,24 +11,15 @@ std::optional<ui64> TTrivialArray::DoGetRawSize() const {
1011
return NArrow::GetArrayDataSize(Array);
1112
}
1213

13-
std::vector<NKikimr::NArrow::NAccessor::TChunkedArraySerialized> TTrivialArray::DoSplitBySizes(
14-
const TColumnSaver& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) {
15-
auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("f", GetDataType()) }));
16-
auto chunks = NArrow::NSplitter::TSimpleSplitter(saver).SplitBySizes(
17-
arrow::RecordBatch::Make(schema, GetRecordsCount(), { Array }), fullSerializedData, splitSizes);
18-
std::vector<TChunkedArraySerialized> result;
19-
for (auto&& i : chunks) {
20-
AFL_VERIFY(i.GetSlicedBatch()->num_columns() == 1);
21-
result.emplace_back(std::make_shared<TTrivialArray>(i.GetSlicedBatch()->column(0)), i.GetSerializedChunk());
22-
}
23-
return result;
24-
}
25-
2614
std::shared_ptr<arrow::Scalar> TTrivialArray::DoGetMaxScalar() const {
2715
auto minMaxPos = NArrow::FindMinMaxPosition(Array);
2816
return NArrow::TStatusValidator::GetValid(Array->GetScalar(minMaxPos.second));
2917
}
3018

19+
ui32 TTrivialArray::DoGetValueRawBytes() const {
20+
return NArrow::GetArrayDataSize(Array);
21+
}
22+
3123
namespace {
3224
class TChunkAccessor {
3325
private:
@@ -85,4 +77,12 @@ std::shared_ptr<arrow::Scalar> TTrivialChunkedArray::DoGetMaxScalar() const {
8577
return result;
8678
}
8779

80+
ui32 TTrivialChunkedArray::DoGetValueRawBytes() const {
81+
ui32 result = 0;
82+
for (auto&& i : Array->chunks()) {
83+
result += NArrow::GetArrayDataSize(i);
84+
}
85+
return result;
86+
}
87+
8888
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/plain/accessor.h

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22
#include <ydb/library/formats/arrow/accessor/abstract/accessor.h>
3+
#include <ydb/library/formats/arrow/arrow_helpers.h>
34
#include <ydb/library/formats/arrow/validation/validation.h>
45

56
namespace NKikimr::NArrow::NAccessor {
@@ -12,24 +13,21 @@ class TTrivialArray: public IChunkedArray {
1213
protected:
1314
virtual std::optional<ui64> DoGetRawSize() const override;
1415

15-
virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override {
16+
virtual TLocalDataAddress DoGetLocalData(
17+
const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override {
1618
return TLocalDataAddress(Array, 0, 0);
1719
}
18-
virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const override {
19-
return std::make_shared<arrow::ChunkedArray>(Array);
20-
}
2120
virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const override {
2221
return NArrow::TStatusValidator::GetValid(Array->GetScalar(index));
2322
}
2423
virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override;
25-
virtual std::vector<TChunkedArraySerialized> DoSplitBySizes(
26-
const TColumnSaver& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) override;
27-
28-
virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray(
29-
const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override {
30-
AFL_VERIFY(false);
31-
return TLocalChunkedArrayAddress(nullptr, TCommonChunkAddress(0, GetRecordsCount(), 0));
24+
virtual std::shared_ptr<IChunkedArray> DoISlice(const ui32 offset, const ui32 count) const override {
25+
return std::make_shared<TTrivialArray>(Array->Slice(offset, count));
26+
}
27+
virtual ui32 DoGetNullsCount() const override {
28+
return Array->null_count();
3229
}
30+
virtual ui32 DoGetValueRawBytes() const override;
3331

3432
public:
3533
const std::shared_ptr<arrow::Array>& GetArray() const {
@@ -40,6 +38,43 @@ class TTrivialArray: public IChunkedArray {
4038
: TBase(data->length(), EType::Array, data->type())
4139
, Array(data) {
4240
}
41+
42+
template <class TArrowDataType = arrow::StringType>
43+
class TPlainBuilder {
44+
private:
45+
std::unique_ptr<arrow::ArrayBuilder> Builder;
46+
std::optional<ui32> LastRecordIndex;
47+
48+
public:
49+
TPlainBuilder(const ui32 reserveItems = 0, const ui32 reserveSize = 0) {
50+
Builder = NArrow::MakeBuilder(arrow::TypeTraits<TArrowDataType>::type_singleton(), reserveItems, reserveSize);
51+
}
52+
53+
void AddRecord(const ui32 recordIndex, const std::string_view value) {
54+
if (LastRecordIndex) {
55+
AFL_VERIFY(*LastRecordIndex < recordIndex)("last", LastRecordIndex)("index", recordIndex);
56+
TStatusValidator::Validate(Builder->AppendNulls(recordIndex - *LastRecordIndex - 1));
57+
} else {
58+
TStatusValidator::Validate(Builder->AppendNulls(recordIndex));
59+
}
60+
LastRecordIndex = recordIndex;
61+
AFL_VERIFY(NArrow::Append<TArrowDataType>(*Builder, arrow::util::string_view(value.data(), value.size())));
62+
}
63+
64+
std::shared_ptr<IChunkedArray> Finish(const ui32 recordsCount) {
65+
if (LastRecordIndex) {
66+
AFL_VERIFY(*LastRecordIndex < recordsCount)("last", LastRecordIndex)("count", recordsCount);
67+
TStatusValidator::Validate(Builder->AppendNulls(recordsCount - *LastRecordIndex - 1));
68+
} else {
69+
TStatusValidator::Validate(Builder->AppendNulls(recordsCount));
70+
}
71+
return std::make_shared<TTrivialArray>(NArrow::FinishBuilder(std::move(Builder)));
72+
}
73+
};
74+
75+
static TPlainBuilder<arrow::StringType> MakeBuilderUtf8(const ui32 reserveItems = 0, const ui32 reserveSize = 0) {
76+
return TPlainBuilder<arrow::StringType>(reserveItems, reserveSize);
77+
}
4378
};
4479

4580
class TTrivialChunkedArray: public IChunkedArray {
@@ -48,28 +83,21 @@ class TTrivialChunkedArray: public IChunkedArray {
4883
const std::shared_ptr<arrow::ChunkedArray> Array;
4984

5085
protected:
51-
virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override;
52-
virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const override {
53-
return Array;
86+
virtual ui32 DoGetValueRawBytes() const override;
87+
virtual ui32 DoGetNullsCount() const override {
88+
return Array->null_count();
5489
}
90+
virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override;
5591
virtual std::optional<ui64> DoGetRawSize() const override;
5692
virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const override {
5793
auto chunk = GetChunkSlow(index);
5894
return NArrow::TStatusValidator::GetValid(chunk.GetArray()->GetScalar(chunk.GetAddress().GetLocalIndex(index)));
5995
}
60-
virtual std::vector<TChunkedArraySerialized> DoSplitBySizes(
61-
const TColumnSaver& /*saver*/, const TString& /*fullSerializedData*/, const std::vector<ui64>& /*splitSizes*/) override {
62-
AFL_VERIFY(false);
63-
return {};
64-
}
6596

66-
virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override;
67-
68-
virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray(
69-
const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override {
70-
AFL_VERIFY(false);
71-
return TLocalChunkedArrayAddress(nullptr, TCommonChunkAddress(0, 0, 0));
97+
virtual std::shared_ptr<IChunkedArray> DoISlice(const ui32 offset, const ui32 count) const override {
98+
return std::make_shared<TTrivialChunkedArray>(Array->Slice(offset, count));
7299
}
100+
virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override;
73101

74102
public:
75103
TTrivialChunkedArray(const std::shared_ptr<arrow::ChunkedArray>& data)

0 commit comments

Comments
 (0)