mirror of
https://github.com/uNetworking/uWebSockets.js.git
synced 2026-03-03 05:00:41 -05:00
New WorkerThread.js apporoach using addChildApp
This commit is contained in:
50
examples/WorkerThreads.js
vendored
50
examples/WorkerThreads.js
vendored
@@ -1,33 +1,61 @@
|
||||
/* This example spawns two worker threads, each with their own
|
||||
* server listening to the same port (Linux feature). */
|
||||
/* This example shows two different approaches to multi-core load balancing.
|
||||
* The first approach (the oldest) requires Linux and will only work on Linux.
|
||||
* This approach listens to port 4000 on all CPUs. That's it. That's all you do.
|
||||
* Listening to the same port from many worker threads will work on Linux.
|
||||
* The second approach will work on all platforms; you set up a main acceptorApp and register all child apps
|
||||
* (worker apps) with it. The acceptorApp will listen to port 9001 and move sockets in round-robin fashion to
|
||||
* the registered child apps.
|
||||
* Note that, in this example we only create 2 worker threads. Ideally you should create as many as there are CPUs
|
||||
* in your system. But by only creating 2 here, it is simple to see the perf. gain on a system of 4 cores, as you can then
|
||||
* run the client side on the remaining 2 cores without interfering with the server side. */
|
||||
|
||||
const uWS = require('../dist/uws.js');
|
||||
const port = 9001;
|
||||
const { Worker, isMainThread, threadId } = require('worker_threads');
|
||||
const { Worker, isMainThread, threadId, parentPort } = require('worker_threads');
|
||||
const os = require('os');
|
||||
|
||||
if (isMainThread) {
|
||||
|
||||
/* The acceptorApp only listens, but must be SSL if worker apps are SSL and likewise opposite */
|
||||
const acceptorApp = uWS./*SSL*/App({
|
||||
key_file_name: 'misc/key.pem',
|
||||
cert_file_name: 'misc/cert.pem',
|
||||
passphrase: '1234'
|
||||
}).listen(port, (token) => {
|
||||
if (token) {
|
||||
console.log('Listening to port ' + port + ' from thread ' + threadId + ' as main acceptor');
|
||||
} else {
|
||||
console.log('Failed to listen to port ' + port + ' from thread ' + threadId);
|
||||
}
|
||||
});
|
||||
|
||||
/* Main thread loops over all CPUs */
|
||||
/* In this case we only spawn two (hardcoded) */
|
||||
/*os.cpus()*/[0, 1].forEach(() => {
|
||||
|
||||
/* Spawn a new thread running this source file */
|
||||
new Worker(__filename);
|
||||
new Worker(__filename).on("message", (workerAppDescriptor) => {
|
||||
acceptorApp.addChildAppDescriptor(workerAppDescriptor);
|
||||
});
|
||||
});
|
||||
|
||||
/* I guess main thread joins by default? */
|
||||
} else {
|
||||
/* Here we are inside a worker thread */
|
||||
const app = uWS.SSLApp({
|
||||
const app = uWS./*SSL*/App({
|
||||
key_file_name: 'misc/key.pem',
|
||||
cert_file_name: 'misc/cert.pem',
|
||||
passphrase: '1234'
|
||||
}).get('/*', (res, req) => {
|
||||
res.end('Hello Worker!');
|
||||
}).listen(port, (token) => {
|
||||
if (token) {
|
||||
console.log('Listening to port ' + port + ' from thread ' + threadId);
|
||||
} else {
|
||||
console.log('Failed to listen to port ' + port + ' from thread ' + threadId);
|
||||
}
|
||||
}).listen(4000, (token) => {
|
||||
if (token) {
|
||||
console.log('Listening to port ' + 4000 + ' from thread ' + threadId);
|
||||
} else {
|
||||
console.log('Failed to listen to port ' + 4000 + ' from thread ' + threadId);
|
||||
}
|
||||
});
|
||||
|
||||
/* The worker sends back its descriptor to the main acceptor */
|
||||
parentPort.postMessage(app.getDescriptor());
|
||||
}
|
||||
|
||||
@@ -713,6 +713,54 @@ std::pair<uWS::SocketContextOptions, bool> readOptionsObject(const FunctionCallb
|
||||
return {options, true};
|
||||
}
|
||||
|
||||
template <typename APP>
|
||||
void uWS_App_addChildApp(const FunctionCallbackInfo<Value> &args) {
|
||||
APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0);
|
||||
|
||||
Isolate *isolate = args.GetIsolate();
|
||||
|
||||
double descriptor = args[0]->NumberValue(isolate->GetCurrentContext()).ToChecked();
|
||||
|
||||
|
||||
APP *receivingApp;// = (APP *) args[0]->ToObject(isolate->GetCurrentContext()).ToLocalChecked()->GetAlignedPointerFromInternalField(0);
|
||||
|
||||
memcpy(&receivingApp, &descriptor, sizeof(receivingApp));
|
||||
|
||||
/* Todo: check the class type of args[0] must match class type of args.Holder() */
|
||||
//if (args[0])
|
||||
|
||||
//std::cout << "addChildApp: " << receivingApp << std::endl;
|
||||
|
||||
app->addChildApp(receivingApp);
|
||||
|
||||
args.GetReturnValue().Set(args.Holder());
|
||||
}
|
||||
|
||||
template <typename APP>
|
||||
void uWS_App_getDescriptor(const FunctionCallbackInfo<Value> &args) {
|
||||
APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0);
|
||||
|
||||
Isolate *isolate = args.GetIsolate();
|
||||
|
||||
static_assert(sizeof(double) >= sizeof(app));
|
||||
|
||||
//static thread_local std::unordered_set<UniquePersistent<Object>> persistentApps;
|
||||
|
||||
UniquePersistent<Object> *persistentApp = new UniquePersistent<Object>;
|
||||
persistentApp->Reset(args.GetIsolate(), args.Holder());
|
||||
|
||||
//persistentApps.emplace(persistentApp);
|
||||
|
||||
double descriptor = 0;
|
||||
memcpy(&descriptor, &app, sizeof(app));
|
||||
|
||||
//std::cout << "getDescriptor: " << app << std::endl;
|
||||
|
||||
//std::cout << "Loop: " << app->getLoop() << std::endl;
|
||||
|
||||
args.GetReturnValue().Set(Number::New(isolate, descriptor));
|
||||
}
|
||||
|
||||
template <typename APP>
|
||||
void uWS_App_addServerName(const FunctionCallbackInfo<Value> &args) {
|
||||
APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0);
|
||||
@@ -920,6 +968,11 @@ void uWS_App(const FunctionCallbackInfo<Value> &args) {
|
||||
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "listen_unix", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_listen_unix<APP>, args.Data()));
|
||||
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "filter", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_filter<APP>, args.Data()));
|
||||
|
||||
/* load balancing */
|
||||
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "addChildAppDescriptor", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_addChildApp<APP>, args.Data()));
|
||||
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "getDescriptor", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_getDescriptor<APP>, args.Data()));
|
||||
|
||||
|
||||
/* ws, listen */
|
||||
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "ws", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_ws<APP>, args.Data()));
|
||||
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "publish", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_publish<APP>, args.Data()));
|
||||
|
||||
Submodule uWebSockets updated: 9bbf161a72...9cca5d68e0
Reference in New Issue
Block a user