Skip to content

Commit c18370b

Browse files
authored
Merge de99498 into 57cf0e9
2 parents 57cf0e9 + de99498 commit c18370b

11 files changed

+65
-32
lines changed

ydb/core/persqueue/partition.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
748748
auto& userInfo = userInfoPair.second;
749749
if (!userInfo.LabeledCounters)
750750
continue;
751-
if (!userInfo.HasReadRule && !userInfo.Important)
751+
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
752752
continue;
753753
auto* cac = ac->AddConsumerAggregatedCounters();
754754
cac->SetConsumer(userInfo.User);
@@ -1083,7 +1083,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
10831083
auto& userInfo = userInfoPair.second;
10841084
if (!userInfo.LabeledCounters)
10851085
continue;
1086-
if (!userInfo.HasReadRule && !userInfo.Important)
1086+
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
10871087
continue;
10881088
bool haveChanges = false;
10891089
userInfo.EndOffset = EndOffset;
@@ -1805,7 +1805,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
18051805
} else {
18061806
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
18071807
}
1808-
} else {
1808+
} else if (user != CLIENTID_WITHOUT_CONSUMER) {
18091809
auto ui = UsersInfoStorage->GetIfExists(user);
18101810
if (ui && ui->LabeledCounters) {
18111811
ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());

ydb/core/persqueue/partition_init.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -886,7 +886,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
886886
SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
887887
WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
888888
if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
889-
subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
889+
subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"});
890890
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
891891
new NKikimr::NPQ::TPercentileCounter(
892892
NPersQueue::GetCountersForTopic(counters, IsServerless), {},
@@ -899,7 +899,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
899899
subgroups.pop_back();
900900
}
901901

902-
subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
902+
subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"});
903903
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
904904
new NKikimr::NPQ::TPercentileCounter(
905905
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",

ydb/core/persqueue/read_balancer.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "read_balancer.h"
22

33
#include <ydb/core/persqueue/events/internal.h>
4+
#include "ydb/core/persqueue/user_info.h"
45
#include <ydb/core/protos/counters_pq.pb.h>
56
#include <ydb/core/base/feature_flags.h>
67
#include <ydb/core/tablet/tablet_exception.h>
@@ -65,6 +66,7 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA
6566
for (const auto& rr : Self->TabletConfig.GetReadRules()) {
6667
Self->Consumers[rr];
6768
}
69+
Self->Consumers[CLIENTID_WITHOUT_CONSUMER];
6870
}
6971
Self->Inited = true;
7072
if (!dataRowset.Next())
@@ -536,7 +538,8 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
536538
Consumers[rr];
537539
}
538540
}
539-
541+
Consumers[CLIENTID_WITHOUT_CONSUMER];
542+
540543
TVector<std::pair<ui32, TPartInfo>> newPartitions;
541544
TVector<ui32> deletedPartitions;
542545
TVector<std::pair<ui64, TTabletInfo>> newTablets;

ydb/core/persqueue/ut/resources/counters_datastreams.html

+15-15
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,6 @@
2525
bin=60000: 0
2626
bin=999999: 0
2727

28-
name=api.grpc.topic.stream_write.partition_throttled_milliseconds:
29-
bin=0: 30
30-
bin=1: 0
31-
bin=10: 0
32-
bin=100: 0
33-
bin=1000: 0
34-
bin=10000: 0
35-
bin=20: 0
36-
bin=2500: 0
37-
bin=5: 0
38-
bin=50: 0
39-
bin=500: 0
40-
bin=5000: 0
41-
bin=999999: 0
42-
4328
name=topic.write.lag_milliseconds:
4429
bin=100: 0
4530
bin=1000: 10
@@ -68,4 +53,19 @@
6853
bin=5242880: 0
6954
bin=67108864: 0
7055
bin=99999999: 0
56+
57+
name=topic.write.partition_throttled_milliseconds:
58+
bin=0: 30
59+
bin=1: 0
60+
bin=10: 0
61+
bin=100: 0
62+
bin=1000: 0
63+
bin=10000: 0
64+
bin=20: 0
65+
bin=2500: 0
66+
bin=5: 0
67+
bin=50: 0
68+
bin=500: 0
69+
bin=5000: 0
70+
bin=999999: 0
7171
</pre>

ydb/core/persqueue/ut/resources/counters_topics.html

+14
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,20 @@
3131
name=topic.reserve.used_bytes: 0
3232
name=topic.storage_bytes: 0
3333

34+
consumer=$without_consumer:
35+
name=topic.partition.alive_count: 0
36+
name=topic.partition.committed_end_to_end_lag_milliseconds_max: 0
37+
name=topic.partition.committed_lag_messages_max: 0
38+
name=topic.partition.committed_read_lag_milliseconds_max: 0
39+
name=topic.partition.end_to_end_lag_milliseconds_max: 0
40+
name=topic.partition.read.idle_milliseconds_max: 0
41+
name=topic.partition.read.lag_messages_max: 0
42+
name=topic.partition.read.lag_milliseconds_max: 0
43+
name=topic.partition.read.speed_limit_bytes_per_second: 0
44+
name=topic.partition.read.throttled_microseconds_max: 0
45+
name=topic.partition.write.lag_milliseconds_max: 0
46+
name=topic.read.lag_messages: 0
47+
3448
consumer=client:
3549
name=topic.partition.alive_count: 1
3650
name=topic.partition.committed_end_to_end_lag_milliseconds_max: 0

ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void
206206
.UseTopicCommit = OnlyTableInTx,
207207
.UseTableSelect = UseTableSelect && !OnlyTopicInTx,
208208
.UseTableUpsert = !OnlyTopicInTx,
209+
.ReadWithoutConsumer = ReadWithoutConsumer,
209210
.CommitPeriod = CommitPeriod,
210211
.CommitMessages = CommitMessages
211212
};

ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class TTopicOperationsScenario {
7070
bool OnlyTopicInTx = false;
7171
bool OnlyTableInTx = false;
7272
bool UseTableSelect = true;
73+
bool ReadWithoutConsumer = false;
7374

7475
protected:
7576
void CreateTopic(const TString& database,

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp

+20-10
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,33 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
2424
auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(params.Driver);
2525
std::optional<TTransactionSupport> txSupport;
2626

27-
auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
2827
auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver);
29-
auto consumers = describeTopicResult.GetConsumers();
28+
NYdb::NTopic::TReadSessionSettings settings;
29+
30+
if (!params.ReadWithoutConsumer) {
31+
auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
32+
auto consumers = describeTopicResult.GetConsumers();
3033

31-
if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
32-
{
33-
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
34-
exit(EXIT_FAILURE);
34+
if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
35+
{
36+
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
37+
exit(EXIT_FAILURE);
38+
}
39+
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);
40+
} else {
41+
NYdb::NTopic::TTopicReadSettings topic = params.TopicName;
42+
auto partitions = describeTopicResult.GetPartitions();
43+
for(auto partition: partitions) {
44+
topic.AppendPartitionIds(partition.GetPartitionId());
45+
}
46+
settings.WithoutConsumer().AppendTopics(topic);
3547
}
3648

49+
3750
if (params.UseTransactions) {
3851
txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName);
3952
}
4053

41-
NYdb::NTopic::TReadSessionSettings settings;
42-
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);
43-
4454
auto readSession = topicClient->CreateReadSession(settings);
4555
WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, "Reader session was created.");
4656

@@ -93,7 +103,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
93103
<< " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime);
94104
}
95105

96-
if (!txSupport || params.UseTopicCommit) {
106+
if (!params.ReadWithoutConsumer && (!txSupport || params.UseTopicCommit)) {
97107
dataEvent->Commit();
98108
}
99109
} else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ namespace NYdb {
3030
bool UseTopicCommit = false;
3131
bool UseTableSelect = true;
3232
bool UseTableUpsert = true;
33+
bool ReadWithoutConsumer = false;
3334
size_t CommitPeriod = 15;
3435
size_t CommitMessages = 1'000'000;
3536
};

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ void TCommandWorkloadTopicRunRead::Config(TConfig& config)
3434
config.Opts->AddLongOption("topic", "Topic name.")
3535
.DefaultValue(TOPIC)
3636
.StoreResult(&Scenario.TopicName);
37+
config.Opts->AddLongOption("no-consumer", "Read without consumer")
38+
.Hidden()
39+
.StoreTrue(&Scenario.ReadWithoutConsumer);
3740

3841
// Specific params
3942
config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '<consumer-prefix>-0' ... '<consumer-prefix>-<n-1>' where n is set in the '--consumers' option.")

ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ namespace NKikimr::NPersQueueTests {
463463
"topic.write.bytes",
464464
"topic.write.messages",
465465
"api.grpc.topic.stream_write.bytes",
466-
"api.grpc.topic.stream_write.partition_throttled_milliseconds",
466+
"topic.write.partition_throttled_milliseconds",
467467
"topic.write.message_size_bytes",
468468
"api.grpc.topic.stream_write.messages",
469469
"topic.write.lag_milliseconds",

0 commit comments

Comments
 (0)