1
1
#include " container.h"
2
2
#include < ydb/library/actors/core/log.h>
3
+ #include < ydb/core/formats/arrow/arrow_helpers.h>
3
4
#include < ydb/core/formats/arrow/simple_arrays_cache.h>
4
5
5
6
namespace NKikimr ::NArrow {
6
7
7
- NKikimr::TConclusionStatus TGeneralContainer::MergeColumnsStrictly (const TGeneralContainer& container) {
8
- if (RecordsCount != container.RecordsCount ) {
8
+ TConclusionStatus TGeneralContainer::MergeColumnsStrictly (const TGeneralContainer& container) {
9
+ if (!container.RecordsCount ) {
10
+ return TConclusionStatus::Success ();
11
+ }
12
+ if (!RecordsCount) {
13
+ RecordsCount = container.RecordsCount ;
14
+ }
15
+ if (*RecordsCount != *container.RecordsCount ) {
9
16
return TConclusionStatus::Fail (TStringBuilder () << " inconsistency records count in additional container: " <<
10
17
container.GetSchema ()->ToString () << " . expected: " << RecordsCount << " , reality: " << container.GetRecordsCount ());
11
18
}
12
19
for (i32 i = 0 ; i < container.Schema ->num_fields (); ++i) {
13
20
auto addFieldResult = AddField (container.Schema ->field (i), container.Columns [i]);
14
- if (! addFieldResult) {
21
+ if (addFieldResult. IsFail () ) {
15
22
return addFieldResult;
16
23
}
17
24
}
18
25
return TConclusionStatus::Success ();
19
26
}
20
27
21
- NKikimr:: TConclusionStatus TGeneralContainer::AddField (const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<NAccessor::IChunkedArray>& data) {
28
+ TConclusionStatus TGeneralContainer::AddField (const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<NAccessor::IChunkedArray>& data) {
22
29
AFL_VERIFY (f);
23
30
AFL_VERIFY (data);
24
- if (data->GetRecordsCount () != RecordsCount) {
31
+ if (RecordsCount && data->GetRecordsCount () != * RecordsCount) {
25
32
return TConclusionStatus::Fail (TStringBuilder () << " inconsistency records count in new column: " <<
26
33
f->name () << " . expected: " << RecordsCount << " , reality: " << data->GetRecordsCount ());
27
34
}
28
35
if (!data->GetDataType ()->Equals (f->type ())) {
29
36
return TConclusionStatus::Fail (" schema and data type are not equals: " + data->GetDataType ()->ToString () + " vs " + f->type ()->ToString ());
30
37
}
31
- if (Schema->GetFieldByName (f->name ())) {
32
- return TConclusionStatus::Fail (" field name duplication: " + f->name ());
33
- }
34
- auto resultAdd = Schema->AddField (Schema->num_fields (), f);
35
- if (!resultAdd.ok ()) {
36
- return TConclusionStatus::Fail (" internal schema error on add field: " + resultAdd.status ().ToString ());
38
+ {
39
+ auto conclusion = Schema->AddField (f);
40
+ if (conclusion.IsFail ()) {
41
+ return conclusion;
42
+ }
37
43
}
38
- Schema = *resultAdd ;
44
+ RecordsCount = data-> GetRecordsCount () ;
39
45
Columns.emplace_back (data);
40
46
return TConclusionStatus::Success ();
41
47
}
42
48
43
- TGeneralContainer::TGeneralContainer (const std::shared_ptr<arrow::Schema>& schema, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
44
- : Schema(schema)
45
- , Columns(std::move(columns))
46
- {
47
- AFL_VERIFY (schema);
49
+ TConclusionStatus TGeneralContainer::AddField (const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<arrow::ChunkedArray>& data) {
50
+ return AddField (f, std::make_shared<NAccessor::TTrivialChunkedArray>(data));
51
+ }
52
+
53
+ TConclusionStatus TGeneralContainer::AddField (const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<arrow::Array>& data) {
54
+ return AddField (f, std::make_shared<NAccessor::TTrivialArray>(data));
55
+ }
56
+
57
+ void TGeneralContainer::Initialize () {
48
58
std::optional<ui64> recordsCount;
49
59
AFL_VERIFY (Schema->num_fields () == (i32)Columns.size ())(" schema" , Schema->num_fields ())(" columns" , Columns.size ());
50
60
for (i32 i = 0 ; i < Schema->num_fields (); ++i) {
@@ -58,12 +68,34 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Schema>& schem
58
68
}
59
69
}
60
70
AFL_VERIFY (recordsCount);
71
+ AFL_VERIFY (!RecordsCount || *RecordsCount == *recordsCount);
61
72
RecordsCount = *recordsCount;
62
73
}
63
74
75
+ TGeneralContainer::TGeneralContainer (const std::vector<std::shared_ptr<arrow::Field>>& fields, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
76
+ : Schema(std::make_shared<NModifier::TSchema>(fields))
77
+ , Columns(std::move(columns))
78
+ {
79
+ Initialize ();
80
+ }
81
+
82
+ TGeneralContainer::TGeneralContainer (const std::shared_ptr<NModifier::TSchema>& schema, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
83
+ : Schema(std::make_shared<NModifier::TSchema>(schema))
84
+ , Columns(std::move(columns))
85
+ {
86
+ Initialize ();
87
+ }
88
+
89
+ TGeneralContainer::TGeneralContainer (const std::shared_ptr<arrow::Schema>& schema, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
90
+ : Schema(std::make_shared<NModifier::TSchema>(schema))
91
+ , Columns(std::move(columns))
92
+ {
93
+ Initialize ();
94
+ }
95
+
64
96
TGeneralContainer::TGeneralContainer (const std::shared_ptr<arrow::Table>& table) {
65
97
AFL_VERIFY (table);
66
- Schema = table->schema ();
98
+ Schema = std::make_shared<NModifier::TSchema>( table->schema () );
67
99
RecordsCount = table->num_rows ();
68
100
for (auto && i : table->columns ()) {
69
101
if (i->num_chunks () == 1 ) {
@@ -72,15 +104,17 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Table>& table)
72
104
Columns.emplace_back (std::make_shared<NAccessor::TTrivialChunkedArray>(i));
73
105
}
74
106
}
107
+ Initialize ();
75
108
}
76
109
77
110
TGeneralContainer::TGeneralContainer (const std::shared_ptr<arrow::RecordBatch>& table) {
78
111
AFL_VERIFY (table);
79
- Schema = table->schema ();
112
+ Schema = std::make_shared<NModifier::TSchema>( table->schema () );
80
113
RecordsCount = table->num_rows ();
81
114
for (auto && i : table->columns ()) {
82
115
Columns.emplace_back (std::make_shared<NAccessor::TTrivialArray>(i));
83
116
}
117
+ Initialize ();
84
118
}
85
119
86
120
std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray> TGeneralContainer::GetAccessorByNameVerified (const std::string& fieldId) const {
@@ -110,14 +144,78 @@ std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const std::o
110
144
if (fields.empty ()) {
111
145
return nullptr ;
112
146
}
113
- return arrow::Table::Make (std::make_shared<arrow::Schema>(fields), columns, RecordsCount);
147
+ AFL_VERIFY (RecordsCount);
148
+ return arrow::Table::Make (std::make_shared<arrow::Schema>(fields), columns, *RecordsCount);
114
149
}
115
150
116
- std::shared_ptr<arrow::Table> TGeneralContainer::BuildTable (const std::optional<std::set<std::string>>& columnNames /* = {}*/ ) const {
151
+ std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified (const std::optional<std::set<std::string>>& columnNames /* = {}*/ ) const {
117
152
auto result = BuildTableOptional (columnNames);
118
153
AFL_VERIFY (result);
119
154
AFL_VERIFY (!columnNames || result->schema ()->num_fields () == (i32)columnNames->size ());
120
155
return result;
121
156
}
122
157
158
+ std::shared_ptr<NArrow::NAccessor::IChunkedArray> TGeneralContainer::GetAccessorByNameOptional (const std::string& fieldId) const {
159
+ int idx = Schema->GetFieldIndex (fieldId);
160
+ if (idx == -1 ) {
161
+ return nullptr ;
162
+ }
163
+ AFL_VERIFY ((ui32)idx < Columns.size ())(" idx" , idx)(" count" , Columns.size ());
164
+ return Columns[idx];
165
+ }
166
+
167
+ TConclusionStatus TGeneralContainer::SyncSchemaTo (const std::shared_ptr<arrow::Schema>& schema, const IFieldsConstructor* defaultFieldsConstructor, const bool forceDefaults) {
168
+ std::shared_ptr<NModifier::TSchema> schemaNew = std::make_shared<NModifier::TSchema>();
169
+ std::vector<std::shared_ptr<NAccessor::IChunkedArray>> columnsNew;
170
+ if (!RecordsCount) {
171
+ return TConclusionStatus::Fail (" original container has not data" );
172
+ }
173
+ for (auto && i : schema->fields ()) {
174
+ const int idx = Schema->GetFieldIndex (i->name ());
175
+ if (idx == -1 ) {
176
+ if (!defaultFieldsConstructor) {
177
+ return TConclusionStatus::Fail (" haven't field for sync: '" + i->name () + " '" );
178
+ } else {
179
+ schemaNew->AddField (i).Validate ();
180
+ auto defConclusion = defaultFieldsConstructor->GetDefaultColumnElementValue (i, forceDefaults);
181
+ if (defConclusion.IsFail ()) {
182
+ return defConclusion;
183
+ }
184
+ columnsNew.emplace_back (std::make_shared<NAccessor::TTrivialArray>(NArrow::TThreadSimpleArraysCache::Get (i->type (), *defConclusion, *RecordsCount)));
185
+ }
186
+ } else {
187
+ const auto & fOwned = Schema->GetFieldVerified (idx);
188
+ if (!fOwned ->type ()->Equals (i->type ())) {
189
+ return TConclusionStatus::Fail (" different field types for '" + i->name () + " '. Have " + fOwned ->type ()->ToString () + " , need " + i->type ()->ToString ());
190
+ }
191
+ schemaNew->AddField (fOwned ).Validate ();
192
+ columnsNew.emplace_back (Columns[idx]);
193
+ }
194
+ }
195
+ std::swap (Schema, schemaNew);
196
+ std::swap (columnsNew, Columns);
197
+ return TConclusionStatus::Success ();
198
+ }
199
+
200
+ TString TGeneralContainer::DebugString () const {
201
+ TStringBuilder result;
202
+ if (RecordsCount) {
203
+ result << " records_count=" << *RecordsCount << " ;" ;
204
+ }
205
+ result << " schema=" << Schema->ToString () << " ;" ;
206
+ return result;
207
+ }
208
+
209
+ TConclusion<std::shared_ptr<arrow::Scalar>> IFieldsConstructor::GetDefaultColumnElementValue (const std::shared_ptr<arrow::Field>& field, const bool force) const {
210
+ AFL_VERIFY (field);
211
+ auto result = DoGetDefaultColumnElementValue (field->name ());
212
+ if (result) {
213
+ return result;
214
+ }
215
+ if (force) {
216
+ return NArrow::DefaultScalar (field->type ());
217
+ }
218
+ return TConclusionStatus::Fail (" have not default value for column " + field->name ());
219
+ }
220
+
123
221
}
0 commit comments