Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3bd9fc0
PTP: Use uv_executor_queue_work and add options for zlib, crypto
davisjam Aug 31, 2018
bc0f42e
PTP: First pass at node::threadpool
davisjam Sep 3, 2018
7200111
pTP: rewire NodePlatform to use Threadpool
davisjam Sep 6, 2018
662825f
PTP: Refactor out a LibuvExecutor
davisjam Sep 6, 2018
a894655
PTP: linting
davisjam Sep 6, 2018
8771ad8
PTP: Add state tracking for Tasks
davisjam Sep 6, 2018
3a2bbb6
PTP: Bugfix: start and stop DelayedTaskScheduler
davisjam Sep 6, 2018
8f6df02
PTP: BlockingDrain for Threadpool and TaskQueue
davisjam Sep 6, 2018
5892ef7
PTP: CreatePlatform creates its own TP
davisjam Sep 7, 2018
95bdaad
PTP: linting, comments, etc.
davisjam Sep 7, 2018
76ca768
PTP: More unit tests
davisjam Sep 7, 2018
83e91b4
PTP: Threadpool RAII; UV_THREADPOOL_SIZE
davisjam Sep 7, 2018
a654044
PTP: Cancel Tasks Post'ed to the Threadpool
davisjam Sep 7, 2018
706f2c6
PTP: Add WorkerGroup class to manage Workers
davisjam Sep 9, 2018
d79920f
PTP: Expose NodeThreadpool as public TP face
davisjam Sep 9, 2018
223cfc4
PTP: Subclass NodeThreadpool for CPU/IO split
davisjam Sep 9, 2018
f462078
PTP: Generalize subclass type
davisjam Sep 9, 2018
153dcdb
PTP: ByTaskOriginPartitonedNodeThreadpool
davisjam Sep 9, 2018
8cf91cd
PTP: ByTaskOriginAndTypePartitonedNodeThreadpool
davisjam Sep 10, 2018
21a931a
PTP: Track Task queue and run times
davisjam Sep 10, 2018
076ccca
PTP: Refactor to DRY in PartitionedNodeThreadpool
davisjam Sep 11, 2018
ec5e624
PTP: Profiling: Sample TaskQueue lengths
davisjam Sep 11, 2018
2547f52
PTP: Add UnpartitionedPartitionedNodeThreadpool
davisjam Sep 11, 2018
61b4cd3
PTP: Visualize threadpool behavior
davisjam Sep 11, 2018
0c40738
PTP: change env vars for consistency
davisjam Sep 16, 2018
003c147
PTP: dump stats in either [Signal]Exit or Start
davisjam Sep 16, 2018
c289dcd
PTP: PrintStats tweaks for easy parsing
davisjam Sep 17, 2018
c98c8d6
PTP: address changes in the libuv PR
davisjam Sep 30, 2018
64ce6e1
REMOVE ME: PTP: include libuv changes so people can try prototype
davisjam Sep 30, 2018
883d7fc
PTP: make linter happy
davisjam Sep 30, 2018
83e4214
REMOVE ME: PTP: include libuv changes so people can try prototype
davisjam Sep 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@
'src/node_util.cc',
'src/node_v8.cc',
'src/node_stat_watcher.cc',
'src/node_threadpool.cc',
'src/node_watchdog.cc',
'src/node_worker.cc',
'src/node_zlib.cc',
Expand Down Expand Up @@ -421,6 +422,7 @@
'src/node_persistent.h',
'src/node_platform.h',
'src/node_root_certs.h',
'src/node_threadpool.h',
'src/node_version.h',
'src/node_watchdog.h',
'src/node_wrap.h',
Expand Down Expand Up @@ -963,6 +965,7 @@
'test/cctest/test_node_postmortem_metadata.cc',
'test/cctest/test_environment.cc',
'test/cctest/test_platform.cc',
'test/cctest/test_threadpool.cc',
'test/cctest/test_traced_value.cc',
'test/cctest/test_util.cc',
'test/cctest/test_url.cc'
Expand Down
17 changes: 17 additions & 0 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "node_javascript.h"
#include "node_code_cache.h"
#include "node_platform.h"
#include "node_threadpool.h"
#include "node_version.h"
#include "node_internals.h"
#include "node_revert.h"
Expand Down Expand Up @@ -283,8 +284,20 @@ class NodeTraceStateObserver :
v8::TracingController* controller_;
};

static struct {
// Returns zero on success
int Initialize(void) {
tp_.reset(new threadpool::Threadpool());
tp_->Initialize();
return uv_replace_executor(tp_->GetExecutor());
}

std::unique_ptr<threadpool::Threadpool> tp_;
} node_threadpool;

static struct {
#if NODE_USE_V8_PLATFORM
// TODO(davisjam): Pass in the node_threadpool.
void Initialize(int thread_pool_size) {
tracing_agent_.reset(new tracing::Agent());
auto controller = tracing_agent_->GetTracingController();
Expand Down Expand Up @@ -3338,6 +3351,10 @@ int Start(int argc, char** argv) {
V8::SetEntropySource(crypto::EntropySource);
#endif // HAVE_OPENSSL

// This needs to run before any work is queued to the libuv executor.
CHECK_EQ(0, node_threadpool.Initialize());

// TODO(davisjam): Pass the v8_platform the node_threadpool.
v8_platform.Initialize(
per_process_opts->v8_thread_pool_size);
V8::Initialize();
Expand Down
4 changes: 3 additions & 1 deletion src/node_crypto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4584,7 +4584,9 @@ bool ECDH::IsKeyPairValid() {
struct CryptoJob : public ThreadPoolWork {
Environment* const env;
std::unique_ptr<AsyncWrap> async_wrap;
inline explicit CryptoJob(Environment* env) : ThreadPoolWork(env), env(env) {}
inline explicit CryptoJob(Environment* env) : ThreadPoolWork(env), env(env) {
SetOptionsType(UV_WORK_USER_CPU);
}
inline void AfterThreadPoolWork(int status) final;
virtual void AfterThreadPoolWork() = 0;
static inline void Run(std::unique_ptr<CryptoJob> job, Local<Value> wrap);
Expand Down
29 changes: 27 additions & 2 deletions src/node_internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,11 +502,30 @@ class InternalCallbackScope {
bool closed_ = false;
};

// TODO(davisjam): Update to use node_threadpool.
class ThreadPoolWork {
public:
explicit inline ThreadPoolWork(Environment* env) : env_(env) {}
explicit inline ThreadPoolWork(Environment* env) : env_(env) {
work_req_options_.type = UV_WORK_UNKNOWN;
work_req_options_.priority = -1;
work_req_options_.cancelable = 0;
work_req_options_.data = nullptr;
}
inline virtual ~ThreadPoolWork() = default;

inline void SetOptionsType(uv_work_type type) {
work_req_options_.type = type;
}
inline void SetOptionsPriority(int priority) {
work_req_options_.priority = priority;
}
inline void SetOptionsCancelable(int cancelable) {
work_req_options_.cancelable = cancelable;
}
inline void SetOptionsData(void* data) {
work_req_options_.data = data;
}

inline void ScheduleWork();
inline int CancelWork();

Expand All @@ -516,13 +535,19 @@ class ThreadPoolWork {
private:
Environment* env_;
uv_work_t work_req_;
uv_work_options_t work_req_options_;
};

void ThreadPoolWork::ScheduleWork() {
env_->IncreaseWaitingRequestCounter();
int status = uv_queue_work(
// TODO(davisjam): Should we route to node TP instead?
// I don't think so.
// These are pending user requests with a CB for the event loop.
// So it makes sense for libuv to handle them start-to-finish.
int status = uv_executor_queue_work(
env_->event_loop(),
&work_req_,
&work_req_options_,
[](uv_work_t* req) {
ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req);
self->DoThreadPoolWork();
Expand Down
4 changes: 4 additions & 0 deletions src/node_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
static void RunTask(uv_timer_t* timer) {
DelayedTaskScheduler* scheduler =
ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
// This adds the Task to the TP queue.
// TODO(davisjam): Plug in TP implementation.
scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
}

Expand Down Expand Up @@ -330,6 +332,7 @@ void NodePlatform::DrainTasks(Isolate* isolate) {

do {
// Worker tasks aren't associated with an Isolate.
// TODO(davisjam): This will require some dancing with the TP.
worker_thread_task_runner_->BlockingDrain();
} while (per_isolate->FlushForegroundTasksInternal());
}
Expand Down Expand Up @@ -371,6 +374,7 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
}

void NodePlatform::CallOnWorkerThread(std::unique_ptr<v8::Task> task) {
// TODO(davisjam): Plug in TP implementation
worker_thread_task_runner_->PostTask(std::move(task));
}

Expand Down
4 changes: 4 additions & 0 deletions src/node_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ class PerIsolatePlatformData :
};

// This acts as the single worker thread task runner for all Isolates.
// API is modeled on v8::TaskRunner.
class WorkerThreadsTaskRunner {
public:
explicit WorkerThreadsTaskRunner(int thread_pool_size);

// Add task to queue for eventual Run()
void PostTask(std::unique_ptr<v8::Task> task);
// Add task to queue after at least delay_in_seconds
void PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds);

Expand All @@ -110,6 +113,7 @@ class WorkerThreadsTaskRunner {
int NumberOfWorkerThreads() const;

private:
// Push'd directly by PostTask() and indirectly by PostDelayedTask.
TaskQueue<v8::Task> pending_worker_tasks_;

class DelayedTaskScheduler;
Expand Down
Loading