Skip to content

Commit be2fb17

Browse files
fix broken counters in YMQ (#4241)
1 parent eae74a3 commit be2fb17

File tree

3 files changed

+36
-5
lines changed

3 files changed

+36
-5
lines changed

ydb/core/ymq/actor/action.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ class TActionActor
252252
auto* detailedCounters = UserCounters_ ? UserCounters_->GetDetailedCounters() : nullptr;
253253
const size_t errors = ErrorsCount(Response_, detailedCounters ? &detailedCounters->APIStatuses : nullptr);
254254

255+
FinishTs_ = TActivationContext::Now();
256+
255257
const TDuration duration = GetRequestDuration();
256258
const TDuration workingDuration = GetRequestWorkingDuration();
257259
if (QueueLeader_ && (IsActionForQueue(Action_) || IsActionForQueueYMQ(Action_))) {
@@ -287,7 +289,6 @@ class TActionActor
287289
}
288290
}
289291

290-
FinishTs_ = TActivationContext::Now();
291292
if (IsRequestSlow()) {
292293
PrintSlowRequestWarning();
293294
}

ydb/core/ymq/actor/queue_leader.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,12 +1189,13 @@ void TQueueLeader::WaitAddMessagesToInflyOrTryAnotherShard(TReceiveMessageBatchR
11891189
void TQueueLeader::Reply(TReceiveMessageBatchRequestProcessing& reqInfo) {
11901190
const ui64 shard = reqInfo.GetCurrentShard();
11911191
if (!reqInfo.Answer->Failed && !reqInfo.Answer->OverLimit) {
1192-
int receiveCount = 0;
11931192
int messageCount = 0;
11941193
ui64 bytesRead = 0;
11951194

11961195
for (auto& message : reqInfo.Answer->Messages) {
1197-
receiveCount += message.ReceiveCount;
1196+
COLLECT_HISTOGRAM_COUNTER(Counters_, MessageReceiveAttempts, message.ReceiveCount);
1197+
COLLECT_HISTOGRAM_COUNTER(Counters_, receive_attempts_count_rate, message.ReceiveCount);
1198+
11981199
messageCount++;
11991200
bytesRead += message.Data.size();
12001201

@@ -1204,8 +1205,6 @@ void TQueueLeader::Reply(TReceiveMessageBatchRequestProcessing& reqInfo) {
12041205
}
12051206

12061207
if (messageCount > 0) {
1207-
COLLECT_HISTOGRAM_COUNTER(Counters_, MessageReceiveAttempts, receiveCount);
1208-
COLLECT_HISTOGRAM_COUNTER(Counters_, receive_attempts_count_rate, receiveCount);
12091208
ADD_COUNTER_COUPLE(Counters_, ReceiveMessage_Count, received_count_per_second, messageCount);
12101209
ADD_COUNTER_COUPLE(Counters_, ReceiveMessage_BytesRead, received_bytes_per_second, bytesRead);
12111210
}

ydb/tests/functional/sqs/common/test_queue_counters.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# -*- coding: utf-8 -*-
33
from ydb.tests.library.sqs.test_base import KikimrSqsTestBase
44

5+
from ydb.tests.library.sqs.requests_client import SqsSendMessageParams
6+
57

68
class TestSqsGettingCounters(KikimrSqsTestBase):
79

@@ -153,3 +155,32 @@ def test_purge_queue_counters(self):
153155
'sensor': 'MessagesPurged',
154156
})
155157
assert purged_derivative > 0
158+
159+
def test_action_duration_being_not_immediate(self):
160+
queue_url = self._create_queue_and_assert(self.queue_name, False, True)
161+
162+
for i in range(100):
163+
message_payload = "foobar" + str(i)
164+
self._sqs_api.send_message(queue_url, message_payload)
165+
self._read_while_not_empty(queue_url, 1)
166+
167+
sqs_counters = self._get_sqs_counters()
168+
169+
durations = self._get_counter(sqs_counters, {
170+
'queue': self.queue_name,
171+
'sensor': 'ReceiveMessage_Duration',
172+
})
173+
buckets_longer_than_5ms = durations['hist']['buckets'][1:]
174+
assert any(map(lambda x: x > 0, buckets_longer_than_5ms))
175+
176+
def test_receive_attempts_are_counted_separately_for_messages_in_one_batch(self):
177+
queue_url = self._create_queue_and_assert(self.queue_name, False, True)
178+
self._sqs_api.send_message_batch(queue_url, [SqsSendMessageParams('data0'), SqsSendMessageParams('data1')])
179+
self._read_while_not_empty(queue_url, 2)
180+
181+
sqs_counters = self._get_sqs_counters()
182+
message_receive_attempts = self._get_counter(sqs_counters, {
183+
'queue': self.queue_name,
184+
'sensor': 'MessageReceiveAttempts',
185+
})
186+
assert message_receive_attempts['hist']['buckets'][0] == 2

0 commit comments

Comments
 (0)