Skip to content

Commit fd849bf

Browse files
authored
Merge 3557ee2 into aa5a493
2 parents aa5a493 + 3557ee2 commit fd849bf

File tree

13 files changed

+104
-35
lines changed

13 files changed

+104
-35
lines changed

ydb/core/formats/arrow/common/container.cpp

+7-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <ydb/library/actors/core/log.h>
33
#include <ydb/core/formats/arrow/arrow_helpers.h>
44
#include <ydb/core/formats/arrow/simple_arrays_cache.h>
5+
#include <ydb/library/accessor/validator.h>
56

67
namespace NKikimr::NArrow {
78

@@ -93,10 +94,10 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Schema>& schem
9394
Initialize();
9495
}
9596

96-
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Table>& table) {
97+
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Table>& table)
98+
: RecordsCount(TValidator::CheckNotNull(table)->num_rows())
99+
, Schema(std::make_shared<NModifier::TSchema>(TValidator::CheckNotNull(table)->schema())) {
97100
AFL_VERIFY(table);
98-
Schema = std::make_shared<NModifier::TSchema>(table->schema());
99-
RecordsCount = table->num_rows();
100101
for (auto&& i : table->columns()) {
101102
if (i->num_chunks() == 1) {
102103
Columns.emplace_back(std::make_shared<NAccessor::TTrivialArray>(i->chunk(0)));
@@ -107,10 +108,10 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Table>& table)
107108
Initialize();
108109
}
109110

110-
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::RecordBatch>& table) {
111+
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::RecordBatch>& table)
112+
: RecordsCount(TValidator::CheckNotNull(table)->num_rows())
113+
, Schema(std::make_shared<NModifier::TSchema>(TValidator::CheckNotNull(table)->schema())) {
111114
AFL_VERIFY(table);
112-
Schema = std::make_shared<NModifier::TSchema>(table->schema());
113-
RecordsCount = table->num_rows();
114115
for (auto&& i : table->columns()) {
115116
Columns.emplace_back(std::make_shared<NAccessor::TTrivialArray>(i));
116117
}

ydb/core/formats/arrow/common/container.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ class IFieldsConstructor {
2424

2525
class TGeneralContainer {
2626
private:
27-
YDB_READONLY_DEF(std::optional<ui64>, RecordsCount);
27+
std::optional<ui64> RecordsCount;
2828
YDB_READONLY_DEF(std::shared_ptr<NModifier::TSchema>, Schema);
2929
std::vector<std::shared_ptr<NAccessor::IChunkedArray>> Columns;
3030
void Initialize();
3131
public:
3232
TGeneralContainer(const ui32 recordsCount);
3333

34-
ui32 GetRecordsCountVerified() const {
34+
ui32 GetRecordsCount() const {
3535
AFL_VERIFY(RecordsCount);
3636
return *RecordsCount;
3737
}

ydb/core/formats/arrow/modifier/subset.h

+6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ class TSchemaSubset {
1414
TSchemaSubset() = default;
1515
TSchemaSubset(const std::set<ui32>& fieldsIdx, const ui32 fieldsCount);
1616

17+
static TSchemaSubset AllFieldsAccepted() {
18+
TSchemaSubset result;
19+
result.Exclude = true;
20+
return result;
21+
}
22+
1723
template <class T>
1824
std::vector<T> Apply(const std::vector<T>& fullSchema) const {
1925
if (FieldIdx.empty()) {

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

+37
Original file line numberDiff line numberDiff line change
@@ -2624,6 +2624,43 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
26242624

26252625
}
26262626

2627+
Y_UNIT_TEST(NormalizeAbsentColumn) {
2628+
auto settings = TKikimrSettings().SetWithSampleTables(false);
2629+
TKikimrRunner kikimr(settings);
2630+
TLocalHelper testHelper(kikimr);
2631+
2632+
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
2633+
csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
2634+
csController->SetLagForCompactionBeforeTierings(TDuration::Seconds(1));
2635+
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
2636+
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
2637+
2638+
testHelper.CreateTestOlapTable();
2639+
auto tableClient = kikimr.GetTableClient();
2640+
2641+
Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD, NKikimrServices::TX_COLUMNSHARD_SCAN }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();
2642+
2643+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
2644+
2645+
{
2646+
auto alterQuery = TStringBuilder() << "ALTER TABLESTORE `/Root/olapStore` ADD COLUMN new_column1 Uint64;";
2647+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
2648+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
2649+
}
2650+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 1000);
2651+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 1000);
2652+
2653+
{
2654+
auto alterQuery = TStringBuilder() << "ALTER TABLESTORE `/Root/olapStore` ADD COLUMN new_column2 Uint64;";
2655+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
2656+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
2657+
}
2658+
2659+
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
2660+
csController->WaitIndexation(TDuration::Seconds(5));
2661+
2662+
}
2663+
26272664
Y_UNIT_TEST(MultiInsertWithSinks) {
26282665
NKikimrConfig::TAppConfig appConfig;
26292666
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
33
#include <ydb/core/kqp/ut/common/columnshard.h>
44
#include <ydb/core/testlib/common_helper.h>
5+
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
6+
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
57
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
68
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
79
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
@@ -3039,6 +3041,11 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
30393041

30403042
auto session = Kikimr->GetTableClient().CreateSession().GetValueSync().GetSession();
30413043

3044+
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
3045+
csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
3046+
csController->SetLagForCompactionBeforeTierings(TDuration::Seconds(1));
3047+
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
3048+
30423049
const TString query = Sprintf(R"(
30433050
CREATE TABLE `/Root/DataShard` (
30443051
Col1 Uint64 NOT NULL,
@@ -3054,6 +3061,8 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
30543061
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
30553062
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
30563063
DoExecute();
3064+
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
3065+
csController->WaitIndexation(TDuration::Seconds(5));
30573066
}
30583067

30593068
};

ydb/core/kqp/ut/service/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ PEERDIR(
2424
library/cpp/threading/local_executor
2525
ydb/core/kqp
2626
ydb/core/kqp/ut/common
27+
ydb/core/tx/columnshard/hooks/testing
2728
ydb/library/yql/sql/pg
2829
ydb/library/yql/parser/pg_wrapper
2930
ydb/public/lib/ut_helpers

ydb/core/protos/feature_flags.proto

+2
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,6 @@ message TFeatureFlags {
146146
optional bool EnableSingleCompositeActionGroup = 131 [default = false];
147147
optional bool EnableResourcePoolsOnServerless = 132 [default = false];
148148
optional bool EnableChangefeedsOnIndexTables = 134 [default = false];
149+
optional bool EnableResourcePoolsCounters = 135 [default = false];
150+
optional bool EnableOptionalColumnsInColumnShard = 136 [default = false];
149151
}

ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,14 @@
22

33
namespace NKikimr::NOlap::NCompaction {
44

5+
void IColumnMerger::Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) {
6+
AFL_VERIFY(!Started);
7+
Started = true;
8+
// for (auto&& i : input) {
9+
// AFL_VERIFY(i->GetDataType()->id() == Context.GetResultField()->type()->id())("input", i->GetDataType()->ToString())(
10+
// "result", Context.GetResultField()->ToString());
11+
// }
12+
return DoStart(input);
13+
}
14+
515
}

ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h

+1-5
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,7 @@ class IColumnMerger {
2121

2222
virtual ~IColumnMerger() = default;
2323

24-
void Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) {
25-
AFL_VERIFY(!Started);
26-
Started = true;
27-
return DoStart(input);
28-
}
24+
void Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input);
2925

3026
std::vector<TColumnPortionResult> Execute(
3127
const NCompaction::TColumnMergeContext& context, const std::shared_ptr<arrow::RecordBatch>& remap) {

ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
3030

3131
ui32 idx = 0;
3232
for (auto&& batch : Batches) {
33+
AFL_VERIFY(batch->GetColumnsCount() == resultFiltered->GetColumnsCount())("data", batch->GetColumnsCount())(
34+
"schema", resultFiltered->GetColumnsCount());
3335
{
3436
NArrow::NConstruction::IArrayBuilder::TPtr column =
3537
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(
@@ -54,7 +56,6 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
5456
NActors::TLogContextGuard logGuard(
5557
NActors::TLogContextBuilder::Build()("field_name", resultFiltered->GetIndexInfo().GetColumnName(columnId)));
5658
auto columnInfo = stats->GetColumnInfo(columnId);
57-
auto resultField = resultFiltered->GetIndexInfo().GetColumnFieldVerified(columnId);
5859
std::shared_ptr<IColumnMerger> merger = std::make_shared<TPlainMerger>();
5960
// resultFiltered->BuildColumnMergerVerified(columnId);
6061

ydb/core/tx/columnshard/engines/changes/indexation.cpp

+1-11
Original file line numberDiff line numberDiff line change
@@ -130,17 +130,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
130130
Y_ABORT_UNLESS(!DataToIndex.empty());
131131
Y_ABORT_UNLESS(AppendedPortions.empty());
132132

133-
auto maxSnapshot = TSnapshot::Zero();
134-
for (auto& inserted : DataToIndex) {
135-
TSnapshot insertSnap = inserted.GetSnapshot();
136-
Y_ABORT_UNLESS(insertSnap.Valid());
137-
if (insertSnap > maxSnapshot) {
138-
maxSnapshot = insertSnap;
139-
}
140-
}
141-
Y_ABORT_UNLESS(maxSnapshot.Valid());
142-
143-
auto resultSchema = context.SchemaVersions.GetSchema(maxSnapshot);
133+
auto resultSchema = context.SchemaVersions.GetLastSchema();
144134
Y_ABORT_UNLESS(resultSchema->GetIndexInfo().IsSorted());
145135

146136
TPathesData pathBatches;

ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp

+18-9
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,32 @@ std::set<ui32> ISnapshotSchema::GetPkColumnsIds() const {
2929

3030
TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> ISnapshotSchema::NormalizeBatch(
3131
const ISnapshotSchema& dataSchema, const std::shared_ptr<NArrow::TGeneralContainer>& batch, const std::set<ui32>& restoreColumnIds) const {
32+
AFL_VERIFY(dataSchema.GetSnapshot() <= GetSnapshot());
3233
if (dataSchema.GetSnapshot() == GetSnapshot()) {
33-
return batch;
34+
if (batch->GetColumnsCount() == GetColumnsCount()) {
35+
return batch;
36+
}
3437
}
35-
AFL_VERIFY(dataSchema.GetSnapshot() < GetSnapshot());
3638
const std::shared_ptr<arrow::Schema>& resultArrowSchema = GetSchema();
3739

38-
std::shared_ptr<NArrow::TGeneralContainer> result = std::make_shared<NArrow::TGeneralContainer>(batch->GetRecordsCountVerified());
40+
std::shared_ptr<NArrow::TGeneralContainer> result = std::make_shared<NArrow::TGeneralContainer>(batch->GetRecordsCount());
3941
for (size_t i = 0; i < resultArrowSchema->fields().size(); ++i) {
4042
auto& resultField = resultArrowSchema->fields()[i];
4143
auto columnId = GetIndexInfo().GetColumnId(resultField->name());
4244
auto oldField = dataSchema.GetFieldByColumnIdOptional(columnId);
4345
if (oldField) {
44-
auto conclusion = result->AddField(resultField, batch->GetAccessorByNameVerified(oldField->name()));
45-
if (conclusion.IsFail()) {
46-
return conclusion;
46+
auto fAccessor = batch->GetAccessorByNameOptional(oldField->name());
47+
if (fAccessor) {
48+
auto conclusion = result->AddField(resultField, fAccessor);
49+
if (conclusion.IsFail()) {
50+
return conclusion;
51+
}
52+
continue;
4753
}
48-
} else if (restoreColumnIds.contains(columnId)) {
54+
}
55+
if (restoreColumnIds.contains(columnId)) {
56+
AFL_VERIFY(!!GetExternalDefaultValueVerified(columnId) || GetIndexInfo().IsNullableVerified(columnId))("column_name",
57+
GetIndexInfo().GetColumnName(columnId, false))("id", columnId);
4958
result->AddField(resultField,
5059
NArrow::TThreadSimpleArraysCache::Get(resultField->type(), GetExternalDefaultValueVerified(columnId), batch->num_rows()))
5160
.Validate();
@@ -107,8 +116,9 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::PrepareForModi
107116
Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(batch, GetIndexInfo().GetPrimaryKey()));
108117

109118
switch (mType) {
119+
case NEvWrite::EModificationType::Replace:
110120
case NEvWrite::EModificationType::Upsert: {
111-
AFL_VERIFY(batch->num_columns() <= dstSchema->num_fields());
121+
AFL_VERIFY(batch->num_columns() <= dstSchema->num_fields());
112122
if (batch->num_columns() < dstSchema->num_fields()) {
113123
for (auto&& f : dstSchema->fields()) {
114124
if (GetIndexInfo().IsNullableVerified(f->name())) {
@@ -125,7 +135,6 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::PrepareForModi
125135
return batch;
126136
}
127137
case NEvWrite::EModificationType::Delete:
128-
case NEvWrite::EModificationType::Replace:
129138
case NEvWrite::EModificationType::Insert:
130139
case NEvWrite::EModificationType::Update:
131140
return batch;

ydb/core/tx/columnshard/operations/slice_builder/builder.cpp

+8-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,14 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta
5555
if (OriginalBatch->num_columns() != indexSchema->num_fields()) {
5656
AFL_VERIFY(OriginalBatch->num_columns() < indexSchema->num_fields())("original", OriginalBatch->num_columns())(
5757
"index", indexSchema->num_fields());
58-
58+
if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableOptionalColumnsInColumnShard()) {
59+
subset = NArrow::TSchemaSubset::AllFieldsAccepted();
60+
const std::vector<ui32>& columnIdsVector = ActualSchema->GetIndexInfo().GetColumnIds(false);
61+
const std::set<ui32> columnIdsSet(columnIdsVector.begin(), columnIdsVector.end());
62+
auto normalized =
63+
ActualSchema->NormalizeBatch(*ActualSchema, std::make_shared<NArrow::TGeneralContainer>(OriginalBatch), columnIdsSet).DetachResult();
64+
OriginalBatch = NArrow::ToBatch(normalized->BuildTableVerified(), true);
65+
}
5966
}
6067
WriteData.MutableWriteMeta().SetWriteMiddle2StartInstant(TMonotonic::Now());
6168
auto batches = BuildSlices();

0 commit comments

Comments
 (0)