Skip to content

Commit 650e966

Browse files
qyryqGazizonoki
authored andcommitted
Fix TStartPartitionSessionEvent::Confirm (#16734)
1 parent d122ca9 commit 650e966

File tree

2 files changed

+62
-1
lines changed

2 files changed

+62
-1
lines changed

src/client/topic/impl/read_session_impl.ipp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ void TPartitionStreamImpl<UseMigrationProtocol>::RequestStatus() {
7373
template<bool UseMigrationProtocol>
7474
void TPartitionStreamImpl<UseMigrationProtocol>::ConfirmCreate(std::optional<ui64> readOffset, std::optional<ui64> commitOffset) {
7575
if (auto sessionShared = CbContext->LockShared()) {
76+
if (commitOffset.has_value()) {
77+
SetFirstNotReadOffset(commitOffset.value());
78+
}
7679
sessionShared->ConfirmPartitionStreamCreate(this, readOffset, commitOffset);
7780
}
7881
}

src/client/topic/ut/basic_usage_ut.cpp

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include <src/client/persqueue_public/ut/ut_utils/ut_utils.h>
44

55
#include <ydb-cpp-sdk/client/topic/client.h>
6-
6+
77
#include <src/client/persqueue_public/persqueue.h>
88

99
#include <src/client/topic/impl/common.h>
@@ -661,6 +661,64 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
661661
Sleep(TDuration::Seconds(5));
662662
}
663663

664+
Y_UNIT_TEST(ConfirmPartitionSessionWithCommitOffset) {
665+
// TStartPartitionSessionEvent::Confirm(readOffset, commitOffset) should work,
666+
// if commitOffset passed to Confirm is greater than the offset committed previously by the consumer.
667+
// https://st.yandex-team.ru/KIKIMR-23015
668+
669+
auto setup = TTopicSdkTestSetup(TEST_CASE_NAME);
670+
671+
{
672+
// Write 2 messages:
673+
auto settings = NTopic::TWriteSessionSettings()
674+
.Path(setup.GetTopicPath())
675+
.MessageGroupId(TEST_MESSAGE_GROUP_ID)
676+
.ProducerId(TEST_MESSAGE_GROUP_ID);
677+
auto client = setup.MakeClient();
678+
auto writer = client.CreateSimpleBlockingWriteSession(settings);
679+
writer->Write("message");
680+
writer->Write("message");
681+
writer->Close();
682+
}
683+
684+
{
685+
// Read messages:
686+
auto settings = NTopic::TReadSessionSettings()
687+
.ConsumerName(TEST_CONSUMER)
688+
.AppendTopics(std::string(setup.GetTopicPath()));
689+
690+
auto client = setup.MakeClient();
691+
auto reader = client.CreateReadSession(settings);
692+
693+
{
694+
// Start partition session and request to read from offset 1 and commit offset 1:
695+
auto event = reader->GetEvent(true);
696+
UNIT_ASSERT(event.has_value());
697+
UNIT_ASSERT(std::holds_alternative<TReadSessionEvent::TStartPartitionSessionEvent>(*event));
698+
auto& startPartitionSession = std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event);
699+
startPartitionSession.Confirm(/*readOffset=*/ 1, /*commitOffset=*/ 1);
700+
}
701+
702+
{
703+
// Receive a message with offset 1 and commit it:
704+
auto event = reader->GetEvent(true);
705+
UNIT_ASSERT(event.has_value());
706+
UNIT_ASSERT(std::holds_alternative<TReadSessionEvent::TDataReceivedEvent>(*event));
707+
auto& dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
708+
709+
// Here we should commit range [1, 2), not [0, 2):
710+
dataReceived.Commit();
711+
}
712+
713+
{
714+
// And then get a TCommitOffsetAcknowledgementEvent:
715+
auto event = reader->GetEvent(true);
716+
UNIT_ASSERT(event.has_value());
717+
UNIT_ASSERT(std::holds_alternative<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(*event));
718+
}
719+
}
720+
}
721+
664722
Y_UNIT_TEST(ConflictingWrites) {
665723

666724
TTopicSdkTestSetup setup(TEST_CASE_NAME);

0 commit comments

Comments
 (0)