@@ -344,7 +344,7 @@ namespace NKikimr::NDataShard {
344
344
std::vector<TVolatileTxInfo*> byCommitOrder;
345
345
byCommitOrder.reserve (VolatileTxs.size ());
346
346
347
- auto postProcessTxInfo = [this , &byCommitOrder ](TVolatileTxInfo* info) {
347
+ auto postProcessTxInfo = [& ](TVolatileTxInfo* info) {
348
348
switch (info->State ) {
349
349
case EVolatileTxState::Waiting:
350
350
case EVolatileTxState::Committed: {
@@ -399,6 +399,28 @@ namespace NKikimr::NDataShard {
399
399
VolatileTxByCommitOrder.PushBack (info);
400
400
}
401
401
402
+ ui64 numWaiting = 0 ;
403
+ ui64 numCommitted = 0 ;
404
+ ui64 numAborting = 0 ;
405
+ for (auto & pr : VolatileTxs) {
406
+ switch (pr.second ->State ) {
407
+ case EVolatileTxState::Waiting:
408
+ ++numWaiting;
409
+ break ;
410
+ case EVolatileTxState::Committed:
411
+ ++numCommitted;
412
+ break ;
413
+ case EVolatileTxState::Aborting:
414
+ ++numAborting;
415
+ break ;
416
+ }
417
+ }
418
+
419
+ Self->SetCounter (COUNTER_VOLATILE_TX_INFLIGHT, VolatileTxs.size ());
420
+ Self->SetCounter (COUNTER_VOLATILE_TX_WAITING_COUNT, numWaiting);
421
+ Self->SetCounter (COUNTER_VOLATILE_TX_COMMITTED_COUNT, numCommitted);
422
+ Self->SetCounter (COUNTER_VOLATILE_TX_ABORTING_COUNT, numAborting);
423
+
402
424
return true ;
403
425
}
404
426
@@ -554,6 +576,8 @@ namespace NKikimr::NDataShard {
554
576
db.Table <Schema::TxVolatileParticipants>().Key (info->TxId , shardId).Update ();
555
577
}
556
578
579
+ UpdateCountersAdd (info);
580
+
557
581
txc.DB .OnRollback ([this , txId]() {
558
582
RollbackAddVolatileTx (txId);
559
583
});
@@ -593,7 +617,10 @@ namespace NKikimr::NDataShard {
593
617
594
618
// FIXME: do we need to handle WaitingSnapshotEvents somehow?
595
619
620
+ // Note: not counting latency (this is a rollback)
621
+
596
622
// This will also unlink from linked lists
623
+ UpdateCountersRemove (info);
597
624
VolatileTxs.erase (txId);
598
625
}
599
626
@@ -632,6 +659,10 @@ namespace NKikimr::NDataShard {
632
659
VolatileTxByCommitTxId.erase (commitTxId);
633
660
}
634
661
VolatileTxByVersion.erase (info);
662
+
663
+ Self->IncCounter (COUNTER_VOLATILE_TX_TOTAL_LATENCY_MS, info->LatencyTimer .Passed () * 1000 );
664
+
665
+ UpdateCountersRemove (info);
635
666
VolatileTxs.erase (txId);
636
667
637
668
if (prevUncertain < GetMinUncertainVersion ()) {
@@ -728,7 +759,7 @@ namespace NKikimr::NDataShard {
728
759
ui64 txId = info->TxId ;
729
760
730
761
// Move tx to aborting, but don't persist yet, we need a separate transaction for that
731
- info-> State = EVolatileTxState::Aborting;
762
+ ChangeState ( info, EVolatileTxState::Aborting) ;
732
763
733
764
// Aborted transactions don't have dependencies
734
765
for (ui64 dependencyTxId : info->Dependencies ) {
@@ -842,7 +873,7 @@ namespace NKikimr::NDataShard {
842
873
// Move tx to committed.
843
874
// Note that we don't need to wait until the new state is committed (it's repeatable),
844
875
// but we need to wait until the initial effects are committed and persisted.
845
- info-> State = EVolatileTxState::Committed;
876
+ ChangeState ( info, EVolatileTxState::Committed) ;
846
877
db.Table <Schema::TxVolatileDetails>().Key (txId).Update (
847
878
NIceDb::TUpdate<Schema::TxVolatileDetails::State>(info->State ));
848
879
@@ -1030,4 +1061,43 @@ namespace NKikimr::NDataShard {
1030
1061
return false ;
1031
1062
}
1032
1063
1064
+ void TVolatileTxManager::UpdateCountersAdd (TVolatileTxInfo* info) {
1065
+ Self->IncCounter (COUNTER_VOLATILE_TX_INFLIGHT);
1066
+ switch (info->State ) {
1067
+ case EVolatileTxState::Waiting:
1068
+ Self->IncCounter (COUNTER_VOLATILE_TX_WAITING_COUNT);
1069
+ break ;
1070
+ case EVolatileTxState::Committed:
1071
+ Self->IncCounter (COUNTER_VOLATILE_TX_COMMITTED_COUNT);
1072
+ break ;
1073
+ case EVolatileTxState::Aborting:
1074
+ Self->IncCounter (COUNTER_VOLATILE_TX_ABORTING_COUNT);
1075
+ break ;
1076
+ }
1077
+ }
1078
+
1079
+ void TVolatileTxManager::UpdateCountersRemove (TVolatileTxInfo* info) {
1080
+ Self->DecCounter (COUNTER_VOLATILE_TX_INFLIGHT);
1081
+ switch (info->State ) {
1082
+ case EVolatileTxState::Waiting:
1083
+ Self->DecCounter (COUNTER_VOLATILE_TX_WAITING_COUNT);
1084
+ break ;
1085
+ case EVolatileTxState::Committed:
1086
+ Self->DecCounter (COUNTER_VOLATILE_TX_COMMITTED_COUNT);
1087
+ break ;
1088
+ case EVolatileTxState::Aborting:
1089
+ Self->DecCounter (COUNTER_VOLATILE_TX_ABORTING_COUNT);
1090
+ break ;
1091
+ }
1092
+ }
1093
+
1094
+ void TVolatileTxManager::ChangeState (TVolatileTxInfo* info, EVolatileTxState state) {
1095
+ if (info->State == EVolatileTxState::Waiting) {
1096
+ Self->IncCounter (COUNTER_VOLATILE_TX_WAIT_LATENCY_MS, info->LatencyTimer .Passed () * 1000 );
1097
+ }
1098
+ UpdateCountersRemove (info);
1099
+ info->State = state;
1100
+ UpdateCountersAdd (info);
1101
+ }
1102
+
1033
1103
} // namespace NKikimr::NDataShard
0 commit comments