Skip to content

Commit 1f04df4

Browse files
Deduplicated messages sensors: LOGBROKER-8733 (ydb-platform#3147)
1 parent 3e4dc18 commit 1f04df4

File tree

8 files changed

+56
-22
lines changed

8 files changed

+56
-22
lines changed

ydb/core/persqueue/partition.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
742742
NKikimr::NPQ::TMultiCounter MsgsWrittenTotal;
743743
NKikimr::NPQ::TMultiCounter MsgsWrittenGrpc;;
744744

745+
NKikimr::NPQ::TMultiCounter MsgsDiscarded;
746+
NKikimr::NPQ::TMultiCounter BytesDiscarded;
747+
745748
// Writing blob with topic quota variables
746749
ui64 TopicQuotaRequestCookie = 0;
747750

ydb/core/persqueue/partition_init.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,10 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
785785
BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"UncompressedBytesWritten" + suffix}, true);
786786
BytesWrittenComp = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"CompactedBytesWritten" + suffix}, true);
787787
MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"MessagesWritten" + suffix}, true);
788+
if (IsLocalDC) {
789+
MsgsDiscarded = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"DiscardedMessages"}, true);
790+
BytesDiscarded = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"DiscardedBytes"}, true);
791+
}
788792

789793
TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
790794
ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
@@ -872,8 +876,14 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
872876
{"topic.write.messages"}, true, "name");
873877

874878

875-
BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
879+
MsgsDiscarded = NKikimr::NPQ::TMultiCounter(
880+
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
881+
{"topic.write.discarded_messages"}, true, "name");
882+
BytesDiscarded = NKikimr::NPQ::TMultiCounter(
883+
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
884+
{"topic.write.discarded_bytes"} , true, "name");
876885

886+
BytesWrittenUncompressed = TMultiCounter(
877887
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
878888
{"topic.write.uncompressed_bytes"}, true, "name");
879889

ydb/core/persqueue/partition_write.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,9 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
902902
);
903903

904904
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
905+
MsgsDiscarded.Inc();
905906
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
907+
BytesDiscarded.Inc(p.Msg.Data.size());
906908
} else {
907909
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
908910
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size());

ydb/core/persqueue/ut/counters_ut.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ Y_UNIT_TEST(Partition) {
8383
CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true);
8484
CmdWrite(0, "sourceid1", TestData(), tc, false);
8585
CmdWrite(0, "sourceid2", TestData(), tc, false);
86+
CmdWrite(0, "sourceid1", TestData(), tc, false);
87+
CmdWrite(0, "sourceid2", TestData(), tc, false);
8688
PQGetPartInfo(0, 30, tc);
8789

8890

@@ -93,15 +95,15 @@ Y_UNIT_TEST(Partition) {
9395
dbGroup->OutputHtml(countersStr);
9496
TString referenceCounters = NResource::Find(TStringBuf("counters_pqproxy.html"));
9597

96-
UNIT_ASSERT_EQUAL(countersStr.Str() + "\n", referenceCounters);
98+
UNIT_ASSERT_VALUES_EQUAL(countersStr.Str() + "\n", referenceCounters);
9799
}
98100

99101
{
100102
auto counters = tc.Runtime->GetAppData(0).Counters;
101103
auto dbGroup = GetServiceCounters(counters, "datastreams");
102104
TStringStream countersStr;
103105
dbGroup->OutputHtml(countersStr);
104-
UNIT_ASSERT_EQUAL(countersStr.Str(), "<pre></pre>");
106+
UNIT_ASSERT_VALUES_EQUAL(countersStr.Str(), "<pre></pre>");
105107
}
106108
}
107109

@@ -116,6 +118,7 @@ Y_UNIT_TEST(PartitionFirstClass) {
116118
CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true);
117119
CmdWrite(0, "sourceid1", TestData(), tc, false);
118120
CmdWrite(0, "sourceid2", TestData(), tc, false);
121+
CmdWrite(0, "sourceid0", TestData(), tc, false);
119122
PQGetPartInfo(0, 30, tc);
120123

121124
{

ydb/core/persqueue/ut/resources/counters_datastreams.html

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
name=api.grpc.topic.stream_write.bytes: 540
44
name=api.grpc.topic.stream_write.messages: 30
55
name=topic.write.bytes: 540
6+
name=topic.write.discarded_bytes: 90
7+
name=topic.write.discarded_messages: 10
68
name=topic.write.messages: 30
79
name=topic.write.uncompressed_bytes: 270
810

@@ -55,7 +57,7 @@
5557
bin=99999999: 0
5658

5759
name=topic.write.partition_throttled_milliseconds:
58-
bin=0: 30
60+
bin=0: 40
5961
bin=1: 0
6062
bin=10: 0
6163
bin=100: 0

ydb/core/persqueue/ut/resources/counters_pqproxy.html

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
OriginDC=Dc1:
5151

5252
sensor=PartitionWriteQuotaWaitOriginal:
53-
Interval=0ms: 30
53+
Interval=0ms: 50
5454
Interval=10000ms: 0
5555
Interval=1000ms: 0
5656
Interval=100ms: 0
@@ -67,7 +67,7 @@
6767
OriginDC=cluster:
6868

6969
sensor=PartitionWriteQuotaWaitOriginal:
70-
Interval=0ms: 30
70+
Interval=0ms: 50
7171
Interval=10000ms: 0
7272
Interval=1000ms: 0
7373
Interval=100ms: 0
@@ -88,7 +88,7 @@
8888
OriginDC=cluster:
8989

9090
sensor=PartitionWriteQuotaWaitOriginal:
91-
Interval=0ms: 30
91+
Interval=0ms: 50
9292
Interval=10000ms: 0
9393
Interval=1000ms: 0
9494
Interval=100ms: 0
@@ -111,7 +111,7 @@
111111
OriginDC=cluster:
112112

113113
sensor=PartitionWriteQuotaWaitOriginal:
114-
Interval=0ms: 30
114+
Interval=0ms: 50
115115
Interval=10000ms: 0
116116
Interval=1000ms: 0
117117
Interval=100ms: 0
@@ -136,7 +136,7 @@
136136
OriginDC=cluster:
137137

138138
sensor=PartitionWriteQuotaWaitOriginal:
139-
Interval=0ms: 30
139+
Interval=0ms: 50
140140
Interval=10000ms: 0
141141
Interval=1000ms: 0
142142
Interval=100ms: 0
@@ -470,17 +470,21 @@
470470
TopicPath=asdfgs/topic:
471471

472472
ClientDC=Unknown:
473-
sensor=BytesWrittenFromDC: 1560
473+
sensor=BytesWrittenFromDC: 2600
474474

475475
OriginDC=Dc1:
476476
sensor=BytesWrittenOriginal: 540
477477
sensor=CompactedBytesWrittenOriginal: 747
478+
sensor=DiscardedBytes: 180
479+
sensor=DiscardedMessages: 20
478480
sensor=MessagesWrittenOriginal: 30
479481
sensor=UncompressedBytesWrittenOriginal: 270
480482

481483
OriginDC=cluster:
482484
sensor=BytesWrittenOriginal: 540
483485
sensor=CompactedBytesWrittenOriginal: 747
486+
sensor=DiscardedBytes: 180
487+
sensor=DiscardedMessages: 20
484488
sensor=MessagesWrittenOriginal: 30
485489
sensor=UncompressedBytesWrittenOriginal: 270
486490

@@ -489,11 +493,13 @@
489493
TopicPath=total:
490494

491495
ClientDC=Unknown:
492-
sensor=BytesWrittenFromDC: 1560
496+
sensor=BytesWrittenFromDC: 2600
493497

494498
OriginDC=cluster:
495499
sensor=BytesWrittenOriginal: 540
496500
sensor=CompactedBytesWrittenOriginal: 747
501+
sensor=DiscardedBytes: 180
502+
sensor=DiscardedMessages: 20
497503
sensor=MessagesWrittenOriginal: 30
498504
sensor=UncompressedBytesWrittenOriginal: 270
499505

@@ -504,11 +510,13 @@
504510
TopicPath=total:
505511

506512
ClientDC=Unknown:
507-
sensor=BytesWrittenFromDC: 1560
513+
sensor=BytesWrittenFromDC: 2600
508514

509515
OriginDC=cluster:
510516
sensor=BytesWrittenOriginal: 540
511517
sensor=CompactedBytesWrittenOriginal: 747
518+
sensor=DiscardedBytes: 180
519+
sensor=DiscardedMessages: 20
512520
sensor=MessagesWrittenOriginal: 30
513521
sensor=UncompressedBytesWrittenOriginal: 270
514522

@@ -521,11 +529,13 @@
521529
TopicPath=total:
522530

523531
ClientDC=Unknown:
524-
sensor=BytesWrittenFromDC: 1560
532+
sensor=BytesWrittenFromDC: 2600
525533

526534
OriginDC=cluster:
527535
sensor=BytesWrittenOriginal: 540
528536
sensor=CompactedBytesWrittenOriginal: 747
537+
sensor=DiscardedBytes: 180
538+
sensor=DiscardedMessages: 20
529539
sensor=MessagesWrittenOriginal: 30
530540
sensor=UncompressedBytesWrittenOriginal: 270
531541

ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ namespace NKikimr::NPersQueueTests {
223223
Sleep(TDuration::MilliSeconds(10));
224224
}
225225

226-
// Ts and firstOffset and expectingQuantities will be set in first iteration of reading by received messages.
226+
// Ts and firstOffset and expectingQuantities will be set in first iteration of reading by received messages.
227227
// Each will contains shifts from the message: before, equals and after.
228228
// It allow check reading from different shift. First iteration read from zero.
229229
TVector<TInstant> ts { TInstant::Zero() };
@@ -254,10 +254,10 @@ namespace NKikimr::NPersQueueTests {
254254
ui32 lastOffset = 0;
255255

256256
settings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable {
257-
Cerr << ">>>>> Iteration: " << i << " TDataReceivedEvent: " << event.DebugString(false)
257+
Cerr << ">>>>> Iteration: " << i << " TDataReceivedEvent: " << event.DebugString(false)
258258
<< " size=" << event.GetMessages().size() << Endl << Flush;
259259
for (const auto& msg : event.GetMessages()) {
260-
Cerr << ">>>>> Iteration: " << i << " Got message: " << msg.GetData().substr(0, 16)
260+
Cerr << ">>>>> Iteration: " << i << " Got message: " << msg.GetData().substr(0, 16)
261261
<< " :: " << msg.DebugString(false) << Endl << Flush;
262262

263263
auto count = ++map[msg.GetData()];
@@ -281,12 +281,12 @@ namespace NKikimr::NPersQueueTests {
281281
} else {
282282
if (map.size() == 1) {
283283
auto expectedOffset = firstOffset[i];
284-
UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset, "Iteration: " << i
285-
<< " Expected first message offset " << expectedOffset
284+
UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset, "Iteration: " << i
285+
<< " Expected first message offset " << expectedOffset
286286
<< " but got " << msg.GetOffset());
287287
} else {
288-
UNIT_ASSERT_C(lastOffset < msg.GetOffset(), "Iteration: " << i
289-
<< " unexpected offset order. Last offset " << lastOffset
288+
UNIT_ASSERT_C(lastOffset < msg.GetOffset(), "Iteration: " << i
289+
<< " unexpected offset order. Last offset " << lastOffset
290290
<< " Message offset " << msg.GetOffset());
291291
}
292292

@@ -310,8 +310,8 @@ namespace NKikimr::NPersQueueTests {
310310

311311
if (i == 0) {
312312
for (ui32 j = 1; j < ts.size(); ++j) {
313-
Cerr << ">>>>> Planed iteration: " << j
314-
<< ". Start reading from time: " << ts[j]
313+
Cerr << ">>>>> Planed iteration: " << j
314+
<< ". Start reading from time: " << ts[j]
315315
<< ". Expected first message offset: " << firstOffset[j]
316316
<< ". Expected message quantity: " << expectingQuantities[j] << Endl;
317317
}
@@ -462,6 +462,8 @@ namespace NKikimr::NPersQueueTests {
462462
"topic.read.lag_milliseconds",
463463
"topic.write.bytes",
464464
"topic.write.messages",
465+
"topic.write.discarded_bytes",
466+
"topic.write.discarded_messages",
465467
"api.grpc.topic.stream_write.bytes",
466468
"topic.write.partition_throttled_milliseconds",
467469
"topic.write.message_size_bytes",

ydb/services/persqueue_v1/persqueue_ut.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3676,6 +3676,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
36763676
{
36773677
"BytesWrittenOriginal",
36783678
"CompactedBytesWrittenOriginal",
3679+
"DiscardedBytes",
3680+
"DiscardedMessages",
36793681
"MessagesWrittenOriginal",
36803682
"UncompressedBytesWrittenOriginal"
36813683
},

0 commit comments

Comments
 (0)