Skip to content

Commit 999638f

Browse files
authored
Merge 1790bcd into c26bd91
2 parents c26bd91 + 1790bcd commit 999638f

File tree

8 files changed

+240
-125
lines changed

8 files changed

+240
-125
lines changed

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 77 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -443,8 +443,8 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev) {
443443
if (ForceTraversalOperationId == operationId) {
444444
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS);
445445
} else {
446-
if (std::any_of(ForceTraversals.begin(), ForceTraversals.end(),
447-
[&operationId](const TForceTraversal& elem) { return elem.OperationId == operationId;})) {
446+
auto forceTraversalOperation = ForceTraversalOperation(operationId);
447+
if (forceTraversalOperation) {
448448
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED);
449449
} else {
450450
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION);
@@ -580,22 +580,35 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
580580

581581
TPathId pathId;
582582

583-
if (!ForceTraversals.empty() && !LastTraversalWasForce) {
583+
if (!LastTraversalWasForce) {
584584
LastTraversalWasForce = true;
585585

586-
TForceTraversal& operation = ForceTraversals.front();
587-
pathId = operation.PathId;
586+
for (TForceTraversalOperation& operation : ForceTraversals) {
587+
for (TForceTraversalTable& operationTable : operation.Tables) {
588+
if (operationTable.Status == TForceTraversalTable::EStatus::None) {
589+
operationTable.Status = TForceTraversalTable::EStatus::RequestSent;
590+
db.Table<Schema::ForceTraversalTables>().Key(operation.OperationId, operationTable.PathId.OwnerId, operationTable.PathId.LocalPathId)
591+
.Update(NIceDb::TUpdate<Schema::ForceTraversalTables::Status>((ui64)operationTable.Status));
588592

589-
ForceTraversalOperationId = operation.OperationId;
590-
ForceTraversalColumnTags = operation.ColumnTags;
591-
ForceTraversalTypes = operation.Types;
592-
ForceTraversalReplyToActorId = operation.ReplyToActorId;
593+
pathId = operationTable.PathId;
594+
break;
595+
}
596+
}
597+
598+
if (!pathId) {
599+
SA_LOG_D("[" << TabletID() << "] All the force traversal tables sent the requests. OperationId=" << operation.OperationId);
600+
continue;
601+
}
593602

594-
PersistForceTraversal(db);
603+
ForceTraversalOperationId = operation.OperationId;
604+
}
595605

596-
// db.Table<Schema::ForceTraversals>().Key(operation.OperationId, operation.PathId.OwnerId, operation.PathId.LocalPathId).Delete();
597-
ForceTraversals.pop_front();
598-
} else if (!ScheduleTraversalsByTime.Empty()){
606+
if (!pathId) {
607+
SA_LOG_D("[" << TabletID() << "] All the force traversal operations sent the requests.");
608+
}
609+
}
610+
611+
if (!pathId && !ScheduleTraversalsByTime.Empty()){
599612
LastTraversalWasForce = false;
600613

601614
auto* oldestTable = ScheduleTraversalsByTime.Top();
@@ -606,8 +619,10 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
606619
}
607620

608621
pathId = oldestTable->PathId;
609-
} else {
610-
SA_LOG_E("[" << TabletID() << "] No schedule traversal from schemeshard.");
622+
}
623+
624+
if (!pathId) {
625+
SA_LOG_E("[" << TabletID() << "] No traversal from schemeshard.");
611626
return;
612627
}
613628

@@ -653,13 +668,60 @@ void TStatisticsAggregator::FinishTraversal(NIceDb::TNiceDb& db) {
653668
}
654669
}
655670

671+
auto forceTraversalOperation = CurrentForceTraversalOperation();
672+
if (forceTraversalOperation) {
673+
bool tablesRemained = std::any_of(forceTraversalOperation->Tables.begin(), forceTraversalOperation->Tables.end(),
674+
[](const TForceTraversalTable& elem) { return elem.Status == TForceTraversalTable::EStatus::None;});
675+
if (!tablesRemained) {
676+
DeleteForceTraversalOperation(ForceTraversalOperationId);
677+
db.Table<Schema::ForceTraversalOperations>().Key(ForceTraversalOperationId).Delete();
678+
}
679+
}
680+
656681
ResetTraversalState(db);
657682
}
658683

659684
TString TStatisticsAggregator::LastTraversalWasForceString() const {
660685
return LastTraversalWasForce ? "force" : "schedule";
661686
}
662687

688+
TStatisticsAggregator::TForceTraversalOperation* TStatisticsAggregator::CurrentForceTraversalOperation() {
689+
return ForceTraversalOperation(ForceTraversalOperationId);
690+
}
691+
692+
TStatisticsAggregator::TForceTraversalOperation* TStatisticsAggregator::ForceTraversalOperation(const TString& operationId) {
693+
auto forceTraversalOperation = std::find_if(ForceTraversals.begin(), ForceTraversals.end(),
694+
[operationId](const TForceTraversalOperation& elem) { return elem.OperationId == operationId;});
695+
696+
if (forceTraversalOperation == ForceTraversals.end()) {
697+
return nullptr;
698+
} else {
699+
return &*forceTraversalOperation;
700+
}
701+
}
702+
703+
void TStatisticsAggregator::DeleteForceTraversalOperation(const TString& operationId) {
704+
ForceTraversals.remove_if([operationId](const TForceTraversalOperation& elem) { return elem.OperationId == operationId;});
705+
}
706+
707+
TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::ForceTraversalTable(const TString& operationId, const TPathId& pathId) {
708+
for (TForceTraversalOperation& operation : ForceTraversals) {
709+
if (operation.OperationId == operationId) {
710+
for (TForceTraversalTable& operationTable : operation.Tables) {
711+
if (operationTable.PathId == pathId) {
712+
return &operationTable;
713+
}
714+
}
715+
}
716+
}
717+
718+
return nullptr;
719+
}
720+
721+
TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::CurrentForceTraversalTable() {
722+
return ForceTraversalTable(ForceTraversalOperationId, TraversalTableId.PathId);
723+
}
724+
663725
void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) {
664726
db.Table<Schema::SysParams>().Key(id).Update(
665727
NIceDb::TUpdate<Schema::SysParams::Value>(value));
@@ -676,30 +738,19 @@ void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) {
676738
PersistSysParam(db, Schema::SysParam_TraversalStartKey, TraversalStartKey.GetBuffer());
677739
}
678740

679-
void TStatisticsAggregator::PersistForceTraversal(NIceDb::TNiceDb& db) {
680-
PersistSysParam(db, Schema::SysParam_ForceTraversalOperationId, ToString(ForceTraversalOperationId));
681-
PersistSysParam(db, Schema::SysParam_ForceTraversalCookie, ForceTraversalOperationId);
682-
PersistSysParam(db, Schema::SysParam_ForceTraversalColumnTags, ToString(ForceTraversalColumnTags));
683-
PersistSysParam(db, Schema::SysParam_ForceTraversalTypes, ToString(ForceTraversalTypes));
684-
}
685-
686741
void TStatisticsAggregator::PersistGlobalTraversalRound(NIceDb::TNiceDb& db) {
687742
PersistSysParam(db, Schema::SysParam_GlobalTraversalRound, ToString(GlobalTraversalRound));
688743
}
689744

690745
void TStatisticsAggregator::ResetTraversalState(NIceDb::TNiceDb& db) {
691746
ForceTraversalOperationId.clear();
692747
TraversalTableId.PathId = TPathId();
693-
ForceTraversalColumnTags.clear();
694-
ForceTraversalTypes.clear();
695748
TraversalStartTime = TInstant::MicroSeconds(0);
696749
PersistTraversal(db);
697750

698751
TraversalStartKey = TSerializedCellVec();
699752
PersistStartKey(db);
700753

701-
ForceTraversalReplyToActorId = {};
702-
703754
for (auto& [tag, _] : CountMinSketches) {
704755
db.Table<Schema::ColumnStatistics>().Key(tag).Delete();
705756
}

ydb/core/statistics/aggregator/aggregator_impl.h

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
145145

146146
void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value);
147147
void PersistTraversal(NIceDb::TNiceDb& db);
148-
void PersistForceTraversal(NIceDb::TNiceDb& db);
149148
void PersistStartKey(NIceDb::TNiceDb& db);
150149
void PersistGlobalTraversalRound(NIceDb::TNiceDb& db);
151150

@@ -240,8 +239,6 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
240239
std::queue<TEvStatistics::TEvRequestStats::TPtr> PendingRequests;
241240
bool ProcessUrgentInFlight = false;
242241

243-
TActorId ForceTraversalReplyToActorId = {};
244-
245242
bool IsSchemeshardSeen = false;
246243
bool IsStatisticsTableCreated = false;
247244
bool PendingSaveStatistics = false;
@@ -306,8 +303,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
306303
private: // stored in local db
307304

308305
TString ForceTraversalOperationId;
309-
TString ForceTraversalColumnTags;
310-
TString ForceTraversalTypes;
306+
311307
TTableId TraversalTableId;
312308
bool TraversalIsColumnTable = false;
313309
TSerializedCellVec TraversalStartKey;
@@ -323,14 +319,32 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
323319
TTraversalsByTime;
324320
TTraversalsByTime ScheduleTraversalsByTime;
325321

326-
struct TForceTraversal {
327-
TString OperationId;
322+
struct TForceTraversalTable {
328323
TPathId PathId;
329324
TString ColumnTags;
325+
326+
enum class EStatus : ui8 {
327+
None,
328+
RequestSent,
329+
ResponseReceived,
330+
};
331+
EStatus Status = EStatus::None;
332+
};
333+
struct TForceTraversalOperation {
334+
TString OperationId;
335+
std::list<TForceTraversalTable> Tables;
330336
TString Types;
331337
TActorId ReplyToActorId;
332338
};
333-
std::list<TForceTraversal> ForceTraversals;
339+
std::list<TForceTraversalOperation> ForceTraversals;
340+
341+
private:
342+
TForceTraversalOperation* CurrentForceTraversalOperation();
343+
TForceTraversalOperation* ForceTraversalOperation(const TString& operationId);
344+
void DeleteForceTraversalOperation(const TString& operationId);
345+
346+
TForceTraversalTable* ForceTraversalTable(const TString& operationId, const TPathId& pathId);
347+
TForceTraversalTable* CurrentForceTraversalTable();
334348
};
335349

336350
} // NKikimr::NStat

ydb/core/statistics/aggregator/schema.h

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,32 +45,45 @@ struct TAggregatorSchema : NIceDb::Schema {
4545
IsColumnTable
4646
>;
4747
};
48-
/*
49-
struct ForceTraversals : Table<5> {
50-
struct OperationId : Column<1, NScheme::NTypeIds::Uint64> {};
48+
49+
// struct ForceTraversals : Table<5>
50+
51+
struct ForceTraversalOperations : Table<6> {
52+
struct OperationId : Column<1, NScheme::NTypeIds::String> {};
53+
struct Types : Column<2, NScheme::NTypeIds::String> {};
54+
55+
using TKey = TableKey<OperationId>;
56+
using TColumns = TableColumns<
57+
OperationId,
58+
Types
59+
>;
60+
};
61+
62+
struct ForceTraversalTables : Table<7> {
63+
struct OperationId : Column<1, NScheme::NTypeIds::String> {};
5164
struct OwnerId : Column<2, NScheme::NTypeIds::Uint64> {};
5265
struct LocalPathId : Column<3, NScheme::NTypeIds::Uint64> {};
53-
struct Cookie : Column<4, NScheme::NTypeIds::String> {};
54-
struct ColumnTags : Column<5, NScheme::NTypeIds::String> {};
55-
struct Types : Column<6, NScheme::NTypeIds::String> {};
66+
struct ColumnTags : Column<4, NScheme::NTypeIds::String> {};
67+
struct Status : Column<5, NScheme::NTypeIds::Uint64> {};
5668

5769
using TKey = TableKey<OperationId, OwnerId, LocalPathId>;
5870
using TColumns = TableColumns<
5971
OperationId,
6072
OwnerId,
6173
LocalPathId,
62-
Cookie,
6374
ColumnTags,
64-
Types
75+
Status
6576
>;
6677
};
67-
*/
78+
6879
using TTables = SchemaTables<
6980
SysParams,
7081
BaseStatistics,
7182
ColumnStatistics,
72-
ScheduleTraversals
73-
// ForceTraversals
83+
ScheduleTraversals,
84+
// ForceTraversals,
85+
ForceTraversalOperations,
86+
ForceTraversalTables
7487
>;
7588

7689
using TSettings = SchemaSettings<
@@ -80,12 +93,12 @@ struct TAggregatorSchema : NIceDb::Schema {
8093

8194
static constexpr ui64 SysParam_Database = 1;
8295
static constexpr ui64 SysParam_TraversalStartKey = 2;
83-
static constexpr ui64 SysParam_ForceTraversalOperationId = 3;
96+
// deprecated 3
8497
static constexpr ui64 SysParam_TraversalTableOwnerId = 4;
8598
static constexpr ui64 SysParam_TraversalTableLocalPathId = 5;
86-
static constexpr ui64 SysParam_ForceTraversalCookie = 6;
87-
static constexpr ui64 SysParam_ForceTraversalColumnTags = 7;
88-
static constexpr ui64 SysParam_ForceTraversalTypes = 8;
99+
// deprecated 6
100+
// deprecated 7
101+
// deprecated 8
89102
static constexpr ui64 SysParam_TraversalStartTime = 9;
90103
// deprecated 10
91104
static constexpr ui64 SysParam_TraversalIsColumnTable = 11;

ydb/core/statistics/aggregator/tx_analyze_table.cpp

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,48 +28,64 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
2828
NIceDb::TNiceDb db(txc.DB);
2929

3030
const TString operationId = Record.GetOperationId();
31-
const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ",");
32-
33-
for (const auto& table : Record.GetTables()) {
34-
const TPathId pathId = PathIdFromPathId(table.GetPathId());
35-
const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},",");
3631

37-
// check existing force traversal with the same cookie and path
38-
auto forceTraversal = std::find_if(Self->ForceTraversals.begin(), Self->ForceTraversals.end(),
39-
[&pathId, &operationId](const TForceTraversal& elem) {
40-
return elem.PathId == pathId
41-
&& elem.OperationId == operationId;});
32+
// check existing force traversal with the same OperationId
33+
const auto existingOperation = Self->ForceTraversalOperation(operationId);
4234

43-
// update existing force traversal
44-
if (forceTraversal != Self->ForceTraversals.end()) {
45-
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Update existing force traversal. PathId " << pathId << " , ReplyToActorId " << ReplyToActorId);
46-
forceTraversal->ReplyToActorId = ReplyToActorId;
35+
// update existing force traversal
36+
if (existingOperation) {
37+
if (existingOperation->Tables.size() == Record.TablesSize()) {
38+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Update existing force traversal. OperationId " << operationId << " , ReplyToActorId " << ReplyToActorId);
39+
existingOperation->ReplyToActorId = ReplyToActorId;
4740
return true;
41+
} else {
42+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Delete broken force traversal. OperationId " << operationId << " , ReplyToActorId " << ReplyToActorId);
43+
Self->DeleteForceTraversalOperation(operationId);
4844
}
45+
}
46+
47+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation, OperationId=" << operationId);
48+
const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ",");
4949

50-
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation for pathId " << pathId);
50+
// create new force trasersal
51+
TForceTraversalOperation operation {
52+
.OperationId = operationId,
53+
.Tables = {},
54+
.Types = types,
55+
.ReplyToActorId = ReplyToActorId
56+
};
5157

52-
// create new force trasersal
53-
TForceTraversal operation {
54-
.OperationId = operationId,
58+
for (const auto& table : Record.GetTables()) {
59+
const TPathId pathId = PathIdFromPathId(table.GetPathId());
60+
const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},",");
61+
const TForceTraversalTable::EStatus status = TForceTraversalTable::EStatus::None;
62+
63+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation, OperationId=" << operationId << " , PathId " << pathId);
64+
65+
// create new force traversal
66+
TForceTraversalTable operationTable {
5567
.PathId = pathId,
5668
.ColumnTags = columnTags,
57-
.Types = types,
58-
.ReplyToActorId = ReplyToActorId
69+
.Status = status
5970
};
60-
Self->ForceTraversals.emplace_back(operation);
61-
/*
62-
db.Table<Schema::ForceTraversals>().Key(Self->NextForceTraversalOperationId, pathId.OwnerId, pathId.LocalPathId).Update(
63-
NIceDb::TUpdate<Schema::ForceTraversals::OperationId>(Self->NextForceTraversalOperationId),
64-
NIceDb::TUpdate<Schema::ForceTraversals::OwnerId>(pathId.OwnerId),
65-
NIceDb::TUpdate<Schema::ForceTraversals::LocalPathId>(pathId.LocalPathId),
66-
NIceDb::TUpdate<Schema::ForceTraversals::Cookie>(cookie),
67-
NIceDb::TUpdate<Schema::ForceTraversals::ColumnTags>(columnTags),
68-
NIceDb::TUpdate<Schema::ForceTraversals::Types>(types)
71+
operation.Tables.emplace_back(operationTable);
72+
73+
db.Table<Schema::ForceTraversalTables>().Key(operationId, pathId.OwnerId, pathId.LocalPathId).Update(
74+
NIceDb::TUpdate<Schema::ForceTraversalTables::OperationId>(operationId),
75+
NIceDb::TUpdate<Schema::ForceTraversalTables::OwnerId>(pathId.OwnerId),
76+
NIceDb::TUpdate<Schema::ForceTraversalTables::LocalPathId>(pathId.LocalPathId),
77+
NIceDb::TUpdate<Schema::ForceTraversalTables::ColumnTags>(columnTags),
78+
NIceDb::TUpdate<Schema::ForceTraversalTables::Status>((ui64)status)
6979
);
70-
*/
7180
}
7281

82+
Self->ForceTraversals.emplace_back(operation);
83+
84+
db.Table<Schema::ForceTraversalOperations>().Key(operationId).Update(
85+
NIceDb::TUpdate<Schema::ForceTraversalOperations::OperationId>(operationId),
86+
NIceDb::TUpdate<Schema::ForceTraversalOperations::Types>(types)
87+
);
88+
7389
return true;
7490
}
7591

0 commit comments

Comments
 (0)