Skip to content

Kafka read with balance ut #3732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 114 additions & 13 deletions ydb/core/kafka_proxy/ut/ut_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ void Print(const TBuffer& buffer) {
Cerr << ">>>>> Packet sent: " << sb << Endl;
}

struct TReadInfo {
std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions;
TString MemberId;
i32 GenerationId;
};

template <class TKikimr, bool secure>
class TTestServer {
public:
Expand Down Expand Up @@ -268,6 +274,23 @@ void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent
UNIT_ASSERT_C(false, "Field " << field << " not found in message meta");
}

void AssertPartitionsIsUniqueAndCountIsExpected(std::vector<TReadInfo> readInfos, ui32 expectedPartitionsCount, TString topic) {
std::unordered_set<int> partitions;
ui32 partitionsCount = 0;
for (TReadInfo readInfo: readInfos) {
for (auto topicPartitions: readInfo.Partitions) {
if (topicPartitions.Topic == topic) {
for (auto partition: topicPartitions.Partitions) {
partitions.emplace(partition);
partitionsCount++;
}
}
}
}
UNIT_ASSERT_VALUES_EQUAL(partitionsCount, expectedPartitionsCount);
UNIT_ASSERT_VALUES_EQUAL(partitions.size(), expectedPartitionsCount);
}

std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr<NYdb::NTopic::IReadSession> reader) {
std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> result;
while (true) {
Expand Down Expand Up @@ -487,12 +510,6 @@ class TTestClient {
return WriteAndRead<TSyncGroupResponseData>(header, request);
}

struct TReadInfo {
std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions;
TString MemberId;
i32 GenerationId;
};

TReadInfo JoinAndSyncGroup(std::vector<TString>& topics, TString& groupId, i32 heartbeatTimeout = 1000000) {
auto joinResponse = JoinGroup(topics, groupId, heartbeatTimeout);
auto memberId = joinResponse->MemberId;
Expand Down Expand Up @@ -1202,7 +1219,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {

TString notExistsTopicName = "/Root/not-exists";

ui64 minActivePartitions = 10;
ui64 minActivePartitions = 12;

TString group = "consumer-0";
TString notExistsGroup = "consumer-not-exists";
Expand Down Expand Up @@ -1236,6 +1253,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {

TTestClient clientA(testServer.Port);
TTestClient clientB(testServer.Port);
TTestClient clientC(testServer.Port);
TTestClient clientD(testServer.Port);

{
auto msg = clientA.ApiVersions();
Expand Down Expand Up @@ -1272,26 +1291,108 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
auto readInfoB = clientB.JoinAndSyncGroup(topics, group);
UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 0);

// clientA gets RABALANCE status, because of new reader. We need to release some partitions
// clientA gets RABALANCE status, because of new reader. We need to release some partitions for new client
clientA.WaitRebalance(readInfoA.MemberId, readInfoA.GenerationId, group);

// clientA now gets half of partitions
readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/2);
UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

// some partitions now released, and we can give them to clientB
// some partitions now released, and we can give them to clientB. clientB now gets half of partitions
clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group);
readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/2);
UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

// cleintA leave group and all partitions goes to clientB
AssertPartitionsIsUniqueAndCountIsExpected({readInfoA, readInfoB}, minActivePartitions, topicName);

// clientC join group, and get 0 partitions, becouse it's all at clientA and clientB
UNIT_ASSERT_VALUES_EQUAL(clientC.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(clientC.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
auto readInfoC = clientC.JoinAndSyncGroup(topics, group);
UNIT_ASSERT_VALUES_EQUAL(readInfoC.Partitions.size(), 0);

// all clients gets RABALANCE status, because of new reader. We need to release some partitions for new client
clientA.WaitRebalance(readInfoA.MemberId, readInfoA.GenerationId, group);
clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group);

// all clients now gets minActivePartitions/3 partitions
readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientC.Heartbeat(readInfoC.MemberId, readInfoC.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

AssertPartitionsIsUniqueAndCountIsExpected({readInfoA, readInfoB, readInfoC}, minActivePartitions, topicName);

// clientD join group, and get 0 partitions, becouse it's all at clientA, clientB and clientC
UNIT_ASSERT_VALUES_EQUAL(clientD.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(clientD.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
auto readInfoD = clientD.JoinAndSyncGroup(topics, group);
UNIT_ASSERT_VALUES_EQUAL(readInfoD.Partitions.size(), 0);

// all clients gets RABALANCE status, because of new reader. We need to release some partitions
clientA.WaitRebalance(readInfoA.MemberId, readInfoA.GenerationId, group);
clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group);
clientC.WaitRebalance(readInfoC.MemberId, readInfoC.GenerationId, group);

// all clients now gets minActivePartitions/4 partitions
readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4);
UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4);
UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4);
UNIT_ASSERT_VALUES_EQUAL(clientC.Heartbeat(readInfoC.MemberId, readInfoC.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoD = clientD.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4);
UNIT_ASSERT_VALUES_EQUAL(clientD.Heartbeat(readInfoD.MemberId, readInfoD.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

AssertPartitionsIsUniqueAndCountIsExpected({readInfoA, readInfoB, readInfoC, readInfoD}, minActivePartitions, topicName);


// cleintA leave group and all partitions goes to clientB, clientB and clientD
UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

// all other clients gets RABALANCE status, because one clientA leave group.
clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group);
readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions);
clientC.WaitRebalance(readInfoC.MemberId, readInfoC.GenerationId, group);
clientD.WaitRebalance(readInfoD.MemberId, readInfoD.GenerationId, group);

// all other clients now gets minActivePartitions/3 partitions
readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

// clientB leave group
UNIT_ASSERT_VALUES_EQUAL(clientB.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientC.Heartbeat(readInfoC.MemberId, readInfoC.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoD = clientD.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientD.Heartbeat(readInfoD.MemberId, readInfoD.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

AssertPartitionsIsUniqueAndCountIsExpected({readInfoB, readInfoC, readInfoD}, minActivePartitions, topicName);


// all other clients leaves the group
UNIT_ASSERT_VALUES_EQUAL(clientB.LeaveGroup(readInfoB.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(clientC.LeaveGroup(readInfoC.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(clientD.LeaveGroup(readInfoD.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
}

//release partition before lock
{
std::vector<TString> topics;
topics.push_back(topicName);

auto readInfoA = clientA.JoinGroup(topics, group);
Sleep(TDuration::MilliSeconds(200));
auto readInfoB = clientB.JoinGroup(topics, group);
Sleep(TDuration::MilliSeconds(200));

UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA->MemberId.value(), group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(clientB.LeaveGroup(readInfoB->MemberId.value(), group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
}

{
Expand Down
Loading