@@ -443,8 +443,8 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev) {
443
443
if (ForceTraversalOperationId == operationId) {
444
444
outRecord.SetStatus (NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS);
445
445
} else {
446
- if ( std::any_of (ForceTraversals. begin (), ForceTraversals. end (),
447
- [&operationId]( const TForceTraversal& elem) { return elem. OperationId == operationId;}) ) {
446
+ auto forceTraversalOperation = ForceTraversalOperation (operationId);
447
+ if (forceTraversalOperation ) {
448
448
outRecord.SetStatus (NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED);
449
449
} else {
450
450
outRecord.SetStatus (NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION);
@@ -580,22 +580,35 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
580
580
581
581
TPathId pathId;
582
582
583
- if (!ForceTraversals. empty () && ! LastTraversalWasForce) {
583
+ if (!LastTraversalWasForce) {
584
584
LastTraversalWasForce = true ;
585
585
586
- TForceTraversal& operation = ForceTraversals.front ();
587
- pathId = operation.PathId ;
586
+ for (TForceTraversalOperation& operation : ForceTraversals) {
587
+ for (TForceTraversalTable& operationTable : operation.Tables ) {
588
+ if (operationTable.Status == TForceTraversalTable::EStatus::None) {
589
+ operationTable.Status = TForceTraversalTable::EStatus::RequestSent;
590
+ db.Table <Schema::ForceTraversalTables>().Key (operation.OperationId , operationTable.PathId .OwnerId , operationTable.PathId .LocalPathId )
591
+ .Update (NIceDb::TUpdate<Schema::ForceTraversalTables::Status>((ui64)operationTable.Status ));
588
592
589
- ForceTraversalOperationId = operation.OperationId ;
590
- ForceTraversalColumnTags = operation.ColumnTags ;
591
- ForceTraversalTypes = operation.Types ;
592
- ForceTraversalReplyToActorId = operation.ReplyToActorId ;
593
+ pathId = operationTable.PathId ;
594
+ break ;
595
+ }
596
+ }
597
+
598
+ if (!pathId) {
599
+ SA_LOG_D (" [" << TabletID () << " ] All the force traversal tables sent the requests. OperationId=" << operation.OperationId );
600
+ continue ;
601
+ }
593
602
594
- PersistForceTraversal (db);
603
+ ForceTraversalOperationId = operation.OperationId ;
604
+ }
595
605
596
- // db.Table<Schema::ForceTraversals>().Key(operation.OperationId, operation.PathId.OwnerId, operation.PathId.LocalPathId).Delete();
597
- ForceTraversals.pop_front ();
598
- } else if (!ScheduleTraversalsByTime.Empty ()){
606
+ if (!pathId) {
607
+ SA_LOG_D (" [" << TabletID () << " ] All the force traversal operations sent the requests." );
608
+ }
609
+ }
610
+
611
+ if (!pathId && !ScheduleTraversalsByTime.Empty ()){
599
612
LastTraversalWasForce = false ;
600
613
601
614
auto * oldestTable = ScheduleTraversalsByTime.Top ();
@@ -606,8 +619,10 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
606
619
}
607
620
608
621
pathId = oldestTable->PathId ;
609
- } else {
610
- SA_LOG_E (" [" << TabletID () << " ] No schedule traversal from schemeshard." );
622
+ }
623
+
624
+ if (!pathId) {
625
+ SA_LOG_E (" [" << TabletID () << " ] No traversal from schemeshard." );
611
626
return ;
612
627
}
613
628
@@ -653,13 +668,66 @@ void TStatisticsAggregator::FinishTraversal(NIceDb::TNiceDb& db) {
653
668
}
654
669
}
655
670
671
+ auto forceTraversalOperation = CurrentForceTraversalOperation ();
672
+ if (forceTraversalOperation) {
673
+ bool tablesRemained = std::any_of (forceTraversalOperation->Tables .begin (), forceTraversalOperation->Tables .end (),
674
+ [](const TForceTraversalTable& elem) { return elem.Status == TForceTraversalTable::EStatus::None;});
675
+ if (!tablesRemained) {
676
+ DeleteForceTraversalOperation (ForceTraversalOperationId, db);
677
+ }
678
+ }
679
+
656
680
ResetTraversalState (db);
657
681
}
658
682
659
683
TString TStatisticsAggregator::LastTraversalWasForceString () const {
660
684
return LastTraversalWasForce ? " force" : " schedule" ;
661
685
}
662
686
687
+ TStatisticsAggregator::TForceTraversalOperation* TStatisticsAggregator::CurrentForceTraversalOperation () {
688
+ return ForceTraversalOperation (ForceTraversalOperationId);
689
+ }
690
+
691
+ TStatisticsAggregator::TForceTraversalOperation* TStatisticsAggregator::ForceTraversalOperation (const TString& operationId) {
692
+ auto forceTraversalOperation = std::find_if (ForceTraversals.begin (), ForceTraversals.end (),
693
+ [operationId](const TForceTraversalOperation& elem) { return elem.OperationId == operationId;});
694
+
695
+ if (forceTraversalOperation == ForceTraversals.end ()) {
696
+ return nullptr ;
697
+ } else {
698
+ return &*forceTraversalOperation;
699
+ }
700
+ }
701
+
702
+ void TStatisticsAggregator::DeleteForceTraversalOperation (const TString& operationId, NIceDb::TNiceDb& db) {
703
+ db.Table <Schema::ForceTraversalOperations>().Key (ForceTraversalOperationId).Delete ();
704
+
705
+ auto operation = ForceTraversalOperation (operationId);
706
+ for (const TForceTraversalTable& table : operation->Tables ) {
707
+ db.Table <Schema::ForceTraversalTables>().Key (operationId, table.PathId .OwnerId , table.PathId .LocalPathId ).Delete ();
708
+ }
709
+
710
+ ForceTraversals.remove_if ([operationId](const TForceTraversalOperation& elem) { return elem.OperationId == operationId;});
711
+ }
712
+
713
+ TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::ForceTraversalTable (const TString& operationId, const TPathId& pathId) {
714
+ for (TForceTraversalOperation& operation : ForceTraversals) {
715
+ if (operation.OperationId == operationId) {
716
+ for (TForceTraversalTable& operationTable : operation.Tables ) {
717
+ if (operationTable.PathId == pathId) {
718
+ return &operationTable;
719
+ }
720
+ }
721
+ }
722
+ }
723
+
724
+ return nullptr ;
725
+ }
726
+
727
+ TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::CurrentForceTraversalTable () {
728
+ return ForceTraversalTable (ForceTraversalOperationId, TraversalTableId.PathId );
729
+ }
730
+
663
731
void TStatisticsAggregator::PersistSysParam (NIceDb::TNiceDb& db, ui64 id, const TString& value) {
664
732
db.Table <Schema::SysParams>().Key (id).Update (
665
733
NIceDb::TUpdate<Schema::SysParams::Value>(value));
@@ -676,30 +744,19 @@ void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) {
676
744
PersistSysParam (db, Schema::SysParam_TraversalStartKey, TraversalStartKey.GetBuffer ());
677
745
}
678
746
679
- void TStatisticsAggregator::PersistForceTraversal (NIceDb::TNiceDb& db) {
680
- PersistSysParam (db, Schema::SysParam_ForceTraversalOperationId, ToString (ForceTraversalOperationId));
681
- PersistSysParam (db, Schema::SysParam_ForceTraversalCookie, ForceTraversalOperationId);
682
- PersistSysParam (db, Schema::SysParam_ForceTraversalColumnTags, ToString (ForceTraversalColumnTags));
683
- PersistSysParam (db, Schema::SysParam_ForceTraversalTypes, ToString (ForceTraversalTypes));
684
- }
685
-
686
747
void TStatisticsAggregator::PersistGlobalTraversalRound (NIceDb::TNiceDb& db) {
687
748
PersistSysParam (db, Schema::SysParam_GlobalTraversalRound, ToString (GlobalTraversalRound));
688
749
}
689
750
690
751
void TStatisticsAggregator::ResetTraversalState (NIceDb::TNiceDb& db) {
691
752
ForceTraversalOperationId.clear ();
692
753
TraversalTableId.PathId = TPathId ();
693
- ForceTraversalColumnTags.clear ();
694
- ForceTraversalTypes.clear ();
695
754
TraversalStartTime = TInstant::MicroSeconds (0 );
696
755
PersistTraversal (db);
697
756
698
757
TraversalStartKey = TSerializedCellVec ();
699
758
PersistStartKey (db);
700
759
701
- ForceTraversalReplyToActorId = {};
702
-
703
760
for (auto & [tag, _] : CountMinSketches) {
704
761
db.Table <Schema::ColumnStatistics>().Key (tag).Delete ();
705
762
}
0 commit comments