Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions src/node_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "tracing/traced_value.h"
#include "util-inl.h"

#include <atomic>
#include <memory>

struct node_napi_env__ : public napi_env__ {
Expand Down Expand Up @@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource {
*v8::String::Utf8Value(env_->isolate, name)),
thread_count(thread_count_),
is_closing(false),
dispatch_state(kDispatchIdle),
context(context_),
max_queue_size(max_queue_size_),
env(env_),
Expand Down Expand Up @@ -176,7 +178,7 @@ class ThreadSafeFunction : public node::AsyncResource {
return napi_closing;
}
} else {
if (uv_async_send(&async) != 0) {
if (Send() != 0) {
return napi_generic_failure;
}
queue.push(data);
Expand Down Expand Up @@ -211,7 +213,7 @@ class ThreadSafeFunction : public node::AsyncResource {
if (is_closing && max_queue_size > 0) {
cond->Signal(lock);
}
if (uv_async_send(&async) != 0) {
if (Send() != 0) {
return napi_generic_failure;
}
}
Expand Down Expand Up @@ -275,9 +277,32 @@ class ThreadSafeFunction : public node::AsyncResource {
return napi_ok;
}

void DispatchOne() {
inline void* Context() {
return context;
}

protected:
void Dispatch() {
bool has_more = true;

// Limit maximum synchronous iteration count to prevent event loop
// starvation. See `src/node_messaging.cc` for an inspiration.
unsigned int iterations_left = kMaxIterationCount;
while (has_more && --iterations_left != 0) {
dispatch_state = kDispatchRunning;
has_more = DispatchOne();

// Send() was called while we were executing the JS function
if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) {
Comment thread
addaleax marked this conversation as resolved.
has_more = true;
}
}
}

bool DispatchOne() {
void* data = nullptr;
bool popped_value = false;
bool has_more = false;

{
node::Mutex::ScopedLock lock(this->mutex);
Expand Down Expand Up @@ -305,6 +330,8 @@ class ThreadSafeFunction : public node::AsyncResource {
} else {
CHECK_EQ(0, uv_idle_stop(&idle));
}
} else {
has_more = true;
}
}
}
Expand All @@ -322,6 +349,8 @@ class ThreadSafeFunction : public node::AsyncResource {
call_js_cb(env, js_callback, context, data);
});
}

return has_more;
}

void Finalize() {
Expand All @@ -335,10 +364,6 @@ class ThreadSafeFunction : public node::AsyncResource {
EmptyQueueAndDelete();
}

inline void* Context() {
return context;
}

void CloseHandlesAndMaybeDelete(bool set_closing = false) {
v8::HandleScope scope(env->isolate);
if (set_closing) {
Expand Down Expand Up @@ -370,6 +395,16 @@ class ThreadSafeFunction : public node::AsyncResource {
});
}

int Send() {
// Ask currently running Dispatch() to make one more iteration
unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
if ((current_state & kDispatchRunning) == kDispatchRunning) {
return 0;
}

return uv_async_send(&async);
}

// Default way of calling into JavaScript. Used when ThreadSafeFunction is
// without a call_js_cb_.
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
Expand All @@ -396,7 +431,7 @@ class ThreadSafeFunction : public node::AsyncResource {
static void IdleCb(uv_idle_t* idle) {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::idle, idle);
ts_fn->DispatchOne();
ts_fn->Dispatch();
}

static void AsyncCb(uv_async_t* async) {
Expand All @@ -411,6 +446,12 @@ class ThreadSafeFunction : public node::AsyncResource {
}

private:
static const unsigned char kDispatchIdle = 0;
static const unsigned char kDispatchRunning = 1 << 0;
static const unsigned char kDispatchPending = 1 << 1;

static const unsigned int kMaxIterationCount = 1000;

// These are variables protected by the mutex.
node::Mutex mutex;
std::unique_ptr<node::ConditionVariable> cond;
Expand All @@ -419,6 +460,7 @@ class ThreadSafeFunction : public node::AsyncResource {
uv_idle_t idle;
size_t thread_count;
bool is_closing;
std::atomic_uchar dispatch_state;

// These are variables set once, upon creation, and then never again, which
// means we don't need the mutex to read them.
Expand Down
4 changes: 2 additions & 2 deletions test/node-api/test_threadsafe_function/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function testWithJSMarshaller({
binding[threadStarter](function testCallback(value) {
array.push(value);
if (array.length === quitAfter) {
setImmediate(() => {
process.nextTick(() => {
Comment thread
jasnell marked this conversation as resolved.
Outdated
binding.StopThread(common.mustCall(() => {
resolve(array);
}), !!abort);
Expand Down Expand Up @@ -85,7 +85,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
// The default call-into-JS implementation passes no arguments.
assert.strictEqual(arguments.length, 0);
if (callCount === binding.ARRAY_LENGTH) {
setImmediate(() => {
process.nextTick(() => {
binding.StopThread(common.mustCall(() => {
resolve();
}), false);
Expand Down