Skip to content

Commit 5e95651

Browse files
Skip default columns on indexation (#6899)
1 parent 1c15550 commit 5e95651

36 files changed

+424
-173
lines changed

ydb/core/formats/arrow/arrow_helpers.cpp

+14-1
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ std::shared_ptr<arrow::Scalar> DefaultScalar(const std::shared_ptr<arrow::DataTy
548548
}
549549
return true;
550550
});
551-
Y_ABORT_UNLESS(out);
551+
AFL_VERIFY(out)("type", type->ToString());
552552
return out;
553553
}
554554

@@ -634,6 +634,19 @@ int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr
634634
return ScalarCompare(*x, *y);
635635
}
636636

637+
int ScalarCompareNullable(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y) {
638+
if (!x && !!y) {
639+
return -1;
640+
}
641+
if (!!x && !y) {
642+
return 1;
643+
}
644+
if (!x && !y) {
645+
return 0;
646+
}
647+
return ScalarCompare(*x, *y);
648+
}
649+
637650
std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
638651
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
639652
auto sortPermutation = MakeSortPermutation(batch, sortingKey, andUnique);

ydb/core/formats/arrow/arrow_helpers.h

+1
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& ar
9898
bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x);
9999
int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y);
100100
int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
101+
int ScalarCompareNullable(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
101102
std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow);
102103
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
103104
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#include "subset.h"
2+
#include <ydb/library/actors/core/log.h>
3+
4+
namespace NKikimr::NArrow {
5+
6+
TSchemaSubset::TSchemaSubset(const std::set<ui32>& fieldsIdx, const ui32 fieldsCount) {
7+
AFL_VERIFY(fieldsIdx.size() <= fieldsCount);
8+
AFL_VERIFY(fieldsIdx.size());
9+
if (fieldsCount == fieldsIdx.size()) {
10+
return;
11+
}
12+
Exclude = (fieldsCount - fieldsIdx.size()) < fieldsIdx.size();
13+
if (!Exclude) {
14+
FieldIdx = std::vector<ui32>(fieldsIdx.begin(), fieldsIdx.end());
15+
} else {
16+
auto it = fieldsIdx.begin();
17+
for (ui32 i = 0; i < fieldsCount; ++i) {
18+
if (it == fieldsIdx.end() || i < *it) {
19+
FieldIdx.emplace_back(i);
20+
} else if (*it == i) {
21+
++it;
22+
} else {
23+
AFL_VERIFY(false);
24+
}
25+
}
26+
}
27+
}
28+
29+
NKikimrArrowSchema::TSchemaSubset TSchemaSubset::SerializeToProto() const {
30+
NKikimrArrowSchema::TSchemaSubset result;
31+
result.MutableList()->SetExclude(Exclude);
32+
for (auto&& i : FieldIdx) {
33+
result.MutableList()->AddFieldsIdx(i);
34+
}
35+
return result;
36+
}
37+
38+
TConclusionStatus TSchemaSubset::DeserializeFromProto(const NKikimrArrowSchema::TSchemaSubset& proto) {
39+
if (!proto.HasList()) {
40+
return TConclusionStatus::Fail("no schema subset data");
41+
}
42+
Exclude = proto.GetList().GetExclude();
43+
std::vector<ui32> fieldIdx;
44+
for (auto&& i : proto.GetList().GetFieldsIdx()) {
45+
fieldIdx.emplace_back(i);
46+
}
47+
std::swap(fieldIdx, FieldIdx);
48+
return TConclusionStatus::Success();
49+
}
50+
51+
}
+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#pragma once
2+
#include <ydb/core/formats/arrow/protos/fields.pb.h>
3+
#include <ydb/library/conclusion/result.h>
4+
#include <ydb/library/actors/core/log.h>
5+
6+
namespace NKikimr::NArrow {
7+
8+
class TSchemaSubset {
9+
private:
10+
std::vector<ui32> FieldIdx;
11+
TString FieldBits;
12+
bool Exclude = false;
13+
public:
14+
TSchemaSubset() = default;
15+
TSchemaSubset(const std::set<ui32>& fieldsIdx, const ui32 fieldsCount);
16+
17+
template <class T>
18+
std::vector<T> Apply(const std::vector<T>& fullSchema) const {
19+
if (FieldIdx.empty()) {
20+
return fullSchema;
21+
}
22+
std::vector<T> fields;
23+
if (!Exclude) {
24+
for (auto&& i : FieldIdx) {
25+
AFL_VERIFY(i < fullSchema.size());
26+
fields.emplace_back(fullSchema[i]);
27+
}
28+
} else {
29+
auto it = FieldIdx.begin();
30+
for (ui32 i = 0; i < fullSchema.size(); ++i) {
31+
if (it == FieldIdx.end() || i < *it) {
32+
AFL_VERIFY(i < fullSchema.size());
33+
fields.emplace_back(fullSchema[i]);
34+
} else if (i == *it) {
35+
++it;
36+
} else {
37+
AFL_VERIFY(false);
38+
}
39+
}
40+
}
41+
return fields;
42+
}
43+
44+
NKikimrArrowSchema::TSchemaSubset SerializeToProto() const;
45+
[[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrArrowSchema::TSchemaSubset& proto);
46+
};
47+
48+
}

ydb/core/formats/arrow/modifier/ya.make

+2
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ PEERDIR(
44
contrib/libs/apache/arrow
55
ydb/library/conclusion
66
ydb/core/formats/arrow/switch
7+
ydb/core/formats/arrow/protos
78
ydb/library/actors/core
89
)
910

1011
SRCS(
1112
schema.cpp
13+
subset.cpp
1214
)
1315

1416
END()

ydb/core/formats/arrow/process_columns.cpp

+20-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "process_columns.h"
22
#include "common/adapter.h"
3+
#include "modifier/subset.h"
34

45
#include <util/string/join.h>
56

@@ -28,16 +29,23 @@ std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(const std::shared_ptr
2829

2930
template <class TDataContainer>
3031
TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(const std::shared_ptr<TDataContainer>& srcBatch,
31-
const std::shared_ptr<arrow::Schema>& dstSchema) {
32+
const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
3233
AFL_VERIFY(srcBatch);
3334
AFL_VERIFY(dstSchema);
3435
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
3536
columns.reserve(dstSchema->num_fields());
36-
37+
std::vector<std::shared_ptr<arrow::Field>> fields;
38+
fields.reserve(dstSchema->num_fields());
39+
std::set<ui32> fieldIdx;
40+
ui32 idx = 0;
3741
for (auto& field : dstSchema->fields()) {
3842
const int index = srcBatch->schema()->GetFieldIndex(field->name());
3943
if (index > -1) {
44+
if (subset) {
45+
fieldIdx.emplace(idx);
46+
}
4047
columns.push_back(srcBatch->column(index));
48+
fields.emplace_back(field);
4149
auto srcField = srcBatch->schema()->field(index);
4250
if (field->Equals(srcField)) {
4351
AFL_VERIFY(columns.back()->type()->Equals(field->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
@@ -47,14 +55,17 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(const std::shared_
4755
("column_type", field->ToString(true))("incoming_type", srcField->ToString(true));
4856
return TConclusionStatus::Fail("incompatible column types");
4957
}
50-
} else {
58+
} else if (!subset) {
5159
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name())
5260
("column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names()));
5361
return TConclusionStatus::Fail("not found column '" + field->name() + "'");
5462
}
63+
++idx;
5564
}
56-
57-
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(dstSchema, std::move(columns), srcBatch->num_rows());
65+
if (subset) {
66+
*subset = TSchemaSubset(fieldIdx, dstSchema->num_fields());
67+
}
68+
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), srcBatch->num_rows());
5869
}
5970

6071
template <class TDataContainer, class TStringType>
@@ -114,12 +125,12 @@ std::shared_ptr<arrow::Table> TColumnOperator::Extract(const std::shared_ptr<arr
114125
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
115126
}
116127

117-
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema) {
118-
return AdaptColumnsImpl(incoming, dstSchema);
128+
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
129+
return AdaptColumnsImpl(incoming, dstSchema, subset);
119130
}
120131

121-
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema) {
122-
return AdaptColumnsImpl(incoming, dstSchema);
132+
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
133+
return AdaptColumnsImpl(incoming, dstSchema, subset);
123134
}
124135

125136
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {

ydb/core/formats/arrow/process_columns.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
namespace NKikimr::NArrow {
77

8+
class TSchemaSubset;
9+
810
class TColumnOperator {
911
public:
1012
enum class EExtractProblemsPolicy {
@@ -36,8 +38,8 @@ class TColumnOperator {
3638
std::shared_ptr<arrow::RecordBatch> Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
3739
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);
3840

39-
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema);
40-
TConclusion<std::shared_ptr<arrow::Table>> Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema);
41+
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
42+
TConclusion<std::shared_ptr<arrow::Table>> Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
4143

4244
TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
4345
TConclusion<std::shared_ptr<arrow::Table>> Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package NKikimrArrowSchema;
2+
3+
message TSchemaSubset {
4+
5+
message TFieldsList {
6+
optional bool Exclude = 1;
7+
repeated uint32 FieldsIdx = 2;
8+
}
9+
10+
oneof Implementation {
11+
TFieldsList List = 1;
12+
string Bits = 2;
13+
14+
}
15+
}

ydb/core/formats/arrow/protos/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ PROTO_LIBRARY()
22

33
SRCS(
44
ssa.proto
5+
fields.proto
56
)
67

78
PEERDIR(

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -3087,6 +3087,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
30873087
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
30883088
}
30893089

3090+
{
3091+
auto it = client.ExecuteQuery(R"(
3092+
UPSERT INTO `/Root/DataShard` (Col3) VALUES ('null');
3093+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3094+
UNIT_ASSERT(!it.IsSuccess());
3095+
}
3096+
30903097
{
30913098
auto it = client.StreamExecuteQuery(R"(
30923099
SELECT * FROM `/Root/DataShard` ORDER BY Col1;

ydb/core/protos/tx_columnshard.proto

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import "ydb/core/protos/long_tx_service.proto";
44
import "ydb/core/protos/statistics.proto";
55
import "ydb/core/protos/subdomains.proto";
66
import "ydb/core/protos/tx.proto";
7+
import "ydb/core/formats/arrow/protos/fields.proto";
78

89
package NKikimrTxColumnShard;
910
option java_package = "ru.yandex.kikimr.proto";
@@ -88,6 +89,7 @@ message TLogicalMetadata {
8889
optional uint64 DirtyWriteTimeSeconds = 5;
8990
optional string SpecialKeysRawData = 6;
9091
optional TEvWrite.EModificationType ModificationType = 7;
92+
optional NKikimrArrowSchema.TSchemaSubset SchemaSubset = 8;
9193
}
9294

9395
message TEvWriteResult {

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
1818

1919
const auto& writeMeta = batch.GetAggregation().GetWriteMeta();
2020
meta.SetModificationType(TEnumOperator<NEvWrite::EModificationType>::SerializeToProto(writeMeta.GetModificationType()));
21+
*meta.MutableSchemaSubset() = batch.GetAggregation().GetSchemaSubset().SerializeToProto();
2122
auto schemeVersion = batch.GetAggregation().GetSchemaVersion();
2223
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);
2324

24-
NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
25+
NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange,
26+
meta, tableSchema->GetVersion(),
27+
batch->GetData());
2528
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
2629
if (ok) {
2730
Self->UpdateInsertTableCounters();

ydb/core/tx/columnshard/engines/changes/general_compaction.cpp

+34-7
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,23 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches(
3030
resultSchema->GetIndexInfo().GetReplaceKey(), resultDataSchema, false, IIndexInfo::GetSnapshotColumnNames());
3131

3232
THashSet<ui64> portionsInUsage;
33+
std::set<ui32> columnIds;
3334
for (auto&& i : portions) {
35+
if (columnIds.size() != resultSchema->GetColumnsCount()) {
36+
for (auto id : i.GetPortionInfo().GetColumnIds()) {
37+
if (resultSchema->GetFieldIndex(id) > 0) {
38+
columnIds.emplace(id);
39+
}
40+
}
41+
}
3442
AFL_VERIFY(portionsInUsage.emplace(i.GetPortionInfo().GetPortionId()).second);
3543
}
44+
AFL_VERIFY(columnIds.size() <= resultSchema->GetColumnsCount());
3645

3746
for (auto&& i : portions) {
3847
auto dataSchema = i.GetPortionInfo().GetSchema(context.SchemaVersions);
3948
auto batch = i.RestoreBatch(dataSchema, *resultSchema);
40-
batch = resultSchema->NormalizeBatch(*dataSchema, batch).DetachResult();
49+
batch = resultSchema->NormalizeBatch(*dataSchema, batch, columnIds).DetachResult();
4150
IIndexInfo::NormalizeDeletionColumn(*batch);
4251
auto filter = BuildPortionFilter(shardingActual, batch, i.GetPortionInfo(), portionsInUsage, resultSchema);
4352
mergeStream.AddSource(batch, filter);
@@ -175,13 +184,32 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
175184
}
176185

177186
std::shared_ptr<TSerializationStats> stats = std::make_shared<TSerializationStats>();
178-
for (auto&& i : SwitchedPortions) {
179-
stats->Merge(i.GetSerializationStat(*resultSchema));
187+
std::set<ui32> columnIds;
188+
{
189+
{
190+
THashMap<ui64, ISnapshotSchema::TPtr> schemas;
191+
for (auto& portion : SwitchedPortions) {
192+
auto dataSchema = portion.GetSchema(context.SchemaVersions);
193+
schemas.emplace(dataSchema->GetVersion(), dataSchema);
194+
}
195+
columnIds = ISnapshotSchema::GetColumnsWithDifferentDefaults(schemas, resultSchema);
196+
}
197+
for (auto&& i : SwitchedPortions) {
198+
stats->Merge(i.GetSerializationStat(*resultSchema));
199+
if (columnIds.size() != resultSchema->GetColumnsCount()) {
200+
for (auto id : i.GetColumnIds()) {
201+
if (resultSchema->HasColumnId(id)) {
202+
columnIds.emplace(id);
203+
}
204+
}
205+
}
206+
}
207+
AFL_VERIFY(columnIds.size() <= resultSchema->GetColumnsCount());
180208
}
181209

182210
std::vector<std::map<ui32, std::vector<TColumnPortionResult>>> chunkGroups;
183211
chunkGroups.resize(batchResults.size());
184-
for (auto&& columnId : resultSchema->GetIndexInfo().GetColumnIds()) {
212+
for (auto&& columnId : columnIds) {
185213
NActors::TLogContextGuard logGuard(
186214
NActors::TLogContextBuilder::Build()("field_name", resultSchema->GetIndexInfo().GetColumnName(columnId)));
187215
auto columnInfo = stats->GetColumnInfo(columnId);
@@ -196,11 +224,10 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
196224
if (!p.ExtractColumnChunks(columnId, records, chunks)) {
197225
if (!loader) {
198226
loader = resultSchema->GetColumnLoaderVerified(columnId);
199-
} else {
200-
AFL_VERIFY(dataSchema->IsSpecialColumnId(columnId));
201227
}
228+
auto f = resultSchema->GetFieldByColumnIdVerified(columnId);
202229
chunks.emplace_back(std::make_shared<NChunks::TDefaultChunkPreparation>(columnId, p.GetPortionInfo().GetRecordsCount(),
203-
p.GetPortionInfo().GetColumnRawBytes({ columnId }), resultField, resultSchema->GetDefaultValueVerified(columnId),
230+
resultField, resultSchema->GetExternalDefaultValueVerified(columnId),
204231
resultSchema->GetColumnSaver(columnId)));
205232
records = { nullptr };
206233
}

0 commit comments

Comments
 (0)