@@ -584,6 +584,48 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
584
584
ToTabletSend.emplace_back (shardId, ui64 (BuildId), std::move (ev));
585
585
}
586
586
587
+ void SendKMeansLocalRequest (TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
588
+ Y_ASSERT (buildInfo.IsBuildVectorIndex ());
589
+ auto ev = MakeHolder<TEvDataShard::TEvLocalKMeansRequest>();
590
+ ev->Record .SetId (ui64 (BuildId));
591
+
592
+ PathIdFromPathId (buildInfo.TablePathId , ev->Record .MutablePathId ());
593
+ *ev->Record .MutableSettings () = std::get<1 >(buildInfo.SpecializedIndexDescription ).GetSettings ();
594
+ ev->Record .SetK (buildInfo.KMeans .K );
595
+ ev->Record .SetUpload (buildInfo.KMeans .GetUpload ());
596
+ ev->Record .SetState (NKikimrTxDataShard::TEvLocalKMeansRequest::SAMPLE);
597
+
598
+ ev->Record .SetDoneRounds (0 );
599
+ ev->Record .SetNeedsRounds (3 ); // TODO(mbkkt) should be configurable
600
+
601
+ ev->Record .SetParent (buildInfo.KMeans .Parent );
602
+ ev->Record .SetChild (buildInfo.KMeans .ChildBegin );
603
+
604
+ auto path = TPath::Init (buildInfo.TablePathId , Self).Dive (buildInfo.IndexName );
605
+ if (buildInfo.KMeans .GetUpload () == NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING
606
+ || buildInfo.KMeans .GetUpload () == NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_POSTING) {
607
+ path.Dive (NTableIndex::NTableVectorKmeansTreeIndex::PostingTable);
608
+ } else if (buildInfo.KMeans .Level % 2 == 0 ) {
609
+ path.Dive (NTableIndex::NTableVectorKmeansTreeIndex::BuildPostingTableSuffix0);
610
+ } else {
611
+ path.Dive (NTableIndex::NTableVectorKmeansTreeIndex::BuildPostingTableSuffix1);
612
+ }
613
+ ev->Record .SetPostingName (path.PathString ());
614
+ path.Rise ().Dive (NTableIndex::NTableVectorKmeansTreeIndex::LevelTable);
615
+ ev->Record .SetLevelName (path.PathString ());
616
+
617
+ ev->Record .SetEmbeddingColumn (buildInfo.IndexColumns [0 ]);
618
+ *ev->Record .MutableDataColumns () = {
619
+ buildInfo.DataColumns .begin (), buildInfo.DataColumns .end ()
620
+ };
621
+
622
+ auto shardId = CommonFillRecord (ev->Record , shardIdx, buildInfo);
623
+ ev->Record .SetSeed (ui64 (shardId));
624
+ LOG_D (" TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record .ShortDebugString ());
625
+
626
+ ToTabletSend.emplace_back (shardId, ui64 (BuildId), std::move (ev));
627
+ }
628
+
587
629
void SendBuildIndexRequest (TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
588
630
auto ev = MakeHolder<TEvDataShard::TEvBuildIndexCreateRequest>();
589
631
ev->Record .SetBuildIndexId (ui64 (BuildId));
@@ -719,10 +761,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
719
761
Y_ABORT (" Unreachable" );
720
762
}
721
763
}
722
- // TODO(mbkkt) enable detection of Local case
723
- // if (buildInfo.ToUploadShards.size() == 1 && buildInfo.DoneShardsSize == 0) {
724
- // buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Local;
725
- // }
764
+ if (buildInfo.DoneShardsSize == 0 && buildInfo.ToUploadShards .size () == 1 ) {
765
+ buildInfo.KMeans .State = TIndexBuildInfo::TKMeans::Local;
766
+ }
726
767
}
727
768
728
769
bool SendKMeansSample (TIndexBuildInfo& buildInfo) {
@@ -739,6 +780,10 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
739
780
return SendToShards (buildInfo, [&](TShardIdx shardIdx) { SendKMeansReshuffleRequest (shardIdx, buildInfo); });
740
781
}
741
782
783
+ bool SendKMeansLocal (TIndexBuildInfo& buildInfo) {
784
+ return SendToShards (buildInfo, [&](TShardIdx shardIdx) { SendKMeansLocalRequest (shardIdx, buildInfo); });
785
+ }
786
+
742
787
bool SendVectorIndex (TIndexBuildInfo& buildInfo) {
743
788
switch (buildInfo.KMeans .State ) {
744
789
case TIndexBuildInfo::TKMeans::Sample:
@@ -748,9 +793,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
748
793
// return SendKMeansRecompute(buildInfo);
749
794
case TIndexBuildInfo::TKMeans::Reshuffle:
750
795
return SendKMeansReshuffle (buildInfo);
751
- // TODO(mbkkt)
752
- // case TIndexBuildInfo::TKMeans::Local:
753
- // return SendKMeansLocal(buildInfo);
796
+ case TIndexBuildInfo::TKMeans::Local:
797
+ return SendKMeansLocal (buildInfo);
754
798
}
755
799
return true ;
756
800
}
@@ -1344,6 +1388,138 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex
1344
1388
}
1345
1389
};
1346
1390
1391
+ struct TSchemeShard ::TIndexBuilder::TTxReplyLocalKMeans: public TSchemeShard::TIndexBuilder::TTxReply {
1392
+ private:
1393
+ TEvDataShard::TEvLocalKMeansResponse::TPtr Local;
1394
+
1395
+ public:
1396
+ explicit TTxReplyLocalKMeans (TSelf* self, TEvDataShard::TEvLocalKMeansResponse::TPtr& local)
1397
+ : TTxReply(self)
1398
+ , Local(local)
1399
+ {
1400
+ }
1401
+
1402
+ bool DoExecute (TTransactionContext& txc, const TActorContext& ctx) override {
1403
+ auto & record = Local->Get ()->Record ;
1404
+
1405
+ LOG_I (" TTxReply : TEvLocalKMeansResponse, Id# " << record.GetId ());
1406
+
1407
+ const auto buildId = TIndexBuildId (record.GetId ());
1408
+ const auto * buildInfoPtr = Self->IndexBuilds .FindPtr (buildId);
1409
+ if (!buildInfoPtr) {
1410
+ return true ;
1411
+ }
1412
+ auto & buildInfo = *buildInfoPtr->Get ();
1413
+ LOG_D (" TTxReply : TEvLocalKMeansResponse"
1414
+ << " , TIndexBuildInfo: " << buildInfo
1415
+ << " , record: " << record.ShortDebugString ());
1416
+
1417
+ TTabletId shardId = TTabletId (record.GetTabletId ());
1418
+ if (!Self->TabletIdToShardIdx .contains (shardId)) {
1419
+ return true ;
1420
+ }
1421
+
1422
+ TShardIdx shardIdx = Self->TabletIdToShardIdx .at (shardId);
1423
+ if (!buildInfo.Shards .contains (shardIdx)) {
1424
+ return true ;
1425
+ }
1426
+
1427
+ switch (const auto state = buildInfo.State ; state) {
1428
+ case TIndexBuildInfo::EState::Filling:
1429
+ {
1430
+ TIndexBuildInfo::TShardStatus& shardStatus = buildInfo.Shards .at (shardIdx);
1431
+
1432
+ auto actualSeqNo = std::pair<ui64, ui64>(Self->Generation (), shardStatus.SeqNoRound );
1433
+ auto recordSeqNo = std::pair<ui64, ui64>(record.GetRequestSeqNoGeneration (), record.GetRequestSeqNoRound ());
1434
+
1435
+ if (actualSeqNo != recordSeqNo) {
1436
+ LOG_D (" TTxReply : TEvLocalKMeansResponse"
1437
+ << " ignore progress message by seqNo"
1438
+ << " , TIndexBuildInfo: " << buildInfo
1439
+ << " , actual seqNo for the shard " << shardId << " (" << shardIdx << " ) is: " << Self->Generation () << " :" << shardStatus.SeqNoRound
1440
+ << " , record: " << record.ShortDebugString ());
1441
+ Y_ABORT_UNLESS (actualSeqNo > recordSeqNo);
1442
+ return true ;
1443
+ }
1444
+
1445
+ // TODO(mbkkt) add billing
1446
+
1447
+ NYql::TIssues issues;
1448
+ NYql::IssuesFromMessage (record.GetIssues (), issues);
1449
+ shardStatus.DebugMessage = issues.ToString ();
1450
+
1451
+ NIceDb::TNiceDb db (txc.DB );
1452
+ Self->PersistBuildIndexUploadProgress (db, buildInfo, shardIdx);
1453
+ shardStatus.Status = record.GetStatus ();
1454
+
1455
+ switch (shardStatus.Status ) {
1456
+ case NKikimrIndexBuilder::EBuildStatus::DONE:
1457
+ if (buildInfo.InProgressShards .erase (shardIdx)) {
1458
+ ++buildInfo.DoneShardsSize ;
1459
+
1460
+ Self->IndexBuildPipes .Close (buildId, shardId, ctx);
1461
+
1462
+ Progress (buildId);
1463
+ }
1464
+ break ;
1465
+
1466
+ case NKikimrIndexBuilder::EBuildStatus::ABORTED:
1467
+ // datashard gracefully rebooted, reschedule shard
1468
+ if (buildInfo.InProgressShards .erase (shardIdx)) {
1469
+ buildInfo.ToUploadShards .emplace_front (shardIdx);
1470
+
1471
+ Self->IndexBuildPipes .Close (buildId, shardId, ctx);
1472
+
1473
+ Progress (buildId);
1474
+ }
1475
+ break ;
1476
+
1477
+ case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
1478
+ case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
1479
+ buildInfo.Issue += TStringBuilder ()
1480
+ << " One of the shards report " << shardStatus.Status
1481
+ << " at Filling stage, process has to be canceled"
1482
+ << " , shardId: " << shardId
1483
+ << " , shardIdx: " << shardIdx;
1484
+ Self->PersistBuildIndexIssue (db, buildInfo);
1485
+ ChangeState (buildInfo.Id , TIndexBuildInfo::EState::Rejection_Applying);
1486
+
1487
+ Progress (buildId);
1488
+ break ;
1489
+
1490
+ case NKikimrIndexBuilder::EBuildStatus::INVALID:
1491
+ case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
1492
+ case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
1493
+ Y_ABORT (" Unreachable" );
1494
+ }
1495
+ break ;
1496
+ }
1497
+ case TIndexBuildInfo::EState::AlterMainTable:
1498
+ case TIndexBuildInfo::EState::Invalid:
1499
+ case TIndexBuildInfo::EState::Locking:
1500
+ case TIndexBuildInfo::EState::GatheringStatistics:
1501
+ case TIndexBuildInfo::EState::Initiating:
1502
+ case TIndexBuildInfo::EState::DropBuild:
1503
+ case TIndexBuildInfo::EState::CreateBuild:
1504
+ case TIndexBuildInfo::EState::Applying:
1505
+ case TIndexBuildInfo::EState::Unlocking:
1506
+ case TIndexBuildInfo::EState::Done:
1507
+ Y_FAIL_S (" Unreachable " << Name (state));
1508
+ case TIndexBuildInfo::EState::Cancellation_Applying:
1509
+ case TIndexBuildInfo::EState::Cancellation_Unlocking:
1510
+ case TIndexBuildInfo::EState::Cancelled:
1511
+ case TIndexBuildInfo::EState::Rejection_Applying:
1512
+ case TIndexBuildInfo::EState::Rejection_Unlocking:
1513
+ case TIndexBuildInfo::EState::Rejected:
1514
+ LOG_D (" TTxReply : TEvLocalKMeansResponse"
1515
+ << " superfluous message " << record.ShortDebugString ());
1516
+ break ;
1517
+ }
1518
+
1519
+ return true ;
1520
+ }
1521
+ };
1522
+
1347
1523
struct TSchemeShard ::TIndexBuilder::TTxReplyReshuffleKMeans: public TSchemeShard::TIndexBuilder::TTxReply {
1348
1524
private:
1349
1525
TEvDataShard::TEvReshuffleKMeansResponse::TPtr Reshuffle;
@@ -2118,6 +2294,10 @@ ITransaction* TSchemeShard::CreateTxReply(TEvDataShard::TEvReshuffleKMeansRespon
2118
2294
return new TIndexBuilder::TTxReplyReshuffleKMeans (this , reshuffle);
2119
2295
}
2120
2296
2297
+ ITransaction* TSchemeShard::CreateTxReply (TEvDataShard::TEvLocalKMeansResponse::TPtr& local) {
2298
+ return new TIndexBuilder::TTxReplyLocalKMeans (this , local);
2299
+ }
2300
+
2121
2301
ITransaction* TSchemeShard::CreateTxReply (TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& upload) {
2122
2302
return new TIndexBuilder::TTxReplyUpload (this , upload);
2123
2303
}
0 commit comments