Skip to content

Commit 6819f47

Browse files
committed
Add local kmeans
1 parent a640494 commit 6819f47

6 files changed

+188
-9
lines changed

ydb/core/tx/schemeshard/schemeshard_build_index.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ void TSchemeShard::Handle(TEvDataShard::TEvReshuffleKMeansResponse::TPtr& ev, co
3636
Execute(CreateTxReply(ev), ctx);
3737
}
3838

39+
void TSchemeShard::Handle(TEvDataShard::TEvLocalKMeansResponse::TPtr& ev, const TActorContext& ctx) {
40+
Execute(CreateTxReply(ev), ctx);
41+
}
42+
3943
void TSchemeShard::Handle(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& ev, const TActorContext& ctx) {
4044
Execute(CreateTxReply(ev), ctx);
4145
}

ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp

+178-7
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,46 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
585585
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
586586
}
587587

588+
void SendKMeansLocalRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
589+
Y_ASSERT(buildInfo.IsBuildVectorIndex());
590+
auto ev = MakeHolder<TEvDataShard::TEvLocalKMeansRequest>();
591+
ev->Record.SetId(ui64(BuildId));
592+
593+
auto path = TPath::Init(buildInfo.TablePathId, Self).Dive(buildInfo.IndexName);
594+
if (buildInfo.KMeans.Level == 0) {
595+
PathIdFromPathId(buildInfo.TablePathId, ev->Record.MutablePathId());
596+
} else {
597+
PathIdFromPathId(path.Dive(buildInfo.KMeans.ReadFrom())->PathId, ev->Record.MutablePathId());
598+
path.Rise();
599+
}
600+
*ev->Record.MutableSettings() = std::get<NKikimrSchemeOp::TVectorIndexKmeansTreeDescription>(
601+
buildInfo.SpecializedIndexDescription).GetSettings().settings();
602+
ev->Record.SetK(buildInfo.KMeans.K);
603+
ev->Record.SetUpload(buildInfo.KMeans.GetUpload());
604+
ev->Record.SetState(NKikimrTxDataShard::TEvLocalKMeansRequest::SAMPLE);
605+
606+
ev->Record.SetDoneRounds(0);
607+
ev->Record.SetNeedsRounds(3); // TODO(mbkkt) should be configurable
608+
609+
ev->Record.SetParent(buildInfo.KMeans.Parent);
610+
ev->Record.SetChild(buildInfo.KMeans.ChildBegin);
611+
612+
ev->Record.SetPostingName(path.Dive(buildInfo.KMeans.WriteTo()).PathString());
613+
path.Rise().Dive(NTableIndex::NTableVectorKmeansTreeIndex::LevelTable);
614+
ev->Record.SetLevelName(path.PathString());
615+
616+
ev->Record.SetEmbeddingColumn(buildInfo.IndexColumns[0]);
617+
*ev->Record.MutableDataColumns() = {
618+
buildInfo.DataColumns.begin(), buildInfo.DataColumns.end()
619+
};
620+
621+
auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo);
622+
ev->Record.SetSeed(ui64(shardId));
623+
LOG_D("TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record.ShortDebugString());
624+
625+
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
626+
}
627+
588628
void SendBuildIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
589629
auto ev = MakeHolder<TEvDataShard::TEvBuildIndexCreateRequest>();
590630
ev->Record.SetBuildIndexId(ui64(BuildId));
@@ -719,10 +759,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
719759
Y_ABORT("Unreachable");
720760
}
721761
}
722-
// TODO(mbkkt) enable detection of Local case
723-
// if (buildInfo.ToUploadShards.size() == 1 && buildInfo.DoneShardsSize == 0) {
724-
// buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Local;
725-
// }
762+
if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.size() == 1) {
763+
buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Local;
764+
}
726765
}
727766

728767
bool SendKMeansSample(TIndexBuildInfo& buildInfo) {
@@ -739,6 +778,10 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
739778
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendKMeansReshuffleRequest(shardIdx, buildInfo); });
740779
}
741780

781+
bool SendKMeansLocal(TIndexBuildInfo& buildInfo) {
782+
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendKMeansLocalRequest(shardIdx, buildInfo); });
783+
}
784+
742785
bool SendVectorIndex(TIndexBuildInfo& buildInfo) {
743786
switch (buildInfo.KMeans.State) {
744787
case TIndexBuildInfo::TKMeans::Sample:
@@ -748,9 +791,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
748791
// return SendKMeansRecompute(buildInfo);
749792
case TIndexBuildInfo::TKMeans::Reshuffle:
750793
return SendKMeansReshuffle(buildInfo);
751-
// TODO(mbkkt)
752-
// case TIndexBuildInfo::TKMeans::Local:
753-
// return SendKMeansLocal(buildInfo);
794+
case TIndexBuildInfo::TKMeans::Local:
795+
return SendKMeansLocal(buildInfo);
754796
}
755797
return true;
756798
}
@@ -1357,6 +1399,131 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex
13571399
}
13581400
};
13591401

1402+
struct TSchemeShard::TIndexBuilder::TTxReplyLocalKMeans: public TSchemeShard::TIndexBuilder::TTxReply {
1403+
private:
1404+
TEvDataShard::TEvLocalKMeansResponse::TPtr Local;
1405+
1406+
public:
1407+
explicit TTxReplyLocalKMeans(TSelf* self, TEvDataShard::TEvLocalKMeansResponse::TPtr& local)
1408+
: TTxReply(self)
1409+
, Local(local)
1410+
{
1411+
}
1412+
1413+
bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override {
1414+
auto& record = Local->Get()->Record;
1415+
1416+
LOG_I("TTxReply : TEvLocalKMeansResponse, Id# " << record.GetId());
1417+
1418+
const auto buildId = TIndexBuildId(record.GetId());
1419+
const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId);
1420+
if (!buildInfoPtr) {
1421+
return true;
1422+
}
1423+
auto& buildInfo = *buildInfoPtr->Get();
1424+
LOG_D("TTxReply : TEvLocalKMeansResponse"
1425+
<< ", TIndexBuildInfo: " << buildInfo
1426+
<< ", record: " << record.ShortDebugString());
1427+
1428+
TTabletId shardId = TTabletId(record.GetTabletId());
1429+
if (!Self->TabletIdToShardIdx.contains(shardId)) {
1430+
return true;
1431+
}
1432+
1433+
TShardIdx shardIdx = Self->TabletIdToShardIdx.at(shardId);
1434+
if (!buildInfo.Shards.contains(shardIdx)) {
1435+
return true;
1436+
}
1437+
1438+
switch (const auto state = buildInfo.State; state) {
1439+
case TIndexBuildInfo::EState::Filling:
1440+
{
1441+
TIndexBuildInfo::TShardStatus& shardStatus = buildInfo.Shards.at(shardIdx);
1442+
1443+
auto actualSeqNo = std::pair<ui64, ui64>(Self->Generation(), shardStatus.SeqNoRound);
1444+
auto recordSeqNo = std::pair<ui64, ui64>(record.GetRequestSeqNoGeneration(), record.GetRequestSeqNoRound());
1445+
1446+
if (actualSeqNo != recordSeqNo) {
1447+
LOG_D("TTxReply : TEvLocalKMeansResponse"
1448+
<< " ignore progress message by seqNo"
1449+
<< ", TIndexBuildInfo: " << buildInfo
1450+
<< ", actual seqNo for the shard " << shardId << " (" << shardIdx << ") is: " << Self->Generation() << ":" << shardStatus.SeqNoRound
1451+
<< ", record: " << record.ShortDebugString());
1452+
Y_ABORT_UNLESS(actualSeqNo > recordSeqNo);
1453+
return true;
1454+
}
1455+
1456+
TBillingStats stats{record.GetUploadRows(), record.GetUploadBytes(), record.GetReadRows(), record.GetReadBytes()};
1457+
shardStatus.Processed += stats;
1458+
buildInfo.Processed += stats;
1459+
1460+
NYql::TIssues issues;
1461+
NYql::IssuesFromMessage(record.GetIssues(), issues);
1462+
shardStatus.DebugMessage = issues.ToString();
1463+
1464+
NIceDb::TNiceDb db(txc.DB);
1465+
shardStatus.Status = record.GetStatus();
1466+
1467+
switch (shardStatus.Status) {
1468+
case NKikimrIndexBuilder::EBuildStatus::DONE:
1469+
if (buildInfo.InProgressShards.erase(shardIdx)) {
1470+
buildInfo.DoneShards.emplace_back(shardIdx);
1471+
}
1472+
break;
1473+
case NKikimrIndexBuilder::EBuildStatus::ABORTED:
1474+
// datashard gracefully rebooted, reschedule shard
1475+
if (buildInfo.InProgressShards.erase(shardIdx)) {
1476+
buildInfo.ToUploadShards.emplace_front(shardIdx);
1477+
}
1478+
break;
1479+
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
1480+
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
1481+
buildInfo.Issue += TStringBuilder()
1482+
<< "One of the shards report "<< shardStatus.Status
1483+
<< " at Filling stage, process has to be canceled"
1484+
<< ", shardId: " << shardId
1485+
<< ", shardIdx: " << shardIdx;
1486+
Self->PersistBuildIndexIssue(db, buildInfo);
1487+
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
1488+
1489+
Progress(buildId);
1490+
return true;
1491+
case NKikimrIndexBuilder::EBuildStatus::INVALID:
1492+
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
1493+
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
1494+
Y_ABORT("Unreachable");
1495+
}
1496+
Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
1497+
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
1498+
Progress(buildId);
1499+
break;
1500+
}
1501+
case TIndexBuildInfo::EState::AlterMainTable:
1502+
case TIndexBuildInfo::EState::Invalid:
1503+
case TIndexBuildInfo::EState::Locking:
1504+
case TIndexBuildInfo::EState::GatheringStatistics:
1505+
case TIndexBuildInfo::EState::Initiating:
1506+
case TIndexBuildInfo::EState::DropBuild:
1507+
case TIndexBuildInfo::EState::CreateBuild:
1508+
case TIndexBuildInfo::EState::Applying:
1509+
case TIndexBuildInfo::EState::Unlocking:
1510+
case TIndexBuildInfo::EState::Done:
1511+
Y_FAIL_S("Unreachable " << Name(state));
1512+
case TIndexBuildInfo::EState::Cancellation_Applying:
1513+
case TIndexBuildInfo::EState::Cancellation_Unlocking:
1514+
case TIndexBuildInfo::EState::Cancelled:
1515+
case TIndexBuildInfo::EState::Rejection_Applying:
1516+
case TIndexBuildInfo::EState::Rejection_Unlocking:
1517+
case TIndexBuildInfo::EState::Rejected:
1518+
LOG_D("TTxReply : TEvLocalKMeansResponse"
1519+
<< " superfluous message " << record.ShortDebugString());
1520+
break;
1521+
}
1522+
1523+
return true;
1524+
}
1525+
};
1526+
13601527
struct TSchemeShard::TIndexBuilder::TTxReplyReshuffleKMeans: public TSchemeShard::TIndexBuilder::TTxReply {
13611528
private:
13621529
TEvDataShard::TEvReshuffleKMeansResponse::TPtr Reshuffle;
@@ -2112,6 +2279,10 @@ ITransaction* TSchemeShard::CreateTxReply(TEvDataShard::TEvReshuffleKMeansRespon
21122279
return new TIndexBuilder::TTxReplyReshuffleKMeans(this, reshuffle);
21132280
}
21142281

2282+
ITransaction* TSchemeShard::CreateTxReply(TEvDataShard::TEvLocalKMeansResponse::TPtr& local) {
2283+
return new TIndexBuilder::TTxReplyLocalKMeans(this, local);
2284+
}
2285+
21152286
ITransaction* TSchemeShard::CreateTxReply(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& upload) {
21162287
return new TIndexBuilder::TTxReplyUpload(this, upload);
21172288
}

ydb/core/tx/schemeshard/schemeshard_impl.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -4758,6 +4758,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
47584758
HFuncTraced(TEvPrivate::TEvIndexBuildingMakeABill, Handle);
47594759
HFuncTraced(TEvDataShard::TEvSampleKResponse, Handle);
47604760
HFuncTraced(TEvDataShard::TEvReshuffleKMeansResponse, Handle);
4761+
HFuncTraced(TEvDataShard::TEvLocalKMeansResponse, Handle);
47614762
HFuncTraced(TEvIndexBuilder::TEvUploadSampleKResponse, Handle);
47624763
// } // NIndexBuilder
47634764

ydb/core/tx/schemeshard/schemeshard_impl.h

+3
Original file line numberDiff line numberDiff line change
@@ -1346,6 +1346,7 @@ class TSchemeShard
13461346
struct TTxReplyRetry;
13471347
struct TTxReplySampleK;
13481348
struct TTxReplyReshuffleKMeans;
1349+
struct TTxReplyLocalKMeans;
13491350
struct TTxReplyUpload;
13501351

13511352
struct TTxPipeReset;
@@ -1364,6 +1365,7 @@ class TSchemeShard
13641365
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvBuildIndexProgressResponse::TPtr& progress);
13651366
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvSampleKResponse::TPtr& sampleK);
13661367
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvReshuffleKMeansResponse::TPtr& reshuffle);
1368+
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvLocalKMeansResponse::TPtr& local);
13671369
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& upload);
13681370
NTabletFlatExecutor::ITransaction* CreatePipeRetry(TIndexBuildId indexBuildId, TTabletId tabletId);
13691371
NTabletFlatExecutor::ITransaction* CreateTxBilling(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev);
@@ -1377,6 +1379,7 @@ class TSchemeShard
13771379
void Handle(TEvDataShard::TEvBuildIndexProgressResponse::TPtr& ev, const TActorContext& ctx);
13781380
void Handle(TEvDataShard::TEvSampleKResponse::TPtr& ev, const TActorContext& ctx);
13791381
void Handle(TEvDataShard::TEvReshuffleKMeansResponse::TPtr& ev, const TActorContext& ctx);
1382+
void Handle(TEvDataShard::TEvLocalKMeansResponse::TPtr& ev, const TActorContext& ctx);
13801383
void Handle(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& ev, const TActorContext& ctx);
13811384

13821385
void Handle(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev, const TActorContext& ctx);

ydb/core/tx/schemeshard/schemeshard_info_types.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -3015,7 +3015,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
30153015
Sample = 0,
30163016
// Recompute,
30173017
Reshuffle,
3018-
// Local,
3018+
Local,
30193019
};
30203020
ui32 Level = 0;
30213021

ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {
128128
auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId);
129129
UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE);
130130

131-
const TString meteringData = R"({"usage":{"start":2,"quantity":330,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-0-0-200-404-1290-2686","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n";
131+
const TString meteringData = R"({"usage":{"start":2,"quantity":128,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-0-0-200-0-1290-0","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n";
132132

133133
UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData);
134134

0 commit comments

Comments
 (0)