Skip to content

Commit

Permalink
Fix worker_threads support
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Belanger <[email protected]>
  • Loading branch information
Qard committed Jan 10, 2025
1 parent a950a56 commit 2e2d8ab
Show file tree
Hide file tree
Showing 25 changed files with 834 additions and 328 deletions.
1 change: 1 addition & 0 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
'src/connection.cc',
'src/errors.cc',
'src/kafka-consumer.cc',
'src/per-isolate-data.cc',
'src/producer.cc',
'src/topic.cc',
'src/workers.cc',
Expand Down
766 changes: 496 additions & 270 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions src/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <vector>
#include <math.h>

#include "src/per-isolate-data.h"
#include "src/workers.h"
#include "src/admin.h"

Expand All @@ -33,6 +34,10 @@ AdminClient::AdminClient(Conf* gconfig):
rkqu = NULL;
}

void AdminClient::delete_instance(void* arg) {
delete (static_cast<AdminClient*>(arg));
}

AdminClient::~AdminClient() {
Disconnect();
}
Expand Down Expand Up @@ -90,8 +95,6 @@ Baton AdminClient::Disconnect() {
return Baton(RdKafka::ERR_NO_ERROR);
}

Nan::Persistent<v8::Function> AdminClient::constructor;

void AdminClient::Init(v8::Local<v8::Object> exports) {
Nan::HandleScope scope;

Expand All @@ -108,7 +111,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken);

constructor.Reset(
PerIsolateData::For(v8::Isolate::GetCurrent())->AdminClientConstructor().Reset(
(tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked());
Nan::Set(exports, Nan::New("AdminClient").ToLocalChecked(),
tpl->GetFunction(Nan::GetCurrentContext()).ToLocalChecked());
Expand Down Expand Up @@ -155,7 +158,8 @@ v8::Local<v8::Object> AdminClient::NewInstance(v8::Local<v8::Value> arg) {
const unsigned argc = 1;

v8::Local<v8::Value> argv[argc] = { arg };
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
v8::Local<v8::Function> cons = Nan::New<v8::Function>(
PerIsolateData::For(v8::Isolate::GetCurrent())->AdminClientConstructor());
v8::Local<v8::Object> instance =
Nan::NewInstance(cons, argc, argv).ToLocalChecked();

Expand Down
3 changes: 2 additions & 1 deletion src/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

#include <nan.h>
#include <uv.h>
#include <iostream>
#include <string>
#include <vector>

Expand Down Expand Up @@ -59,6 +58,8 @@ class AdminClient : public Connection {
explicit AdminClient(Conf* globalConfig);
~AdminClient();

static void delete_instance(void* arg);

rd_kafka_queue_t* rkqu;

private:
Expand Down
3 changes: 1 addition & 2 deletions src/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
* of the MIT license. See the LICENSE.txt file for details.
*/

#include <iostream>
#include "src/binding.h"

using NodeKafka::Producer;
Expand Down Expand Up @@ -71,4 +70,4 @@ void Init(v8::Local<v8::Object> exports, v8::Local<v8::Value> m_, void* v_) {
Nan::New(RdKafka::version_str().c_str()).ToLocalChecked());
}

NODE_MODULE(kafka, Init)
NODE_MODULE_CONTEXT_AWARE(kafka, Init)
4 changes: 3 additions & 1 deletion src/callbacks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ Dispatcher::~Dispatcher() {
callbacks[i].Reset();
}

Deactivate();

uv_mutex_destroy(&async_lock);
}

// Only run this if we aren't already listening
void Dispatcher::Activate() {
if (!async) {
async = new uv_async_t;
uv_async_init(uv_default_loop(), async, AsyncMessage_);
uv_async_init(Nan::GetCurrentEventLoop(), async, AsyncMessage_);

async->data = this;
}
Expand Down
14 changes: 13 additions & 1 deletion src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* of the MIT license. See the LICENSE.txt file for details.
*/

#include <iostream>
#include <string>
#include <vector>
#include <list>
Expand Down Expand Up @@ -36,7 +37,14 @@ void Conf::DumpConfig(std::list<std::string> *dump) {

Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object, std::string &errstr) { // NOLINT
v8::Local<v8::Context> context = Nan::GetCurrentContext();
Conf* rdconf = static_cast<Conf*>(RdKafka::Conf::create(type));
Conf* rdconf = new Conf(type);

if (type == CONF_GLOBAL)
rdconf->rk_conf_ = rd_kafka_conf_new();
else
rdconf->rkt_conf_ = rd_kafka_topic_conf_new();

// Conf* rdconf = static_cast<Conf*>(RdKafka::Conf::create(type));

v8::MaybeLocal<v8::Array> _property_names = object->GetOwnPropertyNames(
Nan::GetCurrentContext());
Expand Down Expand Up @@ -150,6 +158,10 @@ Conf::~Conf() {
if (m_rebalance_cb) {
delete m_rebalance_cb;
}

if (m_offset_commit_cb) {
delete m_offset_commit_cb;
}
}

} // namespace NodeKafka
6 changes: 4 additions & 2 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@
#define SRC_CONFIG_H_

#include <nan.h>
#include <iostream>
#include <vector>
#include <list>
#include <string>

#include "rdkafkacpp.h"
#include "rdkafkacpp_int.h"
#include "src/common.h"
#include "src/callbacks.h"

namespace NodeKafka {

class Conf : public RdKafka::Conf {
class Conf : public RdKafka::ConfImpl {
private:
Conf(RdKafka::Conf::ConfType type) : RdKafka::ConfImpl(type) {} // NOLINT
public:
~Conf();

Expand Down
7 changes: 7 additions & 0 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ Connection::Connection(Conf* gconfig, Conf* tconfig):
// Perhaps node new methods should report this as an error? But there
// isn't anything the user can do about it.
m_gconfig->set("event_cb", &m_event_cb, errstr);

node::AddEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
}

void Connection::delete_instance(void* arg) {
delete (static_cast<Connection*>(arg));
}

Connection::~Connection() {
node::RemoveEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
uv_rwlock_destroy(&m_connection_lock);

if (m_tconfig) {
Expand Down
3 changes: 2 additions & 1 deletion src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#define SRC_CONNECTION_H_

#include <nan.h>
#include <iostream>
#include <string>
#include <vector>

Expand Down Expand Up @@ -78,6 +77,8 @@ class Connection : public Nan::ObjectWrap {
Connection(Conf*, Conf*);
~Connection();

static void delete_instance(void* arg);

static Nan::Persistent<v8::Function> constructor;
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
static Baton rdkafkaErrorToBaton(RdKafka::Error* error);
Expand Down
1 change: 0 additions & 1 deletion src/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#define SRC_ERRORS_H_

#include <nan.h>
#include <iostream>
#include <string>

#include "rdkafkacpp.h"
Expand Down
15 changes: 10 additions & 5 deletions src/kafka-consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <vector>

#include "src/kafka-consumer.h"
#include "src/per-isolate-data.h"
#include "src/workers.h"

using Nan::FunctionCallbackInfo;
Expand All @@ -36,6 +37,10 @@ KafkaConsumer::KafkaConsumer(Conf* gconfig, Conf* tconfig):
m_consume_loop = nullptr;
}

void KafkaConsumer::delete_instance(void* arg) {
delete (static_cast<KafkaConsumer*>(arg));
}

KafkaConsumer::~KafkaConsumer() {
// We only want to run this if it hasn't been run already
Disconnect();
Expand Down Expand Up @@ -558,8 +563,6 @@ std::string KafkaConsumer::RebalanceProtocol() {
return consumer->rebalance_protocol();
}

Nan::Persistent<v8::Function> KafkaConsumer::constructor;

void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
Nan::HandleScope scope;

Expand Down Expand Up @@ -620,7 +623,8 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync);
Nan::SetPrototypeMethod(tpl, "offsetsStore", NodeOffsetsStore);

constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
PerIsolateData::For(v8::Isolate::GetCurrent())->KafkaConsumerConstructor()
.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
.ToLocalChecked());
Nan::Set(exports, Nan::New("KafkaConsumer").ToLocalChecked(),
(tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked());
Expand Down Expand Up @@ -680,7 +684,8 @@ v8::Local<v8::Object> KafkaConsumer::NewInstance(v8::Local<v8::Value> arg) {
const unsigned argc = 1;

v8::Local<v8::Value> argv[argc] = { arg };
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
v8::Local<v8::Function> cons = Nan::New<v8::Function>(
PerIsolateData::For(v8::Isolate::GetCurrent())->KafkaConsumerConstructor());
v8::Local<v8::Object> instance =
Nan::NewInstance(cons, argc, argv).ToLocalChecked();

Expand Down Expand Up @@ -1395,7 +1400,7 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) {
// cleanup the async worker
consumeLoop->WorkComplete();
consumeLoop->Destroy();

consumer->m_consume_loop = nullptr;
}

Expand Down
5 changes: 3 additions & 2 deletions src/kafka-consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

#include <nan.h>
#include <uv.h>
#include <iostream>
#include <string>
#include <vector>

Expand Down Expand Up @@ -95,6 +94,8 @@ class KafkaConsumer : public Connection {
KafkaConsumer(Conf *, Conf *);
~KafkaConsumer();

static void delete_instance(void* arg);

private:
static void part_list_print(const std::vector<RdKafka::TopicPartition*>&);

Expand All @@ -103,7 +104,7 @@ class KafkaConsumer : public Connection {
bool m_is_subscribed = false;

void* m_consume_loop = nullptr;

// Node methods
static NAN_METHOD(NodeConnect);
static NAN_METHOD(NodeSubscribe);
Expand Down
57 changes: 57 additions & 0 deletions src/per-isolate-data.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

#include <mutex>
#include <unordered_map>
#include <utility>

#include "per-isolate-data.h"

namespace NodeKafka {

static std::unordered_map<v8::Isolate*, PerIsolateData> per_isolate_data_;
static std::mutex mutex;

PerIsolateData* PerIsolateData::For(v8::Isolate* isolate) {
const std::lock_guard<std::mutex> lock(mutex);
auto maybe = per_isolate_data_.find(isolate);
if (maybe != per_isolate_data_.end()) {
return &maybe->second;
}

per_isolate_data_.emplace(std::make_pair(isolate, PerIsolateData()));

auto pair = per_isolate_data_.find(isolate);
auto perIsolateData = &pair->second;

node::AddEnvironmentCleanupHook(isolate, [](void* data) {
const std::lock_guard<std::mutex> lock(mutex);
per_isolate_data_.erase(static_cast<v8::Isolate*>(data));
}, isolate);

return perIsolateData;
}

Nan::Global<v8::Function>& PerIsolateData::AdminClientConstructor() {
return admin_client_constructor;
}

Nan::Global<v8::Function>& PerIsolateData::KafkaConsumerConstructor() {
return kafka_consumer_constructor;
}

Nan::Global<v8::Function>& PerIsolateData::KafkaProducerConstructor() {
return kafka_producer_constructor;
}

Nan::Global<v8::Function>& PerIsolateData::TopicConstructor() {
return topic_constructor;
}

} // namespace dd
39 changes: 39 additions & 0 deletions src/per-isolate-data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

#ifndef SRC_PER_ISOLATE_DATA_H_
#define SRC_PER_ISOLATE_DATA_H_

#include <node.h>
#include <nan.h>
#include <v8.h>

namespace NodeKafka {

class PerIsolateData {
private:
Nan::Global<v8::Function> admin_client_constructor;
Nan::Global<v8::Function> kafka_consumer_constructor;
Nan::Global<v8::Function> kafka_producer_constructor;
Nan::Global<v8::Function> topic_constructor;

PerIsolateData() {}

public:
static PerIsolateData* For(v8::Isolate* isolate);

Nan::Global<v8::Function>& AdminClientConstructor();
Nan::Global<v8::Function>& KafkaConsumerConstructor();
Nan::Global<v8::Function>& KafkaProducerConstructor();
Nan::Global<v8::Function>& TopicConstructor();
};

} // namespace NodeKafka

#endif // SRC_PER_ISOLATE_DATA_H_
Loading

0 comments on commit 2e2d8ab

Please sign in to comment.