Skip to content

Commit 8404d71

Browse files
authored
Call FillLocalKMeans for prefixed index explicitly (#17321)
1 parent 307663a commit 8404d71

File tree

3 files changed

+187
-54
lines changed

3 files changed

+187
-54
lines changed

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

+92
Original file line numberDiff line numberDiff line change
@@ -3103,6 +3103,98 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
31033103
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
31043104
}
31053105

3106+
Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineDistanceNotNullableLevel3) {
3107+
NKikimrConfig::TFeatureFlags featureFlags;
3108+
featureFlags.SetEnableVectorIndex(true);
3109+
auto setting = NKikimrKqp::TKqpSetting();
3110+
auto serverSettings = TKikimrSettings()
3111+
.SetFeatureFlags(featureFlags)
3112+
.SetKqpSettings({setting});
3113+
3114+
TKikimrRunner kikimr(serverSettings);
3115+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE);
3116+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
3117+
3118+
auto db = kikimr.GetTableClient();
3119+
auto session = DoCreateTableForPrefixedVectorIndex(db, false);
3120+
{
3121+
const TString createIndex(Q_(R"(
3122+
ALTER TABLE `/Root/TestTable`
3123+
ADD INDEX index
3124+
GLOBAL USING vector_kmeans_tree
3125+
ON (user, emb)
3126+
WITH (distance=cosine, vector_type="uint8", vector_dimension=2, levels=3, clusters=2);
3127+
)"));
3128+
3129+
auto result = session.ExecuteSchemeQuery(createIndex)
3130+
.ExtractValueSync();
3131+
3132+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3133+
}
3134+
{
3135+
auto result = session.DescribeTable("/Root/TestTable").ExtractValueSync();
3136+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
3137+
const auto& indexes = result.GetTableDescription().GetIndexDescriptions();
3138+
UNIT_ASSERT_EQUAL(indexes.size(), 1);
3139+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexName(), "index");
3140+
std::vector<std::string> indexKeyColumns{"user", "emb"};
3141+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexColumns(), indexKeyColumns);
3142+
const auto& settings = std::get<TKMeansTreeSettings>(indexes[0].GetIndexSettings());
3143+
UNIT_ASSERT_EQUAL(settings.Settings.Metric, NYdb::NTable::TVectorIndexSettings::EMetric::CosineDistance);
3144+
UNIT_ASSERT_EQUAL(settings.Settings.VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Uint8);
3145+
UNIT_ASSERT_EQUAL(settings.Settings.VectorDimension, 2);
3146+
UNIT_ASSERT_EQUAL(settings.Levels, 3);
3147+
UNIT_ASSERT_EQUAL(settings.Clusters, 2);
3148+
}
3149+
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
3150+
}
3151+
3152+
Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineDistanceNotNullableLevel4) {
3153+
NKikimrConfig::TFeatureFlags featureFlags;
3154+
featureFlags.SetEnableVectorIndex(true);
3155+
auto setting = NKikimrKqp::TKqpSetting();
3156+
auto serverSettings = TKikimrSettings()
3157+
.SetFeatureFlags(featureFlags)
3158+
.SetKqpSettings({setting});
3159+
3160+
TKikimrRunner kikimr(serverSettings);
3161+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE);
3162+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
3163+
3164+
auto db = kikimr.GetTableClient();
3165+
auto session = DoCreateTableForPrefixedVectorIndex(db, false);
3166+
{
3167+
const TString createIndex(Q_(R"(
3168+
ALTER TABLE `/Root/TestTable`
3169+
ADD INDEX index
3170+
GLOBAL USING vector_kmeans_tree
3171+
ON (user, emb)
3172+
WITH (distance=cosine, vector_type="uint8", vector_dimension=2, levels=4, clusters=2);
3173+
)"));
3174+
3175+
auto result = session.ExecuteSchemeQuery(createIndex)
3176+
.ExtractValueSync();
3177+
3178+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3179+
}
3180+
{
3181+
auto result = session.DescribeTable("/Root/TestTable").ExtractValueSync();
3182+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
3183+
const auto& indexes = result.GetTableDescription().GetIndexDescriptions();
3184+
UNIT_ASSERT_EQUAL(indexes.size(), 1);
3185+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexName(), "index");
3186+
std::vector<std::string> indexKeyColumns{"user", "emb"};
3187+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexColumns(), indexKeyColumns);
3188+
const auto& settings = std::get<TKMeansTreeSettings>(indexes[0].GetIndexSettings());
3189+
UNIT_ASSERT_EQUAL(settings.Settings.Metric, NYdb::NTable::TVectorIndexSettings::EMetric::CosineDistance);
3190+
UNIT_ASSERT_EQUAL(settings.Settings.VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Uint8);
3191+
UNIT_ASSERT_EQUAL(settings.Settings.VectorDimension, 2);
3192+
UNIT_ASSERT_EQUAL(settings.Levels, 4);
3193+
UNIT_ASSERT_EQUAL(settings.Clusters, 2);
3194+
}
3195+
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
3196+
}
3197+
31063198
Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineSimilarityNotNullableLevel2) {
31073199
NKikimrConfig::TFeatureFlags featureFlags;
31083200
featureFlags.SetEnableVectorIndex(true);

ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp

+71-37
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateBuildPropose(
370370
static constexpr std::string_view LogPrefix = "Create build table boundaries for ";
371371
LOG_D(buildInfo.Id << " table " << suffix
372372
<< ", count: " << count << ", parts: " << parts << ", step: " << step
373-
<< ", kmeans: " << buildInfo.KMeansTreeToDebugStr());
373+
<< ", " << buildInfo.DebugString());
374374
if (parts > 1) {
375375
const auto from = buildInfo.KMeans.ChildBegin;
376376
for (auto i = from + step, e = from + count; i < e; i += step) {
@@ -699,7 +699,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
699699
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
700700
}
701701

702-
void SendBuildIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
702+
void SendBuildSecondaryIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
703703
auto ev = MakeHolder<TEvDataShard::TEvBuildIndexCreateRequest>();
704704
ev->Record.SetBuildIndexId(ui64(BuildId));
705705

@@ -807,12 +807,20 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
807807
}
808808
}
809809

810-
bool FillTable(TIndexBuildInfo& buildInfo) {
810+
bool FillSecondaryIndex(TIndexBuildInfo& buildInfo) {
811+
LOG_D("FillSecondaryIndex Start");
812+
811813
if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty()) {
812814
AddAllShards(buildInfo);
813815
}
814-
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendBuildIndexRequest(shardIdx, buildInfo); }) &&
816+
auto done = SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendBuildSecondaryIndexRequest(shardIdx, buildInfo); }) &&
815817
buildInfo.DoneShards.size() == buildInfo.Shards.size();
818+
819+
if (done) {
820+
LOG_D("FillSecondaryIndex Done");
821+
}
822+
823+
return done;
816824
}
817825

818826
bool FillPrefixKMeans(TIndexBuildInfo& buildInfo) {
@@ -823,6 +831,14 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
823831
buildInfo.DoneShards.size() == buildInfo.Shards.size();
824832
}
825833

834+
bool FillLocalKMeans(TIndexBuildInfo& buildInfo) {
835+
if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty()) {
836+
AddAllShards(buildInfo);
837+
}
838+
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendKMeansLocalRequest(shardIdx, buildInfo); }) &&
839+
buildInfo.DoneShards.size() == buildInfo.Shards.size();
840+
}
841+
826842
bool InitSingleKMeans(TIndexBuildInfo& buildInfo) {
827843
if (!buildInfo.DoneShards.empty() || !buildInfo.InProgressShards.empty() || !buildInfo.ToUploadShards.empty()) {
828844
return false;
@@ -934,102 +950,112 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
934950
);
935951
}
936952

937-
bool FillVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
938-
if (buildInfo.IsBuildPrefixedVectorIndex() && buildInfo.KMeans.Level == 1) {
939-
LOG_D("FillIndex::Prefixed::Level1::Start " << buildInfo.KMeansTreeToDebugStr());
940-
if (!FillTable(buildInfo)) {
953+
bool FillPrefixedVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
954+
LOG_D("FillPrefixedVectorIndex Start " << buildInfo.DebugString());
955+
956+
if (buildInfo.KMeans.Level == 1) {
957+
if (!FillSecondaryIndex(buildInfo)) {
941958
return false;
942959
}
943-
const ui64 doneShards = buildInfo.DoneShards.size();
960+
LOG_D("FillPrefixedVectorIndex DoneLevel " << buildInfo.DebugString());
944961

962+
const ui64 doneShards = buildInfo.DoneShards.size();
945963
ClearDoneShards(txc, buildInfo);
946964
// it's approximate but upper bound, so it's ok
947965
buildInfo.KMeans.TableSize = std::max<ui64>(1, buildInfo.Processed.GetUploadRows());
948966
buildInfo.KMeans.PrefixIndexDone(doneShards);
967+
LOG_D("FillPrefixedVectorIndex PrefixIndexDone " << buildInfo.DebugString());
968+
949969
PersistKMeansState(txc, buildInfo);
950970
NIceDb::TNiceDb db{txc.DB};
951971
Self->PersistBuildIndexUploadReset(db, buildInfo);
952-
LOG_D("FillIndex::Prefixed::Level1::Done " << buildInfo.KMeansTreeToDebugStr());
953972
ChangeState(BuildId, TIndexBuildInfo::EState::CreateBuild);
954973
Progress(BuildId);
955974
return false;
956-
}
957-
958-
if (buildInfo.IsBuildPrefixedVectorIndex() && buildInfo.KMeans.Level == 2) {
959-
LOG_D("FillIndex::Prefixed::Level2::Start " << buildInfo.KMeansTreeToDebugStr());
960-
if (!FillPrefixKMeans(buildInfo)) {
975+
} else {
976+
bool filled = buildInfo.KMeans.Level == 2
977+
? FillPrefixKMeans(buildInfo)
978+
: FillLocalKMeans(buildInfo);
979+
if (!filled) {
961980
return false;
962981
}
982+
LOG_D("FillPrefixedVectorIndex DoneLevel " << buildInfo.DebugString());
963983

964984
ClearDoneShards(txc, buildInfo);
965985
Y_ASSERT(buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::MultiLocal);
966986
const bool needsAnotherLevel = buildInfo.KMeans.NextLevel();
967987
buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::MultiLocal;
968-
buildInfo.KMeans.Parent = buildInfo.KMeans.ParentEnd();
988+
if (buildInfo.KMeans.Level == 2) {
989+
buildInfo.KMeans.Parent = buildInfo.KMeans.ParentEnd();
990+
}
991+
LOG_D("FillPrefixedVectorIndex NextLevel " << buildInfo.DebugString());
992+
969993
PersistKMeansState(txc, buildInfo);
970994
NIceDb::TNiceDb db{txc.DB};
971995
Self->PersistBuildIndexUploadReset(db, buildInfo);
972-
LOG_D("FillIndex::Prefixed::Level2::Done " << buildInfo.KMeansTreeToDebugStr());
973996
if (!needsAnotherLevel) {
997+
LOG_D("FillPrefixedVectorIndex Done " << buildInfo.DebugString());
974998
return true;
975999
}
9761000
ChangeState(BuildId, TIndexBuildInfo::EState::DropBuild);
9771001
Progress(BuildId);
9781002
return false;
9791003
}
1004+
}
1005+
1006+
bool FillVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
1007+
LOG_D("FillVectorIndex Start " << buildInfo.DebugString());
9801008

9811009
if (buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Upload) {
9821010
return false;
9831011
}
9841012
if (InitSingleKMeans(buildInfo)) {
985-
LOG_D("FillIndex::SingleKMeans::Start " << buildInfo.KMeansTreeToDebugStr());
1013+
LOG_D("FillVectorIndex SingleKMeans " << buildInfo.DebugString());
9861014
}
9871015
if (!SendVectorIndex(buildInfo)) {
9881016
return false;
9891017
}
9901018

991-
LOG_D("FillIndex::SendVectorIndex::Done " << buildInfo.KMeansTreeToDebugStr());
9921019
if (!buildInfo.Sample.Rows.empty()) {
9931020
if (buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Collect) {
994-
LOG_D("FillIndex::SendSample::Start " << buildInfo.KMeansTreeToDebugStr());
1021+
LOG_D("FillVectorIndex SendUploadSampleKRequest " << buildInfo.DebugString());
9951022
SendUploadSampleKRequest(buildInfo);
9961023
return false;
9971024
}
998-
LOG_D("FillIndex::SendSample::Done " << buildInfo.KMeansTreeToDebugStr());
9991025
}
10001026

1001-
LOG_D("FillIndex::ClearDoneShards " << buildInfo.KMeansTreeToDebugStr());
1027+
LOG_D("FillVectorIndex DoneLevel " << buildInfo.DebugString());
10021028
ClearDoneShards(txc, buildInfo);
10031029

10041030
if (!buildInfo.Sample.Rows.empty()) {
10051031
if (buildInfo.KMeans.NextState()) {
1006-
LOG_D("FillIndex::NextState::Start " << buildInfo.KMeansTreeToDebugStr());
1032+
LOG_D("FillVectorIndex NextState " << buildInfo.DebugString());
10071033
PersistKMeansState(txc, buildInfo);
10081034
Progress(BuildId);
10091035
return false;
10101036
}
10111037
buildInfo.Sample.Clear();
10121038
NIceDb::TNiceDb db{txc.DB};
10131039
Self->PersistBuildIndexSampleForget(db, buildInfo);
1014-
LOG_D("FillIndex::NextState::Done " << buildInfo.KMeansTreeToDebugStr());
1040+
LOG_D("FillVectorIndex DoneState " << buildInfo.DebugString());
10151041
}
10161042

10171043
if (buildInfo.KMeans.NextParent()) {
1018-
LOG_D("FillIndex::NextParent::Start " << buildInfo.KMeansTreeToDebugStr());
1044+
LOG_D("FillVectorIndex NextParent " << buildInfo.DebugString());
10191045
PersistKMeansState(txc, buildInfo);
10201046
Progress(BuildId);
10211047
return false;
10221048
}
10231049

10241050
if (InitMultiKMeans(buildInfo)) {
1025-
LOG_D("FillIndex::MultiKMeans::Start " << buildInfo.KMeansTreeToDebugStr());
1051+
LOG_D("FillVectorIndex MultiKMeans " << buildInfo.DebugString());
10261052
PersistKMeansState(txc, buildInfo);
10271053
Progress(BuildId);
10281054
return false;
10291055
}
10301056

10311057
if (buildInfo.KMeans.NextLevel()) {
1032-
LOG_D("FillIndex::NextLevel::Start " << buildInfo.KMeansTreeToDebugStr());
1058+
LOG_D("FillVectorIndex NextLevel " << buildInfo.DebugString());
10331059
PersistKMeansState(txc, buildInfo);
10341060
NIceDb::TNiceDb db{txc.DB};
10351061
Self->PersistBuildIndexUploadReset(db, buildInfo);
@@ -1039,7 +1065,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
10391065
Progress(BuildId);
10401066
return false;
10411067
}
1042-
LOG_D("FillIndex::Done " << buildInfo.KMeansTreeToDebugStr());
1068+
LOG_D("FillVectorIndex Done " << buildInfo.DebugString());
10431069
return true;
10441070
}
10451071

@@ -1057,15 +1083,20 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
10571083
Y_ABORT_UNLESS(buildInfo.SnapshotStep);
10581084
}
10591085
if (buildInfo.Shards.empty()) {
1060-
LOG_D("FillIndex::InitiateShards " << buildInfo.KMeansTreeToDebugStr());
10611086
NIceDb::TNiceDb db(txc.DB);
10621087
InitiateShards(db, buildInfo);
10631088
}
1064-
if (buildInfo.IsBuildVectorIndex()) {
1065-
return FillVectorIndex(txc, buildInfo);
1066-
} else {
1067-
Y_ASSERT(buildInfo.IsBuildSecondaryIndex() || buildInfo.IsBuildColumns());
1068-
return FillTable(buildInfo);
1089+
switch (buildInfo.BuildKind) {
1090+
case TIndexBuildInfo::EBuildKind::BuildSecondaryIndex:
1091+
case TIndexBuildInfo::EBuildKind::BuildColumns:
1092+
return FillSecondaryIndex(buildInfo);
1093+
case TIndexBuildInfo::EBuildKind::BuildVectorIndex:
1094+
return FillVectorIndex(txc, buildInfo);
1095+
case TIndexBuildInfo::EBuildKind::BuildPrefixedVectorIndex:
1096+
return FillPrefixedVectorIndex(txc, buildInfo);
1097+
default:
1098+
Y_ASSERT(false);
1099+
return true;
10691100
}
10701101
}
10711102

@@ -1302,6 +1333,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
13021333
}
13031334

13041335
void InitiateShards(NIceDb::TNiceDb& db, TIndexBuildInfo& buildInfo) {
1336+
LOG_D("InitiateShards " << buildInfo.DebugString());
1337+
13051338
Y_ASSERT(buildInfo.Shards.empty());
13061339
Y_ASSERT(buildInfo.ToUploadShards.empty());
13071340
Y_ASSERT(buildInfo.InProgressShards.empty());
@@ -1317,15 +1350,16 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
13171350
auto tableColumns = NTableIndex::ExtractInfo(table); // skip dropped columns
13181351
TSerializedTableRange shardRange = InfiniteRange(tableColumns.Keys.size());
13191352
static constexpr std::string_view LogPrefix = "";
1320-
LOG_D("infinite range " << buildInfo.KMeans.RangeToDebugStr(shardRange, buildInfo.IsBuildPrefixedVectorIndex() ? 2 : 1));
13211353

13221354
buildInfo.Cluster2Shards.clear();
13231355
for (const auto& x: table->GetPartitions()) {
13241356
Y_ABORT_UNLESS(Self->ShardInfos.contains(x.ShardIdx));
13251357
TSerializedCellVec bound{x.EndOfRange};
13261358
shardRange.To = bound;
1327-
LOG_D("shard " << x.ShardIdx << " range " << buildInfo.KMeans.RangeToDebugStr(shardRange, buildInfo.IsBuildPrefixedVectorIndex() ? 2 : 1));
1328-
buildInfo.AddParent(shardRange, x.ShardIdx);
1359+
if (buildInfo.BuildKind == TIndexBuildInfo::EBuildKind::BuildVectorIndex) {
1360+
LOG_D("shard " << x.ShardIdx << " range " << buildInfo.KMeans.RangeToDebugStr(shardRange));
1361+
buildInfo.AddParent(shardRange, x.ShardIdx);
1362+
}
13291363
auto [it, emplaced] = buildInfo.Shards.emplace(x.ShardIdx, TIndexBuildInfo::TShardStatus{std::move(shardRange), "", buildInfo.Shards.size()});
13301364
Y_ASSERT(emplaced);
13311365
shardRange.From = std::move(bound);

0 commit comments

Comments
 (0)