Skip to content

Commit 3e87d00

Browse files
MBkktCyberROFL
andauthored
Refactoring: DataShard handling indexes (#6425)
Co-authored-by: Ilnaz Nizametdinov <[email protected]>
1 parent 8c6ee31 commit 3e87d00

10 files changed

+40
-30
lines changed

ydb/core/tx/datashard/build_index.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -696,15 +696,15 @@ void TDataShard::HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev,
696696
TUserTable::TCPtr userTable = GetUserTables().at(tableId.PathId.LocalPathId);
697697

698698

699-
if (BuildIndexManager.Contains(buildIndexId)) {
700-
TBuildIndexRecord recCard = BuildIndexManager.Get(buildIndexId);
701-
if (recCard.SeqNo == seqNo) {
699+
if (const auto* recCard = BuildIndexManager.Get(buildIndexId)) {
700+
if (recCard->SeqNo == seqNo) {
702701
// do no start one more scan
703702
ctx.Send(ev->Sender, std::move(response));
704703
return;
705704
}
706705

707-
CancelScan(userTable->LocalTid, recCard.ScanId);
706+
CancelScan(userTable->LocalTid, recCard->ScanId);
707+
BuildIndexManager.Drop(buildIndexId);
708708
}
709709

710710

ydb/core/tx/datashard/build_index.h

+17-11
Original file line numberDiff line numberDiff line change
@@ -33,26 +33,32 @@ struct TBuildIndexRecord {
3333

3434
class TBuildIndexManager {
3535
public:
36-
bool Contains(ui64 id) {
37-
return Records.contains(id);
36+
const TBuildIndexRecord* Get(ui64 id) const {
37+
Y_ABORT_UNLESS(id != 0);
38+
if (BuildIndexId == id) {
39+
return &Record;
40+
}
41+
Y_ABORT_UNLESS(BuildIndexId == 0);
42+
return nullptr;
3843
}
3944

4045
void Set(ui64 id, TBuildIndexRecord record) {
41-
Records.emplace(id, record);
42-
}
43-
44-
TBuildIndexRecord Get(ui64 id) const {
45-
return Records.at(id);
46+
Y_ABORT_UNLESS(id != 0);
47+
Y_ABORT_UNLESS(BuildIndexId == 0);
48+
BuildIndexId = id;
49+
Record = record;
4650
}
4751

4852
void Drop(ui64 id) {
49-
Records.erase(id);
53+
Y_ABORT_UNLESS(Get(id) == &Record);
54+
BuildIndexId = 0;
55+
Record = {};
5056
}
5157

5258
private:
53-
using TBuildIndexIdToScanIdMap = TMap<ui64, TBuildIndexRecord>;
54-
55-
TBuildIndexIdToScanIdMap Records;
59+
// Only single shard scan for build index possible now
60+
ui64 BuildIndexId = 0;
61+
TBuildIndexRecord Record;
5662
};
5763

5864
}

ydb/core/tx/datashard/change_collector_async_index.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
105105
const auto tagToPos = MakeTagToPos(tagsToSelect, [](const auto tag) { return tag; });
106106
const auto updatedTagToPos = MakeTagToPos(updates, [](const TUpdateOp& op) { return op.Tag; });
107107

108-
userTable->ForEachAsyncIndex([&] (const auto& pathId, const auto& index) {
108+
userTable->ForEachAsyncIndex([&](const auto& pathId, const TUserTable::TTableIndex& index) {
109109
if (generateDeletions) {
110110
bool needDeletion = rop == ERowOp::Erase || rop == ERowOp::Reset;
111111

@@ -198,7 +198,7 @@ auto TAsyncIndexChangeCollector::CacheTags(const TTableId& tableId) const {
198198

199199
TCachedTagsBuilder builder;
200200

201-
userTable->ForEachAsyncIndex([&] (const auto&, const auto& index) {
201+
userTable->ForEachAsyncIndex([&](const auto&, const TUserTable::TTableIndex& index) {
202202
builder.AddIndexTags(index.KeyColumnIds);
203203
builder.AddDataTags(index.DataColumnIds);
204204
});

ydb/core/tx/datashard/change_record_cdc_serializer.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,9 @@ class TDynamoDBStreamsJsonSerializer: public TJsonSerializer {
370370
} else if (name.StartsWith("__Hash_")) {
371371
bool indexed = false;
372372
for (const auto& [_, index] : schema->Indexes) {
373+
if (index.Type != TUserTable::TTableIndex::EType::EIndexTypeGlobalAsync) {
374+
continue;
375+
}
373376
Y_ABORT_UNLESS(index.KeyColumnIds.size() >= 1);
374377
if (index.KeyColumnIds.at(0) == tag) {
375378
indexed = true;

ydb/core/tx/datashard/change_sender.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ class TChangeSender: public TActor<TChangeSender> {
275275
for (const auto& [tableId, tableInfo] : self->GetUserTables()) {
276276
const auto fullTableId = TTableId(self->GetPathOwnerId(), tableId);
277277

278-
tableInfo->ForEachAsyncIndex([&] (const auto& indexPathId, const auto&) {
278+
tableInfo->ForEachAsyncIndex([&](const auto& indexPathId, const auto&) {
279279
AddChangeSender(indexPathId, fullTableId, ESenderType::AsyncIndex);
280280
});
281281

ydb/core/tx/datashard/datashard.cpp

+1-3
Original file line numberDiff line numberDiff line change
@@ -1825,9 +1825,7 @@ TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxD
18251825
node.key() = remapNewId;
18261826
auto it = newTableInfo->Indexes.insert(std::move(node)).position;
18271827

1828-
Y_ABORT_UNLESS(move.GetReMapIndex().HasDstName());
1829-
indexDesc.SetName(dstIndexName);
1830-
it->second.Name = dstIndexName;
1828+
it->second.Rename(indexDesc, dstIndexName);
18311829
}
18321830

18331831
newTableInfo->SetSchema(schema);

ydb/core/tx/datashard/datashard_impl.h

-1
Original file line numberDiff line numberDiff line change
@@ -1958,7 +1958,6 @@ class TDataShard
19581958
}
19591959

19601960
TBuildIndexManager& GetBuildIndexManager() { return BuildIndexManager; }
1961-
const TBuildIndexManager& GetBuildIndexManager() const { return BuildIndexManager; }
19621961

19631962
// Returns true when datashard is working in mvcc mode
19641963
bool IsMvccEnabled() const;

ydb/core/tx/datashard/datashard_user_table.h

+8-3
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,6 @@ struct TUserTable : public TThrRefBase {
254254
using EType = NKikimrSchemeOp::EIndexType;
255255
using EState = NKikimrSchemeOp::EIndexState;
256256

257-
TString Name;
258257
EType Type;
259258
EState State;
260259
TVector<ui32> KeyColumnIds;
@@ -263,10 +262,12 @@ struct TUserTable : public TThrRefBase {
263262
TTableIndex() = default;
264263

265264
TTableIndex(const NKikimrSchemeOp::TIndexDescription& indexDesc, const TMap<ui32, TUserColumn>& columns)
266-
: Name(indexDesc.GetName())
267-
, Type(indexDesc.GetType())
265+
: Type(indexDesc.GetType())
268266
, State(indexDesc.GetState())
269267
{
268+
if (Type != EType::EIndexTypeGlobalAsync) {
269+
return;
270+
}
270271
THashMap<TStringBuf, ui32> nameToId;
271272
for (const auto& [id, column] : columns) {
272273
Y_DEBUG_ABORT_UNLESS(!nameToId.contains(column.Name));
@@ -285,6 +286,10 @@ struct TUserTable : public TThrRefBase {
285286
fillColumnIds(indexDesc.GetKeyColumnNames(), KeyColumnIds);
286287
fillColumnIds(indexDesc.GetDataColumnNames(), DataColumnIds);
287288
}
289+
290+
static void Rename(NKikimrSchemeOp::TIndexDescription& indexDesc, const TString& newName) {
291+
indexDesc.SetName(newName);
292+
}
288293
};
289294

290295
struct TCdcStream {

ydb/core/tx/datashard/drop_table_unit.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op,
8080
auto it = DataShard.GetUserTables().find(tableId);
8181
Y_ABORT_UNLESS(it != DataShard.GetUserTables().end());
8282
{
83-
it->second->ForEachAsyncIndex([&] (const auto& indexPathId, const auto&) {
83+
it->second->ForEachAsyncIndex([&](const auto& indexPathId, const auto&) {
8484
RemoveSenders.emplace_back(new TEvChangeExchange::TEvRemoveSender(indexPathId));
8585
});
8686
for (const auto& [streamPathId, _] : it->second->CdcStreams) {

ydb/core/tx/datashard/finalize_build_index_unit.cpp

+3-4
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class TFinalizeBuildIndexUnit : public TExecutionUnit {
4646

4747
const auto& userTables = DataShard.GetUserTables();
4848
Y_ABORT_UNLESS(userTables.contains(pathId.LocalPathId));
49-
userTables.at(pathId.LocalPathId)->ForAsyncIndex(indexPathId, [&] (const auto&) {
49+
userTables.at(pathId.LocalPathId)->ForAsyncIndex(indexPathId, [&](const auto&) {
5050
RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(indexPathId));
5151
});
5252

@@ -66,9 +66,8 @@ class TFinalizeBuildIndexUnit : public TExecutionUnit {
6666
ui64 txId = params.GetSnapshotTxId();
6767
Y_ABORT_UNLESS(step != 0);
6868

69-
if (DataShard.GetBuildIndexManager().Contains(params.GetBuildIndexId())) {
70-
auto record = DataShard.GetBuildIndexManager().Get(params.GetBuildIndexId());
71-
DataShard.CancelScan(tableInfo->LocalTid, record.ScanId);
69+
if (const auto* record = DataShard.GetBuildIndexManager().Get(params.GetBuildIndexId())) {
70+
DataShard.CancelScan(tableInfo->LocalTid, record->ScanId);
7271
DataShard.GetBuildIndexManager().Drop(params.GetBuildIndexId());
7372
}
7473

0 commit comments

Comments
 (0)