Skip to content

Commit bfbba84

Browse files
authored
Add index state to DataShard (#6210)
1 parent e1feb74 commit bfbba84

10 files changed

+77
-16
lines changed

ydb/core/protos/flat_scheme_op.proto

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1041,9 +1041,16 @@ message TBuildIndexOutcome {
10411041
optional NKikimrProto.TPathID IndexPathId = 1;
10421042
}
10431043

1044+
message TApply {
1045+
// Path id of the index whose creation was done
1046+
optional NKikimrProto.TPathID IndexPathId = 1;
1047+
}
1048+
1049+
reserved 1; // Apply type was changed from Empty to TApply
1050+
10441051
oneof Result {
1045-
google.protobuf.Empty Apply = 1;
10461052
TCancel Cancel = 2;
1053+
TApply Apply = 3;
10471054
}
10481055
}
10491056

ydb/core/tx/datashard/change_collector_async_index.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
106106
const auto updatedTagToPos = MakeTagToPos(updates, [](const TUpdateOp& op) { return op.Tag; });
107107

108108
for (const auto& [pathId, index] : userTable->Indexes) {
109-
if (index.Type != TUserTable::TTableIndex::EIndexType::EIndexTypeGlobalAsync) {
109+
if (index.Type != TUserTable::TTableIndex::EType::EIndexTypeGlobalAsync) {
110110
continue;
111111
}
112112

@@ -203,7 +203,7 @@ auto TAsyncIndexChangeCollector::CacheTags(const TTableId& tableId) const {
203203
TCachedTagsBuilder builder;
204204

205205
for (const auto& [_, index] : userTable->Indexes) {
206-
if (index.Type != TUserTable::TTableIndex::EIndexType::EIndexTypeGlobalAsync) {
206+
if (index.Type != TUserTable::TTableIndex::EType::EIndexTypeGlobalAsync) {
207207
continue;
208208
}
209209

ydb/core/tx/datashard/change_sender.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ class TChangeSender: public TActor<TChangeSender> {
276276
const auto fullTableId = TTableId(self->GetPathOwnerId(), tableId);
277277

278278
for (const auto& [indexPathId, indexInfo] : tableInfo->Indexes) {
279-
if (indexInfo.Type != TUserTable::TTableIndex::EIndexType::EIndexTypeGlobalAsync) {
279+
if (indexInfo.Type != TUserTable::TTableIndex::EType::EIndexTypeGlobalAsync) {
280280
continue;
281281
}
282282

ydb/core/tx/datashard/datashard.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1564,6 +1564,21 @@ TUserTable::TPtr TDataShard::AlterTableAddIndex(
15641564
return tableInfo;
15651565
}
15661566

1567+
TUserTable::TPtr TDataShard::AlterTableSwitchIndexState(
1568+
const TActorContext& ctx, TTransactionContext& txc,
1569+
const TPathId& pathId, ui64 tableSchemaVersion,
1570+
const TPathId& streamPathId, NKikimrSchemeOp::EIndexState state)
1571+
{
1572+
auto tableInfo = AlterTableSchemaVersion(ctx, txc, pathId, tableSchemaVersion, false);
1573+
tableInfo->SwitchIndexState(streamPathId, state);
1574+
1575+
// This isn't really necessary now, because no one rely on index state
1576+
NIceDb::TNiceDb db(txc.DB);
1577+
PersistUserTable(db, pathId.LocalPathId, *tableInfo);
1578+
1579+
return tableInfo;
1580+
}
1581+
15671582
TUserTable::TPtr TDataShard::AlterTableDropIndex(
15681583
const TActorContext& ctx, TTransactionContext& txc,
15691584
const TPathId& pathId, ui64 tableSchemaVersion,

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1857,6 +1857,11 @@ class TDataShard
18571857
const TPathId& pathId, ui64 tableSchemaVersion,
18581858
const NKikimrSchemeOp::TIndexDescription& indexDesc);
18591859

1860+
TUserTable::TPtr AlterTableSwitchIndexState(
1861+
const TActorContext& ctx, TTransactionContext& txc,
1862+
const TPathId& pathId, ui64 tableSchemaVersion,
1863+
const TPathId& streamPathId, NKikimrSchemeOp::EIndexState state);
1864+
18601865
TUserTable::TPtr AlterTableDropIndex(
18611866
const TActorContext& ctx, TTransactionContext& txc,
18621867
const TPathId& pathId, ui64 tableSchemaVersion,

ydb/core/tx/datashard/datashard_user_table.cpp

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ void TUserTable::AddIndex(const NKikimrSchemeOp::TIndexDescription& indexDesc) {
7272
}
7373

7474
Indexes.emplace(addIndexPathId, TTableIndex(indexDesc, Columns));
75-
AsyncIndexCount += ui32(indexDesc.GetType() == TTableIndex::EIndexType::EIndexTypeGlobalAsync);
75+
AsyncIndexCount += ui32(indexDesc.GetType() == TTableIndex::EType::EIndexTypeGlobalAsync);
7676

7777
NKikimrSchemeOp::TTableDescription schema;
7878
GetSchema(schema);
@@ -81,13 +81,37 @@ void TUserTable::AddIndex(const NKikimrSchemeOp::TIndexDescription& indexDesc) {
8181
SetSchema(schema);
8282
}
8383

84+
void TUserTable::SwitchIndexState(const TPathId& indexPathId, TTableIndex::EState state) {
85+
auto it = Indexes.find(indexPathId);
86+
if (it == Indexes.end()) {
87+
return;
88+
}
89+
90+
it->second.State = state;
91+
92+
// This isn't really necessary now, because no one rely on index state
93+
NKikimrSchemeOp::TTableDescription schema;
94+
GetSchema(schema);
95+
96+
auto& indexes = *schema.MutableTableIndexes();
97+
for (auto it = indexes.begin(); it != indexes.end(); ++it) {
98+
if (indexPathId == TPathId(it->GetPathOwnerId(), it->GetLocalPathId())) {
99+
it->SetState(state);
100+
SetSchema(schema);
101+
return;
102+
}
103+
}
104+
105+
Y_ABORT("unreachable");
106+
}
107+
84108
void TUserTable::DropIndex(const TPathId& indexPathId) {
85109
auto it = Indexes.find(indexPathId);
86110
if (it == Indexes.end()) {
87111
return;
88112
}
89113

90-
AsyncIndexCount -= ui32(it->second.Type == TTableIndex::EIndexType::EIndexTypeGlobalAsync);
114+
AsyncIndexCount -= ui32(it->second.Type == TTableIndex::EType::EIndexTypeGlobalAsync);
91115
Indexes.erase(it);
92116

93117
NKikimrSchemeOp::TTableDescription schema;
@@ -294,7 +318,7 @@ void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr)
294318
for (const auto& indexDesc : descr.GetTableIndexes()) {
295319
Y_ABORT_UNLESS(indexDesc.HasPathOwnerId() && indexDesc.HasLocalPathId());
296320
Indexes.emplace(TPathId(indexDesc.GetPathOwnerId(), indexDesc.GetLocalPathId()), TTableIndex(indexDesc, Columns));
297-
AsyncIndexCount += ui32(indexDesc.GetType() == TTableIndex::EIndexType::EIndexTypeGlobalAsync);
321+
AsyncIndexCount += ui32(indexDesc.GetType() == TTableIndex::EType::EIndexTypeGlobalAsync);
298322
}
299323

300324
for (const auto& streamDesc : descr.GetCdcStreams()) {

ydb/core/tx/datashard/datashard_user_table.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,12 @@ struct TUserTable : public TThrRefBase {
251251
};
252252

253253
struct TTableIndex {
254-
using EIndexType = NKikimrSchemeOp::EIndexType;
254+
using EType = NKikimrSchemeOp::EIndexType;
255+
using EState = NKikimrSchemeOp::EIndexState;
255256

256257
TString Name;
257-
EIndexType Type;
258+
EType Type;
259+
EState State;
258260
TVector<ui32> KeyColumnIds;
259261
TVector<ui32> DataColumnIds;
260262

@@ -263,6 +265,7 @@ struct TUserTable : public TThrRefBase {
263265
TTableIndex(const NKikimrSchemeOp::TIndexDescription& indexDesc, const TMap<ui32, TUserColumn>& columns)
264266
: Name(indexDesc.GetName())
265267
, Type(indexDesc.GetType())
268+
, State(indexDesc.GetState())
266269
{
267270
THashMap<TStringBuf, ui32> nameToId;
268271
for (const auto& [id, column] : columns) {
@@ -443,6 +446,7 @@ struct TUserTable : public TThrRefBase {
443446
bool ResetTableSchemaVersion();
444447

445448
void AddIndex(const NKikimrSchemeOp::TIndexDescription& indexDesc);
449+
void SwitchIndexState(const TPathId& indexPathId, TTableIndex::EState state);
446450
void DropIndex(const TPathId& indexPathId);
447451
bool HasAsyncIndexes() const;
448452

ydb/core/tx/datashard/drop_table_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op,
8181
Y_ABORT_UNLESS(it != DataShard.GetUserTables().end());
8282
{
8383
for (const auto& [indexPathId, indexInfo] : it->second->Indexes) {
84-
if (indexInfo.Type == TUserTable::TTableIndex::EIndexType::EIndexTypeGlobalAsync) {
84+
if (indexInfo.Type == TUserTable::TTableIndex::EType::EIndexTypeGlobalAsync) {
8585
RemoveSenders.emplace_back(new TEvChangeExchange::TEvRemoveSender(indexPathId));
8686
}
8787
}

ydb/core/tx/datashard/finalize_build_index_unit.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,17 @@ class TFinalizeBuildIndexUnit : public TExecutionUnit {
3737
Y_ABORT_UNLESS(version);
3838

3939
TUserTable::TPtr tableInfo;
40-
if (params.HasOutcome() && params.GetOutcome().HasCancel()) {
40+
if (params.HasOutcome() && params.GetOutcome().HasApply()) {
41+
const auto indexPathId = PathIdFromPathId(params.GetOutcome().GetApply().GetIndexPathId());
42+
43+
tableInfo = DataShard.AlterTableSwitchIndexState(ctx, txc, pathId, version, indexPathId, NKikimrSchemeOp::EIndexStateReady);
44+
} else if (params.HasOutcome() && params.GetOutcome().HasCancel()) {
45+
const auto indexPathId = PathIdFromPathId(params.GetOutcome().GetCancel().GetIndexPathId());
46+
4147
const auto& userTables = DataShard.GetUserTables();
4248
Y_ABORT_UNLESS(userTables.contains(pathId.LocalPathId));
4349
const auto& indexes = userTables.at(pathId.LocalPathId)->Indexes;
44-
45-
const auto indexPathId = PathIdFromPathId(params.GetOutcome().GetCancel().GetIndexPathId());
4650
auto it = indexes.find(indexPathId);
47-
4851
if (it != indexes.end() && it->second.Type == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync) {
4952
RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(indexPathId));
5053
}
@@ -66,7 +69,7 @@ class TFinalizeBuildIndexUnit : public TExecutionUnit {
6669
Y_ABORT_UNLESS(step != 0);
6770

6871
if (DataShard.GetBuildIndexManager().Contains(params.GetBuildIndexId())) {
69-
auto record = DataShard.GetBuildIndexManager().Get(params.GetBuildIndexId());
72+
auto record = DataShard.GetBuildIndexManager().Get(params.GetBuildIndexId());
7073
DataShard.CancelScan(tableInfo->LocalTid, record.ScanId);
7174
DataShard.GetBuildIndexManager().Drop(params.GetBuildIndexId());
7275
}

ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ TVector<ISubOperation::TPtr> ApplyBuildIndex(TOperationId nextId, const TTxTrans
3030
op->SetTableName(table.LeafName());
3131
op->SetSnapshotTxId(config.GetSnaphotTxId()); //TODO: fix spelling error in flat_scheme_op.proto first
3232
op->SetBuildIndexId(config.GetBuildIndexId());
33-
op->MutableOutcome()->MutableApply();
33+
if (!indexName.empty()) {
34+
TPath index = table.Child(indexName);
35+
PathIdFromPathId(index.Base()->PathId, op->MutableOutcome()->MutableApply()->MutableIndexPathId());
36+
}
3437

3538
result.push_back(CreateFinalizeBuildIndexMainTable(NextPartId(nextId, result), finalize));
3639
}

0 commit comments

Comments
 (0)