Skip to content

Vector index coordination: enable local kmeans case #10497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ void TSchemeShard::Handle(TEvDataShard::TEvReshuffleKMeansResponse::TPtr& ev, co
Execute(CreateTxReply(ev), ctx);
}

void TSchemeShard::Handle(TEvDataShard::TEvLocalKMeansResponse::TPtr& ev, const TActorContext& ctx) {
Execute(CreateTxReply(ev), ctx);
}

void TSchemeShard::Handle(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& ev, const TActorContext& ctx) {
Execute(CreateTxReply(ev), ctx);
}
Expand Down
185 changes: 178 additions & 7 deletions ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,46 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
}

void SendKMeansLocalRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
Y_ASSERT(buildInfo.IsBuildVectorIndex());
auto ev = MakeHolder<TEvDataShard::TEvLocalKMeansRequest>();
ev->Record.SetId(ui64(BuildId));

auto path = TPath::Init(buildInfo.TablePathId, Self).Dive(buildInfo.IndexName);
if (buildInfo.KMeans.Level == 0) {
PathIdFromPathId(buildInfo.TablePathId, ev->Record.MutablePathId());
} else {
PathIdFromPathId(path.Dive(buildInfo.KMeans.ReadFrom())->PathId, ev->Record.MutablePathId());
path.Rise();
}
*ev->Record.MutableSettings() = std::get<NKikimrSchemeOp::TVectorIndexKmeansTreeDescription>(
buildInfo.SpecializedIndexDescription).GetSettings().settings();
ev->Record.SetK(buildInfo.KMeans.K);
ev->Record.SetUpload(buildInfo.KMeans.GetUpload());
ev->Record.SetState(NKikimrTxDataShard::TEvLocalKMeansRequest::SAMPLE);

ev->Record.SetDoneRounds(0);
ev->Record.SetNeedsRounds(3); // TODO(mbkkt) should be configurable

ev->Record.SetParent(buildInfo.KMeans.Parent);
ev->Record.SetChild(buildInfo.KMeans.ChildBegin);

ev->Record.SetPostingName(path.Dive(buildInfo.KMeans.WriteTo()).PathString());
path.Rise().Dive(NTableIndex::NTableVectorKmeansTreeIndex::LevelTable);
ev->Record.SetLevelName(path.PathString());

ev->Record.SetEmbeddingColumn(buildInfo.IndexColumns[0]);
*ev->Record.MutableDataColumns() = {
buildInfo.DataColumns.begin(), buildInfo.DataColumns.end()
};

auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo);
ev->Record.SetSeed(ui64(shardId));
LOG_D("TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record.ShortDebugString());

ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
}

void SendBuildIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
auto ev = MakeHolder<TEvDataShard::TEvBuildIndexCreateRequest>();
ev->Record.SetBuildIndexId(ui64(BuildId));
Expand Down Expand Up @@ -719,10 +759,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
Y_ABORT("Unreachable");
}
}
// TODO(mbkkt) enable detection of Local case
// if (buildInfo.ToUploadShards.size() == 1 && buildInfo.DoneShardsSize == 0) {
// buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Local;
// }
if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.size() == 1) {
buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Local;
}
}

bool SendKMeansSample(TIndexBuildInfo& buildInfo) {
Expand All @@ -739,6 +778,10 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendKMeansReshuffleRequest(shardIdx, buildInfo); });
}

bool SendKMeansLocal(TIndexBuildInfo& buildInfo) {
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendKMeansLocalRequest(shardIdx, buildInfo); });
}

bool SendVectorIndex(TIndexBuildInfo& buildInfo) {
switch (buildInfo.KMeans.State) {
case TIndexBuildInfo::TKMeans::Sample:
Expand All @@ -748,9 +791,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
// return SendKMeansRecompute(buildInfo);
case TIndexBuildInfo::TKMeans::Reshuffle:
return SendKMeansReshuffle(buildInfo);
// TODO(mbkkt)
// case TIndexBuildInfo::TKMeans::Local:
// return SendKMeansLocal(buildInfo);
case TIndexBuildInfo::TKMeans::Local:
return SendKMeansLocal(buildInfo);
}
return true;
}
Expand Down Expand Up @@ -1357,6 +1399,131 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex
}
};

struct TSchemeShard::TIndexBuilder::TTxReplyLocalKMeans: public TSchemeShard::TIndexBuilder::TTxReply {
private:
TEvDataShard::TEvLocalKMeansResponse::TPtr Local;

public:
explicit TTxReplyLocalKMeans(TSelf* self, TEvDataShard::TEvLocalKMeansResponse::TPtr& local)
: TTxReply(self)
, Local(local)
{
}

bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override {
auto& record = Local->Get()->Record;

LOG_I("TTxReply : TEvLocalKMeansResponse, Id# " << record.GetId());

const auto buildId = TIndexBuildId(record.GetId());
const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId);
if (!buildInfoPtr) {
return true;
}
auto& buildInfo = *buildInfoPtr->Get();
LOG_D("TTxReply : TEvLocalKMeansResponse"
<< ", TIndexBuildInfo: " << buildInfo
<< ", record: " << record.ShortDebugString());

TTabletId shardId = TTabletId(record.GetTabletId());
if (!Self->TabletIdToShardIdx.contains(shardId)) {
return true;
}

TShardIdx shardIdx = Self->TabletIdToShardIdx.at(shardId);
if (!buildInfo.Shards.contains(shardIdx)) {
return true;
}

switch (const auto state = buildInfo.State; state) {
case TIndexBuildInfo::EState::Filling:
{
TIndexBuildInfo::TShardStatus& shardStatus = buildInfo.Shards.at(shardIdx);

auto actualSeqNo = std::pair<ui64, ui64>(Self->Generation(), shardStatus.SeqNoRound);
auto recordSeqNo = std::pair<ui64, ui64>(record.GetRequestSeqNoGeneration(), record.GetRequestSeqNoRound());

if (actualSeqNo != recordSeqNo) {
LOG_D("TTxReply : TEvLocalKMeansResponse"
<< " ignore progress message by seqNo"
<< ", TIndexBuildInfo: " << buildInfo
<< ", actual seqNo for the shard " << shardId << " (" << shardIdx << ") is: " << Self->Generation() << ":" << shardStatus.SeqNoRound
<< ", record: " << record.ShortDebugString());
Y_ABORT_UNLESS(actualSeqNo > recordSeqNo);
return true;
}

TBillingStats stats{record.GetUploadRows(), record.GetUploadBytes(), record.GetReadRows(), record.GetReadBytes()};
shardStatus.Processed += stats;
buildInfo.Processed += stats;

NYql::TIssues issues;
NYql::IssuesFromMessage(record.GetIssues(), issues);
shardStatus.DebugMessage = issues.ToString();

NIceDb::TNiceDb db(txc.DB);
shardStatus.Status = record.GetStatus();

switch (shardStatus.Status) {
case NKikimrIndexBuilder::EBuildStatus::DONE:
if (buildInfo.InProgressShards.erase(shardIdx)) {
buildInfo.DoneShards.emplace_back(shardIdx);
}
break;
case NKikimrIndexBuilder::EBuildStatus::ABORTED:
// datashard gracefully rebooted, reschedule shard
if (buildInfo.InProgressShards.erase(shardIdx)) {
buildInfo.ToUploadShards.emplace_front(shardIdx);
}
break;
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
buildInfo.Issue += TStringBuilder()
<< "One of the shards report "<< shardStatus.Status
<< " at Filling stage, process has to be canceled"
<< ", shardId: " << shardId
<< ", shardIdx: " << shardIdx;
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);

Progress(buildId);
return true;
case NKikimrIndexBuilder::EBuildStatus::INVALID:
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
Y_ABORT("Unreachable");
}
Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
Self->IndexBuildPipes.Close(buildId, shardId, ctx);
Progress(buildId);
break;
}
case TIndexBuildInfo::EState::AlterMainTable:
case TIndexBuildInfo::EState::Invalid:
case TIndexBuildInfo::EState::Locking:
case TIndexBuildInfo::EState::GatheringStatistics:
case TIndexBuildInfo::EState::Initiating:
case TIndexBuildInfo::EState::DropBuild:
case TIndexBuildInfo::EState::CreateBuild:
case TIndexBuildInfo::EState::Applying:
case TIndexBuildInfo::EState::Unlocking:
case TIndexBuildInfo::EState::Done:
Y_FAIL_S("Unreachable " << Name(state));
case TIndexBuildInfo::EState::Cancellation_Applying:
case TIndexBuildInfo::EState::Cancellation_Unlocking:
case TIndexBuildInfo::EState::Cancelled:
case TIndexBuildInfo::EState::Rejection_Applying:
case TIndexBuildInfo::EState::Rejection_Unlocking:
case TIndexBuildInfo::EState::Rejected:
LOG_D("TTxReply : TEvLocalKMeansResponse"
<< " superfluous message " << record.ShortDebugString());
break;
}

return true;
}
};

struct TSchemeShard::TIndexBuilder::TTxReplyReshuffleKMeans: public TSchemeShard::TIndexBuilder::TTxReply {
private:
TEvDataShard::TEvReshuffleKMeansResponse::TPtr Reshuffle;
Expand Down Expand Up @@ -2112,6 +2279,10 @@ ITransaction* TSchemeShard::CreateTxReply(TEvDataShard::TEvReshuffleKMeansRespon
return new TIndexBuilder::TTxReplyReshuffleKMeans(this, reshuffle);
}

ITransaction* TSchemeShard::CreateTxReply(TEvDataShard::TEvLocalKMeansResponse::TPtr& local) {
return new TIndexBuilder::TTxReplyLocalKMeans(this, local);
}

ITransaction* TSchemeShard::CreateTxReply(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& upload) {
return new TIndexBuilder::TTxReplyUpload(this, upload);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4758,6 +4758,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvPrivate::TEvIndexBuildingMakeABill, Handle);
HFuncTraced(TEvDataShard::TEvSampleKResponse, Handle);
HFuncTraced(TEvDataShard::TEvReshuffleKMeansResponse, Handle);
HFuncTraced(TEvDataShard::TEvLocalKMeansResponse, Handle);
HFuncTraced(TEvIndexBuilder::TEvUploadSampleKResponse, Handle);
// } // NIndexBuilder

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,7 @@ class TSchemeShard
struct TTxReplyRetry;
struct TTxReplySampleK;
struct TTxReplyReshuffleKMeans;
struct TTxReplyLocalKMeans;
struct TTxReplyUpload;

struct TTxPipeReset;
Expand All @@ -1364,6 +1365,7 @@ class TSchemeShard
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvBuildIndexProgressResponse::TPtr& progress);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvSampleKResponse::TPtr& sampleK);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvReshuffleKMeansResponse::TPtr& reshuffle);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvLocalKMeansResponse::TPtr& local);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& upload);
NTabletFlatExecutor::ITransaction* CreatePipeRetry(TIndexBuildId indexBuildId, TTabletId tabletId);
NTabletFlatExecutor::ITransaction* CreateTxBilling(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev);
Expand All @@ -1377,6 +1379,7 @@ class TSchemeShard
void Handle(TEvDataShard::TEvBuildIndexProgressResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvSampleKResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvReshuffleKMeansResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvLocalKMeansResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& ev, const TActorContext& ctx);

void Handle(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev, const TActorContext& ctx);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3015,7 +3015,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
Sample = 0,
// Recompute,
Reshuffle,
// Local,
Local,
};
ui32 Level = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {
auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId);
UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE);

const TString meteringData = R"({"usage":{"start":2,"quantity":330,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-0-0-200-404-1290-2686","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n";
const TString meteringData = R"({"usage":{"start":2,"quantity":128,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-0-0-200-0-1290-0","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n";

UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData);

Expand Down
Loading