Skip to content

EvWrite counters #874

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ydb/core/protos/counters_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ enum ECumulativeCounters {
COUNTER_CHANGE_EXCHANGE_REJECTED_APPLY = 99 [(CounterOpts) = {Name: "ChangeExchangeRejectedApply"}];
COUNTER_CHANGE_RECORDS_SENT = 100 [(CounterOpts) = {Name: "ChangeRecordsSent"}];
COUNTER_CHANGE_RECORDS_FORGOTTEN = 101 [(CounterOpts) = {Name: "ChangeRecordsForgotten"}];
COUNTER_WRITE_REQUEST = 102 [(CounterOpts) = {Name: "WriteRequests"}];
COUNTER_WRITE_IMMEDIATE = 103 [(CounterOpts) = {Name: "WriteImmediate"}];
COUNTER_WRITE_COMPLETE = 104 [(CounterOpts) = {Name: "WriteComplete"}];
COUNTER_WRITE_SUCCESS = 105 [(CounterOpts) = {Name: "WriteSuccess"}];
COUNTER_WRITE_ERROR = 106 [(CounterOpts) = {Name: "WriteError"}];
COUNTER_WRITE_OVERLOADED = 107 [(CounterOpts) = {Name: "WriteOverloaded"}];
COUNTER_WRITE_OUT_OF_SPACE = 108 [(CounterOpts) = {Name: "WriteOutOfSpace"}];
COUNTER_WRITE_CANCELLED = 109 [(CounterOpts) = {Name: "WriteCancelled"}];
COUNTER_WRITE_ROWS = 110 [(CounterOpts) = {Name: "WriteRows"}];
COUNTER_WRITE_BYTES = 111 [(CounterOpts) = {Name: "WriteBytes"}];
}

enum EPercentileCounters {
Expand Down Expand Up @@ -381,6 +391,14 @@ enum EPercentileCounters {
Ranges: { Value: 5 Name: "5"},
Ranges: { Value: 10 Name: "10"},
}];

COUNTER_WRITE_SUCCESS_COMPLETE_LATENCY = 20 [(CounterOpts) = {
Name: "WriteSuccessCompleteLatency"
}];

COUNTER_WRITE_EXEC_LATENCY = 21 [(CounterOpts) = {
Name: "WriteExecLatency"
}];
}

enum ETxTypes {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/check_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
<< "Cannot perform transaction: out of disk space at tablet "
<< DataShard.TabletID() << " txId " << op->GetTxId();

DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE);

writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID());
op->Abort(EExecutionUnitKind::FinishProposeWrite);
Expand All @@ -88,7 +88,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
// Updates are not allowed when database is out of space
TString err = "Cannot perform writes: database is out of disk space";

DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE);

writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID());
op->Abort(EExecutionUnitKind::FinishProposeWrite);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2662,8 +2662,8 @@ bool TDataShard::CheckDataTxRejectAndReply(const NEvents::TDataEvents::TEvWrite:
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectDescription);

ctx.Send(ev->Sender, result.release());
IncCounter(COUNTER_PREPARE_OVERLOADED);
IncCounter(COUNTER_PREPARE_COMPLETE);
IncCounter(COUNTER_WRITE_OVERLOADED);
IncCounter(COUNTER_WRITE_COMPLETE);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void TDataShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorCo
}
}

IncCounter(COUNTER_PREPARE_REQUEST);
IncCounter(COUNTER_WRITE_REQUEST);

if (CheckDataTxRejectAndReply(ev, ctx)) {
return;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3153,10 +3153,10 @@ class TDataShard
ev->Record.MutableTableStats()->SetLastAccessTime(ti.Stats.AccessTime.MilliSeconds());
ev->Record.MutableTableStats()->SetLastUpdateTime(ti.Stats.UpdateTime.MilliSeconds());

ev->Record.MutableTableStats()->SetImmediateTxCompleted(TabletCounters->Cumulative()[COUNTER_PREPARE_IMMEDIATE].Get());
ev->Record.MutableTableStats()->SetImmediateTxCompleted(TabletCounters->Cumulative()[COUNTER_PREPARE_IMMEDIATE].Get() + TabletCounters->Cumulative()[COUNTER_WRITE_IMMEDIATE].Get());
ev->Record.MutableTableStats()->SetPlannedTxCompleted(TabletCounters->Cumulative()[COUNTER_PLANNED_TX_COMPLETE].Get());
ev->Record.MutableTableStats()->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_PREPARE_OVERLOADED].Get());
ev->Record.MutableTableStats()->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_PREPARE_OUT_OF_SPACE].Get());
ev->Record.MutableTableStats()->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_PREPARE_OVERLOADED].Get() + TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOADED].Get());
ev->Record.MutableTableStats()->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_PREPARE_OUT_OF_SPACE].Get() + TabletCounters->Cumulative()[COUNTER_WRITE_OUT_OF_SPACE].Get());
ev->Record.MutableTableStats()->SetTxCompleteLagMsec(TabletCounters->Simple()[COUNTER_TX_COMPLETE_LAG].Get());
ev->Record.MutableTableStats()->SetInFlightTxCount(TabletCounters->Simple()[COUNTER_TX_IN_FLY].Get()
+ TabletCounters->Simple()[COUNTER_IMMEDIATE_TX_IN_FLY].Get());
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 704u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetImmediateTxCompleted(), 1u);
}

CompactTable(runtime, shard1, tableId1, false);
Expand All @@ -111,6 +112,16 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetDataSize(), 65u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetIndexSize(), 54u);
}

Write(runtime, sender, shard1, tableId1.PathId.LocalPathId, TShardedTableOptions().Columns_, 1, 100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);

{
Cerr << "... waiting for stats after write" << Endl;
auto stats = WaitTableStats(runtime);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 4u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetImmediateTxCompleted(), 2u);
}
}

Y_UNIT_TEST(MultipleChannelsStatsCorrect) {
Expand Down
15 changes: 6 additions & 9 deletions ydb/core/tx/datashard/finish_propose_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);

if (!op->HasResultSentFlag()) {
DataShard.IncCounter(COUNTER_PREPARE_COMPLETE);
DataShard.IncCounter(COUNTER_WRITE_COMPLETE);

if (writeOp->GetWriteResult())
CompleteRequest(op, ctx);
Expand Down Expand Up @@ -167,7 +167,7 @@ void TFinishProposeWriteUnit::CompleteRequest(TOperation::TPtr op, const TActorC
}

if (res->IsPrepared()) {
DataShard.IncCounter(COUNTER_PREPARE_SUCCESS_COMPLETE_LATENCY, duration);
DataShard.IncCounter(COUNTER_WRITE_SUCCESS_COMPLETE_LATENCY, duration);
} else {
DataShard.CheckSplitCanStart(ctx);
DataShard.CheckMvccStateChangeCanStart(ctx);
Expand Down Expand Up @@ -203,20 +203,17 @@ void TFinishProposeWriteUnit::UpdateCounters(const TWriteOperation* writeOp, con
{
const auto& res = writeOp->GetWriteResult();
auto execLatency = TAppData::TimeProvider->Now() - writeOp->GetReceivedAt();
DataShard.IncCounter(COUNTER_PREPARE_EXEC_LATENCY, execLatency);
DataShard.IncCounter(COUNTER_WRITE_EXEC_LATENCY, execLatency);
if (res->IsPrepared()) {
DataShard.IncCounter(COUNTER_PREPARE_SUCCESS);
DataShard.IncCounter(COUNTER_WRITE_SUCCESS);
} else {
if (writeOp->IsDirty())
DataShard.IncCounter(COUNTER_PREPARE_DIRTY);

if (res->IsError()) {
DataShard.IncCounter(COUNTER_PREPARE_ERROR);
DataShard.IncCounter(COUNTER_WRITE_ERROR);
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::FinishProposeUnit_UpdateCounters), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD,
"Prepare transaction failed. txid " << writeOp->GetTxId()
<< " at tablet " << DataShard.TabletID() << " errors: " << res->GetError());
} else {
DataShard.IncCounter(COUNTER_PREPARE_IMMEDIATE);
DataShard.IncCounter(COUNTER_WRITE_IMMEDIATE);
}
}
}
Expand Down
14 changes: 5 additions & 9 deletions ydb/core/tx/datashard/write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ class TWriteUnit : public TExecutionUnit {

writeTx->GetEngineHost()->UpdateRow(fullTableId, keyCells, commands);
}
//TODO: Counters
// self->IncCounter(COUNTER_UPLOAD_ROWS, rowCount);
// self->IncCounter(COUNTER_UPLOAD_ROWS_BYTES, matrix.GetBuffer().size());

self->IncCounter(COUNTER_WRITE_ROWS, matrix.GetRowCount());
self->IncCounter(COUNTER_WRITE_BYTES, matrix.GetBuffer().size());

writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCommited(self->TabletID(), writeOp->GetTxId()));

Expand Down Expand Up @@ -170,12 +170,8 @@ class TWriteUnit : public TExecutionUnit {
const auto& status = writeOp->GetWriteResult()->Record.status();
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Completed write operation for " << *op << " at " << DataShard.TabletID() << ", status " << status);

//TODO: Counters
// if (WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED || WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED) {
// self->IncCounter(COUNTER_WRITE_SUCCESS);
// } else {
// self->IncCounter(COUNTER_WRITE_ERROR);
// }
DataShard.IncCounter(writeOp->GetWriteResult()->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED ?
COUNTER_WRITE_SUCCESS : COUNTER_WRITE_ERROR);

ctx.Send(writeOp->GetEv()->Sender, writeOp->ReleaseWriteResult().release(), 0, writeOp->GetEv()->Cookie);
}
Expand Down