Skip to content

Commit ce9d5e4

Browse files
Deduplicated messages sensors: LOGBROKER-8733
1 parent 2a904bd commit ce9d5e4

File tree

6 files changed

+51
-12
lines changed

6 files changed

+51
-12
lines changed

ydb/core/persqueue/partition.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
756756
NKikimr::NPQ::TMultiCounter MsgsWrittenTotal;
757757
NKikimr::NPQ::TMultiCounter MsgsWrittenGrpc;;
758758

759+
NKikimr::NPQ::TMultiCounter MsgsDiscarded;;
760+
NKikimr::NPQ::TMultiCounter BytesDiscarded;;
761+
759762
// Writing blob with topic quota variables
760763
ui64 TopicQuotaRequestCookie = 0;
761764
ui64 NextTopicWriteQuotaRequestCookie = 1;

ydb/core/persqueue/partition_init.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,10 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
773773
BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"UncompressedBytesWritten" + suffix}, true);
774774
BytesWrittenComp = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"CompactedBytesWritten" + suffix}, true);
775775
MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"MessagesWritten" + suffix}, true);
776+
if (IsLocalDC) {
777+
MsgsDiscarded = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"DiscardedMessages"}, true);
778+
BytesDiscarded = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"DiscardedBytes"}, true);
779+
}
776780

777781
TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
778782
ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
@@ -859,6 +863,13 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
859863
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
860864
{"topic.write.messages"}, true, "name");
861865

866+
MsgsDiscarded = NKikimr::NPQ::TMultiCounter(
867+
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
868+
{"topic.write.discarded_messages"}, true, "name");
869+
BytesDiscarded = NKikimr::NPQ::TMultiCounter(
870+
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
871+
{"topic.write.discarded_bytes"} , true, "name");
872+
862873

863874
BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
864875

ydb/core/persqueue/partition_write.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,9 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
929929
);
930930

931931
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
932+
MsgsDiscarded.Inc();
932933
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
934+
BytesDiscarded.Inc(p.Msg.Data.size());
933935
} else {
934936
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
935937
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size());

ydb/core/persqueue/ut/counters_ut.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ Y_UNIT_TEST(Partition) {
8383
CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true);
8484
CmdWrite(0, "sourceid1", TestData(), tc, false);
8585
CmdWrite(0, "sourceid2", TestData(), tc, false);
86+
CmdWrite(0, "sourceid1", TestData(), tc, false);
87+
CmdWrite(0, "sourceid2", TestData(), tc, false);
8688
PQGetPartInfo(0, 30, tc);
8789

8890

@@ -93,15 +95,15 @@ Y_UNIT_TEST(Partition) {
9395
dbGroup->OutputHtml(countersStr);
9496
TString referenceCounters = NResource::Find(TStringBuf("counters_pqproxy.html"));
9597

96-
UNIT_ASSERT_EQUAL(countersStr.Str() + "\n", referenceCounters);
98+
UNIT_ASSERT_VALUES_EQUAL(countersStr.Str() + "\n", referenceCounters);
9799
}
98100

99101
{
100102
auto counters = tc.Runtime->GetAppData(0).Counters;
101103
auto dbGroup = GetServiceCounters(counters, "datastreams");
102104
TStringStream countersStr;
103105
dbGroup->OutputHtml(countersStr);
104-
UNIT_ASSERT_EQUAL(countersStr.Str(), "<pre></pre>");
106+
UNIT_ASSERT_VALUES_EQUAL(countersStr.Str(), "<pre></pre>");
105107
}
106108
}
107109

@@ -173,6 +175,7 @@ Y_UNIT_TEST(PartitionFirstClass) {
173175
CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true);
174176
CmdWrite(0, "sourceid1", TestData(), tc, false);
175177
CmdWrite(0, "sourceid2", TestData(), tc, false);
178+
CmdWrite(0, "sourceid0", TestData(), tc, false);
176179
PQGetPartInfo(0, 30, tc);
177180

178181
{
@@ -194,6 +197,14 @@ Y_UNIT_TEST(PartitionFirstClass) {
194197
const TString referenceCounters = NResource::Find(TStringBuf("counters_datastreams.html"));
195198
UNIT_ASSERT_VALUES_EQUAL(countersStr.Str() + "\n", referenceCounters);
196199
}
200+
// CmdWrite(0, "sourceid2", TestData(), tc, false);
201+
// {
202+
// auto counters = tc.Runtime->GetAppData(0).Counters;
203+
// auto dbGroup = GetServiceCounters(counters, "pqproxy");
204+
// TStringStream countersStr;
205+
// dbGroup->OutputHtml(countersStr);
206+
// Cerr << "Counters: ==========================\n" << countersStr.Str() << "==================================" << Endl;;
207+
// }
197208
}
198209

199210
} // Y_UNIT_TEST_SUITE(PQCountersSimple)

ydb/core/persqueue/ut/resources/counters_datastreams.html

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
name=api.grpc.topic.stream_write.bytes: 540
44
name=api.grpc.topic.stream_write.messages: 30
55
name=topic.write.bytes: 540
6+
name=topic.write.discarded_bytes: 90
7+
name=topic.write.discarded_messages: 10
68
name=topic.write.messages: 30
79
name=topic.write.uncompressed_bytes: 270
810

@@ -55,7 +57,7 @@
5557
bin=99999999: 0
5658

5759
name=topic.write.partition_throttled_milliseconds:
58-
bin=0: 30
60+
bin=0: 40
5961
bin=1: 0
6062
bin=10: 0
6163
bin=100: 0

ydb/core/persqueue/ut/resources/counters_pqproxy.html

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
OriginDC=Dc1:
5151

5252
sensor=PartitionWriteQuotaWaitOriginal:
53-
Interval=0ms: 30
53+
Interval=0ms: 50
5454
Interval=10000ms: 0
5555
Interval=1000ms: 0
5656
Interval=100ms: 0
@@ -67,7 +67,7 @@
6767
OriginDC=cluster:
6868

6969
sensor=PartitionWriteQuotaWaitOriginal:
70-
Interval=0ms: 30
70+
Interval=0ms: 50
7171
Interval=10000ms: 0
7272
Interval=1000ms: 0
7373
Interval=100ms: 0
@@ -88,7 +88,7 @@
8888
OriginDC=cluster:
8989

9090
sensor=PartitionWriteQuotaWaitOriginal:
91-
Interval=0ms: 30
91+
Interval=0ms: 50
9292
Interval=10000ms: 0
9393
Interval=1000ms: 0
9494
Interval=100ms: 0
@@ -111,7 +111,7 @@
111111
OriginDC=cluster:
112112

113113
sensor=PartitionWriteQuotaWaitOriginal:
114-
Interval=0ms: 30
114+
Interval=0ms: 50
115115
Interval=10000ms: 0
116116
Interval=1000ms: 0
117117
Interval=100ms: 0
@@ -136,7 +136,7 @@
136136
OriginDC=cluster:
137137

138138
sensor=PartitionWriteQuotaWaitOriginal:
139-
Interval=0ms: 30
139+
Interval=0ms: 50
140140
Interval=10000ms: 0
141141
Interval=1000ms: 0
142142
Interval=100ms: 0
@@ -470,17 +470,21 @@
470470
TopicPath=asdfgs/topic:
471471

472472
ClientDC=Unknown:
473-
sensor=BytesWrittenFromDC: 1560
473+
sensor=BytesWrittenFromDC: 2600
474474

475475
OriginDC=Dc1:
476476
sensor=BytesWrittenOriginal: 540
477477
sensor=CompactedBytesWrittenOriginal: 747
478+
sensor=DiscardedBytes: 180
479+
sensor=DiscardedMessages: 20
478480
sensor=MessagesWrittenOriginal: 30
479481
sensor=UncompressedBytesWrittenOriginal: 270
480482

481483
OriginDC=cluster:
482484
sensor=BytesWrittenOriginal: 540
483485
sensor=CompactedBytesWrittenOriginal: 747
486+
sensor=DiscardedBytes: 180
487+
sensor=DiscardedMessages: 20
484488
sensor=MessagesWrittenOriginal: 30
485489
sensor=UncompressedBytesWrittenOriginal: 270
486490

@@ -489,11 +493,13 @@
489493
TopicPath=total:
490494

491495
ClientDC=Unknown:
492-
sensor=BytesWrittenFromDC: 1560
496+
sensor=BytesWrittenFromDC: 2600
493497

494498
OriginDC=cluster:
495499
sensor=BytesWrittenOriginal: 540
496500
sensor=CompactedBytesWrittenOriginal: 747
501+
sensor=DiscardedBytes: 180
502+
sensor=DiscardedMessages: 20
497503
sensor=MessagesWrittenOriginal: 30
498504
sensor=UncompressedBytesWrittenOriginal: 270
499505

@@ -504,11 +510,13 @@
504510
TopicPath=total:
505511

506512
ClientDC=Unknown:
507-
sensor=BytesWrittenFromDC: 1560
513+
sensor=BytesWrittenFromDC: 2600
508514

509515
OriginDC=cluster:
510516
sensor=BytesWrittenOriginal: 540
511517
sensor=CompactedBytesWrittenOriginal: 747
518+
sensor=DiscardedBytes: 180
519+
sensor=DiscardedMessages: 20
512520
sensor=MessagesWrittenOriginal: 30
513521
sensor=UncompressedBytesWrittenOriginal: 270
514522

@@ -521,11 +529,13 @@
521529
TopicPath=total:
522530

523531
ClientDC=Unknown:
524-
sensor=BytesWrittenFromDC: 1560
532+
sensor=BytesWrittenFromDC: 2600
525533

526534
OriginDC=cluster:
527535
sensor=BytesWrittenOriginal: 540
528536
sensor=CompactedBytesWrittenOriginal: 747
537+
sensor=DiscardedBytes: 180
538+
sensor=DiscardedMessages: 20
529539
sensor=MessagesWrittenOriginal: 30
530540
sensor=UncompressedBytesWrittenOriginal: 270
531541

0 commit comments

Comments
 (0)