Skip to content

Commit 8432391

Browse files
authored
Merge 5219891 into a196e19
2 parents a196e19 + 5219891 commit 8432391

File tree

94 files changed

+1539
-935
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+1539
-935
lines changed

ydb/core/formats/arrow/arrow_helpers.cpp

+14-1
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ std::shared_ptr<arrow::Scalar> DefaultScalar(const std::shared_ptr<arrow::DataTy
548548
}
549549
return true;
550550
});
551-
Y_ABORT_UNLESS(out);
551+
AFL_VERIFY(out)("type", type->ToString());
552552
return out;
553553
}
554554

@@ -634,6 +634,19 @@ int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr
634634
return ScalarCompare(*x, *y);
635635
}
636636

637+
int ScalarCompareNullable(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y) {
638+
if (!x && !!y) {
639+
return -1;
640+
}
641+
if (!!x && !y) {
642+
return 1;
643+
}
644+
if (!x && !y) {
645+
return 0;
646+
}
647+
return ScalarCompare(*x, *y);
648+
}
649+
637650
std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
638651
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
639652
auto sortPermutation = MakeSortPermutation(batch, sortingKey, andUnique);

ydb/core/formats/arrow/arrow_helpers.h

+1
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& ar
9898
bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x);
9999
int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y);
100100
int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
101+
int ScalarCompareNullable(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
101102
std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow);
102103
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
103104
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);

ydb/core/formats/arrow/common/accessor.cpp

+13
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "accessor.h"
2+
#include <ydb/core/formats/arrow/size_calcer.h>
23
#include <ydb/core/formats/arrow/switch/compare.h>
34
#include <ydb/core/formats/arrow/switch/switch_type.h>
45
#include <ydb/library/actors/core/log.h>
@@ -94,6 +95,10 @@ class TChunkAccessor {
9495

9596
}
9697

98+
std::optional<ui64> TTrivialArray::DoGetRawSize() const {
99+
return NArrow::GetArrayDataSize(Array);
100+
}
101+
97102
std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const {
98103
AFL_VERIFY(StartPosition <= position);
99104
AFL_VERIFY(position < FinishPosition);
@@ -119,4 +124,12 @@ IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk(const std::
119124
return SelectChunk(chunkCurrent, position, accessor);
120125
}
121126

127+
std::optional<ui64> TTrivialChunkedArray::DoGetRawSize() const {
128+
ui64 result = 0;
129+
for (auto&& i : Array->chunks()) {
130+
result += NArrow::GetArrayDataSize(i);
131+
}
132+
return result;
133+
}
134+
122135
}

ydb/core/formats/arrow/common/accessor.h

+16-5
Original file line numberDiff line numberDiff line change
@@ -84,19 +84,23 @@ class IChunkedArray {
8484
YDB_READONLY_DEF(std::shared_ptr<arrow::DataType>, DataType);
8585
YDB_READONLY(ui64, RecordsCount, 0);
8686
YDB_READONLY(EType, Type, EType::Undefined);
87+
virtual std::optional<ui64> DoGetRawSize() const = 0;
8788
protected:
8889
virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const = 0;
8990
virtual TCurrentChunkAddress DoGetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const = 0;
9091

9192
template <class TChunkAccessor>
9293
TCurrentChunkAddress SelectChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position, const TChunkAccessor& accessor) const {
93-
if (!chunkCurrent || position >= chunkCurrent->GetStartPosition() + chunkCurrent->GetLength()) {
94+
if (!chunkCurrent || position >= chunkCurrent->GetStartPosition()) {
9495
ui32 startIndex = 0;
9596
ui64 idx = 0;
9697
if (chunkCurrent) {
97-
AFL_VERIFY(chunkCurrent->GetChunkIndex() + 1 < accessor.GetChunksCount());
98-
startIndex = chunkCurrent->GetChunkIndex() + 1;
99-
idx = chunkCurrent->GetStartPosition() + chunkCurrent->GetLength();
98+
if (position < chunkCurrent->GetFinishPosition()) {
99+
return *chunkCurrent;
100+
}
101+
AFL_VERIFY(chunkCurrent->GetChunkIndex() < accessor.GetChunksCount());
102+
startIndex = chunkCurrent->GetChunkIndex();
103+
idx = chunkCurrent->GetStartPosition();
100104
}
101105
for (ui32 i = startIndex; i < accessor.GetChunksCount(); ++i) {
102106
const ui64 nextIdx = idx + accessor.GetChunkLength(i);
@@ -105,7 +109,7 @@ class IChunkedArray {
105109
}
106110
idx = nextIdx;
107111
}
108-
} else if (position < chunkCurrent->GetStartPosition()) {
112+
} else {
109113
AFL_VERIFY(chunkCurrent->GetChunkIndex() > 0);
110114
ui64 idx = chunkCurrent->GetStartPosition();
111115
for (i32 i = chunkCurrent->GetChunkIndex() - 1; i >= 0; --i) {
@@ -156,6 +160,10 @@ class IChunkedArray {
156160
TString DebugString(const ui32 position) const;
157161
};
158162

163+
std::optional<ui64> GetRawSize() const {
164+
return DoGetRawSize();
165+
}
166+
159167
std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const {
160168
return DoGetChunkedArray();
161169
}
@@ -180,6 +188,8 @@ class TTrivialArray: public IChunkedArray {
180188
using TBase = IChunkedArray;
181189
const std::shared_ptr<arrow::Array> Array;
182190
protected:
191+
virtual std::optional<ui64> DoGetRawSize() const override;
192+
183193
virtual TCurrentChunkAddress DoGetChunk(const std::optional<TCurrentChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override {
184194
return TCurrentChunkAddress(Array, 0, 0);
185195
}
@@ -204,6 +214,7 @@ class TTrivialChunkedArray: public IChunkedArray {
204214
virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const override {
205215
return Array;
206216
}
217+
virtual std::optional<ui64> DoGetRawSize() const override;
207218

208219
public:
209220
TTrivialChunkedArray(const std::shared_ptr<arrow::ChunkedArray>& data)

ydb/core/formats/arrow/common/adapter.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class TDataBuilderPolicy<TGeneralContainer> {
9191
return batch;
9292
}
9393
[[nodiscard]] static std::shared_ptr<TGeneralContainer> ApplyArrowFilter(const std::shared_ptr<TGeneralContainer>& batch, const std::shared_ptr<arrow::BooleanArray>& filter) {
94-
auto table = batch->BuildTable();
94+
auto table = batch->BuildTableVerified();
9595
return std::make_shared<TGeneralContainer>(TDataBuilderPolicy<arrow::Table>::ApplyArrowFilter(table, filter));
9696
}
9797
[[nodiscard]] static std::shared_ptr<TGeneralContainer> GetEmptySame(const std::shared_ptr<TGeneralContainer>& batch) {

ydb/core/formats/arrow/common/container.cpp

+119-21
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,60 @@
11
#include "container.h"
22
#include <ydb/library/actors/core/log.h>
3+
#include <ydb/core/formats/arrow/arrow_helpers.h>
34
#include <ydb/core/formats/arrow/simple_arrays_cache.h>
45

56
namespace NKikimr::NArrow {
67

7-
NKikimr::TConclusionStatus TGeneralContainer::MergeColumnsStrictly(const TGeneralContainer& container) {
8-
if (RecordsCount != container.RecordsCount) {
8+
TConclusionStatus TGeneralContainer::MergeColumnsStrictly(const TGeneralContainer& container) {
9+
if (!container.RecordsCount) {
10+
return TConclusionStatus::Success();
11+
}
12+
if (!RecordsCount) {
13+
RecordsCount = container.RecordsCount;
14+
}
15+
if (*RecordsCount != *container.RecordsCount) {
916
return TConclusionStatus::Fail(TStringBuilder() << "inconsistency records count in additional container: " <<
1017
container.GetSchema()->ToString() << ". expected: " << RecordsCount << ", reality: " << container.GetRecordsCount());
1118
}
1219
for (i32 i = 0; i < container.Schema->num_fields(); ++i) {
1320
auto addFieldResult = AddField(container.Schema->field(i), container.Columns[i]);
14-
if (!addFieldResult) {
21+
if (addFieldResult.IsFail()) {
1522
return addFieldResult;
1623
}
1724
}
1825
return TConclusionStatus::Success();
1926
}
2027

21-
NKikimr::TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<NAccessor::IChunkedArray>& data) {
28+
TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<NAccessor::IChunkedArray>& data) {
2229
AFL_VERIFY(f);
2330
AFL_VERIFY(data);
24-
if (data->GetRecordsCount() != RecordsCount) {
31+
if (RecordsCount && data->GetRecordsCount() != *RecordsCount) {
2532
return TConclusionStatus::Fail(TStringBuilder() << "inconsistency records count in new column: " <<
2633
f->name() << ". expected: " << RecordsCount << ", reality: " << data->GetRecordsCount());
2734
}
2835
if (!data->GetDataType()->Equals(f->type())) {
2936
return TConclusionStatus::Fail("schema and data type are not equals: " + data->GetDataType()->ToString() + " vs " + f->type()->ToString());
3037
}
31-
if (Schema->GetFieldByName(f->name())) {
32-
return TConclusionStatus::Fail("field name duplication: " + f->name());
33-
}
34-
auto resultAdd = Schema->AddField(Schema->num_fields(), f);
35-
if (!resultAdd.ok()) {
36-
return TConclusionStatus::Fail("internal schema error on add field: " + resultAdd.status().ToString());
38+
{
39+
auto conclusion = Schema->AddField(f);
40+
if (conclusion.IsFail()) {
41+
return conclusion;
42+
}
3743
}
38-
Schema = *resultAdd;
44+
RecordsCount = data->GetRecordsCount();
3945
Columns.emplace_back(data);
4046
return TConclusionStatus::Success();
4147
}
4248

43-
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Schema>& schema, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
44-
: Schema(schema)
45-
, Columns(std::move(columns))
46-
{
47-
AFL_VERIFY(schema);
49+
TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<arrow::ChunkedArray>& data) {
50+
return AddField(f, std::make_shared<NAccessor::TTrivialChunkedArray>(data));
51+
}
52+
53+
TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<arrow::Array>& data) {
54+
return AddField(f, std::make_shared<NAccessor::TTrivialArray>(data));
55+
}
56+
57+
void TGeneralContainer::Initialize() {
4858
std::optional<ui64> recordsCount;
4959
AFL_VERIFY(Schema->num_fields() == (i32)Columns.size())("schema", Schema->num_fields())("columns", Columns.size());
5060
for (i32 i = 0; i < Schema->num_fields(); ++i) {
@@ -58,12 +68,34 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Schema>& schem
5868
}
5969
}
6070
AFL_VERIFY(recordsCount);
71+
AFL_VERIFY(!RecordsCount || *RecordsCount == *recordsCount);
6172
RecordsCount = *recordsCount;
6273
}
6374

75+
TGeneralContainer::TGeneralContainer(const std::vector<std::shared_ptr<arrow::Field>>& fields, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
76+
: Schema(std::make_shared<NModifier::TSchema>(fields))
77+
, Columns(std::move(columns))
78+
{
79+
Initialize();
80+
}
81+
82+
TGeneralContainer::TGeneralContainer(const std::shared_ptr<NModifier::TSchema>& schema, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
83+
: Schema(std::make_shared<NModifier::TSchema>(schema))
84+
, Columns(std::move(columns))
85+
{
86+
Initialize();
87+
}
88+
89+
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Schema>& schema, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
90+
: Schema(std::make_shared<NModifier::TSchema>(schema))
91+
, Columns(std::move(columns))
92+
{
93+
Initialize();
94+
}
95+
6496
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Table>& table) {
6597
AFL_VERIFY(table);
66-
Schema = table->schema();
98+
Schema = std::make_shared<NModifier::TSchema>(table->schema());
6799
RecordsCount = table->num_rows();
68100
for (auto&& i : table->columns()) {
69101
if (i->num_chunks() == 1) {
@@ -72,15 +104,17 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Table>& table)
72104
Columns.emplace_back(std::make_shared<NAccessor::TTrivialChunkedArray>(i));
73105
}
74106
}
107+
Initialize();
75108
}
76109

77110
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::RecordBatch>& table) {
78111
AFL_VERIFY(table);
79-
Schema = table->schema();
112+
Schema = std::make_shared<NModifier::TSchema>(table->schema());
80113
RecordsCount = table->num_rows();
81114
for (auto&& i : table->columns()) {
82115
Columns.emplace_back(std::make_shared<NAccessor::TTrivialArray>(i));
83116
}
117+
Initialize();
84118
}
85119

86120
std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray> TGeneralContainer::GetAccessorByNameVerified(const std::string& fieldId) const {
@@ -110,14 +144,78 @@ std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const std::o
110144
if (fields.empty()) {
111145
return nullptr;
112146
}
113-
return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, RecordsCount);
147+
AFL_VERIFY(RecordsCount);
148+
return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, *RecordsCount);
114149
}
115150

116-
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTable(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
151+
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
117152
auto result = BuildTableOptional(columnNames);
118153
AFL_VERIFY(result);
119154
AFL_VERIFY(!columnNames || result->schema()->num_fields() == (i32)columnNames->size());
120155
return result;
121156
}
122157

158+
std::shared_ptr<NArrow::NAccessor::IChunkedArray> TGeneralContainer::GetAccessorByNameOptional(const std::string& fieldId) const {
159+
int idx = Schema->GetFieldIndex(fieldId);
160+
if (idx == -1) {
161+
return nullptr;
162+
}
163+
AFL_VERIFY((ui32)idx < Columns.size())("idx", idx)("count", Columns.size());
164+
return Columns[idx];
165+
}
166+
167+
TConclusionStatus TGeneralContainer::SyncSchemaTo(const std::shared_ptr<arrow::Schema>& schema, const IFieldsConstructor* defaultFieldsConstructor, const bool forceDefaults) {
168+
std::shared_ptr<NModifier::TSchema> schemaNew = std::make_shared<NModifier::TSchema>();
169+
std::vector<std::shared_ptr<NAccessor::IChunkedArray>> columnsNew;
170+
if (!RecordsCount) {
171+
return TConclusionStatus::Fail("original container has not data");
172+
}
173+
for (auto&& i : schema->fields()) {
174+
const int idx = Schema->GetFieldIndex(i->name());
175+
if (idx == -1) {
176+
if (!defaultFieldsConstructor) {
177+
return TConclusionStatus::Fail("haven't field for sync: '" + i->name() + "'");
178+
} else {
179+
schemaNew->AddField(i).Validate();
180+
auto defConclusion = defaultFieldsConstructor->GetDefaultColumnElementValue(i, forceDefaults);
181+
if (defConclusion.IsFail()) {
182+
return defConclusion;
183+
}
184+
columnsNew.emplace_back(std::make_shared<NAccessor::TTrivialArray>(NArrow::TThreadSimpleArraysCache::Get(i->type(), *defConclusion, *RecordsCount)));
185+
}
186+
} else {
187+
const auto& fOwned = Schema->GetFieldVerified(idx);
188+
if (!fOwned->type()->Equals(i->type())) {
189+
return TConclusionStatus::Fail("different field types for '" + i->name() + "'. Have " + fOwned->type()->ToString() + ", need " + i->type()->ToString());
190+
}
191+
schemaNew->AddField(fOwned).Validate();
192+
columnsNew.emplace_back(Columns[idx]);
193+
}
194+
}
195+
std::swap(Schema, schemaNew);
196+
std::swap(columnsNew, Columns);
197+
return TConclusionStatus::Success();
198+
}
199+
200+
TString TGeneralContainer::DebugString() const {
201+
TStringBuilder result;
202+
if (RecordsCount) {
203+
result << "records_count=" << *RecordsCount << ";";
204+
}
205+
result << "schema=" << Schema->ToString() << ";";
206+
return result;
207+
}
208+
209+
TConclusion<std::shared_ptr<arrow::Scalar>> IFieldsConstructor::GetDefaultColumnElementValue(const std::shared_ptr<arrow::Field>& field, const bool force) const {
210+
AFL_VERIFY(field);
211+
auto result = DoGetDefaultColumnElementValue(field->name());
212+
if (result) {
213+
return result;
214+
}
215+
if (force) {
216+
return NArrow::DefaultScalar(field->type());
217+
}
218+
return TConclusionStatus::Fail("have not default value for column " + field->name());
219+
}
220+
123221
}

0 commit comments

Comments
 (0)