Skip to content

Commit e144c69

Browse files
Merge branch 'stable-24-1' into nodedup-mode-fix
2 parents 4105093 + c9a38da commit e144c69

23 files changed

+162
-108
lines changed

ydb/core/kafka_proxy/actors/actors.h

+5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@
1010

1111
namespace NKafka {
1212

13+
static constexpr int ProxyNodeId = 1;
14+
static constexpr char UnderlayPrefix[] = "u-";
15+
16+
static_assert(sizeof(UnderlayPrefix) == 3);
17+
1318
enum EAuthSteps {
1419
WAIT_HANDSHAKE,
1520
WAIT_AUTH,

ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp

+14-14
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,21 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
3030
TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>();
3131
response->ErrorCode = EKafkaErrors::NONE_ERROR;
3232

33-
AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3});
34-
AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS);
35-
AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA);
36-
AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID);
37-
AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE);
38-
AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE);
39-
AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS);
33+
AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3, .MaxVersion=9});
34+
AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS, {.MaxVersion=2});
35+
AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA, {.MaxVersion=9});
36+
AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID, {.MaxVersion=4});
37+
AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE, {.MaxVersion=1});
38+
AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE, {.MaxVersion=2});
39+
AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS, {.MinVersion=1, .MaxVersion=1});
4040
AddApiKey<TFetchRequestData>(response->ApiKeys, FETCH, {.MaxVersion=3});
41-
AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP);
42-
AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP);
43-
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP);
44-
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT);
45-
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR);
46-
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=1});
47-
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH);
41+
AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP, {.MaxVersion=9});
42+
AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP, {.MaxVersion=3});
43+
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP, {.MaxVersion=5});
44+
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT, {.MaxVersion=4});
45+
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR, {.MaxVersion=0});
46+
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=0});
47+
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH, {.MaxVersion=8});
4848

4949
return response;
5050
}

ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp

+15-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ void TKafkaFindCoordinatorActor::Bootstrap(const NActors::TActorContext& ctx) {
2525

2626
bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty();
2727
if (withProxy) {
28-
SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), -1, ctx);
28+
SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), NKafka::ProxyNodeId, ctx);
2929
return;
3030
}
3131

@@ -49,6 +49,13 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p
4949
response->Coordinators.push_back(coordinator);
5050
}
5151

52+
response->ErrorCode = NONE_ERROR;
53+
response->Host = host;
54+
response->Port = port;
55+
response->NodeId = nodeId;
56+
57+
KAFKA_LOG_D("FIND_COORDINATOR response. Host#: " << host << ", Port#: " << port << ", NodeId# " << nodeId);
58+
5259
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
5360
Die(ctx);
5461
}
@@ -66,15 +73,21 @@ void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, cons
6673

6774
response->Coordinators.push_back(coordinator);
6875
}
69-
76+
77+
response->ErrorCode = error;
78+
7079
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
7180
Die(ctx);
7281
}
7382

7483
void TKafkaFindCoordinatorActor::Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
7584
auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId());
7685
Y_ABORT_UNLESS(!iter.IsEnd());
86+
7787
auto host = (*ev->Get()->Nodes)[iter->second].Host;
88+
if (host.StartsWith(UnderlayPrefix)) {
89+
host = host.substr(sizeof(UnderlayPrefix) - 1);
90+
}
7891
KAFKA_LOG_D("FIND_COORDINATOR incoming TEvGetAllNodesInfoResponse. Host#: " << host);
7992
SendResponseOkAndDie(host, Context->Config.GetListeningPort(), ctx.SelfID.NodeId(), ctx);
8093
}

ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp

-5
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,6 @@
66
namespace NKafka {
77
using namespace NKikimr::NGRpcProxy::V1;
88

9-
static constexpr int ProxyNodeId = 1;
10-
static constexpr char UnderlayPrefix[] = "u-";
11-
12-
static_assert(sizeof(UnderlayPrefix) == 3);
13-
149
NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
1510
const ui64 correlationId,
1611
const TMessagePtr<TMetadataRequestData>& message) {

ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()
204204
partition.CommittedOffset = sourcePartition.CommittedOffset;
205205
partition.PartitionIndex = sourcePartition.PartitionIndex;
206206
partition.ErrorCode = sourcePartition.ErrorCode;
207+
topic.Partitions.push_back(partition);
207208
}
208209
response->Topics.push_back(topic);
209210
}

ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActo
3939
for (auto& topicToPartitions: NewPartitionsToLockOnTime) {
4040
auto& partitions = topicToPartitions.second;
4141
for (auto partitionsIt = partitions.begin(); partitionsIt != partitions.end(); ) {
42-
if (partitionsIt->LockOn >= ctx.Now()) {
42+
if (partitionsIt->LockOn <= ctx.Now()) {
4343
TopicPartitions[topicToPartitions.first].ToLock.emplace(partitionsIt->PartitionId);
4444
NeedRebalance = true;
4545
partitionsIt = partitions.erase(partitionsIt);
@@ -86,8 +86,8 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr
8686

8787
switch (ReadStep) {
8888
case WAIT_JOIN_GROUP: { // join first time
89-
if (joinGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
90-
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType);
89+
if (joinGroupRequest->ProtocolType.has_value() && !joinGroupRequest->ProtocolType.value().empty() && joinGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
90+
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType.value());
9191
CloseReadSession(ctx);
9292
return;
9393
}
@@ -156,8 +156,8 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr
156156
return;
157157
}
158158

159-
if (syncGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
160-
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType);
159+
if (syncGroupRequest->ProtocolType.has_value() && !syncGroupRequest->ProtocolType.value().empty() && syncGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
160+
SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType.value());
161161
CloseReadSession(ctx);
162162
return;
163163
}
@@ -361,9 +361,9 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala
361361
for (auto part: finalPartitionsToRead) {
362362
KAFKA_LOG_D("SYNC_GROUP assigned partition number: " << part);
363363
topicPartition.Partitions.push_back(part);
364-
assignment.AssignedPartitions.push_back(topicPartition);
365364
partitions.ReadingNow.emplace(part);
366365
}
366+
assignment.AssignedPartitions.push_back(topicPartition);
367367
}
368368

369369
return assignment;

ydb/core/kafka_proxy/kafka_connection.cpp

+8-1
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,14 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
319319
KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize
320320
<< ", Size=" << Request->Size);
321321

322-
Request->Method = EApiKeyNames.find(static_cast<EApiKey>(Request->Header.RequestApiKey))->second;
322+
auto apiKeyNameIt = EApiKeyNames.find(static_cast<EApiKey>(Request->Header.RequestApiKey));
323+
if (apiKeyNameIt == EApiKeyNames.end()) {
324+
KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey);
325+
PassAway();
326+
return false;
327+
}
328+
329+
Request->Method = apiKeyNameIt->second;
323330

324331
PendingRequestsQueue.push_back(Request);
325332
PendingRequests[Request->Header.CorrelationId] = Request;

ydb/core/kafka_proxy/kafka_consumer_protocol.cpp

+9-1
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,20 @@ void TConsumerProtocolAssignment::Read(TKafkaReadable& _readable, TKafkaVersion
143143
}
144144

145145
void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
146+
auto useVarintSize = _version > 3;
146147
_version = ASSIGNMENT_VERSION;
148+
147149
if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
148150
ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment";
149151
}
150152

151-
_writable.writeUnsignedVarint(Size(ASSIGNMENT_VERSION) + 1);
153+
if (useVarintSize) {
154+
_writable.writeUnsignedVarint(Size(_version) + 1);
155+
} else {
156+
TKafkaInt32 size = Size(_version);
157+
_writable << size;
158+
}
159+
152160
_writable << _version;
153161
NPrivate::TWriteCollector _collector;
154162
NPrivate::Write<AssignedPartitionsMeta>(_collector, _writable, _version, AssignedPartitions);

ydb/core/kafka_proxy/kafka_messages.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -4517,7 +4517,12 @@ i32 TSyncGroupResponseData::Size(TKafkaVersion _version) const {
45174517
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
45184518
_collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
45194519
}
4520-
return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
4520+
auto useVarintSize = _version > 3;
4521+
if (useVarintSize) {
4522+
return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
4523+
} else {
4524+
return _collector.Size + sizeof(TKafkaInt32);
4525+
}
45214526
}
45224527

45234528

ydb/core/kafka_proxy/ut/ut_protocol.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,12 @@ class TTestClient {
532532
TReadInfo readInfo;
533533
for (;;) {
534534
readInfo = JoinAndSyncGroup(topics, groupId);
535-
if (readInfo.Partitions.size() == expectedPartitionsCount) {
535+
ui32 partitionsCount = 0;
536+
for (auto topicPartitions: readInfo.Partitions) {
537+
partitionsCount += topicPartitions.Partitions.size();
538+
}
539+
540+
if (partitionsCount == expectedPartitionsCount) {
536541
break;
537542
}
538543
WaitRebalance(readInfo.MemberId, readInfo.GenerationId, groupId);

ydb/core/persqueue/partition.cpp

+18-4
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
748748
auto& userInfo = userInfoPair.second;
749749
if (!userInfo.LabeledCounters)
750750
continue;
751-
if (!userInfo.HasReadRule && !userInfo.Important)
751+
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
752752
continue;
753753
auto* cac = ac->AddConsumerAggregatedCounters();
754754
cac->SetConsumer(userInfo.User);
@@ -996,7 +996,7 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c
996996
TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp->ByteSize());
997997
}
998998
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
999-
OnReadRequestFinished(cookie, answer.Size, info.User, ctx);
999+
OnReadRequestFinished(info.Destination, answer.Size, info.User, ctx);
10001000
}
10011001

10021002
void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) {
@@ -1083,7 +1083,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
10831083
auto& userInfo = userInfoPair.second;
10841084
if (!userInfo.LabeledCounters)
10851085
continue;
1086-
if (!userInfo.HasReadRule && !userInfo.Important)
1086+
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
10871087
continue;
10881088
bool haveChanges = false;
10891089
userInfo.EndOffset = EndOffset;
@@ -1187,6 +1187,12 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
11871187
userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Set(quotaUsage);
11881188
}
11891189
}
1190+
1191+
if (userInfoPair.first == CLIENTID_WITHOUT_CONSUMER ) {
1192+
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get());
1193+
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_USAGE].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Get());
1194+
}
1195+
11901196
if (haveChanges) {
11911197
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters));
11921198
}
@@ -1299,6 +1305,14 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
12991305
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
13001306
}
13011307
}
1308+
1309+
if (PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Get()) {
1310+
ui64 quotaUsage = ui64(AvgReadBytes.GetValue()) * 1000000 / PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get() / 60;
1311+
if (quotaUsage != PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Get()) {
1312+
haveChanges = true;
1313+
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
1314+
}
1315+
}
13021316
return haveChanges;
13031317
}
13041318

@@ -1805,7 +1819,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
18051819
} else {
18061820
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
18071821
}
1808-
} else {
1822+
} else if (user != CLIENTID_WITHOUT_CONSUMER) {
18091823
auto ui = UsersInfoStorage->GetIfExists(user);
18101824
if (ui && ui->LabeledCounters) {
18111825
ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());

ydb/core/persqueue/partition_init.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -885,8 +885,8 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
885885
5000, 10'000, 30'000, 99'999'999});
886886
SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
887887
WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
888-
if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
889-
subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
888+
if (IsQuotingEnabled()) {
889+
subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"});
890890
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
891891
new NKikimr::NPQ::TPercentileCounter(
892892
NPersQueue::GetCountersForTopic(counters, IsServerless), {},
@@ -899,7 +899,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
899899
subgroups.pop_back();
900900
}
901901

902-
subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
902+
subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"});
903903
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
904904
new NKikimr::NPQ::TPercentileCounter(
905905
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",

ydb/core/persqueue/partition_read.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -973,7 +973,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
973973
TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp->ByteSize());
974974

975975
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
976-
OnReadRequestFinished(cookie, answer.Size, info.User, ctx);
976+
OnReadRequestFinished(info.Destination, answer.Size, info.User, ctx);
977977
return;
978978
}
979979

ydb/core/persqueue/ut/resources/counters_datastreams.html

+15-15
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,6 @@
2525
bin=60000: 0
2626
bin=999999: 0
2727

28-
name=api.grpc.topic.stream_write.partition_throttled_milliseconds:
29-
bin=0: 30
30-
bin=1: 0
31-
bin=10: 0
32-
bin=100: 0
33-
bin=1000: 0
34-
bin=10000: 0
35-
bin=20: 0
36-
bin=2500: 0
37-
bin=5: 0
38-
bin=50: 0
39-
bin=500: 0
40-
bin=5000: 0
41-
bin=999999: 0
42-
4328
name=topic.write.lag_milliseconds:
4429
bin=100: 0
4530
bin=1000: 10
@@ -68,4 +53,19 @@
6853
bin=5242880: 0
6954
bin=67108864: 0
7055
bin=99999999: 0
56+
57+
name=topic.write.partition_throttled_milliseconds:
58+
bin=0: 30
59+
bin=1: 0
60+
bin=10: 0
61+
bin=100: 0
62+
bin=1000: 0
63+
bin=10000: 0
64+
bin=20: 0
65+
bin=2500: 0
66+
bin=5: 0
67+
bin=50: 0
68+
bin=500: 0
69+
bin=5000: 0
70+
bin=999999: 0
7171
</pre>

ydb/core/persqueue/ut/resources/counters_topics.html

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
name=topic.partition.read.inflight_throttled_microseconds_max: 0
1717
name=topic.partition.read.speed_limit_bytes_per_second: 20000000000
1818
name=topic.partition.read.throttled_microseconds_max: 0
19+
name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0
20+
name=topic.partition.read_without_consumer.throttled_microseconds_max: 0
1921
name=topic.partition.storage_bytes_max: 0
2022
name=topic.partition.total_count: 2
2123
name=topic.partition.uptime_milliseconds_min: 30000

0 commit comments

Comments
 (0)