Skip to content

Commit f7653f2

Browse files
committed
Add incremental assign and unassign support
1 parent 57d6fd7 commit f7653f2

File tree

4 files changed

+204
-0
lines changed

4 files changed

+204
-0
lines changed

index.d.ts

+4
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
223223
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
224224
consume(): void;
225225

226+
incrementalAssign(assigments: Assignment[]): this;
227+
228+
incrementalUnassign(assignments: Assignment[]): this;
229+
226230
getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
227231

228232
offsetsStore(topicPartitions: TopicPartitionOffset[]): any;

lib/kafka-consumer.js

+25
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,19 @@ KafkaConsumer.prototype.assign = function(assignments) {
264264
return this;
265265
};
266266

267+
/**
268+
* Incremental assign the consumer specific partitions and topics
269+
*
270+
* @param {array} assignments - Assignments array. Should contain
271+
* objects with topic and partition set.
272+
* @return {Client} - Returns itself
273+
*/
274+
275+
KafkaConsumer.prototype.incrementalAssign = function(assignments) {
276+
this._client.incrementalAssign(TopicPartition.map(assignments));
277+
return this;
278+
};
279+
267280
/**
268281
* Unassign the consumer from its assigned partitions and topics.
269282
*
@@ -275,6 +288,18 @@ KafkaConsumer.prototype.unassign = function() {
275288
return this;
276289
};
277290

291+
/**
292+
* Incremental unassign the consumer from specific partitions and topics
293+
*
294+
* @param {array} assignments - Assignments array. Should contain
295+
* objects with topic and partition set.
296+
* @return {Client} - Returns itself
297+
*/
298+
299+
KafkaConsumer.prototype.incrementalUnassign = function(assignments) {
300+
this._client.incrementalUnassign(TopicPartition.map(assignments));
301+
return this;
302+
};
278303

279304
/**
280305
* Get the assignments for the consumer

src/kafka-consumer.cc

+171
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,31 @@ Baton KafkaConsumer::Assign(std::vector<RdKafka::TopicPartition*> partitions) {
177177
return Baton(errcode);
178178
}
179179

180+
Baton KafkaConsumer::IncrementalAssign(std::vector<RdKafka::TopicPartition*> partitions) {
181+
if (!IsConnected()) {
182+
return Baton(RdKafka::ERR__STATE, "KafkaConsumer is disconnected");
183+
}
184+
185+
RdKafka::KafkaConsumer* consumer =
186+
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
187+
188+
RdKafka::Error *e = consumer->incremental_assign(partitions);
189+
190+
if (e) {
191+
RdKafka::ErrorCode errcode = e->code();
192+
delete e;
193+
return Baton(errcode);
194+
}
195+
196+
m_partition_cnt += partitions.size();
197+
for (auto i = partitions.begin(); i != partitions.end(); ++i) {
198+
m_partitions.push_back(*i);
199+
}
200+
partitions.clear();
201+
202+
return Baton(RdKafka::ERR_NO_ERROR);
203+
}
204+
180205
Baton KafkaConsumer::Unassign() {
181206
if (!IsClosing() && !IsConnected()) {
182207
return Baton(RdKafka::ERR__STATE);
@@ -193,12 +218,41 @@ Baton KafkaConsumer::Unassign() {
193218

194219
// Destroy the old list of partitions since we are no longer using it
195220
RdKafka::TopicPartition::destroy(m_partitions);
221+
m_partitions.clear();
196222

197223
m_partition_cnt = 0;
198224

199225
return Baton(RdKafka::ERR_NO_ERROR);
200226
}
201227

228+
Baton KafkaConsumer::IncrementalUnassign(std::vector<RdKafka::TopicPartition*> partitions) {
229+
if (!IsClosing() && !IsConnected()) {
230+
return Baton(RdKafka::ERR__STATE);
231+
}
232+
233+
RdKafka::KafkaConsumer* consumer =
234+
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
235+
236+
RdKafka::Error *e = consumer->incremental_unassign(partitions);
237+
if (e) {
238+
RdKafka::ErrorCode errcode = e->code();
239+
delete e;
240+
return Baton(errcode);
241+
}
242+
243+
// Destroy the old list of partitions since we are no longer using it
244+
RdKafka::TopicPartition::destroy(partitions);
245+
246+
m_partitions.erase(
247+
std::remove_if(m_partitions.begin(), m_partitions.end(), [&partitions](RdKafka::TopicPartition *x) -> bool {
248+
return std::find(partitions.begin(), partitions.end(), x) != partitions.end();
249+
}),
250+
m_partitions.end()
251+
);
252+
m_partition_cnt -= partitions.size();
253+
return Baton(RdKafka::ERR_NO_ERROR);
254+
}
255+
202256
Baton KafkaConsumer::Commit(std::vector<RdKafka::TopicPartition*> toppars) {
203257
if (!IsConnected()) {
204258
return Baton(RdKafka::ERR__STATE);
@@ -525,7 +579,9 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
525579
Nan::SetPrototypeMethod(tpl, "committed", NodeCommitted);
526580
Nan::SetPrototypeMethod(tpl, "position", NodePosition);
527581
Nan::SetPrototypeMethod(tpl, "assign", NodeAssign);
582+
Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign);
528583
Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign);
584+
Nan::SetPrototypeMethod(tpl, "incrementalUnassign", NodeIncrementalUnassign);
529585
Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments);
530586

531587
Nan::SetPrototypeMethod(tpl, "commit", NodeCommit);
@@ -757,6 +813,64 @@ NAN_METHOD(KafkaConsumer::NodeAssign) {
757813
info.GetReturnValue().Set(Nan::True());
758814
}
759815

816+
NAN_METHOD(KafkaConsumer::NodeIncrementalAssign) {
817+
Nan::HandleScope scope;
818+
819+
if (info.Length() < 1 || !info[0]->IsArray()) {
820+
// Just throw an exception
821+
return Nan::ThrowError("Need to specify an array of partitions");
822+
}
823+
824+
v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
825+
std::vector<RdKafka::TopicPartition*> topic_partitions;
826+
827+
for (unsigned int i = 0; i < partitions->Length(); ++i) {
828+
v8::Local<v8::Value> partition_obj_value;
829+
if (!(
830+
Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
831+
partition_obj_value->IsObject())) {
832+
Nan::ThrowError("Must pass topic-partition objects");
833+
}
834+
835+
v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();
836+
837+
// Got the object
838+
int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
839+
std::string topic = GetParameter<std::string>(partition_obj, "topic", "");
840+
841+
if (!topic.empty()) {
842+
RdKafka::TopicPartition* part;
843+
844+
if (partition < 0) {
845+
part = Connection::GetPartition(topic);
846+
} else {
847+
part = Connection::GetPartition(topic, partition);
848+
}
849+
850+
// Set the default value to offset invalid. If provided, we will not set
851+
// the offset.
852+
int64_t offset = GetParameter<int64_t>(
853+
partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
854+
if (offset != RdKafka::Topic::OFFSET_INVALID) {
855+
part->set_offset(offset);
856+
}
857+
858+
topic_partitions.push_back(part);
859+
}
860+
}
861+
862+
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
863+
864+
// Hand over the partitions to the consumer.
865+
Baton b = consumer->IncrementalAssign(topic_partitions);
866+
867+
if (b.err() != RdKafka::ERR_NO_ERROR) {
868+
Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
869+
}
870+
871+
info.GetReturnValue().Set(Nan::True());
872+
}
873+
760874
NAN_METHOD(KafkaConsumer::NodeUnassign) {
761875
Nan::HandleScope scope;
762876

@@ -777,6 +891,63 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) {
777891
info.GetReturnValue().Set(Nan::True());
778892
}
779893

894+
NAN_METHOD(KafkaConsumer::NodeIncrementalUnassign) {
895+
Nan::HandleScope scope;
896+
897+
if (info.Length() < 1 || !info[0]->IsArray()) {
898+
// Just throw an exception
899+
return Nan::ThrowError("Need to specify an array of partitions");
900+
}
901+
902+
v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
903+
std::vector<RdKafka::TopicPartition*> topic_partitions;
904+
905+
for (unsigned int i = 0; i < partitions->Length(); ++i) {
906+
v8::Local<v8::Value> partition_obj_value;
907+
if (!(
908+
Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
909+
partition_obj_value->IsObject())) {
910+
Nan::ThrowError("Must pass topic-partition objects");
911+
}
912+
913+
v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();
914+
915+
// Got the object
916+
int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
917+
std::string topic = GetParameter<std::string>(partition_obj, "topic", "");
918+
919+
if (!topic.empty()) {
920+
RdKafka::TopicPartition* part;
921+
922+
if (partition < 0) {
923+
part = Connection::GetPartition(topic);
924+
} else {
925+
part = Connection::GetPartition(topic, partition);
926+
}
927+
928+
// Set the default value to offset invalid. If provided, we will not set
929+
// the offset.
930+
int64_t offset = GetParameter<int64_t>(
931+
partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
932+
if (offset != RdKafka::Topic::OFFSET_INVALID) {
933+
part->set_offset(offset);
934+
}
935+
936+
topic_partitions.push_back(part);
937+
}
938+
}
939+
940+
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
941+
// Hand over the partitions to the consumer.
942+
Baton b = consumer->IncrementalUnassign(topic_partitions);
943+
944+
if (b.err() != RdKafka::ERR_NO_ERROR) {
945+
Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
946+
}
947+
948+
info.GetReturnValue().Set(Nan::True());
949+
}
950+
780951
NAN_METHOD(KafkaConsumer::NodeUnsubscribe) {
781952
Nan::HandleScope scope;
782953

src/kafka-consumer.h

+4
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ class KafkaConsumer : public Connection {
7272
int AssignedPartitionCount();
7373

7474
Baton Assign(std::vector<RdKafka::TopicPartition*>);
75+
Baton IncrementalAssign(std::vector<RdKafka::TopicPartition*>);
7576
Baton Unassign();
77+
Baton IncrementalUnassign(std::vector<RdKafka::TopicPartition*>);
7678

7779
Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms);
7880

@@ -103,7 +105,9 @@ class KafkaConsumer : public Connection {
103105
static NAN_METHOD(NodeSubscribe);
104106
static NAN_METHOD(NodeDisconnect);
105107
static NAN_METHOD(NodeAssign);
108+
static NAN_METHOD(NodeIncrementalAssign);
106109
static NAN_METHOD(NodeUnassign);
110+
static NAN_METHOD(NodeIncrementalUnassign);
107111
static NAN_METHOD(NodeAssignments);
108112
static NAN_METHOD(NodeUnsubscribe);
109113
static NAN_METHOD(NodeCommit);

0 commit comments

Comments
 (0)