Skip to content

Commit 6bb22a0

Browse files
Deduplicated messages sensors: LOGBROKER-8733
1 parent 78a0cbd commit 6bb22a0

File tree

6 files changed

+54
-12
lines changed

6 files changed

+54
-12
lines changed

ydb/core/persqueue/partition.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
762762
TPartitionCounterWrapper MsgsWrittenTotal;
763763
TPartitionCounterWrapper MsgsWrittenGrpc;
764764

765+
NKikimr::NPQ::TMultiCounter MsgsDiscarded;;
766+
NKikimr::NPQ::TMultiCounter BytesDiscarded;;
767+
765768
// Writing blob with topic quota variables
766769
ui64 TopicQuotaRequestCookie = 0;
767770
ui64 NextTopicWriteQuotaRequestCookie = 1;

ydb/core/persqueue/partition_init.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,10 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
802802
MsgsWrittenTotal.Setup(
803803
IsSupportive(), true,
804804
NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"MessagesWritten" + txSuffix}, true));
805+
if (IsLocalDC) {
806+
MsgsDiscarded = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"DiscardedMessages"}, true);
807+
BytesDiscarded = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"DiscardedBytes"}, true);
808+
}
805809

806810
TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
807811
ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
@@ -904,6 +908,16 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
904908
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
905909
{"topic.write." + messagesSuffix}, true, "name"));
906910

911+
MsgsDiscarded = NKikimr::NPQ::TMultiCounter(
912+
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
913+
{"topic.write.discarded_messages"}, true, "name");
914+
BytesDiscarded = NKikimr::NPQ::TMultiCounter(
915+
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
916+
{"topic.write.discarded_bytes"} , true, "name");
917+
918+
919+
BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
920+
907921
BytesWrittenUncompressed.Setup(
908922
IsSupportive(), false,
909923
NKikimr::NPQ::TMultiCounter(

ydb/core/persqueue/partition_write.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,7 +939,9 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
939939
);
940940

941941
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
942+
MsgsDiscarded.Inc();
942943
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
944+
BytesDiscarded.Inc(p.Msg.Data.size());
943945
} else {
944946
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
945947
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size());

ydb/core/persqueue/ut/counters_ut.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ Y_UNIT_TEST(Partition) {
8484
CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true);
8585
CmdWrite(0, "sourceid1", TestData(), tc, false);
8686
CmdWrite(0, "sourceid2", TestData(), tc, false);
87+
CmdWrite(0, "sourceid1", TestData(), tc, false);
88+
CmdWrite(0, "sourceid2", TestData(), tc, false);
8789
PQGetPartInfo(0, 30, tc);
8890

8991

@@ -94,15 +96,15 @@ Y_UNIT_TEST(Partition) {
9496
dbGroup->OutputHtml(countersStr);
9597
TString referenceCounters = NResource::Find(TStringBuf("counters_pqproxy.html"));
9698

97-
UNIT_ASSERT_EQUAL(countersStr.Str() + "\n", referenceCounters);
99+
UNIT_ASSERT_VALUES_EQUAL(countersStr.Str() + "\n", referenceCounters);
98100
}
99101

100102
{
101103
auto counters = tc.Runtime->GetAppData(0).Counters;
102104
auto dbGroup = GetServiceCounters(counters, "datastreams");
103105
TStringStream countersStr;
104106
dbGroup->OutputHtml(countersStr);
105-
UNIT_ASSERT_EQUAL(countersStr.Str(), "<pre></pre>");
107+
UNIT_ASSERT_VALUES_EQUAL(countersStr.Str(), "<pre></pre>");
106108
}
107109
}
108110

@@ -173,6 +175,7 @@ Y_UNIT_TEST(PartitionFirstClass) {
173175
CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true);
174176
CmdWrite(0, "sourceid1", TestData(), tc, false);
175177
CmdWrite(0, "sourceid2", TestData(), tc, false);
178+
CmdWrite(0, "sourceid0", TestData(), tc, false);
176179
PQGetPartInfo(0, 30, tc);
177180

178181
{
@@ -194,6 +197,14 @@ Y_UNIT_TEST(PartitionFirstClass) {
194197
const TString referenceCounters = NResource::Find(TStringBuf("counters_datastreams.html"));
195198
UNIT_ASSERT_VALUES_EQUAL(countersStr.Str() + "\n", referenceCounters);
196199
}
200+
// CmdWrite(0, "sourceid2", TestData(), tc, false);
201+
// {
202+
// auto counters = tc.Runtime->GetAppData(0).Counters;
203+
// auto dbGroup = GetServiceCounters(counters, "pqproxy");
204+
// TStringStream countersStr;
205+
// dbGroup->OutputHtml(countersStr);
206+
// Cerr << "Counters: ==========================\n" << countersStr.Str() << "==================================" << Endl;;
207+
// }
197208
}
198209

199210
Y_UNIT_TEST(SupportivePartitionCountersPersist) {

ydb/core/persqueue/ut/resources/counters_datastreams.html

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
name=api.grpc.topic.stream_write.bytes: 540
44
name=api.grpc.topic.stream_write.messages: 30
55
name=topic.write.bytes: 540
6+
name=topic.write.discarded_bytes: 90
7+
name=topic.write.discarded_messages: 10
68
name=topic.write.messages: 30
79
name=topic.write.uncompressed_bytes: 270
810

@@ -55,7 +57,7 @@
5557
bin=99999999: 0
5658

5759
name=topic.write.partition_throttled_milliseconds:
58-
bin=0: 30
60+
bin=0: 40
5961
bin=1: 0
6062
bin=10: 0
6163
bin=100: 0

ydb/core/persqueue/ut/resources/counters_pqproxy.html

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
OriginDC=Dc1:
5151

5252
sensor=PartitionWriteQuotaWaitOriginal:
53-
Interval=0ms: 30
53+
Interval=0ms: 50
5454
Interval=10000ms: 0
5555
Interval=1000ms: 0
5656
Interval=100ms: 0
@@ -67,7 +67,7 @@
6767
OriginDC=cluster:
6868

6969
sensor=PartitionWriteQuotaWaitOriginal:
70-
Interval=0ms: 30
70+
Interval=0ms: 50
7171
Interval=10000ms: 0
7272
Interval=1000ms: 0
7373
Interval=100ms: 0
@@ -88,7 +88,7 @@
8888
OriginDC=cluster:
8989

9090
sensor=PartitionWriteQuotaWaitOriginal:
91-
Interval=0ms: 30
91+
Interval=0ms: 50
9292
Interval=10000ms: 0
9393
Interval=1000ms: 0
9494
Interval=100ms: 0
@@ -111,7 +111,7 @@
111111
OriginDC=cluster:
112112

113113
sensor=PartitionWriteQuotaWaitOriginal:
114-
Interval=0ms: 30
114+
Interval=0ms: 50
115115
Interval=10000ms: 0
116116
Interval=1000ms: 0
117117
Interval=100ms: 0
@@ -136,7 +136,7 @@
136136
OriginDC=cluster:
137137

138138
sensor=PartitionWriteQuotaWaitOriginal:
139-
Interval=0ms: 30
139+
Interval=0ms: 50
140140
Interval=10000ms: 0
141141
Interval=1000ms: 0
142142
Interval=100ms: 0
@@ -470,17 +470,21 @@
470470
TopicPath=asdfgs/topic:
471471

472472
ClientDC=Unknown:
473-
sensor=BytesWrittenFromDC: 1560
473+
sensor=BytesWrittenFromDC: 2600
474474

475475
OriginDC=Dc1:
476476
sensor=BytesWrittenOriginal: 540
477477
sensor=CompactedBytesWrittenOriginal: 747
478+
sensor=DiscardedBytes: 180
479+
sensor=DiscardedMessages: 20
478480
sensor=MessagesWrittenOriginal: 30
479481
sensor=UncompressedBytesWrittenOriginal: 270
480482

481483
OriginDC=cluster:
482484
sensor=BytesWrittenOriginal: 540
483485
sensor=CompactedBytesWrittenOriginal: 747
486+
sensor=DiscardedBytes: 180
487+
sensor=DiscardedMessages: 20
484488
sensor=MessagesWrittenOriginal: 30
485489
sensor=UncompressedBytesWrittenOriginal: 270
486490

@@ -489,11 +493,13 @@
489493
TopicPath=total:
490494

491495
ClientDC=Unknown:
492-
sensor=BytesWrittenFromDC: 1560
496+
sensor=BytesWrittenFromDC: 2600
493497

494498
OriginDC=cluster:
495499
sensor=BytesWrittenOriginal: 540
496500
sensor=CompactedBytesWrittenOriginal: 747
501+
sensor=DiscardedBytes: 180
502+
sensor=DiscardedMessages: 20
497503
sensor=MessagesWrittenOriginal: 30
498504
sensor=UncompressedBytesWrittenOriginal: 270
499505

@@ -504,11 +510,13 @@
504510
TopicPath=total:
505511

506512
ClientDC=Unknown:
507-
sensor=BytesWrittenFromDC: 1560
513+
sensor=BytesWrittenFromDC: 2600
508514

509515
OriginDC=cluster:
510516
sensor=BytesWrittenOriginal: 540
511517
sensor=CompactedBytesWrittenOriginal: 747
518+
sensor=DiscardedBytes: 180
519+
sensor=DiscardedMessages: 20
512520
sensor=MessagesWrittenOriginal: 30
513521
sensor=UncompressedBytesWrittenOriginal: 270
514522

@@ -521,11 +529,13 @@
521529
TopicPath=total:
522530

523531
ClientDC=Unknown:
524-
sensor=BytesWrittenFromDC: 1560
532+
sensor=BytesWrittenFromDC: 2600
525533

526534
OriginDC=cluster:
527535
sensor=BytesWrittenOriginal: 540
528536
sensor=CompactedBytesWrittenOriginal: 747
537+
sensor=DiscardedBytes: 180
538+
sensor=DiscardedMessages: 20
529539
sensor=MessagesWrittenOriginal: 30
530540
sensor=UncompressedBytesWrittenOriginal: 270
531541

0 commit comments

Comments
 (0)