@@ -132,4 +132,208 @@ def expect_even_assignments(topics, assignments)
132
132
end
133
133
end
134
134
end
135
+
136
+ context 'one consumer no subscriptions or topics / partitions' do
137
+ it 'returns empty assignments' do
138
+ members = { 'member1' => nil }
139
+ partitions = [ ]
140
+
141
+ assignments = strategy . call ( cluster : nil , members : members , partitions : partitions )
142
+
143
+ expect ( assignments ) . to eq ( { } )
144
+ end
145
+ end
146
+
147
+ context 'one consumer with subscription but no matching topic partition' do
148
+ it 'returns empty assignments' do
149
+ members = { 'member1' => double ( topics : [ 'topic1' ] ) }
150
+ partitions = [ ]
151
+
152
+ assignments = strategy . call ( cluster : nil , members : members , partitions : partitions )
153
+
154
+ expect ( assignments ) . to eq ( { } )
155
+ end
156
+ end
157
+
158
+ context 'one consumer subscribed to one topic with one partition' do
159
+ it 'assigns the partition to the consumer' do
160
+ members = { 'member1' => double ( topics : [ 'topic1' ] ) }
161
+ partitions = [
162
+ t1p0 = double ( :"t1p0" , topic : "topic1" , partition_id : 0 ) ,
163
+ ]
164
+
165
+ assignments = strategy . call ( cluster : nil , members : members , partitions : partitions )
166
+
167
+ expect ( assignments ) . to eq ( {
168
+ 'member1' => [ t1p0 ]
169
+ } )
170
+ end
171
+ end
172
+
173
+ context 'one consumer subscribed to one topic with multiple partitions' do
174
+ it 'assigns all partitions to the consumer' do
175
+ members = { 'member1' => double ( topics : [ 'topic1' ] ) }
176
+ partitions = [
177
+ t1p0 = double ( :"t1p0" , topic : "topic1" , partition_id : 0 ) ,
178
+ t1p1 = double ( :"t1p1" , topic : "topic1" , partition_id : 1 ) ,
179
+ ]
180
+
181
+ assignments = strategy . call ( cluster : nil , members : members , partitions : partitions )
182
+
183
+ expect ( assignments ) . to eq ( {
184
+ 'member1' => [ t1p0 , t1p1 ]
185
+ } )
186
+ end
187
+ end
188
+
189
+ context 'one consumer subscribed to one topic but with multiple different topic partitions' do
190
+ it 'only assigns partitions for the subscribed topic' do
191
+ members = { 'member1' => double ( topics : [ 'topic1' ] ) }
192
+ partitions = [
193
+ t1p0 = double ( :"t1p0" , topic : "topic1" , partition_id : 0 ) ,
194
+ t1p1 = double ( :"t1p1" , topic : "topic1" , partition_id : 1 ) ,
195
+ t2p0 = double ( :"t2p0" , topic : "topic2" , partition_id : 0 ) ,
196
+ ]
197
+
198
+ assignments = strategy . call ( cluster : nil , members : members , partitions : partitions )
199
+
200
+ expect ( assignments ) . to eq ( {
201
+ 'member1' => [ t1p0 , t1p1 ]
202
+ } )
203
+ end
204
+ end
205
+
206
+ context 'one consumer subscribed to multiple topics' do
207
+ it 'assigns all the topics partitions to the consumer' do
208
+ members = { 'member1' => double ( topics : [ 'topic1' , 'topic2' ] ) }
209
+
210
+ partitions = [
211
+ t1p0 = double ( :"t1p0" , topic : "topic1" , partition_id : 0 ) ,
212
+ t1p1 = double ( :"t1p1" , topic : "topic1" , partition_id : 1 ) ,
213
+ t2p0 = double ( :"t2p0" , topic : "topic2" , partition_id : 0 ) ,
214
+ ]
215
+
216
+ assignments = strategy . call ( cluster : nil , members : members , partitions : partitions )
217
+
218
+ expect ( assignments ) . to eq ( {
219
+ 'member1' => [ t1p0 , t1p1 , t2p0 ]
220
+ } )
221
+ end
222
+ end
223
+
224
+ context 'two consumers with one topic and only one partition' do
225
+ it 'only assigns the partition to one consumer' do
226
+ members = {
227
+ 'member1' => double ( topics : [ 'topic1' ] ) ,
228
+ 'member2' => double ( topics : [ 'topic1' ] )
229
+ }
230
+ partitions = [
231
+ t1p0 = double ( :"t1p0" , topic : "topic1" , partition_id : 0 ) ,
232
+ ]
233
+
234
+ assignments = strategy . call ( cluster : nil , members : members , partitions : partitions )
235
+
236
+ expect ( assignments ) . to eq ( {
237
+ 'member1' => [ t1p0 ]
238
+ } )
239
+ end
240
+ end
241
+
242
+ context 'two consumers subscribed to one topic with two partitions' do
243
+ it 'assigns a partition to each consumer' do
244
+ members = {
245
+ 'member1' => double ( topics : [ 'topic1' ] ) ,
246
+ 'member2' => double ( topics : [ 'topic1' ] )
247
+ }
248
+ partitions = [
249
+ t1p0 = double ( :"t1p0" , topic : "topic1" , partition_id : 0 ) ,
250
+ t1p1 = double ( :"t1p1" , topic : "topic1" , partition_id : 1 ) ,
251
+ ]
252
+
253
+ assignments = strategy . call ( cluster : nil , members : members , partitions : partitions )
254
+
255
+ expect ( assignments ) . to eq ( {
256
+ 'member1' => [ t1p0 ] ,
257
+ 'member2' => [ t1p1 ]
258
+ } )
259
+ end
260
+ end
261
+
262
+ context 'multiple consumers with mixed topics subscriptions' do
263
+ it 'creates a balanced assignment' do
264
+ members = {
265
+ 'member1' => double ( topics : [ 'topic1' ] ) ,
266
+ 'member2' => double ( topics : [ 'topic1' , 'topic2' ] ) ,
267
+ 'member3' => double ( topics : [ 'topic1' ] )
268
+ }
269
+ partitions = [
270
+ t1p0 = double ( :"t1p0" , topic : "topic1" , partition_id : 0 ) ,
271
+ t1p1 = double ( :"t1p1" , topic : "topic1" , partition_id : 1 ) ,
272
+ t1p2 = double ( :"t1p2" , topic : "topic1" , partition_id : 2 ) ,
273
+ t2p0 = double ( :"t2p0" , topic : "topic2" , partition_id : 0 ) ,
274
+ t2p1 = double ( :"t2p1" , topic : "topic2" , partition_id : 1 ) ,
275
+ ]
276
+
277
+ assignments = strategy . call ( cluster : nil , members : members , partitions : partitions )
278
+
279
+ expect ( assignments ) . to eq ( {
280
+ 'member1' => [ t1p0 ] ,
281
+ 'member2' => [ t1p1 , t2p0 , t2p1 ] ,
282
+ 'member3' => [ t1p2 ]
283
+ } )
284
+ end
285
+ end
286
+
287
+ context 'two consumers subscribed to two topics with three partitions each' do
288
+ it 'creates a balanced assignment' do
289
+ members = {
290
+ 'member1' => double ( topics : [ 'topic1' , 'topic2' ] ) ,
291
+ 'member2' => double ( topics : [ 'topic1' , 'topic2' ] )
292
+ }
293
+ partitions = [
294
+ t1p0 = double ( :"t1p0" , topic : "topic1" , partition_id : 0 ) ,
295
+ t1p1 = double ( :"t1p1" , topic : "topic1" , partition_id : 1 ) ,
296
+ t1p2 = double ( :"t1p2" , topic : "topic1" , partition_id : 2 ) ,
297
+ t2p0 = double ( :"t2p0" , topic : "topic2" , partition_id : 0 ) ,
298
+ t2p1 = double ( :"t2p1" , topic : "topic2" , partition_id : 1 ) ,
299
+ t2p2 = double ( :"t2p2" , topic : "topic2" , partition_id : 2 ) ,
300
+ ]
301
+
302
+ assignments = strategy . call ( cluster : nil , members : members , partitions : partitions )
303
+
304
+ expect ( assignments ) . to eq ( {
305
+ 'member1' => [ t1p0 , t1p2 , t2p1 ] ,
306
+ 'member2' => [ t1p1 , t2p0 , t2p2 ]
307
+ } )
308
+ end
309
+ end
310
+
311
+ context 'many consumers subscribed to one topic with partitions given out of order' do
312
+ it 'produces balanced assignments' do
313
+ members = {
314
+ 'member1' => double ( topics : [ 'topic1' ] ) ,
315
+ 'member2' => double ( topics : [ 'topic1' ] ) ,
316
+ 'member3' => double ( topics : [ 'topic2' ] ) ,
317
+ }
318
+
319
+ partitions = [
320
+ t2p0 = double ( :"t2p0" , topic : "topic2" , partition_id : 0 ) ,
321
+ t1p0 = double ( :"t1p0" , topic : "topic1" , partition_id : 0 ) ,
322
+ t2p1 = double ( :"t2p1" , topic : "topic2" , partition_id : 1 ) ,
323
+ t1p1 = double ( :"t1p1" , topic : "topic1" , partition_id : 1 ) ,
324
+ ]
325
+
326
+ assignments = strategy . call ( cluster : nil , members : members , partitions : partitions )
327
+
328
+ # Without sorting the partitions by topic this input would produce a non balanced assignment:
329
+ # member1 => [t1p0, t1p1]
330
+ # member2 => []
331
+ # member3 => [t2p0, t2p1]
332
+ expect ( assignments ) . to eq ( {
333
+ 'member1' => [ t1p0 ] ,
334
+ 'member2' => [ t1p1 ] ,
335
+ 'member3' => [ t2p0 , t2p1 ]
336
+ } )
337
+ end
338
+ end
135
339
end
0 commit comments