Skip to content

Statistics: Two force traversal collections in local DB #7765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 78 additions & 26 deletions ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,8 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev) {
if (ForceTraversalOperationId == operationId) {
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS);
} else {
if (std::any_of(ForceTraversals.begin(), ForceTraversals.end(),
[&operationId](const TForceTraversal& elem) { return elem.OperationId == operationId;})) {
auto forceTraversalOperation = ForceTraversalOperation(operationId);
if (forceTraversalOperation) {
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED);
} else {
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION);
Expand Down Expand Up @@ -580,22 +580,36 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {

TPathId pathId;

if (!ForceTraversals.empty() && !LastTraversalWasForce) {
if (!LastTraversalWasForce) {
LastTraversalWasForce = true;

TForceTraversal& operation = ForceTraversals.front();
pathId = operation.PathId;
for (TForceTraversalOperation& operation : ForceTraversals) {
for (TForceTraversalTable& operationTable : operation.Tables) {
if (operationTable.Status == TForceTraversalTable::EStatus::None) {
operationTable.Status = TForceTraversalTable::EStatus::RequestSent;
// TODO delete it later, after EStatus::ResponseReceived. Only update status in local db
// db.Table<Schema::ForceTraversalTables>().Key(operation.OperationId, operationTable.PathId.OwnerId, operationTable.PathId.LocalPathId)
// .Update(NIceDb::TUpdate<Schema::ForceTraversalTables::Status>((ui64)operationTable.Status));
db.Table<Schema::ForceTraversalTables>().Key(operation.OperationId, operationTable.PathId.OwnerId, operationTable.PathId.LocalPathId).Delete();

pathId = operationTable.PathId;
}
}

if (!pathId) {
SA_LOG_D("[" << TabletID() << "] All the force traversal tables sent the requests. OperationId=" << operation.OperationId);
continue;
}

ForceTraversalOperationId = operation.OperationId;
ForceTraversalColumnTags = operation.ColumnTags;
ForceTraversalTypes = operation.Types;
ForceTraversalReplyToActorId = operation.ReplyToActorId;
ForceTraversalOperationId = operation.OperationId;
}

PersistForceTraversal(db);
if (!pathId) {
SA_LOG_D("[" << TabletID() << "] All the force traversal operations sent the requests.");
}
}

// db.Table<Schema::ForceTraversals>().Key(operation.OperationId, operation.PathId.OwnerId, operation.PathId.LocalPathId).Delete();
ForceTraversals.pop_front();
} else if (!ScheduleTraversalsByTime.Empty()){
if (!pathId && !ScheduleTraversalsByTime.Empty()){
LastTraversalWasForce = false;

auto* oldestTable = ScheduleTraversalsByTime.Top();
Expand All @@ -606,8 +620,10 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
}

pathId = oldestTable->PathId;
} else {
SA_LOG_E("[" << TabletID() << "] No schedule traversal from schemeshard.");
}

if (!pathId) {
SA_LOG_E("[" << TabletID() << "] No traversal from schemeshard.");
return;
}

Expand Down Expand Up @@ -653,13 +669,60 @@ void TStatisticsAggregator::FinishTraversal(NIceDb::TNiceDb& db) {
}
}

auto forceTraversalOperation = CurrentForceTraversalOperation();
if (forceTraversalOperation) {
bool tablesRemained = std::any_of(forceTraversalOperation->Tables.begin(), forceTraversalOperation->Tables.end(),
[](const TForceTraversalTable& elem) { return elem.Status == TForceTraversalTable::EStatus::None;});
if (!tablesRemained) {
DeleteForceTraversalOperation(ForceTraversalOperationId);
db.Table<Schema::ForceTraversalOperations>().Key(ForceTraversalOperationId).Delete();
}
}

ResetTraversalState(db);
}

TString TStatisticsAggregator::LastTraversalWasForceString() const {
return LastTraversalWasForce ? "force" : "schedule";
}

TStatisticsAggregator::TForceTraversalOperation* TStatisticsAggregator::CurrentForceTraversalOperation() {
return ForceTraversalOperation(ForceTraversalOperationId);
}

TStatisticsAggregator::TForceTraversalOperation* TStatisticsAggregator::ForceTraversalOperation(const TString& operationId) {
auto forceTraversalOperation = std::find_if(ForceTraversals.begin(), ForceTraversals.end(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В будущем было бы неплохо сделать хеш-таблицу чтобы не делать линейный поиск.

[operationId](const TForceTraversalOperation& elem) { return elem.OperationId == operationId;});

if (forceTraversalOperation == ForceTraversals.end()) {
return nullptr;
} else {
return &*forceTraversalOperation;
}
}

void TStatisticsAggregator::DeleteForceTraversalOperation(const TString& operationId) {
ForceTraversals.remove_if([operationId](const TForceTraversalOperation& elem) { return elem.OperationId == operationId;});
}

TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::ForceTraversalTable(const TString& operationId, const TPathId& pathId) {
for (TForceTraversalOperation& operation : ForceTraversals) {
if (operation.OperationId == operationId) {
for (TForceTraversalTable& operationTable : operation.Tables) {
if (operationTable.PathId == pathId) {
return &operationTable;
}
}
}
}

return nullptr;
}

TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::CurrentForceTraversalTable() {
return ForceTraversalTable(ForceTraversalOperationId, TraversalTableId.PathId);
}

void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) {
db.Table<Schema::SysParams>().Key(id).Update(
NIceDb::TUpdate<Schema::SysParams::Value>(value));
Expand All @@ -676,30 +739,19 @@ void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_TraversalStartKey, TraversalStartKey.GetBuffer());
}

void TStatisticsAggregator::PersistForceTraversal(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_ForceTraversalOperationId, ToString(ForceTraversalOperationId));
PersistSysParam(db, Schema::SysParam_ForceTraversalCookie, ForceTraversalOperationId);
PersistSysParam(db, Schema::SysParam_ForceTraversalColumnTags, ToString(ForceTraversalColumnTags));
PersistSysParam(db, Schema::SysParam_ForceTraversalTypes, ToString(ForceTraversalTypes));
}

void TStatisticsAggregator::PersistGlobalTraversalRound(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_GlobalTraversalRound, ToString(GlobalTraversalRound));
}

void TStatisticsAggregator::ResetTraversalState(NIceDb::TNiceDb& db) {
ForceTraversalOperationId.clear();
TraversalTableId.PathId = TPathId();
ForceTraversalColumnTags.clear();
ForceTraversalTypes.clear();
TraversalStartTime = TInstant::MicroSeconds(0);
PersistTraversal(db);

TraversalStartKey = TSerializedCellVec();
PersistStartKey(db);

ForceTraversalReplyToActorId = {};

for (auto& [tag, _] : CountMinSketches) {
db.Table<Schema::ColumnStatistics>().Key(tag).Delete();
}
Expand Down
30 changes: 22 additions & 8 deletions ydb/core/statistics/aggregator/aggregator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl

void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value);
void PersistTraversal(NIceDb::TNiceDb& db);
void PersistForceTraversal(NIceDb::TNiceDb& db);
void PersistStartKey(NIceDb::TNiceDb& db);
void PersistGlobalTraversalRound(NIceDb::TNiceDb& db);

Expand Down Expand Up @@ -240,8 +239,6 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
std::queue<TEvStatistics::TEvRequestStats::TPtr> PendingRequests;
bool ProcessUrgentInFlight = false;

TActorId ForceTraversalReplyToActorId = {};

bool IsSchemeshardSeen = false;
bool IsStatisticsTableCreated = false;
bool PendingSaveStatistics = false;
Expand Down Expand Up @@ -306,8 +303,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
private: // stored in local db

TString ForceTraversalOperationId;
TString ForceTraversalColumnTags;
TString ForceTraversalTypes;

TTableId TraversalTableId;
bool TraversalIsColumnTable = false;
TSerializedCellVec TraversalStartKey;
Expand All @@ -323,14 +319,32 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
TTraversalsByTime;
TTraversalsByTime ScheduleTraversalsByTime;

struct TForceTraversal {
TString OperationId;
struct TForceTraversalTable {
TPathId PathId;
TString ColumnTags;

enum class EStatus : ui8 {
None,
RequestSent,
ResponseReceived,
};
EStatus Status = EStatus::None;
};
struct TForceTraversalOperation {
TString OperationId;
std::list<TForceTraversalTable> Tables;
TString Types;
TActorId ReplyToActorId;
};
std::list<TForceTraversal> ForceTraversals;
std::list<TForceTraversalOperation> ForceTraversals;

private:
TForceTraversalOperation* CurrentForceTraversalOperation();
TForceTraversalOperation* ForceTraversalOperation(const TString& operationId);
void DeleteForceTraversalOperation(const TString& operationId);

TForceTraversalTable* ForceTraversalTable(const TString& operationId, const TPathId& pathId);
TForceTraversalTable* CurrentForceTraversalTable();
};

} // NKikimr::NStat
43 changes: 28 additions & 15 deletions ydb/core/statistics/aggregator/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,45 @@ struct TAggregatorSchema : NIceDb::Schema {
IsColumnTable
>;
};
/*
struct ForceTraversals : Table<5> {
struct OperationId : Column<1, NScheme::NTypeIds::Uint64> {};

// struct ForceTraversals : Table<5>

struct ForceTraversalOperations : Table<6> {
struct OperationId : Column<1, NScheme::NTypeIds::String> {};
struct Types : Column<2, NScheme::NTypeIds::String> {};

using TKey = TableKey<OperationId>;
using TColumns = TableColumns<
OperationId,
Types
>;
};

struct ForceTraversalTables : Table<7> {
struct OperationId : Column<1, NScheme::NTypeIds::String> {};
struct OwnerId : Column<2, NScheme::NTypeIds::Uint64> {};
struct LocalPathId : Column<3, NScheme::NTypeIds::Uint64> {};
struct Cookie : Column<4, NScheme::NTypeIds::String> {};
struct ColumnTags : Column<5, NScheme::NTypeIds::String> {};
struct Types : Column<6, NScheme::NTypeIds::String> {};
struct ColumnTags : Column<4, NScheme::NTypeIds::String> {};
struct Status : Column<5, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<OperationId, OwnerId, LocalPathId>;
using TColumns = TableColumns<
OperationId,
OwnerId,
LocalPathId,
Cookie,
ColumnTags,
Types
Status
>;
};
*/

using TTables = SchemaTables<
SysParams,
BaseStatistics,
ColumnStatistics,
ScheduleTraversals
// ForceTraversals
ScheduleTraversals,
// ForceTraversals,
ForceTraversalOperations,
ForceTraversalTables
>;

using TSettings = SchemaSettings<
Expand All @@ -80,12 +93,12 @@ struct TAggregatorSchema : NIceDb::Schema {

static constexpr ui64 SysParam_Database = 1;
static constexpr ui64 SysParam_TraversalStartKey = 2;
static constexpr ui64 SysParam_ForceTraversalOperationId = 3;
// deprecated 3
static constexpr ui64 SysParam_TraversalTableOwnerId = 4;
static constexpr ui64 SysParam_TraversalTableLocalPathId = 5;
static constexpr ui64 SysParam_ForceTraversalCookie = 6;
static constexpr ui64 SysParam_ForceTraversalColumnTags = 7;
static constexpr ui64 SysParam_ForceTraversalTypes = 8;
// deprecated 6
// deprecated 7
// deprecated 8
static constexpr ui64 SysParam_TraversalStartTime = 9;
// deprecated 10
static constexpr ui64 SysParam_TraversalIsColumnTable = 11;
Expand Down
76 changes: 46 additions & 30 deletions ydb/core/statistics/aggregator/tx_analyze_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,64 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
NIceDb::TNiceDb db(txc.DB);

const TString operationId = Record.GetOperationId();
const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ",");

for (const auto& table : Record.GetTables()) {
const TPathId pathId = PathIdFromPathId(table.GetPathId());
const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},",");

// check existing force traversal with the same cookie and path
auto forceTraversal = std::find_if(Self->ForceTraversals.begin(), Self->ForceTraversals.end(),
[&pathId, &operationId](const TForceTraversal& elem) {
return elem.PathId == pathId
&& elem.OperationId == operationId;});
// check existing force traversal with the same OperationId
const auto existingOperation = Self->ForceTraversalOperation(operationId);

// update existing force traversal
if (forceTraversal != Self->ForceTraversals.end()) {
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Update existing force traversal. PathId " << pathId << " , ReplyToActorId " << ReplyToActorId);
forceTraversal->ReplyToActorId = ReplyToActorId;
// update existing force traversal
if (existingOperation) {
if (existingOperation->Tables.size() == Record.TablesSize()) {
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Update existing force traversal. OperationId " << operationId << " , ReplyToActorId " << ReplyToActorId);
existingOperation->ReplyToActorId = ReplyToActorId;
return true;
} else {
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Delete broken force traversal. OperationId " << operationId << " , ReplyToActorId " << ReplyToActorId);
Self->DeleteForceTraversalOperation(operationId);
}
}

SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation, OperationId=" << operationId);
const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ",");

SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation for pathId " << pathId);
// create new force trasersal
TForceTraversalOperation operation {
.OperationId = operationId,
.Tables = {},
.Types = types,
.ReplyToActorId = ReplyToActorId
};

// create new force trasersal
TForceTraversal operation {
.OperationId = operationId,
for (const auto& table : Record.GetTables()) {
const TPathId pathId = PathIdFromPathId(table.GetPathId());
const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},",");
const TForceTraversalTable::EStatus status = TForceTraversalTable::EStatus::None;

SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation, OperationId=" << operationId << " , PathId " << pathId);

// create new force traversal
TForceTraversalTable operationTable {
.PathId = pathId,
.ColumnTags = columnTags,
.Types = types,
.ReplyToActorId = ReplyToActorId
.Status = status
};
Self->ForceTraversals.emplace_back(operation);
/*
db.Table<Schema::ForceTraversals>().Key(Self->NextForceTraversalOperationId, pathId.OwnerId, pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::ForceTraversals::OperationId>(Self->NextForceTraversalOperationId),
NIceDb::TUpdate<Schema::ForceTraversals::OwnerId>(pathId.OwnerId),
NIceDb::TUpdate<Schema::ForceTraversals::LocalPathId>(pathId.LocalPathId),
NIceDb::TUpdate<Schema::ForceTraversals::Cookie>(cookie),
NIceDb::TUpdate<Schema::ForceTraversals::ColumnTags>(columnTags),
NIceDb::TUpdate<Schema::ForceTraversals::Types>(types)
operation.Tables.emplace_back(operationTable);

db.Table<Schema::ForceTraversalTables>().Key(operationId, pathId.OwnerId, pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::ForceTraversalTables::OperationId>(operationId),
NIceDb::TUpdate<Schema::ForceTraversalTables::OwnerId>(pathId.OwnerId),
NIceDb::TUpdate<Schema::ForceTraversalTables::LocalPathId>(pathId.LocalPathId),
NIceDb::TUpdate<Schema::ForceTraversalTables::ColumnTags>(columnTags),
NIceDb::TUpdate<Schema::ForceTraversalTables::Status>((ui64)status)
);
*/
}

Self->ForceTraversals.emplace_back(operation);

db.Table<Schema::ForceTraversalOperations>().Key(operationId).Update(
NIceDb::TUpdate<Schema::ForceTraversalOperations::OperationId>(operationId),
NIceDb::TUpdate<Schema::ForceTraversalOperations::Types>(types)
);

return true;
}

Expand Down
Loading
Loading