Worker thread vs child process fork in Node.js
Published on: 7th Jun 2021
Updated on: 7th Jun 2021
Overview
Node.js is using one thread to run the user code and having a thread pool (in libuv) to handle the IO, network and database API calls. This architecture is ok if you have lots of tasks that require a short process.
But, if you are running a task that involves heavy data processing as a result of high CPU usage, it will block the main thread and cause Node.js to become unresponsive.
With Node.js, you have two choices to solve this issue:
-
Partition the task into smaller tasks and run it.
To do this, you may use
setTimeout()
and process partial data consecutively every 200ms or whatever duration that is best suited. On every call, it will process a small portion of the big task.You will see how we use
setTimeout()
in our test script to run the test cases which have been partitioned into smaller tasks without blocking the main thread. -
Move the task to another thread or process so that the main thread is not blocked. Please take note that moving the task to another thread or process does not mean that the CPU usage drops. You are merely moving the "shit" to somewhere else that still can block other programs.
In order to move the task to somewhere else to process, you have two choices in Node.js: worker thread and child process.
There is a third choice (which is not a Node.js library). You may have to develop another program to process this task. We are not going to discuss this method here.
The script to be run with Worker thread
Let's create a script and name it 8101-test-worker-child.js
. Then, copy the codes below into this file.
For the Worker thread, it creates a new thread, loads the script and runs it. The script is now running outside of the Node.js main thread.
// file: 8101-test-worker-child.js
const { parentPort, workerData } = require('worker_threads');
parentPort.on('message', function (msg) {
let hrend = process.hrtime(msg.hstart);
parentPort.postMessage({
msg: 'pong',
thr_recd_msg_ms: (hrend[1] / 1000000)
});
});
setTimeout(() => {
// end the worker thread
process.exit(0);
},
// keep the child process in memory until all processes & threads have been started.
(workerData * 1200) - (workerData * 100));
The script to be fork
Let's create another script and name it 8102-test-fork-child.js
. Then, copy the codes below into this file.
For child_process.fork(), it runs a new node.exe instance (i.e., a new process). Then, it loads the script and runs it.
// file: 8102-test-fork-child.js
process.on('message', function (msg) {
let hrend = process.hrtime(msg.hstart);
process.send({
msg: 'pong',
thr_recd_msg_ms: (hrend[1] / 1000000)
});
});
setTimeout(() => {
// end the fork process
process.exit(0);
},
// keep the child process in memory until all processes & threads have been started.
(parseInt(process.argv[2]) * 1200) - (parseInt(process.argv[2]) * 100));
The main testing script
In this test, we want to find out the following performance factor,
- What is the time required to load the script for execution.
- How long does it take for the first message received by the thread/process.
- How long does it take for the first message to be responded to by the thread/process.
- What is the memory usage for the cold start (i.e., the first startup without using the cache).
- What is the memory usage for the hot start.
Here is our test plan:
- Creates 10 worker threads with Worker() on every second.
- Creates 10 child processes with fork() on every second after all work threads have exited.
- Finally, compute the statistics.
Let's create the third script and name it 8100-test-thread-main.js
. Then, copy the codes below into this file.
// file: 8100-test-thread-main.js
const { fork } = require('child_process');
const { Worker } = require('worker_threads');
// # of threads to be created
let MAX_THREAD = 10;
// the thread index
let LAST_ID_FORK = 0;
let LAST_ID_WORKER = 0;
// create the data store to hold the counter.
let fork_stat = new Array(MAX_THREAD);
let worker_stat = new Array(MAX_THREAD);
for (let i = 0; i < MAX_THREAD; i++) {
fork_stat[i] = {
startup_time_ms: null,
rss_mem_kb: null,
thr_recd_msg_ms: null,
msg_resp_ms: null,
done: false
};
worker_stat[i] = {
startup_time_ms: null,
rss_mem_kb: null,
thr_recd_msg_ms: null,
msg_resp_ms: null,
done: false
};
}
function start_fork() {
let track_msg_resp_ms_fork;
let ID = LAST_ID_FORK++;
let mem0 = process.memoryUsage().rss;
let hstart = process.hrtime();
let th1 = fork('./8102-test-fork-child.js', [MAX_THREAD]);
let hrend = process.hrtime(hstart);
let mem2 = process.memoryUsage().rss;
fork_stat[ID].startup_time_ms = hrend[1] / 1_000_000;
fork_stat[ID].rss_mem_kb = (mem2 - mem0) / 1_024;
console.log('\x1b[35mfork #' + ID + '==> startup time (hr): %ds %dms\x1b[0m', hrend[0], hrend[1] / 1_000_000);
console.log('\x1b[35mfork #' + ID + '==> mem usage: rss changes=%dkb, rss=%dmb\x1b[0m', (mem2 - mem0) / 1_024, mem0.rss / 1_024 / 1_024);
th1.on('message', function (msg) {
if (track_msg_resp_ms_fork) {
// the duration of the first message received by the child process
fork_stat[ID].thr_recd_msg_ms = msg.thr_recd_msg_ms;
// the duration of the first message from send until response.
let hrend = process.hrtime(track_msg_resp_ms_fork);
fork_stat[ID].msg_resp_ms = hrend[1] / 1_000_000;
console.log('\x1b[35mfork #' + ID + '==> msg time : %ds %dms\x1b[0m', hrend[0], hrend[1] / 1_000_000);
track_msg_resp_ms_fork = null;
}
});
th1.on('exit', function () {
fork_stat[ID].done = true;
});
//-------------------------
// start a new thread once the current thread is online
if (LAST_ID_FORK < MAX_THREAD) {
setTimeout(() => {
start_fork();
}, 1000);
}
//-------------------------
setTimeout(() => {
track_msg_resp_ms_fork = process.hrtime();
th1.send({
msg: 'ping',
hstart: track_msg_resp_ms_fork
});
}, 500);
}
function start_worker() {
let track_msg_resp_ms0;
let ID = LAST_ID_WORKER++;
let mem0 = process.memoryUsage().rss;
let hstart = process.hrtime();
let th1 = new Worker('./8101-test-worker-child.js', { workerData: MAX_THREAD });
let hrend = process.hrtime(hstart);
let mem2 = process.memoryUsage().rss;
worker_stat[ID].startup_time_ms = hrend[1] / 1_000_000;
worker_stat[ID].rss_mem_kb = (mem2 - mem0) / 1_024;
console.log('\x1b[32mworker #' + ID + '==> startup time (hr): %ds %dms\x1b[0m', hrend[0], hrend[1] / 1_000_000);
console.log('\x1b[32mworker #' + ID + '==> mem usage: rss changes=%dkb, rss=%dmb\x1b[0m', (mem2 - mem0) / 1_024, mem0.rss / 1_024 / 1_024);
th1.on('message', function (msg) {
if (track_msg_resp_ms0) {
// the duration of the first message received by the child process
worker_stat[ID].thr_recd_msg_ms = msg.thr_recd_msg_ms;
// the duration of the first message from send until response.
let hrend = process.hrtime(track_msg_resp_ms0);
worker_stat[ID].msg_resp_ms = hrend[1] / 1_000_000;
console.log('\x1b[32mworker #' + ID + '==> msg time : %ds %dms\x1b[0m', hrend[0], hrend[1] / 1_000_000);
track_msg_resp_ms0 = null;
}
});
th1.on('exit', function (exitCode) {
worker_stat[ID].done = true;
});
//-------------------------
// start a new thread once the current thread is online
if (LAST_ID_WORKER < MAX_THREAD) {
setTimeout(() => {
start_worker();
}, 1000);
}
else if (LAST_ID_WORKER == MAX_THREAD) {
// check the worker status.
function check_worker() {
for (let i = 0; i < MAX_THREAD; i++) {
if (!worker_stat[i].done
) {
// check again
setTimeout(check_worker, 500);
// exit
return;
}
}
// if all workers have completed their job, start fork testing.
setTimeout(() => {
start_fork();
}, 1000);
}
check_worker();
}
//-------------------------
setTimeout(() => {
track_msg_resp_ms0 = process.hrtime();
th1.postMessage({
msg: 'ping',
hstart: track_msg_resp_ms0
});
}, 500);
}
function wait_for_result() {
// make sure that all threads have exited.
for (let i = 0; i < MAX_THREAD; i++) {
if (!fork_stat[i].done
|| !worker_stat[i].done
) {
setTimeout(wait_for_result, 500);
return;
}
}
// dump the data
console.log('fork stat');
console.table(fork_stat);
console.log('worker stat');
console.table(worker_stat);
let fork_summary = {
startup_time_ms: null,
thr_recd_msg_ms: null,
msg_resp_ms: null,
rss_mem_kb_cold: null,
rss_mem_kb: null,
};
let worker_summary = {
startup_time_ms: null,
thr_recd_msg_ms: null,
msg_resp_ms: null,
rss_mem_kb_cold: null,
rss_mem_kb: null,
};
let fork_summary_cnt = 0;
let worker_summary_cnt = 0;
// skip the first item (cold start requires more memory and cpu to JIT the script).
for (let i = 1; i < MAX_THREAD; i++) {
// GC might kick in and cause the RSS to reduce.
// Exclude the negative RSS value.
if (fork_stat[i].rss_mem_kb > 0) {
fork_summary.startup_time_ms += fork_stat[i].startup_time_ms;
fork_summary.thr_recd_msg_ms += fork_stat[i].thr_recd_msg_ms;
fork_summary.msg_resp_ms += fork_stat[i].msg_resp_ms;
fork_summary.rss_mem_kb += fork_stat[i].rss_mem_kb;
fork_summary_cnt++;
}
// GC might kick in and cause the RSS to reduce.
// Exclude the negative RSS value.
if (worker_stat[i].rss_mem_kb > 0) {
worker_summary.startup_time_ms += worker_stat[i].startup_time_ms;
worker_summary.thr_recd_msg_ms += worker_stat[i].thr_recd_msg_ms;
worker_summary.msg_resp_ms += worker_stat[i].msg_resp_ms;
worker_summary.rss_mem_kb += worker_stat[i].rss_mem_kb;
worker_summary_cnt++;
}
}
// compute the summary
fork_summary.startup_time_ms = (fork_summary.startup_time_ms / fork_summary_cnt).toFixed(3);
fork_summary.thr_recd_msg_ms = (fork_summary.thr_recd_msg_ms / fork_summary_cnt).toFixed(3);
fork_summary.msg_resp_ms = (fork_summary.msg_resp_ms / fork_summary_cnt).toFixed(3);
fork_summary.rss_mem_kb_cold = fork_stat[0].rss_mem_kb;
fork_summary.rss_mem_kb = (fork_summary.rss_mem_kb / fork_summary_cnt).toFixed(3);
worker_summary.startup_time_ms = (worker_summary.startup_time_ms / worker_summary_cnt).toFixed(3);
worker_summary.thr_recd_msg_ms = (worker_summary.thr_recd_msg_ms / worker_summary_cnt).toFixed(3);
worker_summary.msg_resp_ms = (worker_summary.msg_resp_ms / worker_summary_cnt).toFixed(3);
worker_summary.rss_mem_kb_cold = worker_stat[0].rss_mem_kb;
worker_summary.rss_mem_kb = (worker_summary.rss_mem_kb / worker_summary_cnt).toFixed(3);
// show the summary
console.log('fork summary:');
console.table(fork_summary);
console.log('worker summary:');
console.table(worker_summary);
}
// start the test now
start_worker();
// wait for completion
setTimeout(wait_for_result, 500);
The output from the test script
The following output was generated by the script. Please take note that the numbers shown below will be slightly different from time to time.
fork stat
┌─────────┬─────────────────┬────────────┬─────────────────┬─────────────┬──────┐
│ (index) │ startup_time_ms │ rss_mem_kb │ thr_recd_msg_ms │ msg_resp_ms │ done │
├─────────┼─────────────────┼────────────┼─────────────────┼─────────────┼──────┤
│ 0 │ 5.1524 │ 232 │ 1.5234 │ 2.3242 │ true │
│ 1 │ 3.5835 │ 32 │ 1.07 │ 1.5506 │ true │
│ 2 │ 4.7324 │ 40 │ 1.1318 │ 1.6314 │ true │
│ 3 │ 3.4796 │ 36 │ 1.0982 │ 1.5729 │ true │
│ 4 │ 3.4885 │ 36 │ 1.0856 │ 1.5664 │ true │
│ 5 │ 4.4636 │ 40 │ 1.1289 │ 1.6012 │ true │
│ 6 │ 4.6732 │ 32 │ 1.1305 │ 1.6056 │ true │
│ 7 │ 4.4914 │ -880 │ 1.0849 │ 1.5322 │ true │
│ 8 │ 3.5824 │ 36 │ 1.1107 │ 1.5372 │ true │
│ 9 │ 3.6338 │ 44 │ 1.1048 │ 1.5723 │ true │
└─────────┴─────────────────┴────────────┴─────────────────┴─────────────┴──────┘
worker stat
┌─────────┬─────────────────┬────────────┬─────────────────┬─────────────┬──────┐
│ (index) │ startup_time_ms │ rss_mem_kb │ thr_recd_msg_ms │ msg_resp_ms │ done │
├─────────┼─────────────────┼────────────┼─────────────────┼─────────────┼──────┤
│ 0 │ 7.4274 │ 164 │ 0.2243 │ 0.4045 │ true │
│ 1 │ 1.6053 │ 68 │ 0.1714 │ 0.2598 │ true │
│ 2 │ 1.317 │ 56 │ 0.1671 │ 0.2536 │ true │
│ 3 │ 1.2227 │ 48 │ 0.1965 │ 0.2862 │ true │
│ 4 │ 0.8986 │ 56 │ 0.1846 │ 0.2742 │ true │
│ 5 │ 0.7919 │ 60 │ 0.1832 │ 0.2697 │ true │
│ 6 │ 0.8934 │ 60 │ 0.2213 │ 0.3164 │ true │
│ 7 │ 0.8894 │ 60 │ 0.1925 │ 0.2883 │ true │
│ 8 │ 1.2315 │ 48 │ 0.1788 │ 0.24 │ true │
│ 9 │ 0.8906 │ 60 │ 0.1693 │ 0.2617 │ true │
└─────────┴─────────────────┴────────────┴─────────────────┴─────────────┴──────┘
fork summary:
┌─────────────────┬──────────┐
│ (index) │ Values │
├─────────────────┼──────────┤
│ startup_time_ms │ '3.955' │
│ thr_recd_msg_ms │ '1.108' │
│ msg_resp_ms │ '1.580' │
│ rss_mem_kb_cold │ 232 │
│ rss_mem_kb │ '37.000' │
└─────────────────┴──────────┘
worker summary:
┌─────────────────┬──────────┐
│ (index) │ Values │
├─────────────────┼──────────┤
│ startup_time_ms │ '1.082' │
│ thr_recd_msg_ms │ '0.185' │
│ msg_resp_ms │ '0.272' │
│ rss_mem_kb_cold │ 164 │
│ rss_mem_kb │ '57.333' │
└─────────────────┴──────────┘
Summary from the test
We summarize the above statistics into the following table:
Item | Worker | fork |
---|---|---|
Startup time | 1.082ms | 3.955ms |
Duration for receiving the first message | 0.185ms | 1.108ms |
Duration for receiving the first respond | 0.2729ms | 1.580ms |
Memory usage (cold start) | 164kb | 232kb |
Memory usage (hot start) | 57kb | 37kb + 32MB (node.exe) |
-
Forking a script requires running a new node.exe (v8) instance which requires 32MB for the process. On Windows, run Task Manager and switch to the Details tab to see the additional node.exe that was created when forking script.
-
In the
rss_mem_kb
column, the negative value means the GC (garbage collection) has kicked in and is affecting the statistics. If this happens, the entire record will be excluded from the calculation.
Conclusion from the statistics
- Worker has a shorter startup time as compared to fork.
- Worker has a faster IPC (inter-process communication) message as compared to fork.
- Worker uses less memory while fork has a 32MB memory for Node.exe.
Things to take note
What if there is a bug in the script
Let's create a new file and name it 8111-crashing-script.js
. Then, copy the codes below into this file.
Basically, the script will crash because variable a
has not been declared and instantiated.
// file: 8111-crashing-script.js
// let's crash the app with undefined variable
a.b = 'c';
console.log(a);
Let's create a new file and name it 8110-crash-main.js
. Then, copy the codes below into this file.
// file: 8110-crash-main.js
const { fork } = require('child_process');
const { Worker } = require('worker_threads');
// run the crappy child script
let f = fork('./8111-crashing-script.js');
// run the crappy worker script
function run_worker() {
let w = new Worker('./8111-crashing-script.js');
}
let MAX_RUN = 5;
let RUN_IDX = 0;
function show_something() {
// print something so that we know the main thread is still alive.
console.log(new Date());
RUN_IDX++;
if (RUN_IDX < MAX_RUN) {
setTimeout(() => {
show_something();
}, 1000);
}
else if (RUN_IDX >= MAX_RUN) {
// let's see what will happen
run_worker();
// at runtime, the main thread will not reach the following line.
setTimeout(() => {
console.log('exit from main thread');
}, 1000);
}
}
// to delay exit from the main thread, we need to print something every second.
show_something();
Upon running 8110-crash-main.js
, the following output will appear.
2021-06-08T02:30:17.559Z
...
ReferenceError: a is not defined
...
2021-06-08T02:30:18.567Z
2021-06-08T02:30:19.573Z
2021-06-08T02:30:20.585Z
2021-06-08T02:30:21.594Z
...
ReferenceError [Error]: a is not defined
From the above output, if the fork was running a crappy script, the child process will crash but it will not affect the main thread. On the other hand, if the Worker is running a crappy script, it will bring down the main thread.
To avoid the main thread from crashing, the worker thread must handle the error
event.
function run_worker() {
let w = new Worker('./8111-crashing-script.js');
w.on('error', function(err) {
console.log('something bad happened in the worker thread..');
});
}
How to terminate the thread/process
-
Both worker and fork will block the main process from shutting down. To allow the main process shutting down properly, you have to call any one of the following API.
-
Worker
- Call
worker.unref()
to indicate that the main process allows shutting down. - Call
worker.terminate()
to kill the worker. - Call
process.exit()
in the worker script to stop the thread.
- Call
-
Fork
- Call
fork.unref()
to indicate that the main process allows shutting down. - Call
fork.kill()
to kill the process. - Call
process.exit()
in the forked script to stop the process.
- Call
Let's create a new script file to print 5 times of current time before the end of the script.
// file: 8121-working-hard.js
let MAX_RUN = 5;
let RUN_IDX = 0;
function show_something() {
// print something so that we know the main thread is still alive.
console.log(new Date());
RUN_IDX++;
if (RUN_IDX < MAX_RUN) {
setTimeout(() => {
show_something();
}, 1000);
}
}
// to delay exit from the main thread, we need to print something every second.
show_something();
Let's create the main script to run the test.
//file: 8120-working-hard-main.js
const { fork } = require('child_process');
const { Worker } = require('worker_threads');
let f = fork('./8121-working-hard.js');
// let w = new Worker('./8121-working-hard.js');
console.log('exit from main script');
In the first run, we disabled the calls to the Worker line and it showed the following output. The main thread has exited while the child process is still running.
C:\Program Files\nodejs\node.exe .\8120-working-hard-main.js
Debugger attached.
exit from main thread
Debugger attached.
2021-06-08T02:52:43.637Z
2021-06-08T02:52:44.655Z
2021-06-08T02:52:45.657Z
2021-06-08T02:52:46.661Z
2021-06-08T02:52:47.668Z
Waiting for the debugger to disconnect...
Waiting for the debugger to disconnect...
In the next test, we disable the fork and enable the worker and see if there is any difference.
//let f = fork('./8121-working-hard.js');
let w = new Worker('./8121-working-hard.js');
The output will look like this:
C:\Program Files\nodejs\node.exe .\8120-working-hard-main.js
Debugger attached.
exit from main thread
2021-06-08T02:55:35.408Z
2021-06-08T02:55:36.422Z
2021-06-08T02:55:37.425Z
2021-06-08T02:55:38.429Z
2021-06-08T02:55:39.436Z
Waiting for the debugger to disconnect...
This means if the worker or child process is busy, the program will continue to run even though the main thread has reached the last line.
If you want the worker and child process to die together with the main thread, you have to call process.exit()
as shown below.
console.log('exit from main script');
setTimeout(() => {
process.exit();
}, 2000);
Please take note that process.exit()
will stop the worker and child process without waiting for it to complete its task. This could create a mess if it was terminated during the data processing.
Related posts
- In case you want to avoid high CPU usage, you may have to design your program to be stoppable and process the data batch by batch. Please refers to this MSSQL post on how to Design a stoppable long running process
Jump to #NODEJS blog
Jump to #MSSQL blog
Author
Lau Hon Wan, software developer.