|
1 | 1 | #include "abstract_scheme.h"
|
2 |
| - |
3 |
| -#include <ydb/core/tx/columnshard/engines/index_info.h> |
4 |
| -#include <ydb/core/formats/arrow/arrow_helpers.h> |
5 |
| -#include <util/string/join.h> |
6 |
| - |
7 |
| -namespace NKikimr::NOlap { |
8 |
| - |
9 |
| -std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByIndex(const int index) const { |
10 |
| - auto schema = GetSchema(); |
11 |
| - if (!schema || index < 0 || index >= schema->num_fields()) { |
12 |
| - return nullptr; |
13 |
| - } |
14 |
| - return schema->field(index); |
15 |
| -} |
16 |
| -std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByColumnIdOptional(const ui32 columnId) const { |
17 |
| - return GetFieldByIndex(GetFieldIndex(columnId)); |
18 |
| -} |
19 |
| - |
20 |
| -std::set<ui32> ISnapshotSchema::GetPkColumnsIds() const { |
21 |
| - std::set<ui32> result; |
22 |
| - for (auto&& field : GetIndexInfo().GetReplaceKey()->fields()) { |
23 |
| - result.emplace(GetColumnId(field->name())); |
24 |
| - } |
25 |
| - return result; |
26 |
| - |
27 |
| -} |
28 |
| - |
29 |
| -std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const { |
30 |
| - if (dataSchema.GetSnapshot() == GetSnapshot()) { |
31 |
| - return batch; |
32 |
| - } |
33 |
| - Y_ABORT_UNLESS(dataSchema.GetSnapshot() < GetSnapshot()); |
34 |
| - const std::shared_ptr<arrow::Schema>& resultArrowSchema = GetSchema(); |
35 |
| - std::vector<std::shared_ptr<arrow::Array>> newColumns; |
36 |
| - newColumns.reserve(resultArrowSchema->num_fields()); |
37 |
| - |
38 |
| - for (size_t i = 0; i < resultArrowSchema->fields().size(); ++i) { |
39 |
| - auto& resultField = resultArrowSchema->fields()[i]; |
40 |
| - auto columnId = GetIndexInfo().GetColumnId(resultField->name()); |
41 |
| - auto oldColumnIndex = dataSchema.GetFieldIndex(columnId); |
42 |
| - if (oldColumnIndex >= 0) { // ColumnExists |
43 |
| - auto oldColumnInfo = dataSchema.GetFieldByIndex(oldColumnIndex); |
44 |
| - Y_ABORT_UNLESS(oldColumnInfo); |
45 |
| - auto columnData = batch->GetColumnByName(oldColumnInfo->name()); |
46 |
| - Y_ABORT_UNLESS(columnData); |
47 |
| - newColumns.push_back(columnData); |
48 |
| - } else { // AddNullColumn |
49 |
| - auto nullColumn = NArrow::MakeEmptyBatch(arrow::schema({resultField}), batch->num_rows()); |
50 |
| - newColumns.push_back(nullColumn->column(0)); |
51 |
| - } |
52 |
| - } |
53 |
| - return arrow::RecordBatch::Make(resultArrowSchema, batch->num_rows(), newColumns); |
54 |
| -} |
55 |
| - |
56 |
| -std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema) const { |
57 |
| - std::shared_ptr<arrow::Schema> dstSchema = GetIndexInfo().ArrowSchema(); |
58 |
| - auto batch = NArrow::DeserializeBatch(data, (dataSchema ? dataSchema : dstSchema)); |
59 |
| - if (!batch) { |
60 |
| - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "DeserializeBatch() failed"); |
61 |
| - return nullptr; |
62 |
| - } |
63 |
| - if (batch->num_rows() == 0) { |
64 |
| - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "empty batch"); |
65 |
| - return nullptr; |
66 |
| - } |
67 |
| - |
68 |
| - // Correct schema |
69 |
| - if (dataSchema) { |
70 |
| - batch = NArrow::ExtractColumns(batch, dstSchema, true); |
71 |
| - if (!batch) { |
72 |
| - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot correct schema"); |
73 |
| - return nullptr; |
74 |
| - } |
75 |
| - } |
76 |
| - |
77 |
| - if (!batch->schema()->Equals(dstSchema)) { |
78 |
| - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "unexpected schema for insert batch: '" << batch->schema()->ToString() << "'"); |
79 |
| - return nullptr; |
80 |
| - } |
81 |
| - |
82 |
| - const auto& sortingKey = GetIndexInfo().GetPrimaryKey(); |
83 |
| - Y_ABORT_UNLESS(sortingKey); |
84 |
| - |
85 |
| - // Check PK is NOT NULL |
86 |
| - for (auto& field : sortingKey->fields()) { |
87 |
| - auto column = batch->GetColumnByName(field->name()); |
88 |
| - if (!column) { |
89 |
| - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "missing PK column '" << field->name() << "'"); |
90 |
| - return nullptr; |
91 |
| - } |
92 |
| - if (NArrow::HasNulls(column)) { |
93 |
| - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "PK column '" << field->name() << "' contains NULLs"); |
94 |
| - return nullptr; |
95 |
| - } |
96 |
| - } |
97 |
| - |
98 |
| - auto status = batch->ValidateFull(); |
99 |
| - if (!status.ok()) { |
100 |
| - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", status.ToString()); |
101 |
| - return nullptr; |
102 |
| - } |
103 |
| - batch = NArrow::SortBatch(batch, sortingKey, true); |
104 |
| - Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(batch, sortingKey)); |
105 |
| - return batch; |
106 |
| -} |
107 |
| - |
108 |
| -ui32 ISnapshotSchema::GetColumnId(const std::string& columnName) const { |
109 |
| - auto id = GetColumnIdOptional(columnName); |
110 |
| - AFL_VERIFY(id)("column_name", columnName)("schema", JoinSeq(",", GetSchema()->field_names())); |
111 |
| - return *id; |
112 |
| -} |
113 |
| - |
114 |
| -std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByColumnIdVerified(const ui32 columnId) const { |
115 |
| - auto result = GetFieldByColumnIdOptional(columnId); |
116 |
| - AFL_VERIFY(result)("event", "unknown_column")("column_id", columnId)("schema", DebugString()); |
117 |
| - return result; |
118 |
| -} |
119 |
| - |
120 |
| -std::shared_ptr<NKikimr::NOlap::TColumnLoader> ISnapshotSchema::GetColumnLoaderVerified(const ui32 columnId) const { |
121 |
| - auto result = GetColumnLoaderOptional(columnId); |
122 |
| - AFL_VERIFY(result); |
123 |
| - return result; |
124 |
| -} |
125 |
| - |
126 |
| -std::shared_ptr<NKikimr::NOlap::TColumnLoader> ISnapshotSchema::GetColumnLoaderVerified(const std::string& columnName) const { |
127 |
| - auto result = GetColumnLoaderOptional(columnName); |
128 |
| - AFL_VERIFY(result); |
129 |
| - return result; |
130 |
| -} |
131 |
| - |
132 |
| -std::shared_ptr<NKikimr::NOlap::TColumnLoader> ISnapshotSchema::GetColumnLoaderOptional(const std::string& columnName) const { |
133 |
| - const std::optional<ui32> id = GetColumnIdOptional(columnName); |
134 |
| - if (id) { |
135 |
| - return GetColumnLoaderOptional(*id); |
136 |
| - } else { |
137 |
| - return nullptr; |
138 |
| - } |
139 |
| -} |
140 |
| - |
141 |
| -} |
0 commit comments