Skip to content

Commit 1e31594

Browse files
nshestakovGazizonoki
authored andcommitted
Added max_committed_time_lag for DescribeConsumer (#16857)
1 parent 37113e9 commit 1e31594

File tree

5 files changed

+35
-0
lines changed

5 files changed

+35
-0
lines changed

include/ydb-cpp-sdk/client/topic/control_plane.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class TPartitionConsumerStats {
111111
const TInstant& GetLastReadTime() const;
112112
const TDuration& GetMaxReadTimeLag() const;
113113
const TDuration& GetMaxWriteTimeLag() const;
114+
const TDuration& GetMaxCommittedTimeLag() const;
114115

115116
private:
116117
uint64_t CommittedOffset_;
@@ -120,6 +121,7 @@ class TPartitionConsumerStats {
120121
TInstant LastReadTime_;
121122
TDuration MaxReadTimeLag_;
122123
TDuration MaxWriteTimeLag_;
124+
TDuration MaxCommittedTimeLag_;
123125
};
124126

125127
// Topic partition location

src/api/protos/ydb_topic.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,8 @@ message Consumer {
810810
google.protobuf.Duration max_read_time_lag = 2;
811811
// Maximum of differences between write timestamp and create timestamp for all messages, read during last minute.
812812
google.protobuf.Duration max_write_time_lag = 3;
813+
// The difference between the write timestamp of the last commited message and the current time.
814+
google.protobuf.Duration max_committed_time_lag = 5;
813815
// Bytes read statistics.
814816
MultipleWindowsStat bytes_read = 4;
815817
}
@@ -1214,6 +1216,8 @@ message DescribeConsumerResult {
12141216
google.protobuf.Duration max_read_time_lag = 6;
12151217
// Maximum of differences between write timestamp and create timestamp for all messages, read during last minute.
12161218
google.protobuf.Duration max_write_time_lag = 7;
1219+
// The difference between the write timestamp of the last commited message and the current time.
1220+
google.protobuf.Duration max_committed_time_lag = 13;
12171221

12181222
// How much bytes were read during several windows statistics from this partition.
12191223
MultipleWindowsStat bytes_read = 8;

src/client/topic/impl/topic.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ TPartitionConsumerStats::TPartitionConsumerStats(const Ydb::Topic::DescribeConsu
376376
, LastReadTime_(TInstant::Seconds(partitionStats.last_read_time().seconds()))
377377
, MaxReadTimeLag_(TDuration::Seconds(partitionStats.max_read_time_lag().seconds()))
378378
, MaxWriteTimeLag_(TDuration::Seconds(partitionStats.max_write_time_lag().seconds()))
379+
, MaxCommittedTimeLag_(TDuration::Seconds(partitionStats.max_committed_time_lag().seconds()))
379380
{}
380381

381382
uint64_t TPartitionConsumerStats::GetCommittedOffset() const {
@@ -406,6 +407,10 @@ const TDuration& TPartitionConsumerStats::GetMaxWriteTimeLag() const {
406407
return MaxWriteTimeLag_;
407408
}
408409

410+
const TDuration& TPartitionConsumerStats::GetMaxCommittedTimeLag() const {
411+
return MaxCommittedTimeLag_;
412+
}
413+
409414
TPartitionLocation::TPartitionLocation(const Ydb::Topic::PartitionLocation& partitionLocation)
410415
: NodeId_(partitionLocation.node_id())
411416
, Generation_(partitionLocation.generation())

src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,27 @@ TTopicDescription TTopicSdkTestSetup::DescribeTopic(const TString& path)
6666
return status.GetTopicDescription();
6767
}
6868

69+
TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, const TString& consumer)
70+
{
71+
TTopicClient client(MakeDriver());
72+
73+
TDescribeConsumerSettings settings;
74+
settings.IncludeStats(true);
75+
settings.IncludeLocation(true);
76+
77+
auto status = client.DescribeConsumer(path, consumer, settings).GetValueSync();
78+
UNIT_ASSERT(status.IsSuccess());
79+
80+
return status.GetConsumerDescription();
81+
}
82+
83+
TStatus TTopicSdkTestSetup::Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset) {
84+
TTopicClient client(MakeDriver());
85+
86+
return client.CommitOffset(path, partitionId, consumerName, offset).GetValueSync();
87+
}
88+
89+
6990
TString TTopicSdkTestSetup::GetEndpoint() const {
7091
return "localhost:" + ToString(Server.GrpcPort);
7192
}

src/client/topic/ut/ut_utils/topic_sdk_test_setup.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ class TTopicSdkTestSetup {
2222
size_t maxPartitionCount = 100);
2323

2424
TTopicDescription DescribeTopic(const TString& path = TString{TEST_TOPIC});
25+
TConsumerDescription DescribeConsumer(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TString{TEST_CONSUMER});
26+
27+
TStatus Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset);
2528

2629
TString GetEndpoint() const;
2730
TString GetTopicPath(const TString& name = TString{TEST_TOPIC}) const;

0 commit comments

Comments
 (0)