@@ -60,6 +60,12 @@ void Print(const TBuffer& buffer) {
60
60
Cerr << " >>>>> Packet sent: " << sb << Endl;
61
61
}
62
62
63
+ struct TReadInfo {
64
+ std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions;
65
+ TString MemberId;
66
+ i32 GenerationId;
67
+ };
68
+
63
69
template <class TKikimr , bool secure>
64
70
class TTestServer {
65
71
public:
@@ -268,6 +274,23 @@ void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent
268
274
UNIT_ASSERT_C (false , " Field " << field << " not found in message meta" );
269
275
}
270
276
277
+ void AssertPartitionsIsUniqueAndCountIsExpected (std::vector<TReadInfo> readInfos, ui32 expectedPartitionsCount, TString topic) {
278
+ std::unordered_set<int > partitions;
279
+ ui32 partitionsCount = 0 ;
280
+ for (TReadInfo readInfo: readInfos) {
281
+ for (auto topicPartitions: readInfo.Partitions ) {
282
+ if (topicPartitions.Topic == topic) {
283
+ for (auto partition: topicPartitions.Partitions ) {
284
+ partitions.emplace (partition);
285
+ partitionsCount++;
286
+ }
287
+ }
288
+ }
289
+ }
290
+ UNIT_ASSERT_VALUES_EQUAL (partitionsCount, expectedPartitionsCount);
291
+ UNIT_ASSERT_VALUES_EQUAL (partitions.size (), expectedPartitionsCount);
292
+ }
293
+
271
294
std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> Read (std::shared_ptr<NYdb::NTopic::IReadSession> reader) {
272
295
std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> result;
273
296
while (true ) {
@@ -487,12 +510,6 @@ class TTestClient {
487
510
return WriteAndRead<TSyncGroupResponseData>(header, request);
488
511
}
489
512
490
- struct TReadInfo {
491
- std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions;
492
- TString MemberId;
493
- i32 GenerationId;
494
- };
495
-
496
513
TReadInfo JoinAndSyncGroup (std::vector<TString>& topics, TString& groupId, i32 heartbeatTimeout = 1000000 ) {
497
514
auto joinResponse = JoinGroup (topics, groupId, heartbeatTimeout);
498
515
auto memberId = joinResponse->MemberId ;
@@ -1202,7 +1219,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
1202
1219
1203
1220
TString notExistsTopicName = " /Root/not-exists" ;
1204
1221
1205
- ui64 minActivePartitions = 10 ;
1222
+ ui64 minActivePartitions = 12 ;
1206
1223
1207
1224
TString group = " consumer-0" ;
1208
1225
TString notExistsGroup = " consumer-not-exists" ;
@@ -1236,6 +1253,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
1236
1253
1237
1254
TTestClient clientA (testServer.Port );
1238
1255
TTestClient clientB (testServer.Port );
1256
+ TTestClient clientC (testServer.Port );
1257
+ TTestClient clientD (testServer.Port );
1239
1258
1240
1259
{
1241
1260
auto msg = clientA.ApiVersions ();
@@ -1272,26 +1291,108 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
1272
1291
auto readInfoB = clientB.JoinAndSyncGroup (topics, group);
1273
1292
UNIT_ASSERT_VALUES_EQUAL (readInfoB.Partitions .size (), 0 );
1274
1293
1275
- // clientA gets RABALANCE status, because of new reader. We need to release some partitions
1294
+ // clientA gets RABALANCE status, because of new reader. We need to release some partitions for new client
1276
1295
clientA.WaitRebalance (readInfoA.MemberId , readInfoA.GenerationId , group);
1277
1296
1278
1297
// clientA now gets half of partitions
1279
1298
readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/2 );
1280
1299
UNIT_ASSERT_VALUES_EQUAL (clientA.Heartbeat (readInfoA.MemberId , readInfoA.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1281
1300
1282
- // some partitions now released, and we can give them to clientB
1301
+ // some partitions now released, and we can give them to clientB. clientB now gets half of partitions
1283
1302
clientB.WaitRebalance (readInfoB.MemberId , readInfoB.GenerationId , group);
1284
1303
readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/2 );
1285
1304
UNIT_ASSERT_VALUES_EQUAL (clientB.Heartbeat (readInfoB.MemberId , readInfoB.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1286
1305
1287
- // cleintA leave group and all partitions goes to clientB
1306
+ AssertPartitionsIsUniqueAndCountIsExpected ({readInfoA, readInfoB}, minActivePartitions, topicName);
1307
+
1308
+ // clientC join group, and get 0 partitions, becouse it's all at clientA and clientB
1309
+ UNIT_ASSERT_VALUES_EQUAL (clientC.SaslHandshake ()->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1310
+ UNIT_ASSERT_VALUES_EQUAL (clientC.SaslAuthenticate (" ouruser@/Root" , " ourUserPassword" )->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1311
+ auto readInfoC = clientC.JoinAndSyncGroup (topics, group);
1312
+ UNIT_ASSERT_VALUES_EQUAL (readInfoC.Partitions .size (), 0 );
1313
+
1314
+ // all clients gets RABALANCE status, because of new reader. We need to release some partitions for new client
1315
+ clientA.WaitRebalance (readInfoA.MemberId , readInfoA.GenerationId , group);
1316
+ clientB.WaitRebalance (readInfoB.MemberId , readInfoB.GenerationId , group);
1317
+
1318
+ // all clients now gets minActivePartitions/3 partitions
1319
+ readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/3 );
1320
+ UNIT_ASSERT_VALUES_EQUAL (clientA.Heartbeat (readInfoA.MemberId , readInfoA.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1321
+
1322
+ readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/3 );
1323
+ UNIT_ASSERT_VALUES_EQUAL (clientB.Heartbeat (readInfoB.MemberId , readInfoB.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1324
+
1325
+ readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/3 );
1326
+ UNIT_ASSERT_VALUES_EQUAL (clientC.Heartbeat (readInfoC.MemberId , readInfoC.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1327
+
1328
+ AssertPartitionsIsUniqueAndCountIsExpected ({readInfoA, readInfoB, readInfoC}, minActivePartitions, topicName);
1329
+
1330
+ // clientD join group, and get 0 partitions, becouse it's all at clientA, clientB and clientC
1331
+ UNIT_ASSERT_VALUES_EQUAL (clientD.SaslHandshake ()->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1332
+ UNIT_ASSERT_VALUES_EQUAL (clientD.SaslAuthenticate (" ouruser@/Root" , " ourUserPassword" )->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1333
+ auto readInfoD = clientD.JoinAndSyncGroup (topics, group);
1334
+ UNIT_ASSERT_VALUES_EQUAL (readInfoD.Partitions .size (), 0 );
1335
+
1336
+ // all clients gets RABALANCE status, because of new reader. We need to release some partitions
1337
+ clientA.WaitRebalance (readInfoA.MemberId , readInfoA.GenerationId , group);
1338
+ clientB.WaitRebalance (readInfoB.MemberId , readInfoB.GenerationId , group);
1339
+ clientC.WaitRebalance (readInfoC.MemberId , readInfoC.GenerationId , group);
1340
+
1341
+ // all clients now gets minActivePartitions/4 partitions
1342
+ readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/4 );
1343
+ UNIT_ASSERT_VALUES_EQUAL (clientA.Heartbeat (readInfoA.MemberId , readInfoA.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1344
+
1345
+ readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/4 );
1346
+ UNIT_ASSERT_VALUES_EQUAL (clientB.Heartbeat (readInfoB.MemberId , readInfoB.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1347
+
1348
+ readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/4 );
1349
+ UNIT_ASSERT_VALUES_EQUAL (clientC.Heartbeat (readInfoC.MemberId , readInfoC.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1350
+
1351
+ readInfoD = clientD.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/4 );
1352
+ UNIT_ASSERT_VALUES_EQUAL (clientD.Heartbeat (readInfoD.MemberId , readInfoD.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1353
+
1354
+ AssertPartitionsIsUniqueAndCountIsExpected ({readInfoA, readInfoB, readInfoC, readInfoD}, minActivePartitions, topicName);
1355
+
1356
+
1357
+ // cleintA leave group and all partitions goes to clientB, clientB and clientD
1288
1358
UNIT_ASSERT_VALUES_EQUAL (clientA.LeaveGroup (readInfoA.MemberId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1359
+
1360
+ // all other clients gets RABALANCE status, because one clientA leave group.
1289
1361
clientB.WaitRebalance (readInfoB.MemberId , readInfoB.GenerationId , group);
1290
- readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions);
1362
+ clientC.WaitRebalance (readInfoC.MemberId , readInfoC.GenerationId , group);
1363
+ clientD.WaitRebalance (readInfoD.MemberId , readInfoD.GenerationId , group);
1364
+
1365
+ // all other clients now gets minActivePartitions/3 partitions
1366
+ readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/3 );
1291
1367
UNIT_ASSERT_VALUES_EQUAL (clientB.Heartbeat (readInfoB.MemberId , readInfoB.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1292
1368
1293
- // clientB leave group
1294
- UNIT_ASSERT_VALUES_EQUAL (clientB.LeaveGroup (readInfoA.MemberId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1369
+ readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/3 );
1370
+ UNIT_ASSERT_VALUES_EQUAL (clientC.Heartbeat (readInfoC.MemberId , readInfoC.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1371
+
1372
+ readInfoD = clientD.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions/3 );
1373
+ UNIT_ASSERT_VALUES_EQUAL (clientD.Heartbeat (readInfoD.MemberId , readInfoD.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1374
+
1375
+ AssertPartitionsIsUniqueAndCountIsExpected ({readInfoB, readInfoC, readInfoD}, minActivePartitions, topicName);
1376
+
1377
+
1378
+ // all other clients leaves the group
1379
+ UNIT_ASSERT_VALUES_EQUAL (clientB.LeaveGroup (readInfoB.MemberId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1380
+ UNIT_ASSERT_VALUES_EQUAL (clientC.LeaveGroup (readInfoC.MemberId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1381
+ UNIT_ASSERT_VALUES_EQUAL (clientD.LeaveGroup (readInfoD.MemberId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1382
+ }
1383
+
1384
+ // release partition before lock
1385
+ {
1386
+ std::vector<TString> topics;
1387
+ topics.push_back (topicName);
1388
+
1389
+ auto readInfoA = clientA.JoinGroup (topics, group);
1390
+ Sleep (TDuration::MilliSeconds (200 ));
1391
+ auto readInfoB = clientB.JoinGroup (topics, group);
1392
+ Sleep (TDuration::MilliSeconds (200 ));
1393
+
1394
+ UNIT_ASSERT_VALUES_EQUAL (clientA.LeaveGroup (readInfoA->MemberId .value (), group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1395
+ UNIT_ASSERT_VALUES_EQUAL (clientB.LeaveGroup (readInfoB->MemberId .value (), group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1295
1396
}
1296
1397
1297
1398
{
0 commit comments