Skip to content

Commit 6d94c69

Browse files
authored
Support for autopartitioning topics for CLI (#10832)
1 parent e3f0e18 commit 6d94c69

File tree

3 files changed

+17
-3
lines changed

3 files changed

+17
-3
lines changed

ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,7 @@ namespace {
835835

836836
NTopic::TReadSessionSettings TCommandTopicRead::PrepareReadSessionSettings() {
837837
NTopic::TReadSessionSettings settings;
838+
settings.AutoPartitioningSupport(true);
838839
settings.ConsumerName(Consumer_);
839840
// settings.ReadAll(); // TODO(shmel1k@): change to read only original?
840841
if (Timestamp_.Defined()) {

ydb/public/lib/ydb_cli/topic/topic_read.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,16 @@ namespace NYdb::NConsoleClient {
283283
return EXIT_SUCCESS;
284284
}
285285

286+
int TTopicReader::HandleEndPartitionSessionEvent(NTopic::TReadSessionEvent::TEndPartitionSessionEvent *event) {
287+
if (!HasSession(event->GetPartitionSession()->GetPartitionSessionId())) {
288+
return EXIT_SUCCESS;
289+
}
290+
291+
event->Confirm();
292+
293+
return EXIT_SUCCESS;
294+
}
295+
286296
int TTopicReader::HandleEvent(NYdb::NTopic::TReadSessionEvent::TEvent& event, IOutputStream& output) {
287297
if (auto* dataEvent = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
288298
return HandleDataReceivedEvent(dataEvent, output);
@@ -292,10 +302,12 @@ namespace NYdb::NConsoleClient {
292302
return HandleCommitOffsetAcknowledgementEvent(commitEvent);
293303
} else if (auto* partitionStatusEvent = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
294304
return HandlePartitionSessionStatusEvent(partitionStatusEvent);
295-
} else if (auto* stopPartitionSessionEvent = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
305+
} else if (auto* stopPartitionSessionEvent = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
296306
return HandleStopPartitionSessionEvent(stopPartitionSessionEvent);
297307
} else if (auto* partitionSessionClosedEvent = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
298308
return HandlePartitionSessionClosedEvent(partitionSessionClosedEvent);
309+
} else if (auto* endPartitionSessionEvent = std::get_if<NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&event)) {
310+
return HandleEndPartitionSessionEvent(endPartitionSessionEvent);
299311
} else if (auto* sessionClosedEvent = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
300312
ThrowOnError(*sessionClosedEvent);
301313
return 1;
@@ -339,4 +351,4 @@ namespace NYdb::NConsoleClient {
339351
}
340352
return EXIT_SUCCESS;
341353
}
342-
} // namespace NYdb::NConsoleClient
354+
} // namespace NYdb::NConsoleClient

ydb/public/lib/ydb_cli/topic/topic_read.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ namespace NYdb::NConsoleClient {
7171
int HandlePartitionSessionStatusEvent(NTopic::TReadSessionEvent::TPartitionSessionStatusEvent*);
7272
int HandleStopPartitionSessionEvent(NTopic::TReadSessionEvent::TStopPartitionSessionEvent*);
7373
int HandlePartitionSessionClosedEvent(NTopic::TReadSessionEvent::TPartitionSessionClosedEvent*);
74+
int HandleEndPartitionSessionEvent(NTopic::TReadSessionEvent::TEndPartitionSessionEvent*);
7475
int HandleDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent*, IOutputStream&);
7576
int HandleCommitOffsetAcknowledgementEvent(NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent*);
7677
int HandleEvent(NTopic::TReadSessionEvent::TEvent&, IOutputStream&);
@@ -104,4 +105,4 @@ namespace NYdb::NConsoleClient {
104105

105106
THashMap<ui64, std::pair<NTopic::TPartitionSession::TPtr, EReadingStatus>> ActivePartitionSessions_;
106107
};
107-
} // namespace NYdb::NConsoleClient
108+
} // namespace NYdb::NConsoleClient

0 commit comments

Comments
 (0)