Skip to content

Commit 5d08216

Browse files
committed
Fix
1 parent 318bca5 commit 5d08216

5 files changed

+67
-93
lines changed

ydb/core/tx/schemeshard/schemeshard__monitoring.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,7 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
717717

718718
<< "Shards.size: " << info.Shards.size() << Endl
719719
<< "ToUploadShards.size: " << info.ToUploadShards.size() << Endl
720-
<< "DoneShards.size: " << info.DoneShardsSize << Endl
720+
<< "DoneShards.size: " << info.DoneShards.size() << Endl
721721
<< "InProgressShards.size: " << info.InProgressShards.size() << Endl
722722

723723
<< "DomainPathId: " << LinkToPathInfo(info.DomainPathId) << Endl

ydb/core/tx/schemeshard/schemeshard_build_index.cpp

+14-8
Original file line numberDiff line numberDiff line change
@@ -190,27 +190,33 @@ void TSchemeShard::PersistBuildIndexBilling(NIceDb::TNiceDb& db, const TIndexBui
190190
);
191191
}
192192

193-
void TSchemeShard::PersistBuildIndexUploadProgress(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo, const TShardIdx& shardIdx) {
194-
const TIndexBuildInfo::TShardStatus& shardStatus = indexInfo.Shards.at(shardIdx);
195-
db.Table<Schema::IndexBuildShardStatus>().Key(indexInfo.Id, shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update(
193+
void TSchemeShard::PersistBuildIndexUploadProgress(NIceDb::TNiceDb& db, TIndexBuildId buildId, const TShardIdx& shardIdx, const TIndexBuildInfo::TShardStatus& shardStatus) {
194+
db.Table<Schema::IndexBuildShardStatus>().Key(buildId, shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update(
196195
NIceDb::TUpdate<Schema::IndexBuildShardStatus::LastKeyAck>(shardStatus.LastKeyAck),
197196
NIceDb::TUpdate<Schema::IndexBuildShardStatus::Status>(shardStatus.Status),
198197
NIceDb::TUpdate<Schema::IndexBuildShardStatus::Message>(shardStatus.DebugMessage),
199198
NIceDb::TUpdate<Schema::IndexBuildShardStatus::UploadStatus>(shardStatus.UploadStatus),
200199
NIceDb::TUpdate<Schema::IndexBuildShardStatus::RowsProcessed>(shardStatus.Processed.GetRows()),
201200
NIceDb::TUpdate<Schema::IndexBuildShardStatus::BytesProcessed>(shardStatus.Processed.GetBytes())
202-
);
201+
);
203202
}
204203

205-
void TSchemeShard::PersistBuildIndexUploadInitiate(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo, const TShardIdx& shardIdx) {
206-
const TIndexBuildInfo::TShardStatus& shardStatus = indexInfo.Shards.at(shardIdx);
204+
void TSchemeShard::PersistBuildIndexUploadInitiate(NIceDb::TNiceDb& db, TIndexBuildId buildId, const TShardIdx& shardIdx, const TIndexBuildInfo::TShardStatus& shardStatus) {
207205
NKikimrTx::TKeyRange range;
208206
shardStatus.Range.Serialize(range);
209-
db.Table<Schema::IndexBuildShardStatus>().Key(indexInfo.Id, shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update(
207+
db.Table<Schema::IndexBuildShardStatus>().Key(buildId, shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update(
210208
NIceDb::TUpdate<Schema::IndexBuildShardStatus::Range>(range),
211209
NIceDb::TUpdate<Schema::IndexBuildShardStatus::LastKeyAck>(shardStatus.LastKeyAck),
212210
NIceDb::TUpdate<Schema::IndexBuildShardStatus::Status>(shardStatus.Status),
213-
NIceDb::TUpdate<Schema::IndexBuildShardStatus::UploadStatus>(shardStatus.UploadStatus));
211+
NIceDb::TUpdate<Schema::IndexBuildShardStatus::UploadStatus>(shardStatus.UploadStatus)
212+
);
213+
}
214+
215+
void TSchemeShard::PersistBuildIndexUploadReset(NIceDb::TNiceDb& db, TIndexBuildId buildId, const TShardIdx& shardIdx, TIndexBuildInfo::TShardStatus& shardStatus) {
216+
shardStatus.Status = NKikimrIndexBuilder::EBuildStatus::INVALID;
217+
db.Table<Schema::IndexBuildShardStatus>().Key(buildId, shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update(
218+
NIceDb::TUpdate<Schema::IndexBuildShardStatus::Status>(shardStatus.Status)
219+
);
214220
}
215221

216222
void TSchemeShard::PersistBuildIndexForget(NIceDb::TNiceDb& db, const TIndexBuildInfo& info) {

ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp

+46-79
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
590590
ev->Record.SetId(ui64(BuildId));
591591

592592
PathIdFromPathId(buildInfo.TablePathId, ev->Record.MutablePathId());
593-
*ev->Record.MutableSettings() = std::get<1>(buildInfo.SpecializedIndexDescription).GetSettings();
593+
*ev->Record.MutableSettings() = std::get<NKikimrSchemeOp::TVectorIndexKmeansTreeDescription>(
594+
buildInfo.SpecializedIndexDescription).GetSettings().settings();
594595
ev->Record.SetK(buildInfo.KMeans.K);
595596
ev->Record.SetUpload(buildInfo.KMeans.GetUpload());
596597
ev->Record.SetState(NKikimrTxDataShard::TEvLocalKMeansRequest::SAMPLE);
@@ -690,7 +691,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
690691
}
691692

692693
void ClearAfterFill(const TActorContext& ctx, TIndexBuildInfo& buildInfo) {
693-
buildInfo.DoneShardsSize = 0;
694+
buildInfo.DoneShards = {};
694695
buildInfo.InProgressShards = {};
695696
buildInfo.ToUploadShards = {};
696697

@@ -710,7 +711,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
710711
}
711712

712713
bool FillTable(TIndexBuildInfo& buildInfo) {
713-
if (buildInfo.DoneShardsSize == 0 && buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty()) {
714+
if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty()) {
714715
for (const auto& [idx, status] : buildInfo.Shards) {
715716
switch (status.Status) {
716717
case NKikimrIndexBuilder::EBuildStatus::INVALID:
@@ -720,7 +721,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
720721
buildInfo.ToUploadShards.emplace_back(idx);
721722
break;
722723
case NKikimrIndexBuilder::EBuildStatus::DONE:
723-
++buildInfo.DoneShardsSize;
724+
buildInfo.DoneShards.emplace_back(idx);
724725
break;
725726
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
726727
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
@@ -729,11 +730,11 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
729730
}
730731
}
731732
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendBuildIndexRequest(shardIdx, buildInfo); }) &&
732-
buildInfo.DoneShardsSize == buildInfo.Shards.size();
733+
buildInfo.DoneShards.size() == buildInfo.Shards.size();
733734
}
734735

735736
void ComputeKMeansState(TIndexBuildInfo& buildInfo) {
736-
if (buildInfo.DoneShardsSize != 0 || !buildInfo.InProgressShards.empty() || !buildInfo.ToUploadShards.empty()) {
737+
if (!buildInfo.DoneShards.empty() || !buildInfo.InProgressShards.empty() || !buildInfo.ToUploadShards.empty()) {
737738
return;
738739
}
739740
std::array<NScheme::TTypeInfo, 1> typeInfos{NScheme::NTypeIds::Uint32};
@@ -754,14 +755,14 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
754755
buildInfo.ToUploadShards.emplace_back(idx);
755756
break;
756757
case NKikimrIndexBuilder::EBuildStatus::DONE:
757-
++buildInfo.DoneShardsSize;
758+
buildInfo.DoneShards.emplace_back(idx);
758759
break;
759760
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
760761
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
761762
Y_ABORT("Unreachable");
762763
}
763764
}
764-
if (buildInfo.DoneShardsSize == 0 && buildInfo.ToUploadShards.size() == 1) {
765+
if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.size() == 1) {
765766
buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Local;
766767
}
767768
}
@@ -799,12 +800,17 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
799800
return true;
800801
}
801802

802-
bool FillVectorIndex(TIndexBuildInfo& buildInfo) {
803+
bool FillVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
803804
ComputeKMeansState(buildInfo);
804805
if (!SendVectorIndex(buildInfo)) {
805806
return false;
806807
}
807-
buildInfo.DoneShardsSize = 0;
808+
NIceDb::TNiceDb db{txc.DB};
809+
for (const auto& idx : buildInfo.DoneShards) {
810+
auto& status = buildInfo.Shards.at(idx);
811+
Self->PersistBuildIndexUploadReset(db, BuildId, idx, status);
812+
}
813+
buildInfo.DoneShards.clear();
808814

809815
if (!buildInfo.Sample.Sent && !buildInfo.Sample.Rows.empty()) {
810816
SendUploadSampleKRequest(buildInfo);
@@ -843,7 +849,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
843849
InitiateShards(db, buildInfo);
844850
}
845851
if (buildInfo.IsBuildVectorIndex()) {
846-
return FillVectorIndex(buildInfo);
852+
return FillVectorIndex(txc, buildInfo);
847853
} else {
848854
Y_ASSERT(buildInfo.IsBuildSecondaryIndex() || buildInfo.IsBuildColumns());
849855
return FillTable(buildInfo);
@@ -1089,14 +1095,16 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
10891095
auto tableColumns = NTableIndex::ExtractInfo(table); // skip dropped columns
10901096
TSerializedTableRange shardRange = InfiniteRange(tableColumns.Keys.size());
10911097

1098+
Y_ASSERT(buildInfo.Shards.empty());
10921099
for (const auto& x: table->GetPartitions()) {
10931100
Y_ABORT_UNLESS(Self->ShardInfos.contains(x.ShardIdx));
10941101
TSerializedCellVec bound{x.EndOfRange};
10951102
shardRange.To = bound;
1096-
buildInfo.Shards.emplace(x.ShardIdx, TIndexBuildInfo::TShardStatus{std::move(shardRange), ""});
1103+
auto [it, emplaced] = buildInfo.Shards.emplace(x.ShardIdx, TIndexBuildInfo::TShardStatus{std::move(shardRange), ""});
1104+
Y_ASSERT(emplaced);
10971105
shardRange.From = std::move(bound);
10981106

1099-
Self->PersistBuildIndexUploadInitiate(db, buildInfo, x.ShardIdx);
1107+
Self->PersistBuildIndexUploadInitiate(db, BuildId, x.ShardIdx, it->second);
11001108
}
11011109
}
11021110

@@ -1316,33 +1324,21 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex
13161324
NYql::TIssues issues;
13171325
NYql::IssuesFromMessage(record.GetIssues(), issues);
13181326
shardStatus.DebugMessage = issues.ToString();
1319-
1320-
NIceDb::TNiceDb db(txc.DB);
1321-
Self->PersistBuildIndexUploadProgress(db, buildInfo, shardIdx);
13221327
shardStatus.Status = record.GetStatus();
13231328

1329+
NIceDb::TNiceDb db(txc.DB);
13241330
switch (shardStatus.Status) {
13251331
case NKikimrIndexBuilder::EBuildStatus::DONE:
13261332
if (buildInfo.InProgressShards.erase(shardIdx)) {
1327-
++buildInfo.DoneShardsSize;
1328-
1329-
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1330-
1331-
Progress(buildId);
1333+
buildInfo.DoneShards.emplace_back(shardIdx);
13321334
}
13331335
break;
1334-
13351336
case NKikimrIndexBuilder::EBuildStatus::ABORTED:
13361337
// datashard gracefully rebooted, reschedule shard
13371338
if (buildInfo.InProgressShards.erase(shardIdx)) {
13381339
buildInfo.ToUploadShards.emplace_front(shardIdx);
1339-
1340-
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1341-
1342-
Progress(buildId);
13431340
}
13441341
break;
1345-
13461342
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
13471343
buildInfo.Issue += TStringBuilder()
13481344
<< "One of the shards report BAD_REQUEST at Filling stage, process has to be canceled"
@@ -1352,14 +1348,16 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex
13521348
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
13531349

13541350
Progress(buildId);
1355-
break;
1356-
1351+
return true;
13571352
case NKikimrIndexBuilder::EBuildStatus::INVALID:
13581353
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
13591354
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
13601355
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
13611356
Y_ABORT("Unreachable");
13621357
}
1358+
Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
1359+
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1360+
Progress(buildId);
13631361
break;
13641362
}
13651363
case TIndexBuildInfo::EState::AlterMainTable:
@@ -1379,8 +1377,8 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex
13791377
case TIndexBuildInfo::EState::Rejection_Applying:
13801378
case TIndexBuildInfo::EState::Rejection_Unlocking:
13811379
case TIndexBuildInfo::EState::Rejected:
1382-
LOG_D("TTxReply : TEvSampleKResponse"
1383-
<< " superfluous message " << record.ShortDebugString());
1380+
LOG_D("TTxReply : TEvSampleKResponse superfluous message "
1381+
<< record.ShortDebugString());
13841382
break;
13851383
}
13861384

@@ -1449,31 +1447,20 @@ struct TSchemeShard::TIndexBuilder::TTxReplyLocalKMeans: public TSchemeShard::TI
14491447
shardStatus.DebugMessage = issues.ToString();
14501448

14511449
NIceDb::TNiceDb db(txc.DB);
1452-
Self->PersistBuildIndexUploadProgress(db, buildInfo, shardIdx);
14531450
shardStatus.Status = record.GetStatus();
14541451

14551452
switch (shardStatus.Status) {
14561453
case NKikimrIndexBuilder::EBuildStatus::DONE:
14571454
if (buildInfo.InProgressShards.erase(shardIdx)) {
1458-
++buildInfo.DoneShardsSize;
1459-
1460-
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1461-
1462-
Progress(buildId);
1455+
buildInfo.DoneShards.emplace_back(shardIdx);
14631456
}
14641457
break;
1465-
14661458
case NKikimrIndexBuilder::EBuildStatus::ABORTED:
14671459
// datashard gracefully rebooted, reschedule shard
14681460
if (buildInfo.InProgressShards.erase(shardIdx)) {
14691461
buildInfo.ToUploadShards.emplace_front(shardIdx);
1470-
1471-
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1472-
1473-
Progress(buildId);
14741462
}
14751463
break;
1476-
14771464
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
14781465
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
14791466
buildInfo.Issue += TStringBuilder()
@@ -1485,13 +1472,15 @@ struct TSchemeShard::TIndexBuilder::TTxReplyLocalKMeans: public TSchemeShard::TI
14851472
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
14861473

14871474
Progress(buildId);
1488-
break;
1489-
1475+
return true;
14901476
case NKikimrIndexBuilder::EBuildStatus::INVALID:
14911477
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
14921478
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
14931479
Y_ABORT("Unreachable");
14941480
}
1481+
Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
1482+
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1483+
Progress(buildId);
14951484
break;
14961485
}
14971486
case TIndexBuildInfo::EState::AlterMainTable:
@@ -1581,31 +1570,20 @@ struct TSchemeShard::TIndexBuilder::TTxReplyReshuffleKMeans: public TSchemeShard
15811570
shardStatus.DebugMessage = issues.ToString();
15821571

15831572
NIceDb::TNiceDb db(txc.DB);
1584-
Self->PersistBuildIndexUploadProgress(db, buildInfo, shardIdx);
15851573
shardStatus.Status = record.GetStatus();
15861574

15871575
switch (shardStatus.Status) {
15881576
case NKikimrIndexBuilder::EBuildStatus::DONE:
15891577
if (buildInfo.InProgressShards.erase(shardIdx)) {
1590-
++buildInfo.DoneShardsSize;
1591-
1592-
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1593-
1594-
Progress(buildId);
1578+
buildInfo.DoneShards.emplace_back(shardIdx);
15951579
}
15961580
break;
1597-
15981581
case NKikimrIndexBuilder::EBuildStatus::ABORTED:
15991582
// datashard gracefully rebooted, reschedule shard
16001583
if (buildInfo.InProgressShards.erase(shardIdx)) {
16011584
buildInfo.ToUploadShards.emplace_front(shardIdx);
1602-
1603-
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1604-
1605-
Progress(buildId);
16061585
}
16071586
break;
1608-
16091587
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
16101588
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
16111589
buildInfo.Issue += TStringBuilder()
@@ -1615,15 +1593,16 @@ struct TSchemeShard::TIndexBuilder::TTxReplyReshuffleKMeans: public TSchemeShard
16151593
<< ", shardIdx: " << shardIdx;
16161594
Self->PersistBuildIndexIssue(db, buildInfo);
16171595
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
1618-
16191596
Progress(buildId);
1620-
break;
1621-
1597+
return true;
16221598
case NKikimrIndexBuilder::EBuildStatus::INVALID:
16231599
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
16241600
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
16251601
Y_ABORT("Unreachable");
16261602
}
1603+
Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
1604+
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1605+
Progress(buildId);
16271606
break;
16281607
}
16291608
case TIndexBuildInfo::EState::AlterMainTable:
@@ -1824,38 +1803,25 @@ struct TSchemeShard::TIndexBuilder::TTxReplyProgress: public TSchemeShard::TInde
18241803
shardStatus.DebugMessage = issues.ToString();
18251804

18261805
NIceDb::TNiceDb db(txc.DB);
1827-
Self->PersistBuildIndexUploadProgress(db, buildInfo, shardIdx);
1828-
18291806
switch (shardStatus.Status) {
18301807
case NKikimrIndexBuilder::EBuildStatus::INVALID:
18311808
Y_ABORT("Unreachable");
1832-
18331809
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
18341810
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
18351811
// no oop, wait resolution. Progress key are persisted
1836-
break;
1837-
1812+
Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
1813+
return true;
18381814
case NKikimrIndexBuilder::EBuildStatus::DONE:
18391815
if (buildInfo.InProgressShards.erase(shardIdx)) {
1840-
++buildInfo.DoneShardsSize;
1841-
1842-
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1843-
1844-
Progress(buildId);
1816+
buildInfo.DoneShards.emplace_back(shardIdx);
18451817
}
18461818
break;
1847-
18481819
case NKikimrIndexBuilder::EBuildStatus::ABORTED:
18491820
// datashard gracefully rebooted, reschedule shard
18501821
if (buildInfo.InProgressShards.erase(shardIdx)) {
18511822
buildInfo.ToUploadShards.emplace_front(shardIdx);
1852-
1853-
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1854-
1855-
Progress(buildId);
18561823
}
18571824
break;
1858-
18591825
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
18601826
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
18611827
buildInfo.Issue += TStringBuilder()
@@ -1865,11 +1831,12 @@ struct TSchemeShard::TIndexBuilder::TTxReplyProgress: public TSchemeShard::TInde
18651831
<< ", shardIdx: " << shardIdx;
18661832
Self->PersistBuildIndexIssue(db, buildInfo);
18671833
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
1868-
18691834
Progress(buildId);
1870-
break;
1835+
return true;
18711836
}
1872-
1837+
Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
1838+
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1839+
Progress(buildId);
18731840
break;
18741841
}
18751842
case TIndexBuildInfo::EState::AlterMainTable:

0 commit comments

Comments
 (0)