Skip to content

Commit 006158e

Browse files
committed
Fix worker_threads support
Signed-off-by: Stephen Belanger <[email protected]>
1 parent a950a56 commit 006158e

23 files changed

+790
-295
lines changed

binding.gyp

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
'src/connection.cc',
1616
'src/errors.cc',
1717
'src/kafka-consumer.cc',
18+
'src/per-isolate-data.cc',
1819
'src/producer.cc',
1920
'src/topic.cc',
2021
'src/workers.cc',

package-lock.json

+496-270
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/admin.cc

+8-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <vector>
1212
#include <math.h>
1313

14+
#include "src/per-isolate-data.h"
1415
#include "src/workers.h"
1516
#include "src/admin.h"
1617

@@ -33,6 +34,10 @@ AdminClient::AdminClient(Conf* gconfig):
3334
rkqu = NULL;
3435
}
3536

37+
void AdminClient::delete_instance(void* arg) {
38+
delete (static_cast<AdminClient*>(arg));
39+
}
40+
3641
AdminClient::~AdminClient() {
3742
Disconnect();
3843
}
@@ -90,8 +95,6 @@ Baton AdminClient::Disconnect() {
9095
return Baton(RdKafka::ERR_NO_ERROR);
9196
}
9297

93-
Nan::Persistent<v8::Function> AdminClient::constructor;
94-
9598
void AdminClient::Init(v8::Local<v8::Object> exports) {
9699
Nan::HandleScope scope;
97100

@@ -108,7 +111,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
108111
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
109112
Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken);
110113

111-
constructor.Reset(
114+
PerIsolateData::For(v8::Isolate::GetCurrent())->AdminClientConstructor().Reset(
112115
(tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked());
113116
Nan::Set(exports, Nan::New("AdminClient").ToLocalChecked(),
114117
tpl->GetFunction(Nan::GetCurrentContext()).ToLocalChecked());
@@ -155,7 +158,8 @@ v8::Local<v8::Object> AdminClient::NewInstance(v8::Local<v8::Value> arg) {
155158
const unsigned argc = 1;
156159

157160
v8::Local<v8::Value> argv[argc] = { arg };
158-
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
161+
v8::Local<v8::Function> cons = Nan::New<v8::Function>(
162+
PerIsolateData::For(v8::Isolate::GetCurrent())->AdminClientConstructor());
159163
v8::Local<v8::Object> instance =
160164
Nan::NewInstance(cons, argc, argv).ToLocalChecked();
161165

src/admin.h

+2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class AdminClient : public Connection {
5959
explicit AdminClient(Conf* globalConfig);
6060
~AdminClient();
6161

62+
static void delete_instance(void* arg);
63+
6264
rd_kafka_queue_t* rkqu;
6365

6466
private:

src/binding.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,4 @@ void Init(v8::Local<v8::Object> exports, v8::Local<v8::Value> m_, void* v_) {
7171
Nan::New(RdKafka::version_str().c_str()).ToLocalChecked());
7272
}
7373

74-
NODE_MODULE(kafka, Init)
74+
NODE_MODULE_CONTEXT_AWARE(kafka, Init)

src/callbacks.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ Dispatcher::~Dispatcher() {
6767
void Dispatcher::Activate() {
6868
if (!async) {
6969
async = new uv_async_t;
70-
uv_async_init(uv_default_loop(), async, AsyncMessage_);
70+
uv_async_init(Nan::GetCurrentEventLoop(), async, AsyncMessage_);
7171

7272
async->data = this;
7373
}

src/config.cc

+4
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ Conf::~Conf() {
150150
if (m_rebalance_cb) {
151151
delete m_rebalance_cb;
152152
}
153+
154+
if (m_offset_commit_cb) {
155+
delete m_offset_commit_cb;
156+
}
153157
}
154158

155159
} // namespace NodeKafka

src/connection.cc

+7
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,16 @@ Connection::Connection(Conf* gconfig, Conf* tconfig):
5454
// Perhaps node new methods should report this as an error? But there
5555
// isn't anything the user can do about it.
5656
m_gconfig->set("event_cb", &m_event_cb, errstr);
57+
58+
node::AddEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
5759
}
5860

61+
void Connection::delete_instance(void* arg) {
62+
delete (static_cast<Connection*>(arg));
63+
}
64+
5965
Connection::~Connection() {
66+
node::RemoveEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
6067
uv_rwlock_destroy(&m_connection_lock);
6168

6269
if (m_tconfig) {

src/connection.h

+2
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class Connection : public Nan::ObjectWrap {
7878
Connection(Conf*, Conf*);
7979
~Connection();
8080

81+
static void delete_instance(void* arg);
82+
8183
static Nan::Persistent<v8::Function> constructor;
8284
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
8385
static Baton rdkafkaErrorToBaton(RdKafka::Error* error);

src/kafka-consumer.cc

+10-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <vector>
1212

1313
#include "src/kafka-consumer.h"
14+
#include "src/per-isolate-data.h"
1415
#include "src/workers.h"
1516

1617
using Nan::FunctionCallbackInfo;
@@ -36,6 +37,10 @@ KafkaConsumer::KafkaConsumer(Conf* gconfig, Conf* tconfig):
3637
m_consume_loop = nullptr;
3738
}
3839

40+
void KafkaConsumer::delete_instance(void* arg) {
41+
delete (static_cast<KafkaConsumer*>(arg));
42+
}
43+
3944
KafkaConsumer::~KafkaConsumer() {
4045
// We only want to run this if it hasn't been run already
4146
Disconnect();
@@ -558,8 +563,6 @@ std::string KafkaConsumer::RebalanceProtocol() {
558563
return consumer->rebalance_protocol();
559564
}
560565

561-
Nan::Persistent<v8::Function> KafkaConsumer::constructor;
562-
563566
void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
564567
Nan::HandleScope scope;
565568

@@ -620,7 +623,8 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
620623
Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync);
621624
Nan::SetPrototypeMethod(tpl, "offsetsStore", NodeOffsetsStore);
622625

623-
constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
626+
PerIsolateData::For(v8::Isolate::GetCurrent())->KafkaConsumerConstructor()
627+
.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
624628
.ToLocalChecked());
625629
Nan::Set(exports, Nan::New("KafkaConsumer").ToLocalChecked(),
626630
(tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked());
@@ -680,7 +684,8 @@ v8::Local<v8::Object> KafkaConsumer::NewInstance(v8::Local<v8::Value> arg) {
680684
const unsigned argc = 1;
681685

682686
v8::Local<v8::Value> argv[argc] = { arg };
683-
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
687+
v8::Local<v8::Function> cons = Nan::New<v8::Function>(
688+
PerIsolateData::For(v8::Isolate::GetCurrent())->KafkaConsumerConstructor());
684689
v8::Local<v8::Object> instance =
685690
Nan::NewInstance(cons, argc, argv).ToLocalChecked();
686691

@@ -1395,7 +1400,7 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) {
13951400
// cleanup the async worker
13961401
consumeLoop->WorkComplete();
13971402
consumeLoop->Destroy();
1398-
1403+
13991404
consumer->m_consume_loop = nullptr;
14001405
}
14011406

src/kafka-consumer.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ class KafkaConsumer : public Connection {
9595
KafkaConsumer(Conf *, Conf *);
9696
~KafkaConsumer();
9797

98+
static void delete_instance(void* arg);
99+
98100
private:
99101
static void part_list_print(const std::vector<RdKafka::TopicPartition*>&);
100102

@@ -103,7 +105,7 @@ class KafkaConsumer : public Connection {
103105
bool m_is_subscribed = false;
104106

105107
void* m_consume_loop = nullptr;
106-
108+
107109
// Node methods
108110
static NAN_METHOD(NodeConnect);
109111
static NAN_METHOD(NodeSubscribe);

src/per-isolate-data.cc

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
3+
*
4+
* Copyright (c) 2016 Blizzard Entertainment
5+
*
6+
* This software may be modified and distributed under the terms
7+
* of the MIT license. See the LICENSE.txt file for details.
8+
*/
9+
10+
#include <mutex>
11+
#include <unordered_map>
12+
#include <utility>
13+
14+
#include "per-isolate-data.h"
15+
16+
namespace NodeKafka {
17+
18+
static std::unordered_map<v8::Isolate*, PerIsolateData> per_isolate_data_;
19+
static std::mutex mutex;
20+
21+
PerIsolateData* PerIsolateData::For(v8::Isolate* isolate) {
22+
const std::lock_guard<std::mutex> lock(mutex);
23+
auto maybe = per_isolate_data_.find(isolate);
24+
if (maybe != per_isolate_data_.end()) {
25+
return &maybe->second;
26+
}
27+
28+
per_isolate_data_.emplace(std::make_pair(isolate, PerIsolateData()));
29+
30+
auto pair = per_isolate_data_.find(isolate);
31+
auto perIsolateData = &pair->second;
32+
33+
node::AddEnvironmentCleanupHook(isolate, [](void* data) {
34+
const std::lock_guard<std::mutex> lock(mutex);
35+
per_isolate_data_.erase(static_cast<v8::Isolate*>(data));
36+
}, isolate);
37+
38+
return perIsolateData;
39+
}
40+
41+
Nan::Global<v8::Function>& PerIsolateData::AdminClientConstructor() {
42+
return admin_client_constructor;
43+
}
44+
45+
Nan::Global<v8::Function>& PerIsolateData::KafkaConsumerConstructor() {
46+
return kafka_consumer_constructor;
47+
}
48+
49+
Nan::Global<v8::Function>& PerIsolateData::KafkaProducerConstructor() {
50+
return kafka_producer_constructor;
51+
}
52+
53+
Nan::Global<v8::Function>& PerIsolateData::TopicConstructor() {
54+
return topic_constructor;
55+
}
56+
57+
} // namespace dd

src/per-isolate-data.h

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
3+
*
4+
* Copyright (c) 2016 Blizzard Entertainment
5+
*
6+
* This software may be modified and distributed under the terms
7+
* of the MIT license. See the LICENSE.txt file for details.
8+
*/
9+
10+
#ifndef SRC_PER_ISOLATE_DATA_H_
11+
#define SRC_PER_ISOLATE_DATA_H_
12+
13+
#include <node.h>
14+
#include <nan.h>
15+
#include <v8.h>
16+
17+
namespace NodeKafka {
18+
19+
class PerIsolateData {
20+
private:
21+
Nan::Global<v8::Function> admin_client_constructor;
22+
Nan::Global<v8::Function> kafka_consumer_constructor;
23+
Nan::Global<v8::Function> kafka_producer_constructor;
24+
Nan::Global<v8::Function> topic_constructor;
25+
26+
PerIsolateData() {}
27+
28+
public:
29+
static PerIsolateData* For(v8::Isolate* isolate);
30+
31+
Nan::Global<v8::Function>& AdminClientConstructor();
32+
Nan::Global<v8::Function>& KafkaConsumerConstructor();
33+
Nan::Global<v8::Function>& KafkaProducerConstructor();
34+
Nan::Global<v8::Function>& TopicConstructor();
35+
};
36+
37+
} // namespace NodeKafka
38+
39+
#endif // SRC_PER_ISOLATE_DATA_H_

src/producer.cc

+9-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <string>
1111
#include <vector>
1212

13+
#include "src/per-isolate-data.h"
1314
#include "src/producer.h"
1415
#include "src/kafka-consumer.h"
1516
#include "src/workers.h"
@@ -39,12 +40,14 @@ Producer::Producer(Conf* gconfig, Conf* tconfig):
3940
m_gconfig->set("dr_cb", &m_dr_cb, errstr);
4041
}
4142

43+
void Producer::delete_instance(void* arg) {
44+
delete (static_cast<Producer*>(arg));
45+
}
46+
4247
Producer::~Producer() {
4348
Disconnect();
4449
}
4550

46-
Nan::Persistent<v8::Function> Producer::constructor;
47-
4851
void Producer::Init(v8::Local<v8::Object> exports) {
4952
Nan::HandleScope scope;
5053

@@ -91,7 +94,8 @@ void Producer::Init(v8::Local<v8::Object> exports) {
9194
Nan::SetPrototypeMethod(tpl, "sendOffsetsToTransaction", NodeSendOffsetsToTransaction);
9295

9396
// connect. disconnect. resume. pause. get meta data
94-
constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
97+
PerIsolateData::For(v8::Isolate::GetCurrent())->KafkaProducerConstructor()
98+
.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
9599
.ToLocalChecked());
96100

97101
Nan::Set(exports, Nan::New("Producer").ToLocalChecked(),
@@ -153,7 +157,8 @@ v8::Local<v8::Object> Producer::NewInstance(v8::Local<v8::Value> arg) {
153157
const unsigned argc = 1;
154158

155159
v8::Local<v8::Value> argv[argc] = { arg };
156-
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
160+
v8::Local<v8::Function> cons = Nan::New<v8::Function>(
161+
PerIsolateData::For(v8::Isolate::GetCurrent())->KafkaProducerConstructor());
157162
v8::Local<v8::Object> instance =
158163
Nan::NewInstance(cons, argc, argv).ToLocalChecked();
159164

src/producer.h

+2
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ class Producer : public Connection {
9999
Producer(Conf*, Conf*);
100100
~Producer();
101101

102+
static void delete_instance(void* arg);
103+
102104
private:
103105
static NAN_METHOD(NodeProduce);
104106
static NAN_METHOD(NodeSetPartitioner);

src/topic.cc

+11-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include "src/common.h"
1414
#include "src/connection.h"
15+
#include "src/per-isolate-data.h"
1516
#include "src/topic.h"
1617

1718
namespace NodeKafka {
@@ -33,9 +34,15 @@ Topic::Topic(std::string topic_name, RdKafka::Conf* config):
3334
m_topic_name(topic_name),
3435
m_config(config) {
3536
// We probably want to copy the config. May require refactoring if we do not
37+
node::AddEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
38+
}
39+
40+
void Topic::delete_instance(void* arg) {
41+
delete (static_cast<Topic*>(arg));
3642
}
3743

3844
Topic::~Topic() {
45+
node::RemoveEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
3946
if (m_config) {
4047
delete m_config;
4148
}
@@ -74,8 +81,6 @@ Baton offset_store (int32_t partition, int64_t offset) {
7481
7582
*/
7683

77-
Nan::Persistent<v8::Function> Topic::constructor;
78-
7984
void Topic::Init(v8::Local<v8::Object> exports) {
8085
Nan::HandleScope scope;
8186

@@ -86,7 +91,8 @@ void Topic::Init(v8::Local<v8::Object> exports) {
8691
Nan::SetPrototypeMethod(tpl, "name", NodeGetName);
8792

8893
// connect. disconnect. resume. pause. get meta data
89-
constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
94+
PerIsolateData::For(v8::Isolate::GetCurrent())->TopicConstructor()
95+
.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
9096
.ToLocalChecked());
9197

9298
Nan::Set(exports, Nan::New("Topic").ToLocalChecked(),
@@ -147,7 +153,8 @@ v8::Local<v8::Object> Topic::NewInstance(v8::Local<v8::Value> arg) {
147153
const unsigned argc = 1;
148154

149155
v8::Local<v8::Value> argv[argc] = { arg };
150-
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
156+
v8::Local<v8::Function> cons = Nan::New<v8::Function>(
157+
PerIsolateData::For(v8::Isolate::GetCurrent())->TopicConstructor());
151158
v8::Local<v8::Object> instance =
152159
Nan::NewInstance(cons, argc, argv).ToLocalChecked();
153160

0 commit comments

Comments
 (0)