Skip to content

Commit e644918

Browse files
authored
Merge 7994fa3 into 0f80a46
2 parents 0f80a46 + 7994fa3 commit e644918

File tree

3 files changed

+142
-26
lines changed

3 files changed

+142
-26
lines changed

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

+92
Original file line numberDiff line numberDiff line change
@@ -3103,6 +3103,98 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
31033103
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
31043104
}
31053105

3106+
Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineDistanceNotNullableLevel3) {
3107+
NKikimrConfig::TFeatureFlags featureFlags;
3108+
featureFlags.SetEnableVectorIndex(true);
3109+
auto setting = NKikimrKqp::TKqpSetting();
3110+
auto serverSettings = TKikimrSettings()
3111+
.SetFeatureFlags(featureFlags)
3112+
.SetKqpSettings({setting});
3113+
3114+
TKikimrRunner kikimr(serverSettings);
3115+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE);
3116+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
3117+
3118+
auto db = kikimr.GetTableClient();
3119+
auto session = DoCreateTableForPrefixedVectorIndex(db, false);
3120+
{
3121+
const TString createIndex(Q_(R"(
3122+
ALTER TABLE `/Root/TestTable`
3123+
ADD INDEX index
3124+
GLOBAL USING vector_kmeans_tree
3125+
ON (user, emb)
3126+
WITH (distance=cosine, vector_type="uint8", vector_dimension=2, levels=3, clusters=2);
3127+
)"));
3128+
3129+
auto result = session.ExecuteSchemeQuery(createIndex)
3130+
.ExtractValueSync();
3131+
3132+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3133+
}
3134+
{
3135+
auto result = session.DescribeTable("/Root/TestTable").ExtractValueSync();
3136+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
3137+
const auto& indexes = result.GetTableDescription().GetIndexDescriptions();
3138+
UNIT_ASSERT_EQUAL(indexes.size(), 1);
3139+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexName(), "index");
3140+
std::vector<std::string> indexKeyColumns{"user", "emb"};
3141+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexColumns(), indexKeyColumns);
3142+
const auto& settings = std::get<TKMeansTreeSettings>(indexes[0].GetIndexSettings());
3143+
UNIT_ASSERT_EQUAL(settings.Settings.Metric, NYdb::NTable::TVectorIndexSettings::EMetric::CosineDistance);
3144+
UNIT_ASSERT_EQUAL(settings.Settings.VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Uint8);
3145+
UNIT_ASSERT_EQUAL(settings.Settings.VectorDimension, 2);
3146+
UNIT_ASSERT_EQUAL(settings.Levels, 3);
3147+
UNIT_ASSERT_EQUAL(settings.Clusters, 2);
3148+
}
3149+
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
3150+
}
3151+
3152+
Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineDistanceNotNullableLevel4) {
3153+
NKikimrConfig::TFeatureFlags featureFlags;
3154+
featureFlags.SetEnableVectorIndex(true);
3155+
auto setting = NKikimrKqp::TKqpSetting();
3156+
auto serverSettings = TKikimrSettings()
3157+
.SetFeatureFlags(featureFlags)
3158+
.SetKqpSettings({setting});
3159+
3160+
TKikimrRunner kikimr(serverSettings);
3161+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE);
3162+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
3163+
3164+
auto db = kikimr.GetTableClient();
3165+
auto session = DoCreateTableForPrefixedVectorIndex(db, false);
3166+
{
3167+
const TString createIndex(Q_(R"(
3168+
ALTER TABLE `/Root/TestTable`
3169+
ADD INDEX index
3170+
GLOBAL USING vector_kmeans_tree
3171+
ON (user, emb)
3172+
WITH (distance=cosine, vector_type="uint8", vector_dimension=2, levels=4, clusters=2);
3173+
)"));
3174+
3175+
auto result = session.ExecuteSchemeQuery(createIndex)
3176+
.ExtractValueSync();
3177+
3178+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3179+
}
3180+
{
3181+
auto result = session.DescribeTable("/Root/TestTable").ExtractValueSync();
3182+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
3183+
const auto& indexes = result.GetTableDescription().GetIndexDescriptions();
3184+
UNIT_ASSERT_EQUAL(indexes.size(), 1);
3185+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexName(), "index");
3186+
std::vector<std::string> indexKeyColumns{"user", "emb"};
3187+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexColumns(), indexKeyColumns);
3188+
const auto& settings = std::get<TKMeansTreeSettings>(indexes[0].GetIndexSettings());
3189+
UNIT_ASSERT_EQUAL(settings.Settings.Metric, NYdb::NTable::TVectorIndexSettings::EMetric::CosineDistance);
3190+
UNIT_ASSERT_EQUAL(settings.Settings.VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Uint8);
3191+
UNIT_ASSERT_EQUAL(settings.Settings.VectorDimension, 2);
3192+
UNIT_ASSERT_EQUAL(settings.Levels, 4);
3193+
UNIT_ASSERT_EQUAL(settings.Clusters, 2);
3194+
}
3195+
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
3196+
}
3197+
31063198
Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineSimilarityNotNullableLevel2) {
31073199
NKikimrConfig::TFeatureFlags featureFlags;
31083200
featureFlags.SetEnableVectorIndex(true);

ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp

+44-21
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
807807
}
808808
}
809809

810-
bool FillTable(TIndexBuildInfo& buildInfo) {
810+
bool FillSecondaryIndex(TIndexBuildInfo& buildInfo) {
811811
if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty()) {
812812
AddAllShards(buildInfo);
813813
}
@@ -823,6 +823,14 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
823823
buildInfo.DoneShards.size() == buildInfo.Shards.size();
824824
}
825825

826+
bool FillLocalKMeans(TIndexBuildInfo& buildInfo) {
827+
if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty()) {
828+
AddAllShards(buildInfo);
829+
}
830+
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendKMeansLocalRequest(shardIdx, buildInfo); }) &&
831+
buildInfo.DoneShards.size() == buildInfo.Shards.size();
832+
}
833+
826834
bool InitSingleKMeans(TIndexBuildInfo& buildInfo) {
827835
if (!buildInfo.DoneShards.empty() || !buildInfo.InProgressShards.empty() || !buildInfo.ToUploadShards.empty()) {
828836
return false;
@@ -934,50 +942,58 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
934942
);
935943
}
936944

937-
bool FillVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
938-
if (buildInfo.IsBuildPrefixedVectorIndex() && buildInfo.KMeans.Level == 1) {
939-
LOG_D("FillIndex::Prefixed::Level1::Start " << buildInfo.KMeansTreeToDebugStr());
940-
if (!FillTable(buildInfo)) {
945+
bool FillPrefixedVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
946+
Y_ASSERT(buildInfo.IsBuildPrefixedVectorIndex());
947+
LOG_D("FillPrefixedIndex::Level::Start " << buildInfo.KMeansTreeToDebugStr());
948+
949+
if (buildInfo.KMeans.Level == 1) {
950+
if (!FillSecondaryIndex(buildInfo)) {
941951
return false;
942952
}
943-
const ui64 doneShards = buildInfo.DoneShards.size();
953+
LOG_D("FillPrefixedIndex::Level::Done " << buildInfo.KMeansTreeToDebugStr());
944954

955+
const ui64 doneShards = buildInfo.DoneShards.size();
945956
ClearDoneShards(txc, buildInfo);
946957
// it's approximate but upper bound, so it's ok
947958
buildInfo.KMeans.TableSize = std::max<ui64>(1, buildInfo.Processed.GetUploadRows());
948959
buildInfo.KMeans.PrefixIndexDone(doneShards);
960+
LOG_D("FillIndex::Prefixed::NextLevel " << buildInfo.KMeansTreeToDebugStr());
961+
949962
PersistKMeansState(txc, buildInfo);
950963
NIceDb::TNiceDb db{txc.DB};
951964
Self->PersistBuildIndexUploadReset(db, buildInfo);
952-
LOG_D("FillIndex::Prefixed::Level1::Done " << buildInfo.KMeansTreeToDebugStr());
953965
ChangeState(BuildId, TIndexBuildInfo::EState::CreateBuild);
954966
Progress(BuildId);
955967
return false;
956-
}
957-
958-
if (buildInfo.IsBuildPrefixedVectorIndex() && buildInfo.KMeans.Level == 2) {
959-
LOG_D("FillIndex::Prefixed::Level2::Start " << buildInfo.KMeansTreeToDebugStr());
960-
if (!FillPrefixKMeans(buildInfo)) {
968+
} else {
969+
bool filled = buildInfo.KMeans.Level == 2
970+
? FillPrefixKMeans(buildInfo)
971+
: FillLocalKMeans(buildInfo);
972+
if (!filled) {
961973
return false;
962974
}
975+
LOG_D("FillPrefixedIndex::Level::Done " << buildInfo.KMeansTreeToDebugStr());
963976

964977
ClearDoneShards(txc, buildInfo);
965978
Y_ASSERT(buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::MultiLocal);
966979
const bool needsAnotherLevel = buildInfo.KMeans.NextLevel();
967980
buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::MultiLocal;
968981
buildInfo.KMeans.Parent = buildInfo.KMeans.ParentEnd();
982+
LOG_D("FillPrefixedIndex::NextLevel " << buildInfo.KMeansTreeToDebugStr());
983+
969984
PersistKMeansState(txc, buildInfo);
970985
NIceDb::TNiceDb db{txc.DB};
971986
Self->PersistBuildIndexUploadReset(db, buildInfo);
972-
LOG_D("FillIndex::Prefixed::Level2::Done " << buildInfo.KMeansTreeToDebugStr());
973987
if (!needsAnotherLevel) {
974988
return true;
975989
}
976990
ChangeState(BuildId, TIndexBuildInfo::EState::DropBuild);
977991
Progress(BuildId);
978992
return false;
979993
}
994+
}
980995

996+
bool FillVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
981997
if (buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Upload) {
982998
return false;
983999
}
@@ -1061,11 +1077,17 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
10611077
NIceDb::TNiceDb db(txc.DB);
10621078
InitiateShards(db, buildInfo);
10631079
}
1064-
if (buildInfo.IsBuildVectorIndex()) {
1065-
return FillVectorIndex(txc, buildInfo);
1066-
} else {
1067-
Y_ASSERT(buildInfo.IsBuildSecondaryIndex() || buildInfo.IsBuildColumns());
1068-
return FillTable(buildInfo);
1080+
switch (buildInfo.BuildKind) {
1081+
case TIndexBuildInfo::EBuildKind::BuildSecondaryIndex:
1082+
case TIndexBuildInfo::EBuildKind::BuildColumns:
1083+
return FillSecondaryIndex(buildInfo);
1084+
case TIndexBuildInfo::EBuildKind::BuildVectorIndex:
1085+
return FillVectorIndex(txc, buildInfo);
1086+
case TIndexBuildInfo::EBuildKind::BuildPrefixedVectorIndex:
1087+
return FillPrefixedVectorIndex(txc, buildInfo);
1088+
default:
1089+
Y_ASSERT(false);
1090+
return true;
10691091
}
10701092
}
10711093

@@ -1317,15 +1339,16 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
13171339
auto tableColumns = NTableIndex::ExtractInfo(table); // skip dropped columns
13181340
TSerializedTableRange shardRange = InfiniteRange(tableColumns.Keys.size());
13191341
static constexpr std::string_view LogPrefix = "";
1320-
LOG_D("infinite range " << buildInfo.KMeans.RangeToDebugStr(shardRange, buildInfo.IsBuildPrefixedVectorIndex() ? 2 : 1));
13211342

13221343
buildInfo.Cluster2Shards.clear();
13231344
for (const auto& x: table->GetPartitions()) {
13241345
Y_ABORT_UNLESS(Self->ShardInfos.contains(x.ShardIdx));
13251346
TSerializedCellVec bound{x.EndOfRange};
13261347
shardRange.To = bound;
1327-
LOG_D("shard " << x.ShardIdx << " range " << buildInfo.KMeans.RangeToDebugStr(shardRange, buildInfo.IsBuildPrefixedVectorIndex() ? 2 : 1));
1328-
buildInfo.AddParent(shardRange, x.ShardIdx);
1348+
if (buildInfo.BuildKind == TIndexBuildInfo::EBuildKind::BuildVectorIndex) {
1349+
LOG_D("shard " << x.ShardIdx << " range " << buildInfo.KMeans.RangeToDebugStr(shardRange));
1350+
buildInfo.AddParent(shardRange, x.ShardIdx);
1351+
}
13291352
auto [it, emplaced] = buildInfo.Shards.emplace(x.ShardIdx, TIndexBuildInfo::TShardStatus{std::move(shardRange), "", buildInfo.Shards.size()});
13301353
Y_ASSERT(emplaced);
13311354
shardRange.From = std::move(bound);

ydb/core/tx/schemeshard/schemeshard_info_types.h

+6-5
Original file line numberDiff line numberDiff line change
@@ -3285,7 +3285,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
32853285
return {parentFrom, parentTo};
32863286
}
32873287

3288-
TString RangeToDebugStr(const TSerializedTableRange& range, ui32 rootLevel) const {
3288+
TString RangeToDebugStr(const TSerializedTableRange& range) const {
32893289
auto toStr = [&](const TSerializedCellVec& v) -> TString {
32903290
const auto cells = v.GetCells();
32913291
if (cells.empty()) {
@@ -3295,7 +3295,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
32953295
return "-inf";
32963296
}
32973297
auto str = TStringBuilder{} << "{ count: " << cells.size();
3298-
if (Level > rootLevel) {
3298+
if (Level > 1) {
32993299
str << ", parent: " << cells[0].AsValue<NTableIndex::TClusterId>();
33003300
if (cells.size() != 1 && cells[1].IsNull()) {
33013301
str << ", pk: null";
@@ -3684,9 +3684,10 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
36843684

36853685
TSerializedTableRange bound{range};
36863686
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::BUILD_INDEX,
3687-
"AddShardStatus id# " << Id << " shard " << shardIdx <<
3688-
" range " << KMeans.RangeToDebugStr(bound, IsBuildPrefixedVectorIndex() ? 2 : 1));
3689-
AddParent(bound, shardIdx);
3687+
"AddShardStatus id# " << Id << " shard " << shardIdx);
3688+
if (BuildKind == TIndexBuildInfo::EBuildKind::BuildVectorIndex) {
3689+
AddParent(bound, shardIdx);
3690+
}
36903691
Shards.emplace(
36913692
shardIdx, TIndexBuildInfo::TShardStatus(std::move(bound), std::move(lastKeyAck), Shards.size()));
36923693
TIndexBuildInfo::TShardStatus &shardStatus = Shards.at(shardIdx);

0 commit comments

Comments
 (0)