Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 1bf8159

Browse files
author
Robin Fehr
committedJul 12, 2022
cooperative incremental rebalance
1 parent 29106d5 commit 1bf8159

14 files changed

+733
-73
lines changed
 

‎docker-compose.yml

+50-22
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,51 @@
11
---
2-
zookeeper:
3-
image: confluentinc/cp-zookeeper
4-
ports:
5-
- "2181:2181"
6-
environment:
7-
ZOOKEEPER_CLIENT_PORT: 2181
8-
ZOOKEEPER_TICK_TIME: 2000
9-
kafka:
10-
image: confluentinc/cp-kafka
11-
links:
12-
- zookeeper
13-
ports:
14-
- "9092:9092"
15-
environment:
16-
KAFKA_BROKER_ID: 1
17-
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
18-
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
19-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
20-
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
21-
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
22-
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
23-
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
2+
version: '2'
3+
services:
4+
zookeeper:
5+
image: confluentinc/cp-zookeeper
6+
ports:
7+
- "2181:2181"
8+
networks:
9+
- localnet
10+
environment:
11+
ZOOKEEPER_CLIENT_PORT: 2181
12+
ZOOKEEPER_TICK_TIME: 2000
13+
kafka:
14+
image: confluentinc/cp-kafka
15+
ports:
16+
- 9092:9092
17+
- 9997:9997
18+
networks:
19+
- localnet
20+
depends_on:
21+
- zookeeper
22+
environment:
23+
KAFKA_BROKER_ID: 1
24+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
25+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
26+
# KAFKA_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
27+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
28+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
29+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
30+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
31+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
32+
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
33+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
34+
kafka-ui:
35+
container_name: kafka-ui
36+
image: provectuslabs/kafka-ui:latest
37+
ports:
38+
- 8080:8080
39+
networks:
40+
- localnet
41+
depends_on:
42+
- zookeeper
43+
- kafka
44+
environment:
45+
KAFKA_CLUSTERS_0_NAME: local
46+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
47+
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
48+
networks:
49+
localnet:
50+
attachable: true
51+

‎e2e/both.spec.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ describe('Consumer/Producer', function() {
228228

229229
setTimeout(function() {
230230
producer.produce(topic, null, buffer, null);
231-
}, 500)
231+
}, 500);
232232
consumer.setDefaultConsumeTimeout(2000);
233233
consumer.consume(1000, function(err, messages) {
234234
t.ifError(err);
@@ -261,7 +261,7 @@ describe('Consumer/Producer', function() {
261261

262262
setTimeout(function() {
263263
producer.produce(topic, null, buffer, null);
264-
}, 2000)
264+
}, 2000);
265265
consumer.setDefaultConsumeTimeout(3000);
266266
consumer.consume(1000, function(err, messages) {
267267
t.ifError(err);

‎e2e/consumer.spec.js

+350-1
Large diffs are not rendered by default.

‎e2e/producer-transaction.spec.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ describe('Transactional Producer', function () {
4343
producerInput.disconnect(function (err) {
4444
consumerTrans.subscribe([topicIn]);
4545
done(err);
46-
})
46+
});
4747
}
4848
}
4949
producerInput = Kafka.Producer({
@@ -76,7 +76,7 @@ describe('Transactional Producer', function () {
7676
});
7777

7878
after(function (done) {
79-
let connected = 2;
79+
var connected = 2;
8080
function execDisconnect(client) {
8181
if (!client.isConnected) {
8282
connected--;
@@ -225,7 +225,7 @@ describe('Transactional Producer', function () {
225225
return;
226226
}
227227
done();
228-
})
228+
});
229229
}
230230
});
231231
});
@@ -261,7 +261,7 @@ describe('Transactional Producer', function () {
261261
return;
262262
}
263263
done();
264-
})
264+
});
265265
} else {
266266
done('Expected only B');
267267
return;

‎index.d.ts

+6
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,12 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
223223
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
224224
consume(): void;
225225

226+
incrementalAssign(assigments: Assignment[]): this;
227+
228+
incrementalUnassign(assignments: Assignment[]): this;
229+
230+
assignmentLost(): boolean;
231+
226232
getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
227233

228234
offsetsStore(topicPartitions: TopicPartitionOffset[]): any;

‎lib/client.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ function Client(globalConf, SubClientType, topicConf) {
6262
}
6363
}
6464
return obj2;
65-
}
65+
};
6666
this._cb_configs = {
6767
global: extractFunctions(globalConf),
6868
topic: extractFunctions(topicConf),
6969
event: {},
70-
}
70+
};
7171

7272
if (!no_event_cb) {
7373
this._cb_configs.event.event_cb = function(eventType, eventData) {

‎lib/error.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -446,9 +446,9 @@ function LibrdKafkaError(e) {
446446

447447
}
448448

449-
if (e.hasOwnProperty('isFatal')) this.isFatal = e.isFatal;
450-
if (e.hasOwnProperty('isRetriable')) this.isRetriable = e.isRetriable;
451-
if (e.hasOwnProperty('isTxnRequiresAbort')) this.isTxnRequiresAbort = e.isTxnRequiresAbort;
449+
if (e.hasOwnProperty('isFatal')) { this.isFatal = e.isFatal; }
450+
if (e.hasOwnProperty('isRetriable')) { this.isRetriable = e.isRetriable; }
451+
if (e.hasOwnProperty('isTxnRequiresAbort')) { this.isTxnRequiresAbort = e.isTxnRequiresAbort; }
452452

453453
}
454454

‎lib/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
* of the MIT license. See the LICENSE.txt file for details.
88
*/
99

10-
var KafkaConsumer = require('./kafka-consumer');
10+
var KafkaConsumer = require('./kafka-consumer').KafkaConsumer;
1111
var Producer = require('./producer');
1212
var HighLevelProducer = require('./producer/high-level-producer');
1313
var error = require('./error');

‎lib/kafka-consumer.js

+103-31
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
*/
99
'use strict';
1010

11-
module.exports = KafkaConsumer;
12-
1311
var Client = require('./client');
1412
var util = require('util');
1513
var Kafka = require('../librdkafka');
@@ -21,6 +19,48 @@ var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500;
2119
var DEFAULT_CONSUME_TIME_OUT = 1000;
2220
util.inherits(KafkaConsumer, Client);
2321

22+
var eagerRebalanceCallback = function(err, assignment) {
23+
// Create the librdkafka error
24+
err = LibrdKafkaError.create(err);
25+
// Emit the event
26+
this.emit('rebalance', err, assignment);
27+
28+
// That's it
29+
try {
30+
if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
31+
this.assign(assignment);
32+
} else if (err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS) {
33+
this.unassign();
34+
}
35+
} catch (e) {
36+
// Ignore exceptions if we are not connected
37+
if (this.isConnected()) {
38+
this.emit('rebalance.error', e);
39+
}
40+
}
41+
};
42+
43+
var cooperativeRebalanceCallback = function(err, assignment) {
44+
// Create the librdkafka error
45+
err = LibrdKafkaError.create(err);
46+
// Emit the event
47+
this.emit('rebalance', err, assignment);
48+
49+
// That's it
50+
try {
51+
if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
52+
this.incrementalAssign(assignment);
53+
} else if (err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS) {
54+
this.incrementalUnassign(assignment);
55+
}
56+
} catch (e) {
57+
// Ignore exceptions if we are not connected
58+
if (this.isConnected()) {
59+
this.emit('rebalance.error', e);
60+
}
61+
}
62+
};
63+
2464
/**
2565
* KafkaConsumer class for reading messages from Kafka
2666
*
@@ -51,41 +91,25 @@ function KafkaConsumer(conf, topicConf) {
5191
var self = this;
5292

5393
// If rebalance is undefined we don't want any part of this
54-
if (onRebalance && typeof onRebalance === 'boolean') {
55-
conf.rebalance_cb = function(err, assignment) {
56-
// Create the librdkafka error
57-
err = LibrdKafkaError.create(err);
58-
// Emit the event
59-
self.emit('rebalance', err, assignment);
60-
61-
// That's it
62-
try {
63-
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
64-
self.assign(assignment);
65-
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
66-
self.unassign();
67-
}
68-
} catch (e) {
69-
// Ignore exceptions if we are not connected
70-
if (self.isConnected()) {
71-
self.emit('rebalance.error', e);
72-
}
73-
}
74-
};
75-
} else if (onRebalance && typeof onRebalance === 'function') {
94+
if (onRebalance && typeof onRebalance === "boolean") {
95+
conf.rebalance_cb =
96+
conf["partition.assignment.strategy"] === "cooperative-sticky" ?
97+
cooperativeRebalanceCallback.bind(this) :
98+
eagerRebalanceCallback.bind(this);
99+
} else if (onRebalance && typeof onRebalance === "function") {
76100
/*
77101
* Once this is opted in to, that's it. It's going to manually rebalance
78102
* forever. There is no way to unset config values in librdkafka, just
79103
* a way to override them.
80104
*/
81105

82-
conf.rebalance_cb = function(err, assignment) {
83-
// Create the librdkafka error
84-
err = err ? LibrdKafkaError.create(err) : undefined;
106+
conf.rebalance_cb = function (err, assignment) {
107+
// Create the librdkafka error
108+
err = err ? LibrdKafkaError.create(err) : undefined;
85109

86-
self.emit('rebalance', err, assignment);
87-
onRebalance.call(self, err, assignment);
88-
};
110+
self.emit("rebalance", err, assignment);
111+
onRebalance.call(self, err, assignment);
112+
};
89113
}
90114

91115
// Same treatment for offset_commit_cb
@@ -264,6 +288,19 @@ KafkaConsumer.prototype.assign = function(assignments) {
264288
return this;
265289
};
266290

291+
/**
292+
* Incremental assign the consumer specific partitions and topics
293+
*
294+
* @param {array} assignments - Assignments array. Should contain
295+
* objects with topic and partition set.
296+
* @return {Client} - Returns itself
297+
*/
298+
299+
KafkaConsumer.prototype.incrementalAssign = function(assignments) {
300+
this._client.incrementalAssign(TopicPartition.map(assignments));
301+
return this;
302+
};
303+
267304
/**
268305
* Unassign the consumer from its assigned partitions and topics.
269306
*
@@ -275,6 +312,34 @@ KafkaConsumer.prototype.unassign = function() {
275312
return this;
276313
};
277314

315+
/**
316+
* Incremental unassign the consumer from specific partitions and topics
317+
*
318+
* @param {array} assignments - Assignments array. Should contain
319+
* objects with topic and partition set.
320+
* @return {Client} - Returns itself
321+
*/
322+
323+
KafkaConsumer.prototype.incrementalUnassign = function(assignments) {
324+
this._client.incrementalUnassign(TopicPartition.map(assignments));
325+
return this;
326+
};
327+
328+
/**
329+
* Get the assignment lost state.
330+
* Examples for an assignment to be lost:
331+
* - Unsuccessful heartbeats
332+
* - Unknown member id during heartbeats
333+
* - Illegal generation during heartbeats
334+
* - Static consumer fenced by other consumer with same group.instance.id
335+
* - Max. poll interval exceeded
336+
* - Subscribed topic(s) no longer exist during meta data updates
337+
* @return {boolean} - Returns true if the assignment is lost
338+
*/
339+
340+
KafkaConsumer.prototype.assignmentLost = function() {
341+
return this._client.assignmentLost();
342+
};
278343

279344
/**
280345
* Get the assignments for the consumer
@@ -477,7 +542,7 @@ KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {
477542
function emitEofEventsFor(messageIndex) {
478543
while (currentEofEventsIndex < eofEvents.length && eofEvents[currentEofEventsIndex].messageIndex === messageIndex) {
479544
delete eofEvents[currentEofEventsIndex].messageIndex;
480-
self.emit('partition.eof', eofEvents[currentEofEventsIndex])
545+
self.emit('partition.eof', eofEvents[currentEofEventsIndex]);
481546
++currentEofEventsIndex;
482547
}
483548
}
@@ -654,3 +719,10 @@ KafkaConsumer.prototype.pause = function(topicPartitions) {
654719

655720
return this._errorWrap(this._client.pause(topicPartitions), true);
656721
};
722+
723+
module.exports = {
724+
KafkaConsumer: KafkaConsumer,
725+
eagerRebalanceCallback: eagerRebalanceCallback,
726+
cooperativeRebalanceCallback: cooperativeRebalanceCallback
727+
};
728+

‎run_docker.sh

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@ topics=(
2121
"test4"
2222
"test5"
2323
"test6"
24+
"test7"
2425
)
2526

2627
# Run docker-compose exec to make them
2728
for topic in "${topics[@]}"
2829
do
2930
echo "Making topic $topic"
31+
[[ "$topic" != "test7" ]] && partitions=1 || partitions=2
3032
until docker-compose exec kafka \
31-
kafka-topics --create --topic $topic --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181
33+
kafka-topics --create --topic $topic --partitions $partitions --replication-factor 1 --if-not-exists --bootstrap-server localhost:9092
3234
do
3335
topic_result="$?"
3436
if [ "$topic_result" == "1" ]; then

‎src/kafka-consumer.cc

+199-3
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,32 @@ Baton KafkaConsumer::Assign(std::vector<RdKafka::TopicPartition*> partitions) {
177177
return Baton(errcode);
178178
}
179179

180+
Baton KafkaConsumer::IncrementalAssign(
181+
std::vector<RdKafka::TopicPartition *> partitions) {
182+
if (!IsConnected()) {
183+
return Baton(RdKafka::ERR__STATE, "KafkaConsumer is disconnected");
184+
}
185+
186+
RdKafka::KafkaConsumer* consumer =
187+
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
188+
189+
RdKafka::Error *e = consumer->incremental_assign(partitions);
190+
191+
if (e) {
192+
RdKafka::ErrorCode errcode = e->code();
193+
delete e;
194+
return Baton(errcode);
195+
}
196+
197+
m_partition_cnt += partitions.size();
198+
for (auto i = partitions.begin(); i != partitions.end(); ++i) {
199+
m_partitions.push_back(*i);
200+
}
201+
partitions.clear();
202+
203+
return Baton(RdKafka::ERR_NO_ERROR);
204+
}
205+
180206
Baton KafkaConsumer::Unassign() {
181207
if (!IsClosing() && !IsConnected()) {
182208
return Baton(RdKafka::ERR__STATE);
@@ -193,12 +219,46 @@ Baton KafkaConsumer::Unassign() {
193219

194220
// Destroy the old list of partitions since we are no longer using it
195221
RdKafka::TopicPartition::destroy(m_partitions);
222+
m_partitions.clear();
196223

197224
m_partition_cnt = 0;
198225

199226
return Baton(RdKafka::ERR_NO_ERROR);
200227
}
201228

229+
Baton KafkaConsumer::IncrementalUnassign(
230+
std::vector<RdKafka::TopicPartition*> partitions) {
231+
if (!IsClosing() && !IsConnected()) {
232+
return Baton(RdKafka::ERR__STATE);
233+
}
234+
235+
RdKafka::KafkaConsumer* consumer =
236+
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
237+
238+
RdKafka::Error *e = consumer->incremental_unassign(partitions);
239+
if (e) {
240+
RdKafka::ErrorCode errcode = e->code();
241+
delete e;
242+
return Baton(errcode);
243+
}
244+
245+
// Destroy the old list of partitions since we are no longer using it
246+
RdKafka::TopicPartition::destroy(partitions);
247+
248+
m_partitions.erase(
249+
std::remove_if(
250+
m_partitions.begin(),
251+
m_partitions.end(),
252+
[&partitions](RdKafka::TopicPartition *x) -> bool {
253+
return std::find(
254+
partitions.begin(),
255+
partitions.end(), x) != partitions.end();
256+
}),
257+
m_partitions.end());
258+
m_partition_cnt -= partitions.size();
259+
return Baton(RdKafka::ERR_NO_ERROR);
260+
}
261+
202262
Baton KafkaConsumer::Commit(std::vector<RdKafka::TopicPartition*> toppars) {
203263
if (!IsConnected()) {
204264
return Baton(RdKafka::ERR__STATE);
@@ -467,6 +527,12 @@ Baton KafkaConsumer::RefreshAssignments() {
467527
}
468528
}
469529

530+
bool KafkaConsumer::AssignmentLost() {
531+
RdKafka::KafkaConsumer* consumer =
532+
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
533+
return consumer->assignment_lost();
534+
}
535+
470536
std::string KafkaConsumer::Name() {
471537
if (!IsConnected()) {
472538
return std::string("");
@@ -525,8 +591,11 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
525591
Nan::SetPrototypeMethod(tpl, "committed", NodeCommitted);
526592
Nan::SetPrototypeMethod(tpl, "position", NodePosition);
527593
Nan::SetPrototypeMethod(tpl, "assign", NodeAssign);
594+
Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign);
528595
Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign);
529-
Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments);
596+
Nan::SetPrototypeMethod(tpl, "incrementalUnassign", NodeIncrementalUnassign);
597+
Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignment);
598+
Nan::SetPrototypeMethod(tpl, "assignmentLost", NodeAssignmentLost);
530599

531600
Nan::SetPrototypeMethod(tpl, "commit", NodeCommit);
532601
Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync);
@@ -682,7 +751,7 @@ NAN_METHOD(KafkaConsumer::NodePosition) {
682751
RdKafka::TopicPartition::destroy(toppars);
683752
}
684753

685-
NAN_METHOD(KafkaConsumer::NodeAssignments) {
754+
NAN_METHOD(KafkaConsumer::NodeAssignment) {
686755
Nan::HandleScope scope;
687756

688757
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
@@ -757,6 +826,64 @@ NAN_METHOD(KafkaConsumer::NodeAssign) {
757826
info.GetReturnValue().Set(Nan::True());
758827
}
759828

829+
NAN_METHOD(KafkaConsumer::NodeIncrementalAssign) {
830+
Nan::HandleScope scope;
831+
832+
if (info.Length() < 1 || !info[0]->IsArray()) {
833+
// Just throw an exception
834+
return Nan::ThrowError("Need to specify an array of partitions");
835+
}
836+
837+
v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
838+
std::vector<RdKafka::TopicPartition*> topic_partitions;
839+
840+
for (unsigned int i = 0; i < partitions->Length(); ++i) {
841+
v8::Local<v8::Value> partition_obj_value;
842+
if (!(
843+
Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
844+
partition_obj_value->IsObject())) {
845+
Nan::ThrowError("Must pass topic-partition objects");
846+
}
847+
848+
v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();
849+
850+
// Got the object
851+
int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
852+
std::string topic = GetParameter<std::string>(partition_obj, "topic", "");
853+
854+
if (!topic.empty()) {
855+
RdKafka::TopicPartition* part;
856+
857+
if (partition < 0) {
858+
part = Connection::GetPartition(topic);
859+
} else {
860+
part = Connection::GetPartition(topic, partition);
861+
}
862+
863+
// Set the default value to offset invalid. If provided, we will not set
864+
// the offset.
865+
int64_t offset = GetParameter<int64_t>(
866+
partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
867+
if (offset != RdKafka::Topic::OFFSET_INVALID) {
868+
part->set_offset(offset);
869+
}
870+
871+
topic_partitions.push_back(part);
872+
}
873+
}
874+
875+
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
876+
877+
// Hand over the partitions to the consumer.
878+
Baton b = consumer->IncrementalAssign(topic_partitions);
879+
880+
if (b.err() != RdKafka::ERR_NO_ERROR) {
881+
Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
882+
}
883+
884+
info.GetReturnValue().Set(Nan::True());
885+
}
886+
760887
NAN_METHOD(KafkaConsumer::NodeUnassign) {
761888
Nan::HandleScope scope;
762889

@@ -777,6 +904,71 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) {
777904
info.GetReturnValue().Set(Nan::True());
778905
}
779906

907+
NAN_METHOD(KafkaConsumer::NodeIncrementalUnassign) {
908+
Nan::HandleScope scope;
909+
910+
if (info.Length() < 1 || !info[0]->IsArray()) {
911+
// Just throw an exception
912+
return Nan::ThrowError("Need to specify an array of partitions");
913+
}
914+
915+
v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
916+
std::vector<RdKafka::TopicPartition*> topic_partitions;
917+
918+
for (unsigned int i = 0; i < partitions->Length(); ++i) {
919+
v8::Local<v8::Value> partition_obj_value;
920+
if (!(
921+
Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
922+
partition_obj_value->IsObject())) {
923+
Nan::ThrowError("Must pass topic-partition objects");
924+
}
925+
926+
v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();
927+
928+
// Got the object
929+
int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
930+
std::string topic = GetParameter<std::string>(partition_obj, "topic", "");
931+
932+
if (!topic.empty()) {
933+
RdKafka::TopicPartition* part;
934+
935+
if (partition < 0) {
936+
part = Connection::GetPartition(topic);
937+
} else {
938+
part = Connection::GetPartition(topic, partition);
939+
}
940+
941+
// Set the default value to offset invalid. If provided, we will not set
942+
// the offset.
943+
int64_t offset = GetParameter<int64_t>(
944+
partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
945+
if (offset != RdKafka::Topic::OFFSET_INVALID) {
946+
part->set_offset(offset);
947+
}
948+
949+
topic_partitions.push_back(part);
950+
}
951+
}
952+
953+
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
954+
// Hand over the partitions to the consumer.
955+
Baton b = consumer->IncrementalUnassign(topic_partitions);
956+
957+
if (b.err() != RdKafka::ERR_NO_ERROR) {
958+
Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
959+
}
960+
961+
info.GetReturnValue().Set(Nan::True());
962+
}
963+
964+
NAN_METHOD(KafkaConsumer::NodeAssignmentLost) {
965+
Nan::HandleScope scope;
966+
967+
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
968+
bool b = consumer->AssignmentLost();
969+
info.GetReturnValue().Set(Nan::New<v8::Boolean>(b));
970+
}
971+
780972
NAN_METHOD(KafkaConsumer::NodeUnsubscribe) {
781973
Nan::HandleScope scope;
782974

@@ -1087,7 +1279,11 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) {
10871279

10881280
Nan::Callback *callback = new Nan::Callback(cb);
10891281
Nan::AsyncQueueWorker(
1090-
new Workers::KafkaConsumerConsumeLoop(callback, consumer, timeout_ms, retry_read_ms));
1282+
new Workers::KafkaConsumerConsumeLoop(
1283+
callback,
1284+
consumer,
1285+
timeout_ms,
1286+
retry_read_ms));
10911287

10921288
info.GetReturnValue().Set(Nan::Null());
10931289
}

‎src/kafka-consumer.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ class KafkaConsumer : public Connection {
7272
int AssignedPartitionCount();
7373

7474
Baton Assign(std::vector<RdKafka::TopicPartition*>);
75+
Baton IncrementalAssign(std::vector<RdKafka::TopicPartition*>);
7576
Baton Unassign();
77+
Baton IncrementalUnassign(std::vector<RdKafka::TopicPartition*>);
78+
bool AssignmentLost();
7679

7780
Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms);
7881

@@ -103,8 +106,11 @@ class KafkaConsumer : public Connection {
103106
static NAN_METHOD(NodeSubscribe);
104107
static NAN_METHOD(NodeDisconnect);
105108
static NAN_METHOD(NodeAssign);
109+
static NAN_METHOD(NodeIncrementalAssign);
106110
static NAN_METHOD(NodeUnassign);
107-
static NAN_METHOD(NodeAssignments);
111+
static NAN_METHOD(NodeIncrementalUnassign);
112+
static NAN_METHOD(NodeAssignmentLost);
113+
static NAN_METHOD(NodeAssignment);
108114
static NAN_METHOD(NodeUnsubscribe);
109115
static NAN_METHOD(NodeCommit);
110116
static NAN_METHOD(NodeCommitSync);

‎test/consumer.spec.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ module.exports = {
7777
});
7878
},
7979
'has necessary bindings for librdkafka 1:1 binding': function() {
80-
var methods = ['assign', 'unassign', 'subscribe'];
80+
var methods = ['assign', 'unassign', 'subscribe', 'incrementalAssign', 'incrementalUnassign', 'assignmentLost'];
8181
methods.forEach(function(m) {
8282
t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method');
8383
});

‎test/kafka-consumer.spec.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
* of the MIT license. See the LICENSE.txt file for details.
88
*/
99

10-
var KafkaConsumer = require('../lib/kafka-consumer');
10+
var KafkaConsumer = require('../lib/kafka-consumer').KafkaConsumer;
11+
1112
var t = require('assert');
1213

1314
var client;

0 commit comments

Comments
 (0)
Please sign in to comment.