Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 3f378cb

Browse files
committedDec 23, 2024·
Fix worker_threads support
Signed-off-by: Stephen Belanger <[email protected]>
1 parent a950a56 commit 3f378cb

25 files changed

+833
-328
lines changed
 

‎binding.gyp

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
'src/connection.cc',
1616
'src/errors.cc',
1717
'src/kafka-consumer.cc',
18+
'src/per-isolate-data.cc',
1819
'src/producer.cc',
1920
'src/topic.cc',
2021
'src/workers.cc',

‎package-lock.json

+496-270
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎src/admin.cc

+8-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <vector>
1212
#include <math.h>
1313

14+
#include "src/per-isolate-data.h"
1415
#include "src/workers.h"
1516
#include "src/admin.h"
1617

@@ -33,6 +34,10 @@ AdminClient::AdminClient(Conf* gconfig):
3334
rkqu = NULL;
3435
}
3536

37+
void AdminClient::delete_instance(void* arg) {
38+
delete (static_cast<AdminClient*>(arg));
39+
}
40+
3641
AdminClient::~AdminClient() {
3742
Disconnect();
3843
}
@@ -90,8 +95,6 @@ Baton AdminClient::Disconnect() {
9095
return Baton(RdKafka::ERR_NO_ERROR);
9196
}
9297

93-
Nan::Persistent<v8::Function> AdminClient::constructor;
94-
9598
void AdminClient::Init(v8::Local<v8::Object> exports) {
9699
Nan::HandleScope scope;
97100

@@ -108,7 +111,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
108111
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
109112
Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken);
110113

111-
constructor.Reset(
114+
PerIsolateData::For(v8::Isolate::GetCurrent())->AdminClientConstructor().Reset(
112115
(tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked());
113116
Nan::Set(exports, Nan::New("AdminClient").ToLocalChecked(),
114117
tpl->GetFunction(Nan::GetCurrentContext()).ToLocalChecked());
@@ -155,7 +158,8 @@ v8::Local<v8::Object> AdminClient::NewInstance(v8::Local<v8::Value> arg) {
155158
const unsigned argc = 1;
156159

157160
v8::Local<v8::Value> argv[argc] = { arg };
158-
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
161+
v8::Local<v8::Function> cons = Nan::New<v8::Function>(
162+
PerIsolateData::For(v8::Isolate::GetCurrent())->AdminClientConstructor());
159163
v8::Local<v8::Object> instance =
160164
Nan::NewInstance(cons, argc, argv).ToLocalChecked();
161165

‎src/admin.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
#include <nan.h>
1414
#include <uv.h>
15-
#include <iostream>
1615
#include <string>
1716
#include <vector>
1817

@@ -59,6 +58,8 @@ class AdminClient : public Connection {
5958
explicit AdminClient(Conf* globalConfig);
6059
~AdminClient();
6160

61+
static void delete_instance(void* arg);
62+
6263
rd_kafka_queue_t* rkqu;
6364

6465
private:

‎src/binding.cc

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
* of the MIT license. See the LICENSE.txt file for details.
88
*/
99

10-
#include <iostream>
1110
#include "src/binding.h"
1211

1312
using NodeKafka::Producer;
@@ -71,4 +70,4 @@ void Init(v8::Local<v8::Object> exports, v8::Local<v8::Value> m_, void* v_) {
7170
Nan::New(RdKafka::version_str().c_str()).ToLocalChecked());
7271
}
7372

74-
NODE_MODULE(kafka, Init)
73+
NODE_MODULE_CONTEXT_AWARE(kafka, Init)

‎src/callbacks.cc

+3-1
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,16 @@ Dispatcher::~Dispatcher() {
6060
callbacks[i].Reset();
6161
}
6262

63+
Deactivate();
64+
6365
uv_mutex_destroy(&async_lock);
6466
}
6567

6668
// Only run this if we aren't already listening
6769
void Dispatcher::Activate() {
6870
if (!async) {
6971
async = new uv_async_t;
70-
uv_async_init(uv_default_loop(), async, AsyncMessage_);
72+
uv_async_init(Nan::GetCurrentEventLoop(), async, AsyncMessage_);
7173

7274
async->data = this;
7375
}

‎src/config.cc

+13-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* of the MIT license. See the LICENSE.txt file for details.
88
*/
99

10+
#include <iostream>
1011
#include <string>
1112
#include <vector>
1213
#include <list>
@@ -36,7 +37,14 @@ void Conf::DumpConfig(std::list<std::string> *dump) {
3637

3738
Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object, std::string &errstr) { // NOLINT
3839
v8::Local<v8::Context> context = Nan::GetCurrentContext();
39-
Conf* rdconf = static_cast<Conf*>(RdKafka::Conf::create(type));
40+
Conf* rdconf = new Conf(type);
41+
42+
if (type == CONF_GLOBAL)
43+
rdconf->rk_conf_ = rd_kafka_conf_new();
44+
else
45+
rdconf->rkt_conf_ = rd_kafka_topic_conf_new();
46+
47+
// Conf* rdconf = static_cast<Conf*>(RdKafka::Conf::create(type));
4048

4149
v8::MaybeLocal<v8::Array> _property_names = object->GetOwnPropertyNames(
4250
Nan::GetCurrentContext());
@@ -150,6 +158,10 @@ Conf::~Conf() {
150158
if (m_rebalance_cb) {
151159
delete m_rebalance_cb;
152160
}
161+
162+
if (m_offset_commit_cb) {
163+
delete m_offset_commit_cb;
164+
}
153165
}
154166

155167
} // namespace NodeKafka

‎src/config.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,20 @@
1111
#define SRC_CONFIG_H_
1212

1313
#include <nan.h>
14-
#include <iostream>
1514
#include <vector>
1615
#include <list>
1716
#include <string>
1817

1918
#include "rdkafkacpp.h"
19+
#include "rdkafkacpp_int.h"
2020
#include "src/common.h"
2121
#include "src/callbacks.h"
2222

2323
namespace NodeKafka {
2424

25-
class Conf : public RdKafka::Conf {
25+
class Conf : public RdKafka::ConfImpl {
26+
private:
27+
Conf(RdKafka::Conf::ConfType type) : RdKafka::ConfImpl(type) {} // NOLINT
2628
public:
2729
~Conf();
2830

‎src/connection.cc

+7
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,16 @@ Connection::Connection(Conf* gconfig, Conf* tconfig):
5454
// Perhaps node new methods should report this as an error? But there
5555
// isn't anything the user can do about it.
5656
m_gconfig->set("event_cb", &m_event_cb, errstr);
57+
58+
node::AddEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
5759
}
5860

61+
void Connection::delete_instance(void* arg) {
62+
delete (static_cast<Connection*>(arg));
63+
}
64+
5965
Connection::~Connection() {
66+
node::RemoveEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
6067
uv_rwlock_destroy(&m_connection_lock);
6168

6269
if (m_tconfig) {

‎src/connection.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
#define SRC_CONNECTION_H_
1212

1313
#include <nan.h>
14-
#include <iostream>
1514
#include <string>
1615
#include <vector>
1716

@@ -78,6 +77,8 @@ class Connection : public Nan::ObjectWrap {
7877
Connection(Conf*, Conf*);
7978
~Connection();
8079

80+
static void delete_instance(void* arg);
81+
8182
static Nan::Persistent<v8::Function> constructor;
8283
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
8384
static Baton rdkafkaErrorToBaton(RdKafka::Error* error);

‎src/errors.h

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
#define SRC_ERRORS_H_
1212

1313
#include <nan.h>
14-
#include <iostream>
1514
#include <string>
1615

1716
#include "rdkafkacpp.h"

‎src/kafka-consumer.cc

+10-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <vector>
1212

1313
#include "src/kafka-consumer.h"
14+
#include "src/per-isolate-data.h"
1415
#include "src/workers.h"
1516

1617
using Nan::FunctionCallbackInfo;
@@ -36,6 +37,10 @@ KafkaConsumer::KafkaConsumer(Conf* gconfig, Conf* tconfig):
3637
m_consume_loop = nullptr;
3738
}
3839

40+
void KafkaConsumer::delete_instance(void* arg) {
41+
delete (static_cast<KafkaConsumer*>(arg));
42+
}
43+
3944
KafkaConsumer::~KafkaConsumer() {
4045
// We only want to run this if it hasn't been run already
4146
Disconnect();
@@ -558,8 +563,6 @@ std::string KafkaConsumer::RebalanceProtocol() {
558563
return consumer->rebalance_protocol();
559564
}
560565

561-
Nan::Persistent<v8::Function> KafkaConsumer::constructor;
562-
563566
void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
564567
Nan::HandleScope scope;
565568

@@ -620,7 +623,8 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
620623
Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync);
621624
Nan::SetPrototypeMethod(tpl, "offsetsStore", NodeOffsetsStore);
622625

623-
constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
626+
PerIsolateData::For(v8::Isolate::GetCurrent())->KafkaConsumerConstructor()
627+
.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
624628
.ToLocalChecked());
625629
Nan::Set(exports, Nan::New("KafkaConsumer").ToLocalChecked(),
626630
(tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked());
@@ -680,7 +684,8 @@ v8::Local<v8::Object> KafkaConsumer::NewInstance(v8::Local<v8::Value> arg) {
680684
const unsigned argc = 1;
681685

682686
v8::Local<v8::Value> argv[argc] = { arg };
683-
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
687+
v8::Local<v8::Function> cons = Nan::New<v8::Function>(
688+
PerIsolateData::For(v8::Isolate::GetCurrent())->KafkaConsumerConstructor());
684689
v8::Local<v8::Object> instance =
685690
Nan::NewInstance(cons, argc, argv).ToLocalChecked();
686691

@@ -1395,7 +1400,7 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) {
13951400
// cleanup the async worker
13961401
consumeLoop->WorkComplete();
13971402
consumeLoop->Destroy();
1398-
1403+
13991404
consumer->m_consume_loop = nullptr;
14001405
}
14011406

‎src/kafka-consumer.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
#include <nan.h>
1414
#include <uv.h>
15-
#include <iostream>
1615
#include <string>
1716
#include <vector>
1817

@@ -95,6 +94,8 @@ class KafkaConsumer : public Connection {
9594
KafkaConsumer(Conf *, Conf *);
9695
~KafkaConsumer();
9796

97+
static void delete_instance(void* arg);
98+
9899
private:
99100
static void part_list_print(const std::vector<RdKafka::TopicPartition*>&);
100101

@@ -103,7 +104,7 @@ class KafkaConsumer : public Connection {
103104
bool m_is_subscribed = false;
104105

105106
void* m_consume_loop = nullptr;
106-
107+
107108
// Node methods
108109
static NAN_METHOD(NodeConnect);
109110
static NAN_METHOD(NodeSubscribe);

‎src/per-isolate-data.cc

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
3+
*
4+
* Copyright (c) 2016 Blizzard Entertainment
5+
*
6+
* This software may be modified and distributed under the terms
7+
* of the MIT license. See the LICENSE.txt file for details.
8+
*/
9+
10+
#include <mutex>
11+
#include <unordered_map>
12+
#include <utility>
13+
14+
#include "per-isolate-data.h"
15+
16+
namespace NodeKafka {
17+
18+
static std::unordered_map<v8::Isolate*, PerIsolateData> per_isolate_data_;
19+
static std::mutex mutex;
20+
21+
PerIsolateData* PerIsolateData::For(v8::Isolate* isolate) {
22+
const std::lock_guard<std::mutex> lock(mutex);
23+
auto maybe = per_isolate_data_.find(isolate);
24+
if (maybe != per_isolate_data_.end()) {
25+
return &maybe->second;
26+
}
27+
28+
per_isolate_data_.emplace(std::make_pair(isolate, PerIsolateData()));
29+
30+
auto pair = per_isolate_data_.find(isolate);
31+
auto perIsolateData = &pair->second;
32+
33+
node::AddEnvironmentCleanupHook(isolate, [](void* data) {
34+
const std::lock_guard<std::mutex> lock(mutex);
35+
per_isolate_data_.erase(static_cast<v8::Isolate*>(data));
36+
}, isolate);
37+
38+
return perIsolateData;
39+
}
40+
41+
Nan::Global<v8::Function>& PerIsolateData::AdminClientConstructor() {
42+
return admin_client_constructor;
43+
}
44+
45+
Nan::Global<v8::Function>& PerIsolateData::KafkaConsumerConstructor() {
46+
return kafka_consumer_constructor;
47+
}
48+
49+
Nan::Global<v8::Function>& PerIsolateData::KafkaProducerConstructor() {
50+
return kafka_producer_constructor;
51+
}
52+
53+
Nan::Global<v8::Function>& PerIsolateData::TopicConstructor() {
54+
return topic_constructor;
55+
}
56+
57+
} // namespace dd

‎src/per-isolate-data.h

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
3+
*
4+
* Copyright (c) 2016 Blizzard Entertainment
5+
*
6+
* This software may be modified and distributed under the terms
7+
* of the MIT license. See the LICENSE.txt file for details.
8+
*/
9+
10+
#ifndef SRC_PER_ISOLATE_DATA_H_
11+
#define SRC_PER_ISOLATE_DATA_H_
12+
13+
#include <node.h>
14+
#include <nan.h>
15+
#include <v8.h>
16+
17+
namespace NodeKafka {
18+
19+
class PerIsolateData {
20+
private:
21+
Nan::Global<v8::Function> admin_client_constructor;
22+
Nan::Global<v8::Function> kafka_consumer_constructor;
23+
Nan::Global<v8::Function> kafka_producer_constructor;
24+
Nan::Global<v8::Function> topic_constructor;
25+
26+
PerIsolateData() {}
27+
28+
public:
29+
static PerIsolateData* For(v8::Isolate* isolate);
30+
31+
Nan::Global<v8::Function>& AdminClientConstructor();
32+
Nan::Global<v8::Function>& KafkaConsumerConstructor();
33+
Nan::Global<v8::Function>& KafkaProducerConstructor();
34+
Nan::Global<v8::Function>& TopicConstructor();
35+
};
36+
37+
} // namespace NodeKafka
38+
39+
#endif // SRC_PER_ISOLATE_DATA_H_

‎src/producer.cc

+9-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <string>
1111
#include <vector>
1212

13+
#include "src/per-isolate-data.h"
1314
#include "src/producer.h"
1415
#include "src/kafka-consumer.h"
1516
#include "src/workers.h"
@@ -39,12 +40,14 @@ Producer::Producer(Conf* gconfig, Conf* tconfig):
3940
m_gconfig->set("dr_cb", &m_dr_cb, errstr);
4041
}
4142

43+
void Producer::delete_instance(void* arg) {
44+
delete (static_cast<Producer*>(arg));
45+
}
46+
4247
Producer::~Producer() {
4348
Disconnect();
4449
}
4550

46-
Nan::Persistent<v8::Function> Producer::constructor;
47-
4851
void Producer::Init(v8::Local<v8::Object> exports) {
4952
Nan::HandleScope scope;
5053

@@ -91,7 +94,8 @@ void Producer::Init(v8::Local<v8::Object> exports) {
9194
Nan::SetPrototypeMethod(tpl, "sendOffsetsToTransaction", NodeSendOffsetsToTransaction);
9295

9396
// connect. disconnect. resume. pause. get meta data
94-
constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
97+
PerIsolateData::For(v8::Isolate::GetCurrent())->KafkaProducerConstructor()
98+
.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
9599
.ToLocalChecked());
96100

97101
Nan::Set(exports, Nan::New("Producer").ToLocalChecked(),
@@ -153,7 +157,8 @@ v8::Local<v8::Object> Producer::NewInstance(v8::Local<v8::Value> arg) {
153157
const unsigned argc = 1;
154158

155159
v8::Local<v8::Value> argv[argc] = { arg };
156-
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
160+
v8::Local<v8::Function> cons = Nan::New<v8::Function>(
161+
PerIsolateData::For(v8::Isolate::GetCurrent())->KafkaProducerConstructor());
157162
v8::Local<v8::Object> instance =
158163
Nan::NewInstance(cons, argc, argv).ToLocalChecked();
159164

‎src/producer.h

+2
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ class Producer : public Connection {
9999
Producer(Conf*, Conf*);
100100
~Producer();
101101

102+
static void delete_instance(void* arg);
103+
102104
private:
103105
static NAN_METHOD(NodeProduce);
104106
static NAN_METHOD(NodeSetPartitioner);

‎src/topic.cc

+11-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include "src/common.h"
1414
#include "src/connection.h"
15+
#include "src/per-isolate-data.h"
1516
#include "src/topic.h"
1617

1718
namespace NodeKafka {
@@ -33,9 +34,15 @@ Topic::Topic(std::string topic_name, RdKafka::Conf* config):
3334
m_topic_name(topic_name),
3435
m_config(config) {
3536
// We probably want to copy the config. May require refactoring if we do not
37+
node::AddEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
38+
}
39+
40+
void Topic::delete_instance(void* arg) {
41+
delete (static_cast<Topic*>(arg));
3642
}
3743

3844
Topic::~Topic() {
45+
node::RemoveEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
3946
if (m_config) {
4047
delete m_config;
4148
}
@@ -74,8 +81,6 @@ Baton offset_store (int32_t partition, int64_t offset) {
7481
7582
*/
7683

77-
Nan::Persistent<v8::Function> Topic::constructor;
78-
7984
void Topic::Init(v8::Local<v8::Object> exports) {
8085
Nan::HandleScope scope;
8186

@@ -86,7 +91,8 @@ void Topic::Init(v8::Local<v8::Object> exports) {
8691
Nan::SetPrototypeMethod(tpl, "name", NodeGetName);
8792

8893
// connect. disconnect. resume. pause. get meta data
89-
constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
94+
PerIsolateData::For(v8::Isolate::GetCurrent())->TopicConstructor()
95+
.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
9096
.ToLocalChecked());
9197

9298
Nan::Set(exports, Nan::New("Topic").ToLocalChecked(),
@@ -147,7 +153,8 @@ v8::Local<v8::Object> Topic::NewInstance(v8::Local<v8::Value> arg) {
147153
const unsigned argc = 1;
148154

149155
v8::Local<v8::Value> argv[argc] = { arg };
150-
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
156+
v8::Local<v8::Function> cons = Nan::New<v8::Function>(
157+
PerIsolateData::For(v8::Isolate::GetCurrent())->TopicConstructor());
151158
v8::Local<v8::Object> instance =
152159
Nan::NewInstance(cons, argc, argv).ToLocalChecked();
153160

‎src/topic.h

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class Topic : public Nan::ObjectWrap {
4141
Topic(std::string, RdKafka::Conf *);
4242
~Topic();
4343

44+
static void delete_instance(void* arg);
45+
4446
std::string m_topic_name;
4547
RdKafka::Conf * m_config;
4648

‎src/workers.cc

+26-25
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ OffsetsForTimes::OffsetsForTimes(Nan::Callback *callback,
4040
Connection* handle,
4141
std::vector<RdKafka::TopicPartition*> & t,
4242
const int & timeout_ms) :
43-
ErrorAwareWorker(callback),
43+
ErrorAwareWorker(callback, "node-rdkafka:OffsetsForTimes"),
4444
m_handle(handle),
4545
m_topic_partitions(t),
4646
m_timeout_ms(timeout_ms) {}
@@ -82,7 +82,7 @@ void OffsetsForTimes::HandleErrorCallback() {
8282
ConnectionMetadata::ConnectionMetadata(
8383
Nan::Callback *callback, Connection* connection,
8484
std::string topic, int timeout_ms, bool all_topics) :
85-
ErrorAwareWorker(callback),
85+
ErrorAwareWorker(callback, "node-rdkafka:ConnectionMetadata"),
8686
m_connection(connection),
8787
m_topic(topic),
8888
m_timeout_ms(timeout_ms),
@@ -139,7 +139,7 @@ void ConnectionMetadata::HandleErrorCallback() {
139139
ConnectionQueryWatermarkOffsets::ConnectionQueryWatermarkOffsets(
140140
Nan::Callback *callback, Connection* connection,
141141
std::string topic, int32_t partition, int timeout_ms) :
142-
ErrorAwareWorker(callback),
142+
ErrorAwareWorker(callback, "node-rdkafka:ConnectionQueryWatermarkOffsets"),
143143
m_connection(connection),
144144
m_topic(topic),
145145
m_partition(partition),
@@ -193,7 +193,7 @@ void ConnectionQueryWatermarkOffsets::HandleErrorCallback() {
193193
*/
194194

195195
ProducerConnect::ProducerConnect(Nan::Callback *callback, Producer* producer):
196-
ErrorAwareWorker(callback),
196+
ErrorAwareWorker(callback, "node-rdkafka:ProducerConnect"),
197197
producer(producer) {}
198198

199199
ProducerConnect::~ProducerConnect() {}
@@ -240,7 +240,7 @@ void ProducerConnect::HandleErrorCallback() {
240240

241241
ProducerDisconnect::ProducerDisconnect(Nan::Callback *callback,
242242
Producer* producer):
243-
ErrorAwareWorker(callback),
243+
ErrorAwareWorker(callback, "node-rdkafka:ProducerDisconnect"),
244244
producer(producer) {}
245245

246246
ProducerDisconnect::~ProducerDisconnect() {}
@@ -262,6 +262,7 @@ void ProducerDisconnect::HandleOKCallback() {
262262
}
263263

264264
void ProducerDisconnect::HandleErrorCallback() {
265+
Nan::HandleScope scope;
265266
// This should never run
266267
assert(0);
267268
}
@@ -274,7 +275,7 @@ void ProducerDisconnect::HandleErrorCallback() {
274275

275276
ProducerFlush::ProducerFlush(Nan::Callback *callback,
276277
Producer* producer, int timeout_ms):
277-
ErrorAwareWorker(callback),
278+
ErrorAwareWorker(callback, "node-rdkafka:ProducerFlush"),
278279
producer(producer),
279280
timeout_ms(timeout_ms) {}
280281

@@ -312,7 +313,7 @@ void ProducerFlush::HandleOKCallback() {
312313

313314
ProducerInitTransactions::ProducerInitTransactions(Nan::Callback *callback,
314315
Producer* producer, const int & timeout_ms):
315-
ErrorAwareWorker(callback),
316+
ErrorAwareWorker(callback, "node-rdkafka:ProducerInitTransactions"),
316317
producer(producer),
317318
m_timeout_ms(timeout_ms) {}
318319

@@ -357,7 +358,7 @@ void ProducerInitTransactions::HandleErrorCallback() {
357358
*/
358359

359360
ProducerBeginTransaction::ProducerBeginTransaction(Nan::Callback *callback, Producer* producer):
360-
ErrorAwareWorker(callback),
361+
ErrorAwareWorker(callback, "node-rdkafka:ProducerBeginTransaction"),
361362
producer(producer) {}
362363

363364
ProducerBeginTransaction::~ProducerBeginTransaction() {}
@@ -403,7 +404,7 @@ void ProducerBeginTransaction::HandleErrorCallback() {
403404

404405
ProducerCommitTransaction::ProducerCommitTransaction(Nan::Callback *callback,
405406
Producer* producer, const int & timeout_ms):
406-
ErrorAwareWorker(callback),
407+
ErrorAwareWorker(callback, "node-rdkafka:ProducerCommitTransaction"),
407408
producer(producer),
408409
m_timeout_ms(timeout_ms) {}
409410

@@ -449,7 +450,7 @@ void ProducerCommitTransaction::HandleErrorCallback() {
449450

450451
ProducerAbortTransaction::ProducerAbortTransaction(Nan::Callback *callback,
451452
Producer* producer, const int & timeout_ms):
452-
ErrorAwareWorker(callback),
453+
ErrorAwareWorker(callback, "node-rdkafka:ProducerAbortTransaction"),
453454
producer(producer),
454455
m_timeout_ms(timeout_ms) {}
455456

@@ -499,7 +500,7 @@ ProducerSendOffsetsToTransaction::ProducerSendOffsetsToTransaction(
499500
std::vector<RdKafka::TopicPartition *> & t,
500501
KafkaConsumer* consumer,
501502
const int & timeout_ms):
502-
ErrorAwareWorker(callback),
503+
ErrorAwareWorker(callback, "node-rdkafka:ProducerSendOffsetsToTransaction"),
503504
producer(producer),
504505
m_topic_partitions(t),
505506
consumer(consumer),
@@ -551,7 +552,7 @@ void ProducerSendOffsetsToTransaction::HandleErrorCallback() {
551552

552553
KafkaConsumerConnect::KafkaConsumerConnect(Nan::Callback *callback,
553554
KafkaConsumer* consumer):
554-
ErrorAwareWorker(callback),
555+
ErrorAwareWorker(callback, "node-rdkafka:KafkaConsumerConnect"),
555556
consumer(consumer) {}
556557

557558
KafkaConsumerConnect::~KafkaConsumerConnect() {}
@@ -601,7 +602,7 @@ void KafkaConsumerConnect::HandleErrorCallback() {
601602

602603
KafkaConsumerDisconnect::KafkaConsumerDisconnect(Nan::Callback *callback,
603604
KafkaConsumer* consumer):
604-
ErrorAwareWorker(callback),
605+
ErrorAwareWorker(callback, "node-rdkafka:KafkaConsumerDisconnect"),
605606
consumer(consumer) {}
606607

607608
KafkaConsumerDisconnect::~KafkaConsumerDisconnect() {}
@@ -661,7 +662,7 @@ KafkaConsumerConsumeLoop::KafkaConsumerConsumeLoop(Nan::Callback *callback,
661662
KafkaConsumer* consumer,
662663
const int & timeout_ms,
663664
const int & timeout_sleep_delay_ms) :
664-
MessageWorker(callback),
665+
MessageWorker(callback, "node-rdkafka:KafkaConsumerConsumeLoop"),
665666
consumer(consumer),
666667
m_looping(true),
667668
m_timeout_ms(timeout_ms),
@@ -798,7 +799,7 @@ KafkaConsumerConsumeNum::KafkaConsumerConsumeNum(Nan::Callback *callback,
798799
KafkaConsumer* consumer,
799800
const uint32_t & num_messages,
800801
const int & timeout_ms) :
801-
ErrorAwareWorker(callback),
802+
ErrorAwareWorker(callback, "node-rdkafka:KafkaConsumerConsumeNum"),
802803
m_consumer(consumer),
803804
m_num_messages(num_messages),
804805
m_timeout_ms(timeout_ms) {}
@@ -824,7 +825,7 @@ void KafkaConsumerConsumeNum::Execute() {
824825
if (m_messages.size() > eof_event_count) {
825826
timeout_ms = 1;
826827
}
827-
828+
828829
// We will only go into this code path when `enable.partition.eof` is set to true
829830
// In this case, consumer is also interested in EOF messages, so we return an EOF message
830831
m_messages.push_back(message);
@@ -872,7 +873,7 @@ void KafkaConsumerConsumeNum::HandleOKCallback() {
872873
for (std::vector<RdKafka::Message*>::iterator it = m_messages.begin();
873874
it != m_messages.end(); ++it) {
874875
RdKafka::Message* message = *it;
875-
876+
876877
switch (message->err()) {
877878
case RdKafka::ERR_NO_ERROR:
878879
++returnArrayIndex;
@@ -890,15 +891,15 @@ void KafkaConsumerConsumeNum::HandleOKCallback() {
890891
Nan::New<v8::Number>(message->offset()));
891892
Nan::Set(eofEvent, Nan::New<v8::String>("partition").ToLocalChecked(),
892893
Nan::New<v8::Number>(message->partition()));
893-
894+
894895
// also store index at which position in the message array this event was emitted
895896
// this way, we can later emit it at the right point in time
896897
Nan::Set(eofEvent, Nan::New<v8::String>("messageIndex").ToLocalChecked(),
897898
Nan::New<v8::Number>(returnArrayIndex));
898899

899900
Nan::Set(eofEventsArray, eofEventsArrayIndex, eofEvent);
900901
}
901-
902+
902903
delete message;
903904
}
904905
}
@@ -940,7 +941,7 @@ void KafkaConsumerConsumeNum::HandleErrorCallback() {
940941
KafkaConsumerConsume::KafkaConsumerConsume(Nan::Callback *callback,
941942
KafkaConsumer* consumer,
942943
const int & timeout_ms) :
943-
ErrorAwareWorker(callback),
944+
ErrorAwareWorker(callback, "node-rdkafka:KafkaConsumerConsume"),
944945
consumer(consumer),
945946
m_timeout_ms(timeout_ms) {}
946947

@@ -998,7 +999,7 @@ KafkaConsumerCommitted::KafkaConsumerCommitted(Nan::Callback *callback,
998999
KafkaConsumer* consumer,
9991000
std::vector<RdKafka::TopicPartition*> & t,
10001001
const int & timeout_ms) :
1001-
ErrorAwareWorker(callback),
1002+
ErrorAwareWorker(callback, "node-rdkafka:KafkaConsumerCommitted"),
10021003
m_consumer(consumer),
10031004
m_topic_partitions(t),
10041005
m_timeout_ms(timeout_ms) {}
@@ -1052,7 +1053,7 @@ KafkaConsumerSeek::KafkaConsumerSeek(Nan::Callback *callback,
10521053
KafkaConsumer* consumer,
10531054
const RdKafka::TopicPartition * toppar,
10541055
const int & timeout_ms) :
1055-
ErrorAwareWorker(callback),
1056+
ErrorAwareWorker(callback, "node-rdkafka:KafkaConsumerSeek"),
10561057
m_consumer(consumer),
10571058
m_toppar(toppar),
10581059
m_timeout_ms(timeout_ms) {}
@@ -1108,7 +1109,7 @@ AdminClientCreateTopic::AdminClientCreateTopic(Nan::Callback *callback,
11081109
AdminClient* client,
11091110
rd_kafka_NewTopic_t* topic,
11101111
const int & timeout_ms) :
1111-
ErrorAwareWorker(callback),
1112+
ErrorAwareWorker(callback, "node-rdkafka:AdminClientCreateTopic"),
11121113
m_client(client),
11131114
m_topic(topic),
11141115
m_timeout_ms(timeout_ms) {}
@@ -1155,7 +1156,7 @@ AdminClientDeleteTopic::AdminClientDeleteTopic(Nan::Callback *callback,
11551156
AdminClient* client,
11561157
rd_kafka_DeleteTopic_t* topic,
11571158
const int & timeout_ms) :
1158-
ErrorAwareWorker(callback),
1159+
ErrorAwareWorker(callback, "node-rdkafka:AdminClientDeleteTopic"),
11591160
m_client(client),
11601161
m_topic(topic),
11611162
m_timeout_ms(timeout_ms) {}
@@ -1203,7 +1204,7 @@ AdminClientCreatePartitions::AdminClientCreatePartitions(
12031204
AdminClient* client,
12041205
rd_kafka_NewPartitions_t* partitions,
12051206
const int & timeout_ms) :
1206-
ErrorAwareWorker(callback),
1207+
ErrorAwareWorker(callback, "node-rdkafka:AdminClientCreatePartitions"),
12071208
m_client(client),
12081209
m_partitions(partitions),
12091210
m_timeout_ms(timeout_ms) {}

‎src/workers.h

+7-5
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ namespace Workers {
2626

2727
class ErrorAwareWorker : public Nan::AsyncWorker {
2828
public:
29-
explicit ErrorAwareWorker(Nan::Callback* callback_) :
30-
Nan::AsyncWorker(callback_),
29+
explicit ErrorAwareWorker(Nan::Callback* callback_,
30+
const char* resource_name = "node-rdkafka:ErrorAwareWorker") :
31+
Nan::AsyncWorker(callback_, resource_name),
3132
m_baton(RdKafka::ERR_NO_ERROR) {}
3233
virtual ~ErrorAwareWorker() {}
3334

@@ -68,11 +69,12 @@ class ErrorAwareWorker : public Nan::AsyncWorker {
6869

6970
class MessageWorker : public ErrorAwareWorker {
7071
public:
71-
explicit MessageWorker(Nan::Callback* callback_)
72-
: ErrorAwareWorker(callback_), m_asyncdata() {
72+
explicit MessageWorker(Nan::Callback* callback_,
73+
const char* resource_name = "node-rdkafka:ErrorAwareWorker")
74+
: ErrorAwareWorker(callback_, resource_name), m_asyncdata() {
7375
m_async = new uv_async_t;
7476
uv_async_init(
75-
uv_default_loop(),
77+
Nan::GetCurrentEventLoop(),
7678
m_async,
7779
m_async_message);
7880
m_async->data = this;

‎test/kafka-consumer-worker.js

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
var WorkerThreads = require('worker_threads');
2+
var Kafka = require('../');
3+
4+
if (WorkerThreads.isMainThread) {
5+
var worker = new WorkerThreads.Worker(__filename);
6+
7+
var timeout = setTimeout(function() {
8+
worker.terminate();
9+
}, 1000);
10+
11+
worker.on('message', function(report) {
12+
console.log('received message', report);
13+
});
14+
15+
worker.on('exit', function(code) {
16+
clearTimeout(timeout);
17+
process.exit(code);
18+
});
19+
20+
return;
21+
}
22+
23+
var stream = Kafka.KafkaConsumer.createReadStream({
24+
'metadata.broker.list': 'localhost:9092',
25+
'client.id': 'kafka-mocha-consumer',
26+
'group.id': 'kafka-mocha-grp',
27+
'allow.auto.create.topics': true,
28+
'enable.auto.commit': false,
29+
'rebalance_cb': true,
30+
}, {}, {
31+
topics: ['topic']
32+
});
33+
34+
stream.on('data', function(message) {
35+
WorkerThreads.parentPort?.postMessage({ message });
36+
stream.consumer.commitMessage(message);
37+
stream.consumer.disconnect();
38+
stream.close();
39+
});

‎test/kafka-consumer.spec.js

+35
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
*/
99

1010
var KafkaConsumer = require('../lib/kafka-consumer');
11+
var KafkaProducer = require('../lib/producer');
1112
var t = require('assert');
13+
var path = require('path');
14+
var worker_threads = require('worker_threads');
1215

1316
var client;
1417
var defaultConfig = {
@@ -44,5 +47,37 @@ module.exports = {
4447
t.deepStrictEqual(client.topicConfig, {});
4548
t.notEqual(topicConfig, client.topicConfig);
4649
},
50+
'does not crash in a worker': function (cb) {
51+
var consumer = new worker_threads.Worker(
52+
path.join(__dirname, 'kafka-consumer-worker.js')
53+
);
54+
55+
var timeout = setTimeout(function() {
56+
consumer.terminate();
57+
}, 1000);
58+
59+
consumer.on('message', function(msg) {
60+
t.strictEqual(msg.value.toString(), 'my message');
61+
consumer.terminate();
62+
});
63+
64+
consumer.on('exit', function(code) {
65+
clearTimeout(timeout);
66+
t.strictEqual(code, 0);
67+
cb();
68+
});
69+
70+
consumer.on('online', function() {
71+
const stream = Kafka.Producer.createWriteStream({
72+
'metadata.broker.list': 'localhost:9092',
73+
'client.id': 'kafka-mocha-producer',
74+
'dr_cb': true
75+
}, {}, {
76+
topic: 'topic'
77+
});
78+
79+
stream.write(Buffer.from('my message'));
80+
});
81+
}
4782
},
4883
};

‎test/producer-worker.js

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
var WorkerThreads = require('worker_threads');
2+
var Kafka = require('../');
3+
4+
if (WorkerThreads.isMainThread) {
5+
var worker = new WorkerThreads.Worker(__filename);
6+
7+
var timeout = setTimeout(function() {
8+
worker.terminate();
9+
}, 1000);
10+
11+
worker.on('message', function(report) {
12+
console.log('delivery report', report);
13+
});
14+
15+
worker.on('exit', function(code) {
16+
clearTimeout(timeout);
17+
process.exit(code);
18+
});
19+
20+
return;
21+
}
22+
23+
const stream = Kafka.Producer.createWriteStream({
24+
'metadata.broker.list': 'localhost:9092',
25+
'client.id': 'kafka-mocha-producer',
26+
'dr_cb': true
27+
}, {}, {
28+
topic: 'topic'
29+
});
30+
31+
stream.producer.on('delivery-report', function(err, report) {
32+
WorkerThreads.parentPort?.postMessage(report);
33+
stream.producer.disconnect();
34+
});
35+
36+
stream.write(Buffer.from('my message'));

‎test/producer.spec.js

+20
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,26 @@ module.exports = {
9494
};
9595

9696
client.disconnect(next);
97+
},
98+
'does not crash in a worker': function (cb) {
99+
var producer = new worker_threads.Worker(
100+
path.join(__dirname, 'producer-worker.js')
101+
);
102+
103+
var timeout = setTimeout(function() {
104+
producer.terminate();
105+
}, 1000);
106+
107+
consumer.on('message', (report) => {
108+
t.strictEqual(report.topic, 'topic');
109+
producer.terminate();
110+
});
111+
112+
producer.on('exit', function(code) {
113+
clearTimeout(timeout);
114+
t.strictEqual(code, 0);
115+
cb();
116+
});
97117
}
98118
}
99119
},

0 commit comments

Comments
 (0)
Please sign in to comment.