Skip to content

Commit 0105df2

Browse files
nshestakovGazizonoki
authored andcommitted
Fixed errors of the distributed commit offset to the partition (#17423)
1 parent d9f48a3 commit 0105df2

File tree

2 files changed

+17
-7
lines changed

2 files changed

+17
-7
lines changed

src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,22 @@ void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId, con
9898
session->Close(TDuration::Seconds(5));
9999
}
100100

101-
TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler, const TDuration timeout) {
101+
TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer,
102+
std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler,
103+
std::optional<size_t> partition, const TDuration timeout) {
102104
TTopicClient client(MakeDriver());
103105

104-
auto reader = client.CreateReadSession(
105-
TReadSessionSettings()
106-
.AutoPartitioningSupport(true)
107-
.AppendTopics(TTopicReadSettings(topic))
108-
.ConsumerName(consumer));
106+
auto topicSettings = TTopicReadSettings(topic);
107+
if (partition) {
108+
topicSettings.AppendPartitionIds(partition.value());
109+
}
110+
111+
auto settins = TReadSessionSettings()
112+
.AutoPartitioningSupport(true)
113+
.AppendTopics(topicSettings)
114+
.ConsumerName(consumer);
115+
116+
auto reader = client.CreateReadSession(settins);
109117

110118
TInstant deadlineTime = TInstant::Now() + timeout;
111119

src/client/topic/ut/ut_utils/topic_sdk_test_setup.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ class TTopicSdkTestSetup {
3232

3333
std::vector<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent> StartPartitionSessionEvents;
3434
};
35-
ReadResult Read(const std::string& topic, const std::string& consumer, std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler, const TDuration timeout = TDuration::Seconds(5));
35+
ReadResult Read(const std::string& topic, const std::string& consumer,
36+
std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler,
37+
std::optional<size_t> partition = std::nullopt, const TDuration timeout = TDuration::Seconds(5));
3638
TStatus Commit(const std::string& path, const std::string& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId = std::nullopt);
3739

3840
TString GetEndpoint() const;

0 commit comments

Comments
 (0)