Skip to content

additional signals for commit timings control #8729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/columnshard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
const ui32 TabletTxNo;
std::optional<NOlap::TSnapshot> LastCompletedTx;
std::optional<TTxController::TPlanQueueItem> PlannedQueueItem;
std::optional<TMonotonic> StartExecution;

public:
TTxProgressTx(TColumnShard* self)
Expand Down Expand Up @@ -54,6 +55,7 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
} else {
Self->ProgressTxController->PopFirstPlannedTx();
}
StartExecution = TMonotonic::Now();

LastCompletedTx = NOlap::TSnapshot(step, txId);
if (LastCompletedTx > Self->LastCompletedTx) {
Expand Down Expand Up @@ -84,11 +86,17 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
Self->RescheduleWaitingReads();
}
if (PlannedQueueItem) {
AFL_VERIFY(TxOperator);
Self->GetProgressTxController().GetCounters().OnTxProgressLag(
TxOperator->GetOpType(), TMonotonic::Now() - TMonotonic::MilliSeconds(PlannedQueueItem->Step));
Self->GetProgressTxController().ProgressOnComplete(*PlannedQueueItem);
}
if (LastCompletedTx) {
Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx);
}
if (StartExecution) {
Self->GetProgressTxController().GetCounters().OnTxProgressDuration(TxOperator->GetOpType(), TMonotonic::Now() - *StartExecution);
}
Self->SetupIndexation();
}
};
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/counters/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ TCSCounters::TCSCounters()
HistogramSuccessWriteMiddle6PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle6PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
HistogramFailedWritePutBlobsDurationMs = TBase::GetHistogram("FailedWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
HistogramWriteTxCompleteDurationMs = TBase::GetHistogram("WriteTxCompleteDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));

WritePutBlobsCount = TBase::GetValue("WritePutBlobs");
WriteRequests = TBase::GetValue("WriteRequests");

Expand Down
1 change: 0 additions & 1 deletion ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class TCSCounters: public TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr WriteRequests;
THashMap<EWriteFailReason, NMonitoring::TDynamicCounters::TCounterPtr> FailedWriteRequests;
NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests;

public:
const TCSInitialization Initialization;
TTxProgressCounters TxProgress;
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/tx/columnshard/counters/tx_progress.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class TTxProgressCounters: public TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr FinishProposeOnComplete;
NMonitoring::TDynamicCounters::TCounterPtr FinishPlannedTx;
NMonitoring::TDynamicCounters::TCounterPtr AbortTx;
NMonitoring::THistogramPtr HistogramTxProgressDuration;
NMonitoring::THistogramPtr HistogramTxProgressLag;

TProgressCounters(const TCommonCountersOwner& owner)
: TBase(owner)
Expand All @@ -34,13 +36,23 @@ class TTxProgressCounters: public TCommonCountersOwner {
, FinishProposeOnExecute(TBase::GetDeriviative("FinishProposeOnExecute"))
, FinishProposeOnComplete(TBase::GetDeriviative("FinishProposeOnComplete"))
, FinishPlannedTx(TBase::GetDeriviative("FinishPlannedTx"))
, AbortTx(TBase::GetDeriviative("AbortTx")) {
, AbortTx(TBase::GetDeriviative("AbortTx"))
, HistogramTxProgressDuration(TBase::GetHistogram("TxProgress/Execution/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)))
, HistogramTxProgressLag(TBase::GetHistogram("TxProgress/LagOnComplete/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 5))) {
}
};

THashMap<TOpType, TProgressCounters> CountersByOpType;

public:
void OnTxProgressDuration(const TString& opType, const TDuration d) {
GetSubGroup(opType).HistogramTxProgressDuration->Collect(d.MilliSeconds());
}

void OnTxProgressLag(const TString& opType, const TDuration d) {
GetSubGroup(opType).HistogramTxProgressLag->Collect(d.MilliSeconds());
}

void OnRegisterTx(const TOpType& opType) {
GetSubGroup(opType).RegisterTx->Add(1);
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/transactions/tx_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ class TTxController {
DoOnTabletInit(owner);
}
};
TTxProgressCounters& GetCounters() {
return Counters;
}

private:
const TDuration MaxCommitTxDelay = TDuration::Seconds(30);
Expand Down
16 changes: 4 additions & 12 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ namespace NKikimr {
RowsCount = TBase::GetDeriviative("Rows/Count");
PackageSize = TBase::GetHistogram("Rows/PackageSize", NMonitoring::ExponentialHistogram(15, 2, 10));

DurationToStartCommit = TBase::GetHistogram("ToStartCommit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
DurationToFinishCommit = TBase::GetHistogram("ToFinishCommit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
DurationToStartWriting = TBase::GetHistogram("ToStartWriting/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
DurationToTxStarted = TBase::GetHistogram("ToTxStarted/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
PreparingDuration = TBase::GetHistogram("Preparing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
WritingDuration = TBase::GetHistogram("Writing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
CommitDuration = TBase::GetHistogram("Commit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
PrepareReplyDuration = TBase::GetHistogram("ToReply/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));

const google::protobuf::EnumDescriptor* descriptor = ::Ydb::StatusIds::StatusCode_descriptor();
for (ui32 i = 0; i < (ui32)descriptor->value_count(); ++i) {
Expand All @@ -24,12 +24,4 @@ namespace NKikimr {
}
}

void TUploadCounters::OnReply(const TDuration d, const ::Ydb::StatusIds::StatusCode code) const {
const TString name = ::Ydb::StatusIds::StatusCode_Name(code);
auto it = CodesCount.find(name);
Y_ABORT_UNLESS(it != CodesCount.end());
it->second->Add(1);
ReplyDuration->Collect(d.MilliSeconds());
}

}
80 changes: 58 additions & 22 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,64 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
NMonitoring::THistogramPtr PackageSize;

NMonitoring::THistogramPtr DurationToStartCommit;
NMonitoring::THistogramPtr DurationToFinishCommit;
NMonitoring::THistogramPtr DurationToStartWriting;
NMonitoring::THistogramPtr DurationToTxStarted;
NMonitoring::THistogramPtr PreparingDuration;
NMonitoring::THistogramPtr WritingDuration;
NMonitoring::THistogramPtr CommitDuration;
NMonitoring::THistogramPtr PrepareReplyDuration;

THashMap<TString, NMonitoring::TDynamicCounters::TCounterPtr> CodesCount;
public:
TUploadCounters();

void OnTxStarted(const TDuration d) const {
DurationToTxStarted->Collect(d.MilliSeconds());
}
class TGuard: TMoveOnly {
private:
TMonotonic Start = TMonotonic::Now();
std::optional<TMonotonic> WritingStarted;
std::optional<TMonotonic> CommitStarted;
std::optional<TMonotonic> CommitFinished;
std::optional<TMonotonic> ReplyFinished;
TUploadCounters& Owner;
public:
TGuard(const TMonotonic start, TUploadCounters& owner)
: Start(start)
, Owner(owner)
{

void OnWritingStarted(const TDuration d) const {
DurationToStartWriting->Collect(d.MilliSeconds());
}
}

void OnStartCommit(const TDuration d) const {
DurationToStartCommit->Collect(d.MilliSeconds());
}
void OnWritingStarted() {
WritingStarted = TMonotonic::Now();
Owner.PreparingDuration->Collect((*WritingStarted - Start).MilliSeconds());
}

void OnCommitStarted() {
CommitStarted = TMonotonic::Now();
AFL_VERIFY(WritingStarted);
Owner.WritingDuration->Collect((*CommitStarted - *WritingStarted).MilliSeconds());
}

void OnFinishCommit(const TDuration d) const {
DurationToFinishCommit->Collect(d.MilliSeconds());
void OnCommitFinished() {
CommitFinished = TMonotonic::Now();
AFL_VERIFY(CommitStarted);
Owner.CommitDuration->Collect((*CommitFinished - *CommitStarted).MilliSeconds());
}

void OnReply(const ::Ydb::StatusIds::StatusCode code) {
ReplyFinished = TMonotonic::Now();
if (CommitFinished) {
Owner.PrepareReplyDuration->Collect((*ReplyFinished - *CommitFinished).MilliSeconds());
}
Owner.ReplyDuration->Collect((*ReplyFinished - Start).MilliSeconds());

const TString name = ::Ydb::StatusIds::StatusCode_Name(code);
auto it = Owner.CodesCount.find(name);
Y_ABORT_UNLESS(it != Owner.CodesCount.end());
it->second->Add(1);
}
};

TGuard BuildGuard(const TMonotonic start) {
return TGuard(start, *this);
}

void OnRequest(const ui64 rowsCount) const {
Expand All @@ -76,7 +111,7 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
PackageSize->Collect(rowsCount);
}

void OnReply(const TDuration d, const ::Ydb::StatusIds::StatusCode code) const;
void OnReply(const TDuration dFull, const TDuration dDelta, const ::Ydb::StatusIds::StatusCode code) const;
};


Expand Down Expand Up @@ -169,6 +204,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
TActorId LeaderPipeCache;
TDuration Timeout;
TInstant StartTime;
std::optional<TInstant> StartCommitTime;
TActorId TimeoutTimerActorId;

TAutoPtr<NSchemeCache::TSchemeCacheRequest> ResolvePartitionsResult;
Expand All @@ -185,7 +221,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
std::shared_ptr<NYql::TIssues> Issues = std::make_shared<NYql::TIssues>();
NLongTxService::TLongTxId LongTxId;
TUploadCounters UploadCounters;

TUploadCounters::TGuard UploadCountersGuard;
protected:
enum class EUploadSource {
ProtoValues = 0,
Expand Down Expand Up @@ -237,6 +273,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
, LeaderPipeCache(MakePipePerNodeCacheID(false))
, Timeout((timeout && timeout <= DEFAULT_TIMEOUT) ? timeout : DEFAULT_TIMEOUT)
, Status(Ydb::StatusIds::SUCCESS)
, UploadCountersGuard(UploadCounters.BuildGuard(TMonotonic::Now()))
, DiskQuotaExceeded(diskQuotaExceeded)
, Span(std::move(span))
{}
Expand Down Expand Up @@ -762,7 +799,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void WriteToColumnTable(const NActors::TActorContext& ctx) {
UploadCounters.OnWritingStarted(TAppData::TimeProvider->Now() - StartTime);
UploadCountersGuard.OnWritingStarted();
TString accessCheckError;
if (!CheckAccess(accessCheckError)) {
return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, LogPrefix() << accessCheckError, ctx);
Expand All @@ -787,7 +824,6 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit

void Handle(NLongTxService::TEvLongTxService::TEvBeginTxResult::TPtr& ev, const TActorContext& ctx) {
const auto* msg = ev->Get();
UploadCounters.OnTxStarted(TAppData::TimeProvider->Now() - StartTime);

if (msg->Record.GetStatus() != Ydb::StatusIds::SUCCESS) {
NYql::TIssues issues;
Expand Down Expand Up @@ -917,7 +953,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void CommitLongTx(const TActorContext& ctx) {
UploadCounters.OnStartCommit(TAppData::TimeProvider->Now() - StartTime);
UploadCountersGuard.OnCommitStarted();
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvCommitTx(LongTxId), 0, 0, Span.GetTraceId());
TBase::Become(&TThis::StateWaitCommitLongTx);
Expand All @@ -932,7 +968,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void Handle(NLongTxService::TEvLongTxService::TEvCommitTxResult::TPtr& ev, const NActors::TActorContext& ctx) {
UploadCounters.OnFinishCommit(TAppData::TimeProvider->Now() - StartTime);
UploadCountersGuard.OnCommitFinished();
const auto* msg = ev->Get();

if (msg->Record.GetStatus() == Ydb::StatusIds::SUCCESS) {
Expand Down Expand Up @@ -1288,7 +1324,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void ReplyWithResult(::Ydb::StatusIds::StatusCode status, const TActorContext& ctx) {
UploadCounters.OnReply(TAppData::TimeProvider->Now() - StartTime, status);
UploadCountersGuard.OnReply(status);
SendResult(ctx, status);

LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, LogPrefix() << "completed with status " << status);
Expand Down
Loading