Skip to content

Commit 5ead379

Browse files
authored
Refactoring statistics: scan -> traversal (#7210)
1 parent e661cda commit 5ead379

19 files changed

+403
-402
lines changed

ydb/core/protos/counters_statistics_aggregator.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ enum ETxTypes {
1111
TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}];
1212
TXTYPE_CONFIGURE = 2 [(TxTypeOpts) = {Name: "TxConfigure"}];
1313
TXTYPE_SCHEMESHARD_STATS = 3 [(TxTypeOpts) = {Name: "TxSchemeShardStats"}];
14-
TXTYPE_SCAN_TABLE = 4 [(TxTypeOpts) = {Name: "TxScanTable"}];
14+
TXTYPE_ANALYZE_TABLE = 4 [(TxTypeOpts) = {Name: "TxAnalyzeTable"}];
1515
TXTYPE_NAVIGATE = 5 [(TxTypeOpts) = {Name: "TxNavigate"}];
1616
TXTYPE_RESOLVE = 6 [(TxTypeOpts) = {Name: "TxResolve"}];
1717
TXTYPE_SCAN_RESPONSE = 7 [(TxTypeOpts) = {Name: "TxScanResponse"}];
1818
TXTYPE_SAVE_QUERY_RESPONSE = 8 [(TxTypeOpts) = {Name: "TxSaveQueryResponse"}];
19-
TXTYPE_SCHEDULE_SCAN = 9 [(TxTypeOpts) = {Name: "TxScheduleScan"}];
19+
TXTYPE_SCHEDULE_TRAVERSAL = 9 [(TxTypeOpts) = {Name: "TxScheduleTraversal"}];
2020
TXTYPE_DELETE_QUERY_RESPONSE = 10 [(TxTypeOpts) = {Name: "TxDeleteQueryResponse"}];
2121
TXTYPE_AGGR_STAT_RESPONSE = 11 [(TxTypeOpts) = {Name: "TxAggregateStatisticsResponse"}];
2222
TXTYPE_RESPONSE_TABLET_DISTRIBUTION = 12 [(TxTypeOpts) = {Name: "TxResponseTabletDistribution"}];

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 93 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -383,8 +383,8 @@ size_t TStatisticsAggregator::PropagatePart(const std::vector<TNodeId>& nodeIds,
383383
auto ssId = ssIds[index];
384384
auto* entry = record->AddEntries();
385385
entry->SetSchemeShardId(ssId);
386-
auto itStats = BaseStats.find(ssId);
387-
if (itStats != BaseStats.end()) {
386+
auto itStats = BaseStatistics.find(ssId);
387+
if (itStats != BaseStatistics.end()) {
388388
entry->SetStats(itStats->second);
389389
size += itStats->second.size();
390390
} else {
@@ -398,21 +398,21 @@ size_t TStatisticsAggregator::PropagatePart(const std::vector<TNodeId>& nodeIds,
398398
}
399399

400400
void TStatisticsAggregator::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
401-
if (!ScanTableId.PathId) {
401+
if (!TraversalTableId.PathId) {
402402
return;
403403
}
404404
auto tabletId = ev->Get()->TabletId;
405-
if (IsColumnTable) {
405+
if (TraversalIsColumnTable) {
406406
if (tabletId == HiveId) {
407407
Schedule(HiveRetryInterval, new TEvPrivate::TEvRequestDistribution);
408408
} else {
409409
SA_LOG_CRIT("[" << TabletID() << "] TEvDeliveryProblem with unexpected tablet " << tabletId);
410410
}
411411
} else {
412-
if (ShardRanges.empty()) {
412+
if (DatashardRanges.empty()) {
413413
return;
414414
}
415-
auto& range = ShardRanges.front();
415+
auto& range = DatashardRanges.front();
416416
if (tabletId != range.DataShardId) {
417417
return;
418418
}
@@ -439,11 +439,11 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev) {
439439
auto response = std::make_unique<TEvStatistics::TEvAnalyzeStatusResponse>();
440440
auto& outRecord = response->Record;
441441

442-
if (ScanTableId.PathId == pathId) {
442+
if (TraversalTableId.PathId == pathId) {
443443
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS);
444444
} else {
445-
auto it = ScanOperationsByPathId.find(pathId);
446-
if (it != ScanOperationsByPathId.end()) {
445+
auto it = ForceTraversalsByPathId.find(pathId);
446+
if (it != ForceTraversalsByPathId.end()) {
447447
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED);
448448
} else {
449449
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION);
@@ -486,7 +486,7 @@ void TStatisticsAggregator::InitializeStatisticsTable() {
486486
void TStatisticsAggregator::Navigate() {
487487
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
488488
TNavigate::TEntry entry;
489-
entry.TableId = ScanTableId;
489+
entry.TableId = TraversalTableId;
490490
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
491491
entry.Operation = TNavigate::OpTable;
492492

@@ -500,29 +500,29 @@ void TStatisticsAggregator::Resolve() {
500500
++ResolveRound;
501501

502502
TVector<TCell> plusInf;
503-
TTableRange range(StartKey.GetCells(), true, plusInf, true, false);
503+
TTableRange range(TraversalStartKey.GetCells(), true, plusInf, true, false);
504504
auto keyDesc = MakeHolder<TKeyDesc>(
505-
ScanTableId, range, TKeyDesc::ERowOperation::Read, KeyColumnTypes, Columns);
505+
TraversalTableId, range, TKeyDesc::ERowOperation::Read, KeyColumnTypes, Columns);
506506

507507
auto request = std::make_unique<NSchemeCache::TSchemeCacheRequest>();
508508
request->ResultSet.emplace_back(std::move(keyDesc));
509509

510510
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request.release()));
511511
}
512512

513-
void TStatisticsAggregator::NextRange() {
514-
if (ShardRanges.empty()) {
513+
void TStatisticsAggregator::ScanNextDatashardRange() {
514+
if (DatashardRanges.empty()) {
515515
SaveStatisticsToTable();
516516
return;
517517
}
518518

519-
auto& range = ShardRanges.front();
519+
auto& range = DatashardRanges.front();
520520
auto request = std::make_unique<NStat::TEvStatistics::TEvStatisticsRequest>();
521521
auto& record = request->Record;
522522
auto* path = record.MutableTable()->MutablePathId();
523-
path->SetOwnerId(ScanTableId.PathId.OwnerId);
524-
path->SetLocalId(ScanTableId.PathId.LocalPathId);
525-
record.SetStartKey(StartKey.GetBuffer());
523+
path->SetOwnerId(TraversalTableId.PathId.OwnerId);
524+
path->SetLocalId(TraversalTableId.PathId.LocalPathId);
525+
record.SetStartKey(TraversalStartKey.GetBuffer());
526526

527527
Send(MakePipePerNodeCacheID(false),
528528
new TEvPipeCache::TEvForward(request.release(), range.DataShardId, true),
@@ -556,7 +556,7 @@ void TStatisticsAggregator::SaveStatisticsToTable() {
556556
data.push_back(strSketch);
557557
}
558558

559-
Register(CreateSaveStatisticsQuery(ScanTableId.PathId, EStatType::COUNT_MIN_SKETCH,
559+
Register(CreateSaveStatisticsQuery(TraversalTableId.PathId, EStatType::COUNT_MIN_SKETCH,
560560
std::move(columnTags), std::move(data)));
561561
}
562562

@@ -568,118 +568,118 @@ void TStatisticsAggregator::DeleteStatisticsFromTable() {
568568

569569
PendingDeleteStatistics = false;
570570

571-
Register(CreateDeleteStatisticsQuery(ScanTableId.PathId));
571+
Register(CreateDeleteStatisticsQuery(TraversalTableId.PathId));
572572
}
573573

574-
void TStatisticsAggregator::ScheduleNextScan(NIceDb::TNiceDb& db) {
575-
if (!ScanOperations.Empty()) {
576-
auto* operation = ScanOperations.Front();
574+
void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
575+
if (!ForceTraversals.Empty()) {
576+
auto* operation = ForceTraversals.Front();
577577
ReplyToActorIds.swap(operation->ReplyToActorIds);
578578

579-
bool doStartScan = true;
579+
bool doStartAnalyze = true;
580580
bool isColumnTable = false;
581581
auto pathId = operation->PathId;
582-
auto itPath = ScanTables.find(pathId);
583-
if (itPath != ScanTables.end()) {
582+
auto itPath = ScheduleTraversals.find(pathId);
583+
if (itPath != ScheduleTraversals.end()) {
584584
isColumnTable = itPath->second.IsColumnTable;
585585
} else {
586-
doStartScan = false;
586+
doStartAnalyze = false;
587587
}
588-
if (doStartScan) {
589-
StartScan(db, pathId, isColumnTable);
588+
if (doStartAnalyze) {
589+
StartTraversal(db, pathId, isColumnTable);
590590
}
591-
db.Table<Schema::ScanOperations>().Key(operation->OperationId).Delete();
592-
ScanOperations.PopFront();
593-
ScanOperationsByPathId.erase(pathId);
594-
return;
595-
}
596-
if (ScanTablesByTime.Empty()) {
597-
return;
598-
}
599-
auto* topTable = ScanTablesByTime.Top();
600-
if (TInstant::Now() < topTable->LastUpdateTime + ScanIntervalTime) {
601-
return;
602-
}
603-
bool isColumnTable = false;
604-
auto itPath = ScanTables.find(topTable->PathId);
605-
if (itPath != ScanTables.end()) {
606-
isColumnTable = itPath->second.IsColumnTable;
607-
} else {
608-
return;
591+
db.Table<Schema::ForceTraversals>().Key(operation->OperationId).Delete();
592+
ForceTraversals.PopFront();
593+
ForceTraversalsByPathId.erase(pathId);
594+
} else { // ForceTraversals is empty, then go to ScheduleTraversals
595+
if (ScheduleTraversalsByTime.Empty()) {
596+
return;
597+
}
598+
auto* oldestTable = ScheduleTraversalsByTime.Top();
599+
if (TInstant::Now() < oldestTable->LastUpdateTime + ScheduleTraversalPeriod) {
600+
return;
601+
}
602+
bool isColumnTable = false;
603+
auto itPath = ScheduleTraversals.find(oldestTable->PathId);
604+
if (itPath != ScheduleTraversals.end()) {
605+
isColumnTable = itPath->second.IsColumnTable;
606+
} else {
607+
return;
608+
}
609+
StartTraversal(db, oldestTable->PathId, isColumnTable);
609610
}
610-
StartScan(db, topTable->PathId, isColumnTable);
611611
}
612612

613-
void TStatisticsAggregator::StartScan(NIceDb::TNiceDb& db, TPathId pathId, bool isColumnTable) {
614-
ScanTableId.PathId = pathId;
615-
ScanStartTime = TInstant::Now();
616-
IsColumnTable = isColumnTable;
617-
PersistCurrentScan(db);
613+
void TStatisticsAggregator::StartTraversal(NIceDb::TNiceDb& db, TPathId pathId, bool isColumnTable) {
614+
TraversalTableId.PathId = pathId;
615+
TraversalStartTime = TInstant::Now();
616+
TraversalIsColumnTable = isColumnTable;
617+
PersistTraversal(db);
618618

619-
StartKey = TSerializedCellVec();
619+
TraversalStartKey = TSerializedCellVec();
620620
PersistStartKey(db);
621621

622622
Navigate();
623623
}
624624

625-
void TStatisticsAggregator::FinishScan(NIceDb::TNiceDb& db) {
626-
auto pathId = ScanTableId.PathId;
625+
void TStatisticsAggregator::FinishTraversal(NIceDb::TNiceDb& db) {
626+
auto pathId = TraversalTableId.PathId;
627627

628-
auto pathIt = ScanTables.find(pathId);
629-
if (pathIt != ScanTables.end()) {
630-
auto& scanTable = pathIt->second;
631-
scanTable.LastUpdateTime = ScanStartTime;
632-
db.Table<Schema::ScanTables>().Key(pathId.OwnerId, pathId.LocalPathId).Update(
633-
NIceDb::TUpdate<Schema::ScanTables::LastUpdateTime>(ScanStartTime.MicroSeconds()));
628+
auto pathIt = ScheduleTraversals.find(pathId);
629+
if (pathIt != ScheduleTraversals.end()) {
630+
auto& traversalTable = pathIt->second;
631+
traversalTable.LastUpdateTime = TraversalStartTime;
632+
db.Table<Schema::ScheduleTraversals>().Key(pathId.OwnerId, pathId.LocalPathId).Update(
633+
NIceDb::TUpdate<Schema::ScheduleTraversals::LastUpdateTime>(TraversalStartTime.MicroSeconds()));
634634

635-
if (ScanTablesByTime.Has(&scanTable)) {
636-
ScanTablesByTime.Update(&scanTable);
635+
if (ScheduleTraversalsByTime.Has(&traversalTable)) {
636+
ScheduleTraversalsByTime.Update(&traversalTable);
637637
}
638638
}
639639

640-
ResetScanState(db);
640+
ResetTraversalState(db);
641641
}
642642

643643
void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) {
644644
db.Table<Schema::SysParams>().Key(id).Update(
645645
NIceDb::TUpdate<Schema::SysParams::Value>(value));
646646
}
647647

648-
void TStatisticsAggregator::PersistCurrentScan(NIceDb::TNiceDb& db) {
649-
PersistSysParam(db, Schema::SysParam_ScanTableOwnerId, ToString(ScanTableId.PathId.OwnerId));
650-
PersistSysParam(db, Schema::SysParam_ScanTableLocalPathId, ToString(ScanTableId.PathId.LocalPathId));
651-
PersistSysParam(db, Schema::SysParam_ScanStartTime, ToString(ScanStartTime.MicroSeconds()));
652-
PersistSysParam(db, Schema::SysParam_IsColumnTable, ToString(IsColumnTable));
648+
void TStatisticsAggregator::PersistTraversal(NIceDb::TNiceDb& db) {
649+
PersistSysParam(db, Schema::SysParam_TraversalTableOwnerId, ToString(TraversalTableId.PathId.OwnerId));
650+
PersistSysParam(db, Schema::SysParam_TraversalTableLocalPathId, ToString(TraversalTableId.PathId.LocalPathId));
651+
PersistSysParam(db, Schema::SysParam_TraversalStartTime, ToString(TraversalStartTime.MicroSeconds()));
652+
PersistSysParam(db, Schema::SysParam_TraversalIsColumnTable, ToString(TraversalIsColumnTable));
653653
}
654654

655655
void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) {
656-
PersistSysParam(db, Schema::SysParam_StartKey, StartKey.GetBuffer());
656+
PersistSysParam(db, Schema::SysParam_TraversalStartKey, TraversalStartKey.GetBuffer());
657657
}
658658

659-
void TStatisticsAggregator::PersistLastScanOperationId(NIceDb::TNiceDb& db) {
660-
PersistSysParam(db, Schema::SysParam_LastScanOperationId, ToString(LastScanOperationId));
659+
void TStatisticsAggregator::PersistLastForceTraversalOperationId(NIceDb::TNiceDb& db) {
660+
PersistSysParam(db, Schema::SysParam_LastForceTraversalOperationId, ToString(LastForceTraversalOperationId));
661661
}
662662

663663
void TStatisticsAggregator::PersistGlobalTraversalRound(NIceDb::TNiceDb& db) {
664664
PersistSysParam(db, Schema::SysParam_GlobalTraversalRound, ToString(GlobalTraversalRound));
665665
}
666666

667-
void TStatisticsAggregator::ResetScanState(NIceDb::TNiceDb& db) {
668-
ScanTableId.PathId = TPathId();
669-
ScanStartTime = TInstant::MicroSeconds(0);
670-
PersistCurrentScan(db);
667+
void TStatisticsAggregator::ResetTraversalState(NIceDb::TNiceDb& db) {
668+
TraversalTableId.PathId = TPathId();
669+
TraversalStartTime = TInstant::MicroSeconds(0);
670+
PersistTraversal(db);
671671

672-
StartKey = TSerializedCellVec();
672+
TraversalStartKey = TSerializedCellVec();
673673
PersistStartKey(db);
674674

675675
ReplyToActorIds.clear();
676676

677677
for (auto& [tag, _] : CountMinSketches) {
678-
db.Table<Schema::Statistics>().Key(tag).Delete();
678+
db.Table<Schema::ColumnStatistics>().Key(tag).Delete();
679679
}
680680
CountMinSketches.clear();
681681

682-
ShardRanges.clear();
682+
DatashardRanges.clear();
683683

684684
KeyColumnTypes.clear();
685685
Columns.clear();
@@ -728,7 +728,7 @@ bool TStatisticsAggregator::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev
728728
PRE() {
729729
str << "---- StatisticsAggregator ----" << Endl << Endl;
730730
str << "Database: " << Database << Endl;
731-
str << "BaseStats: " << BaseStats.size() << Endl;
731+
str << "BaseStatistics: " << BaseStatistics.size() << Endl;
732732
str << "SchemeShards: " << SchemeShards.size() << Endl;
733733
{
734734
std::function<TSSId(const std::pair<const TSSId, size_t>&)> extr =
@@ -773,24 +773,24 @@ bool TStatisticsAggregator::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev
773773
str << "PendingRequests: " << PendingRequests.size() << Endl;
774774
str << "ProcessUrgentInFlight: " << ProcessUrgentInFlight << Endl << Endl;
775775

776-
str << "ScanTableId: " << ScanTableId << Endl;
776+
str << "TraversalTableId: " << TraversalTableId << Endl;
777777
str << "Columns: " << Columns.size() << Endl;
778-
str << "ShardRanges: " << ShardRanges.size() << Endl;
778+
str << "DatashardRanges: " << DatashardRanges.size() << Endl;
779779
str << "CountMinSketches: " << CountMinSketches.size() << Endl << Endl;
780780

781-
str << "ScanTablesByTime: " << ScanTablesByTime.Size() << Endl;
782-
if (!ScanTablesByTime.Empty()) {
783-
auto* scanTable = ScanTablesByTime.Top();
784-
str << " top: " << scanTable->PathId
785-
<< ", last update time: " << scanTable->LastUpdateTime << Endl;
781+
str << "ScheduleTraversalsByTime: " << ScheduleTraversalsByTime.Size() << Endl;
782+
if (!ScheduleTraversalsByTime.Empty()) {
783+
auto* oldestTable = ScheduleTraversalsByTime.Top();
784+
str << " oldest table: " << oldestTable->PathId
785+
<< ", ordest table update time: " << oldestTable->LastUpdateTime << Endl;
786786
}
787-
str << "ScanTablesBySchemeShard: " << ScanTablesBySchemeShard.size() << Endl;
788-
if (!ScanTablesBySchemeShard.empty()) {
789-
str << " " << ScanTablesBySchemeShard.begin()->first << Endl;
787+
str << "ScheduleTraversalsBySchemeShard: " << ScheduleTraversalsBySchemeShard.size() << Endl;
788+
if (!ScheduleTraversalsBySchemeShard.empty()) {
789+
str << " " << ScheduleTraversalsBySchemeShard.begin()->first << Endl;
790790
std::function<TPathId(const TPathId&)> extr = [](const auto& x) { return x; };
791-
PrintContainerStart(ScanTablesBySchemeShard.begin()->second, 2, str, extr);
791+
PrintContainerStart(ScheduleTraversalsBySchemeShard.begin()->second, 2, str, extr);
792792
}
793-
str << "ScanStartTime: " << ScanStartTime << Endl;
793+
str << "TraversalStartTime: " << TraversalStartTime << Endl;
794794

795795
}
796796
}

0 commit comments

Comments
 (0)