Welcome to ciysys blog

Worker thread vs child process fork in Node.js

Published on: 7th Jun 2021

Updated on: 11th Jun 2021

Overview

Node.js is using one thread to run the user code and having a thread pool (in libuv) to handle the IO, network and database API calls. This architecture is ok if you have lots of tasks that require a short process.

But, if you are running a task that involves heavy data processing as a result of high CPU usage, it will block the main thread and cause Node.js to become unresponsive.

With Node.js, you have two choices to solve this issue:

  1. Partition the task into smaller tasks and run it.

    To do this, you may use setTimeout() and process partial data consecutively every 200ms or whatever duration that is best suited. On every call, it will process a small portion of the big task.

    You will see how we use setTimeout() in our test script to run the test cases which have been partitioned into smaller tasks without blocking the main thread.

  2. Move the task to another thread or process so that the main thread is not blocked. Please take note that moving the task to another thread or process does not mean that the CPU usage drops. You are merely moving the "shit" to somewhere else that still can block other programs.

    In order to move the task to somewhere else to process, you have two choices in Node.js: worker thread and child process.

    There is a third choice (which is not a Node.js library). You may have to develop another program to process this task. We are not going to discuss this method here.

The script to be run with Worker thread

Let's create a script and name it 8101-test-worker-child.js. Then, copy the codes below into this file.

For the Worker thread, it creates a new thread, loads the script and runs it. The script is now running outside of the Node.js main thread.

// file: 8101-test-worker-child.js
const { parentPort, workerData } = require('worker_threads');

parentPort.on('message', function (msg) {
    let hrend = process.hrtime(msg.hstart);
    parentPort.postMessage({
        msg: 'pong',
        thr_recd_msg_ms: (hrend[1] / 1000000)
    });
});

setTimeout(() => {
    // end the worker thread
    process.exit(0);
},
    // keep the child process in memory until all processes & threads have been started.
    (workerData * 1200) - (workerData * 100));

The script to be fork

Let's create another script and name it 8102-test-fork-child.js. Then, copy the codes below into this file.

For child_process.fork(), it runs a new node.exe instance (i.e., a new process). Then, it loads the script and runs it.

// file: 8102-test-fork-child.js
process.on('message', function (msg) {
    let hrend = process.hrtime(msg.hstart);
    process.send({
        msg: 'pong',
        thr_recd_msg_ms: (hrend[1] / 1000000)
    });
});

setTimeout(() => {
    // end the fork process
    process.exit(0);
},
    // keep the child process in memory until all processes & threads have been started.
    (parseInt(process.argv[2]) * 1200) - (parseInt(process.argv[2]) * 100));

The main testing script

In this test, we want to find out the following performance factor,

Here is our test plan:

  1. Creates 10 worker threads with Worker() on every second.
  2. Creates 10 child processes with fork() on every second after all work threads have exited.
  3. Finally, compute the statistics.

Let's create the third script and name it 8100-test-thread-main.js. Then, copy the codes below into this file.

// file: 8100-test-thread-main.js
const { fork } = require('child_process');
const { Worker } = require('worker_threads');

// # of threads to be created
let MAX_THREAD = 10;

// the thread index
let LAST_ID_FORK = 0;
let LAST_ID_WORKER = 0;

// create the data store to hold the counter.
let fork_stat = new Array(MAX_THREAD);
let worker_stat = new Array(MAX_THREAD);

for (let i = 0; i < MAX_THREAD; i++) {
    fork_stat[i] = {
        startup_time_ms: null,
        rss_mem_kb: null,
        thr_recd_msg_ms: null,
        msg_resp_ms: null,
        done: false
    };
    worker_stat[i] = {
        startup_time_ms: null,
        rss_mem_kb: null,
        thr_recd_msg_ms: null,
        msg_resp_ms: null,
        done: false
    };
}

function start_fork() {
    let track_msg_resp_ms_fork;
    let ID = LAST_ID_FORK++;

    let mem0 = process.memoryUsage().rss;
    let hstart = process.hrtime();

    let th1 = fork('./8102-test-fork-child.js', [MAX_THREAD]);

    let hrend = process.hrtime(hstart);
    let mem2 = process.memoryUsage().rss;

    fork_stat[ID].startup_time_ms = hrend[1] / 1_000_000;
    fork_stat[ID].rss_mem_kb = (mem2 - mem0) / 1_024;
    console.log('\x1b[35mfork #' + ID + '==> startup time (hr): %ds %dms\x1b[0m', hrend[0], hrend[1] / 1_000_000);
    console.log('\x1b[35mfork #' + ID + '==> mem usage: rss changes=%dkb, rss=%dmb\x1b[0m', (mem2 - mem0) / 1_024, mem0.rss / 1_024 / 1_024);

    th1.on('message', function (msg) {
        if (track_msg_resp_ms_fork) {
            // the duration of the first message received by the child process
            fork_stat[ID].thr_recd_msg_ms = msg.thr_recd_msg_ms;

            // the duration of the first message from send until response.
            let hrend = process.hrtime(track_msg_resp_ms_fork);
            fork_stat[ID].msg_resp_ms = hrend[1] / 1_000_000;
            console.log('\x1b[35mfork #' + ID + '==> msg time : %ds %dms\x1b[0m', hrend[0], hrend[1] / 1_000_000);
            track_msg_resp_ms_fork = null;
        }
    });

    th1.on('exit', function () {
        fork_stat[ID].done = true;
    });

    //-------------------------
    // start a new thread once the current thread is online
    if (LAST_ID_FORK < MAX_THREAD) {
        setTimeout(() => {
            start_fork();
        }, 1000);
    }

    //-------------------------
    setTimeout(() => {
        track_msg_resp_ms_fork = process.hrtime();
        th1.send({
            msg: 'ping',
            hstart: track_msg_resp_ms_fork
        });
    }, 500);
}

function start_worker() {
    let track_msg_resp_ms0;
    let ID = LAST_ID_WORKER++;

    let mem0 = process.memoryUsage().rss;
    let hstart = process.hrtime();

    let th1 = new Worker('./8101-test-worker-child.js', { workerData: MAX_THREAD });

    let hrend = process.hrtime(hstart);
    let mem2 = process.memoryUsage().rss;

    worker_stat[ID].startup_time_ms = hrend[1] / 1_000_000;
    worker_stat[ID].rss_mem_kb = (mem2 - mem0) / 1_024;
    console.log('\x1b[32mworker #' + ID + '==> startup time (hr): %ds %dms\x1b[0m', hrend[0], hrend[1] / 1_000_000);
    console.log('\x1b[32mworker #' + ID + '==> mem usage: rss changes=%dkb, rss=%dmb\x1b[0m', (mem2 - mem0) / 1_024, mem0.rss / 1_024 / 1_024);

    th1.on('message', function (msg) {
        if (track_msg_resp_ms0) {
            // the duration of the first message received by the child process
            worker_stat[ID].thr_recd_msg_ms = msg.thr_recd_msg_ms;

            // the duration of the first message from send until response.
            let hrend = process.hrtime(track_msg_resp_ms0);
            worker_stat[ID].msg_resp_ms = hrend[1] / 1_000_000;
            console.log('\x1b[32mworker #' + ID + '==> msg time : %ds %dms\x1b[0m', hrend[0], hrend[1] / 1_000_000);
            track_msg_resp_ms0 = null;
        }
    });

    th1.on('exit', function (exitCode) {
        worker_stat[ID].done = true;
    });

    //-------------------------
    // start a new thread once the current thread is online
    if (LAST_ID_WORKER < MAX_THREAD) {
        setTimeout(() => {
            start_worker();
        }, 1000);
    }
    else if (LAST_ID_WORKER == MAX_THREAD) {
        // check the worker status.
        function check_worker() {
            for (let i = 0; i < MAX_THREAD; i++) {
                if (!worker_stat[i].done
                ) {
                    // check again
                    setTimeout(check_worker, 500);
                    // exit
                    return;
                }
            }

            // if all workers have completed their job, start fork testing.
            setTimeout(() => {
                start_fork();
            }, 1000);
        }

        check_worker();
    }

    //-------------------------
    setTimeout(() => {
        track_msg_resp_ms0 = process.hrtime();
        th1.postMessage({
            msg: 'ping',
            hstart: track_msg_resp_ms0
        });
    }, 500);
}

function wait_for_result() {
    // make sure that all threads have exited.
    for (let i = 0; i < MAX_THREAD; i++) {
        if (!fork_stat[i].done
            || !worker_stat[i].done
        ) {
            setTimeout(wait_for_result, 500);
            return;
        }
    }

    // dump the data
    console.log('fork stat');
    console.table(fork_stat);
    console.log('worker stat');
    console.table(worker_stat);

    let fork_summary = {
        startup_time_ms: null,
        thr_recd_msg_ms: null,
        msg_resp_ms: null,
        rss_mem_kb_cold: null,
        rss_mem_kb: null,
    };
    let worker_summary = {
        startup_time_ms: null,
        thr_recd_msg_ms: null,
        msg_resp_ms: null,
        rss_mem_kb_cold: null,
        rss_mem_kb: null,
    };
    let fork_summary_cnt = 0;
    let worker_summary_cnt = 0;

    // skip the first item (cold start requires more memory and cpu to JIT the script).    
    for (let i = 1; i < MAX_THREAD; i++) {
        // GC might kick in and cause the RSS to reduce.
        // Exclude the negative RSS value.
        if (fork_stat[i].rss_mem_kb > 0) {
            fork_summary.startup_time_ms += fork_stat[i].startup_time_ms;
            fork_summary.thr_recd_msg_ms += fork_stat[i].thr_recd_msg_ms;
            fork_summary.msg_resp_ms += fork_stat[i].msg_resp_ms;
            fork_summary.rss_mem_kb += fork_stat[i].rss_mem_kb;
            fork_summary_cnt++;
        }

        // GC might kick in and cause the RSS to reduce.
        // Exclude the negative RSS value.
        if (worker_stat[i].rss_mem_kb > 0) {
            worker_summary.startup_time_ms += worker_stat[i].startup_time_ms;
            worker_summary.thr_recd_msg_ms += worker_stat[i].thr_recd_msg_ms;
            worker_summary.msg_resp_ms += worker_stat[i].msg_resp_ms;
            worker_summary.rss_mem_kb += worker_stat[i].rss_mem_kb;
            worker_summary_cnt++;
        }
    }

    // compute the summary
    fork_summary.startup_time_ms = (fork_summary.startup_time_ms / fork_summary_cnt).toFixed(3);
    fork_summary.thr_recd_msg_ms = (fork_summary.thr_recd_msg_ms / fork_summary_cnt).toFixed(3);
    fork_summary.msg_resp_ms = (fork_summary.msg_resp_ms / fork_summary_cnt).toFixed(3);
    fork_summary.rss_mem_kb_cold = fork_stat[0].rss_mem_kb;
    fork_summary.rss_mem_kb = (fork_summary.rss_mem_kb / fork_summary_cnt).toFixed(3);

    worker_summary.startup_time_ms = (worker_summary.startup_time_ms / worker_summary_cnt).toFixed(3);
    worker_summary.thr_recd_msg_ms = (worker_summary.thr_recd_msg_ms / worker_summary_cnt).toFixed(3);
    worker_summary.msg_resp_ms = (worker_summary.msg_resp_ms / worker_summary_cnt).toFixed(3);
    worker_summary.rss_mem_kb_cold = worker_stat[0].rss_mem_kb;
    worker_summary.rss_mem_kb = (worker_summary.rss_mem_kb / worker_summary_cnt).toFixed(3);

    // show the summary
    console.log('fork summary:');
    console.table(fork_summary);
    console.log('worker summary:');
    console.table(worker_summary);
}

// start the test now
start_worker();
// wait for completion
setTimeout(wait_for_result, 500);

The output from the test script

The following output was generated by the script. Please take note that the numbers shown below will be slightly different from time to time.

fork stat
┌─────────┬─────────────────┬────────────┬─────────────────┬─────────────┬──────┐
│ (index) │ startup_time_ms │ rss_mem_kb │ thr_recd_msg_ms │ msg_resp_ms │ done │
├─────────┼─────────────────┼────────────┼─────────────────┼─────────────┼──────┤
│    0    │     5.1524      │    232     │     1.5234      │   2.3242    │ true │
│    1    │     3.5835      │     32     │      1.07       │   1.5506    │ true │
│    2    │     4.7324      │     40     │     1.1318      │   1.6314    │ true │
│    3    │     3.4796      │     36     │     1.0982      │   1.5729    │ true │
│    4    │     3.4885      │     36     │     1.0856      │   1.5664    │ true │
│    5    │     4.4636      │     40     │     1.1289      │   1.6012    │ true │
│    6    │     4.6732      │     32     │     1.1305      │   1.6056    │ true │
│    7    │     4.4914      │    -880    │     1.0849      │   1.5322    │ true │
│    8    │     3.5824      │     36     │     1.1107      │   1.5372    │ true │
│    9    │     3.6338      │     44     │     1.1048      │   1.5723    │ true │
└─────────┴─────────────────┴────────────┴─────────────────┴─────────────┴──────┘
worker stat
┌─────────┬─────────────────┬────────────┬─────────────────┬─────────────┬──────┐
│ (index) │ startup_time_ms │ rss_mem_kb │ thr_recd_msg_ms │ msg_resp_ms │ done │
├─────────┼─────────────────┼────────────┼─────────────────┼─────────────┼──────┤
│    0    │     7.4274      │    164     │     0.2243      │   0.4045    │ true │
│    1    │     1.6053      │     68     │     0.1714      │   0.2598    │ true │
│    2    │      1.317      │     56     │     0.1671      │   0.2536    │ true │
│    3    │     1.2227      │     48     │     0.1965      │   0.2862    │ true │
│    4    │     0.8986      │     56     │     0.1846      │   0.2742    │ true │
│    5    │     0.7919      │     60     │     0.1832      │   0.2697    │ true │
│    6    │     0.8934      │     60     │     0.2213      │   0.3164    │ true │
│    7    │     0.8894      │     60     │     0.1925      │   0.2883    │ true │
│    8    │     1.2315      │     48     │     0.1788      │    0.24     │ true │
│    9    │     0.8906      │     60     │     0.1693      │   0.2617    │ true │
└─────────┴─────────────────┴────────────┴─────────────────┴─────────────┴──────┘
fork summary:
┌─────────────────┬──────────┐
│     (index)     │  Values  │
├─────────────────┼──────────┤
│ startup_time_ms │ '3.955'  │
│ thr_recd_msg_ms │ '1.108'  │
│   msg_resp_ms   │ '1.580'  │
│ rss_mem_kb_cold │   232    │
│   rss_mem_kb    │ '37.000' │
└─────────────────┴──────────┘
worker summary:
┌─────────────────┬──────────┐
│     (index)     │  Values  │
├─────────────────┼──────────┤
│ startup_time_ms │ '1.082'  │
│ thr_recd_msg_ms │ '0.185'  │
│   msg_resp_ms   │ '0.272'  │
│ rss_mem_kb_cold │   164    │
│   rss_mem_kb    │ '57.333' │
└─────────────────┴──────────┘

Summary from the test

We summarize the above statistics into the following table:

Item Worker fork
Startup time 1.082ms 3.955ms
Duration for receiving the first message 0.185ms 1.108ms
Duration for receiving the first respond 0.2729ms 1.580ms
Memory usage (cold start) 164kb 232kb
Memory usage (hot start) 57kb 37kb + 32MB (node.exe)

Conclusion from the statistics

Things to take note

What if there is a bug in the script

Let's create a new file and name it 8111-crashing-script.js. Then, copy the codes below into this file.

Basically, the script will crash because variable a has not been declared and instantiated.

// file: 8111-crashing-script.js
// let's crash the app with undefined variable
a.b = 'c';
console.log(a);

Let's create a new file and name it 8110-crash-main.js. Then, copy the codes below into this file.

// file: 8110-crash-main.js
const { fork } = require('child_process');
const { Worker } = require('worker_threads');

// run the crappy child script
let f = fork('./8111-crashing-script.js');

// run the crappy worker script
function run_worker() {
    let w = new Worker('./8111-crashing-script.js');
}

let MAX_RUN = 5;
let RUN_IDX = 0;
function show_something() {
    // print something so that we know the main thread is still alive.
    console.log(new Date());

    RUN_IDX++;
    if (RUN_IDX < MAX_RUN) {
        setTimeout(() => {
            show_something();
        }, 1000);
    }
    else if (RUN_IDX >= MAX_RUN) {
        // let's see what will happen
        run_worker();

        // at runtime, the main thread will not reach the following line.
        setTimeout(() => {
            console.log('exit from main thread');    
        }, 1000);        
    }
}

// to delay exit from the main thread, we need to print something every second.
show_something();

Upon running 8110-crash-main.js, the following output will appear.

2021-06-08T02:30:17.559Z
...
ReferenceError: a is not defined
...
2021-06-08T02:30:18.567Z
2021-06-08T02:30:19.573Z
2021-06-08T02:30:20.585Z
2021-06-08T02:30:21.594Z
...
ReferenceError [Error]: a is not defined

From the above output, if the fork was running a crappy script, the child process will crash but it will not affect the main thread. On the other hand, if the Worker is running a crappy script, it will bring down the main thread.

To avoid the main thread from crashing, the worker thread must handle the error event.

function run_worker() {
    let w = new Worker('./8111-crashing-script.js');

    w.on('error', function(err) {
        console.log('something bad happened in the worker thread..');
    });
}

How to terminate the thread/process

Let's create a new script file to print 5 times of current time before the end of the script.

// file: 8121-working-hard.js
let MAX_RUN = 5;
let RUN_IDX = 0;

function show_something() {
    // print something so that we know the main thread is still alive.
    console.log(new Date());

    RUN_IDX++;
    if (RUN_IDX < MAX_RUN) {
        setTimeout(() => {
            show_something();
        }, 1000);
    }   
}

// to delay exit from the main thread, we need to print something every second.
show_something();

Let's create the main script to run the test.

//file: 8120-working-hard-main.js
const { fork } = require('child_process');
const { Worker } = require('worker_threads');

let f = fork('./8121-working-hard.js');
// let w = new Worker('./8121-working-hard.js');

console.log('exit from main script');    

In the first run, we disabled the calls to the Worker line and it showed the following output. The main thread has exited while the child process is still running.

C:\Program Files\nodejs\node.exe .\8120-working-hard-main.js
Debugger attached.
exit from main thread
Debugger attached.
2021-06-08T02:52:43.637Z
2021-06-08T02:52:44.655Z
2021-06-08T02:52:45.657Z
2021-06-08T02:52:46.661Z
2021-06-08T02:52:47.668Z
Waiting for the debugger to disconnect...
Waiting for the debugger to disconnect...

In the next test, we disable the fork and enable the worker and see if there is any difference.

//let f = fork('./8121-working-hard.js');
let w = new Worker('./8121-working-hard.js');

The output will look like this:

C:\Program Files\nodejs\node.exe .\8120-working-hard-main.js
Debugger attached.
exit from main thread
2021-06-08T02:55:35.408Z
2021-06-08T02:55:36.422Z
2021-06-08T02:55:37.425Z
2021-06-08T02:55:38.429Z
2021-06-08T02:55:39.436Z
Waiting for the debugger to disconnect...

This means if the worker or child process is busy, the program will continue to run even though the main thread has reached the last line.

If you want the worker and child process to die together with the main thread, you have to call process.exit() as shown below.

console.log('exit from main script');

setTimeout(() => {
    process.exit();
}, 2000);

Please take note that process.exit() will stop the worker and child process without waiting for it to complete its task. This could create a mess if it was terminated during the data processing.

Related posts

Jump to #NODEJS blog

Jump to #MSSQL blog

Author

Lau Hon Wan - the developer and business owner.