Skip to content

Commit f7ef27e

Browse files
authored
Merge 9958322 into 2cb82f8
2 parents 2cb82f8 + 9958322 commit f7ef27e

11 files changed

+66
-31
lines changed

ydb/core/persqueue/partition.cpp

+17-3
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
755755
auto& userInfo = userInfoPair.second;
756756
if (!userInfo.LabeledCounters)
757757
continue;
758-
if (!userInfo.HasReadRule && !userInfo.Important)
758+
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
759759
continue;
760760
auto* cac = ac->AddConsumerAggregatedCounters();
761761
cac->SetConsumer(userInfo.User);
@@ -1110,7 +1110,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
11101110
auto& userInfo = userInfoPair.second;
11111111
if (!userInfo.LabeledCounters)
11121112
continue;
1113-
if (!userInfo.HasReadRule && !userInfo.Important)
1113+
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
11141114
continue;
11151115
bool haveChanges = false;
11161116
userInfo.EndOffset = EndOffset;
@@ -1214,6 +1214,12 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
12141214
userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Set(quotaUsage);
12151215
}
12161216
}
1217+
1218+
if (userInfoPair.first == CLIENTID_WITHOUT_CONSUMER ) {
1219+
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get());
1220+
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_USAGE].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Get());
1221+
}
1222+
12171223
if (haveChanges) {
12181224
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters));
12191225
}
@@ -1325,6 +1331,14 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
13251331
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
13261332
}
13271333
}
1334+
1335+
if (PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Get()) {
1336+
ui64 quotaUsage = ui64(AvgReadBytes.GetValue()) * 1000000 / PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get() / 60;
1337+
if (quotaUsage != PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Get()) {
1338+
haveChanges = true;
1339+
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
1340+
}
1341+
}
13281342
return haveChanges;
13291343
}
13301344

@@ -1831,7 +1845,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
18311845
} else {
18321846
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
18331847
}
1834-
} else {
1848+
} else if (user != CLIENTID_WITHOUT_CONSUMER) {
18351849
auto ui = UsersInfoStorage->GetIfExists(user);
18361850
if (ui && ui->LabeledCounters) {
18371851
ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());

ydb/core/persqueue/partition_init.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
883883
SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
884884
WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
885885
if (IsQuotingEnabled()) {
886-
subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
886+
subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"});
887887
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
888888
new NKikimr::NPQ::TPercentileCounter(
889889
NPersQueue::GetCountersForTopic(counters, IsServerless), {},
@@ -896,7 +896,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
896896
subgroups.pop_back();
897897
}
898898

899-
subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
899+
subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"});
900900
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
901901
new NKikimr::NPQ::TPercentileCounter(
902902
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",

ydb/core/persqueue/ut/resources/counters_datastreams.html

+15-15
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,6 @@
2525
bin=60000: 0
2626
bin=999999: 0
2727

28-
name=api.grpc.topic.stream_write.partition_throttled_milliseconds:
29-
bin=0: 30
30-
bin=1: 0
31-
bin=10: 0
32-
bin=100: 0
33-
bin=1000: 0
34-
bin=10000: 0
35-
bin=20: 0
36-
bin=2500: 0
37-
bin=5: 0
38-
bin=50: 0
39-
bin=500: 0
40-
bin=5000: 0
41-
bin=999999: 0
42-
4328
name=topic.write.lag_milliseconds:
4429
bin=100: 0
4530
bin=1000: 10
@@ -68,4 +53,19 @@
6853
bin=5242880: 0
6954
bin=67108864: 0
7055
bin=99999999: 0
56+
57+
name=topic.write.partition_throttled_milliseconds:
58+
bin=0: 30
59+
bin=1: 0
60+
bin=10: 0
61+
bin=100: 0
62+
bin=1000: 0
63+
bin=10000: 0
64+
bin=20: 0
65+
bin=2500: 0
66+
bin=5: 0
67+
bin=50: 0
68+
bin=500: 0
69+
bin=5000: 0
70+
bin=999999: 0
7171
</pre>

ydb/core/persqueue/ut/resources/counters_topics.html

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
name=topic.partition.read.inflight_throttled_microseconds_max: 0
1717
name=topic.partition.read.speed_limit_bytes_per_second: 20000000000
1818
name=topic.partition.read.throttled_microseconds_max: 0
19+
name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0
20+
name=topic.partition.read_without_consumer.throttled_microseconds_max: 0
1921
name=topic.partition.storage_bytes_max: 0
2022
name=topic.partition.total_count: 2
2123
name=topic.partition.uptime_milliseconds_min: 30000

ydb/core/protos/counters_pq.proto

+3
Original file line numberDiff line numberDiff line change
@@ -237,4 +237,7 @@ enum EPartitionLabeledCounters {
237237
METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES = 38 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read.speed_limit_bytes_per_second"}];
238238

239239
METRIC_READ_INFLIGHT_LIMIT_THROTTLED = 39 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read.inflight_throttled_microseconds_max"}];
240+
241+
METRIC_READ_QUOTA_NO_CONSUMER_BYTES = 40 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read_without_consumer.speed_limit_bytes_per_second"}];
242+
METRIC_READ_QUOTA_NO_CONSUMER_USAGE = 41 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read_without_consumer.throttled_microseconds_max"}];
240243
}

ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void
206206
.UseTopicCommit = OnlyTableInTx,
207207
.UseTableSelect = UseTableSelect && !OnlyTopicInTx,
208208
.UseTableUpsert = !OnlyTopicInTx,
209+
.ReadWithoutConsumer = ReadWithoutConsumer,
209210
.CommitPeriod = CommitPeriod,
210211
.CommitMessages = CommitMessages
211212
};

ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class TTopicOperationsScenario {
7070
bool OnlyTopicInTx = false;
7171
bool OnlyTableInTx = false;
7272
bool UseTableSelect = true;
73+
bool ReadWithoutConsumer = false;
7374

7475
protected:
7576
void CreateTopic(const TString& database,

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp

+20-10
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,33 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
2424
auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(params.Driver);
2525
std::optional<TTransactionSupport> txSupport;
2626

27-
auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
2827
auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver);
29-
auto consumers = describeTopicResult.GetConsumers();
28+
NYdb::NTopic::TReadSessionSettings settings;
29+
30+
if (!params.ReadWithoutConsumer) {
31+
auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
32+
auto consumers = describeTopicResult.GetConsumers();
3033

31-
if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
32-
{
33-
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
34-
exit(EXIT_FAILURE);
34+
if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
35+
{
36+
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
37+
exit(EXIT_FAILURE);
38+
}
39+
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);
40+
} else {
41+
NYdb::NTopic::TTopicReadSettings topic = params.TopicName;
42+
auto partitions = describeTopicResult.GetPartitions();
43+
for(auto partition: partitions) {
44+
topic.AppendPartitionIds(partition.GetPartitionId());
45+
}
46+
settings.WithoutConsumer().AppendTopics(topic);
3547
}
3648

49+
3750
if (params.UseTransactions) {
3851
txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName);
3952
}
4053

41-
NYdb::NTopic::TReadSessionSettings settings;
42-
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);
43-
4454
auto readSession = topicClient->CreateReadSession(settings);
4555
WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, "Reader session was created.");
4656

@@ -93,7 +103,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
93103
<< " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime);
94104
}
95105

96-
if (!txSupport || params.UseTopicCommit) {
106+
if (!params.ReadWithoutConsumer && (!txSupport || params.UseTopicCommit)) {
97107
dataEvent->Commit();
98108
}
99109
} else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ namespace NYdb {
3030
bool UseTopicCommit = false;
3131
bool UseTableSelect = true;
3232
bool UseTableUpsert = true;
33+
bool ReadWithoutConsumer = false;
3334
size_t CommitPeriod = 15;
3435
size_t CommitMessages = 1'000'000;
3536
};

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ void TCommandWorkloadTopicRunRead::Config(TConfig& config)
3434
config.Opts->AddLongOption("topic", "Topic name.")
3535
.DefaultValue(TOPIC)
3636
.StoreResult(&Scenario.TopicName);
37+
config.Opts->AddLongOption("no-consumer", "Read without consumer")
38+
.Hidden()
39+
.StoreTrue(&Scenario.ReadWithoutConsumer);
3740

3841
// Specific params
3942
config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '<consumer-prefix>-0' ... '<consumer-prefix>-<n-1>' where n is set in the '--consumers' option.")

ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ namespace NKikimr::NPersQueueTests {
463463
"topic.write.bytes",
464464
"topic.write.messages",
465465
"api.grpc.topic.stream_write.bytes",
466-
"api.grpc.topic.stream_write.partition_throttled_milliseconds",
466+
"topic.write.partition_throttled_milliseconds",
467467
"topic.write.message_size_bytes",
468468
"api.grpc.topic.stream_write.messages",
469469
"topic.write.lag_milliseconds",

0 commit comments

Comments
 (0)