Skip to content

Commit 21a2226

Browse files
committed
Provide a rebalance callback for cooperative rebalance strategy
1 parent f7653f2 commit 21a2226

File tree

1 file changed

+46
-20
lines changed

1 file changed

+46
-20
lines changed

lib/kafka-consumer.js

+46-20
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,49 @@ var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500;
2121
var DEFAULT_CONSUME_TIME_OUT = 1000;
2222
util.inherits(KafkaConsumer, Client);
2323

24+
var eagerRebalanceCallback = function(err, assignment) {
25+
// Create the librdkafka error
26+
err = LibrdKafkaError.create(err);
27+
// Emit the event
28+
self.emit('rebalance', err, assignment);
29+
30+
// That's it
31+
try {
32+
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
33+
self.assign(assignment);
34+
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
35+
self.unassign();
36+
}
37+
} catch (e) {
38+
// Ignore exceptions if we are not connected
39+
if (self.isConnected()) {
40+
self.emit('rebalance.error', e);
41+
}
42+
}
43+
}
44+
45+
var cooperativeRebalanceCallback = function(err, assignment) {
46+
// Create the librdkafka error
47+
err = LibrdKafkaError.create(err);
48+
// Emit the event
49+
self.emit('rebalance', err, assignment);
50+
51+
// That's it
52+
try {
53+
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
54+
self.incrementalAssign(assignment);
55+
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
56+
self.incrementalUnassign(assignment);
57+
}
58+
} catch (e) {
59+
// Ignore exceptions if we are not connected
60+
if (self.isConnected()) {
61+
self.emit('rebalance.error', e);
62+
}
63+
}
64+
}
65+
66+
2467
/**
2568
* KafkaConsumer class for reading messages from Kafka
2669
*
@@ -52,26 +95,9 @@ function KafkaConsumer(conf, topicConf) {
5295

5396
// If rebalance is undefined we don't want any part of this
5497
if (onRebalance && typeof onRebalance === 'boolean') {
55-
conf.rebalance_cb = function(err, assignment) {
56-
// Create the librdkafka error
57-
err = LibrdKafkaError.create(err);
58-
// Emit the event
59-
self.emit('rebalance', err, assignment);
60-
61-
// That's it
62-
try {
63-
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
64-
self.assign(assignment);
65-
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
66-
self.unassign();
67-
}
68-
} catch (e) {
69-
// Ignore exceptions if we are not connected
70-
if (self.isConnected()) {
71-
self.emit('rebalance.error', e);
72-
}
73-
}
74-
};
98+
conf.rebalance_cb = conf['partition.assignment.strategy'] === 'cooperative-sticky'
99+
? cooperativeRebalanceCallback
100+
: eagerRebalanceCallback;
75101
} else if (onRebalance && typeof onRebalance === 'function') {
76102
/*
77103
* Once this is opted in to, that's it. It's going to manually rebalance

0 commit comments

Comments
 (0)