Tinypool
A minimal and tiny Node.js Worker Thread Pool implementation (38KB)
README
piscina - the node.js worker pool
✔ Fast communication between threads
✔ Covers both fixed-task and variable-task scenarios
✔ Supports flexible pool sizes
✔ Proper async tracking integration
✔ Tracking statistics for run and wait times
✔ Cancellation Support
✔ Supports enforcing memory resource limits
✔ Supports CommonJS, ESM, and TypeScript
✔ Custom task queues
✔ Optional CPU scheduling priorities on Linux
Written in TypeScript.
For Node.js 12.x and higher.
Piscina API
Example
In main.js:
- ```js
- const Piscina = require('piscina');
- const piscina = new Piscina({
- filename: path.resolve(__dirname, 'worker.js')
- });
- (async function() {
- const result = await piscina.runTask({ a: 4, b: 6 });
- console.log(result); // Prints 10
- })();
- ```
In worker.js:
- ```js
- module.exports = ({ a, b }) => {
- return a + b;
- };
- ```
The worker may also be an async function or may return a Promise:
- ```js
- const { promisify } = require('util');
- const sleep = promisify(setTimeout);
- module.exports = async ({ a, b } => {
- // Fake some async activity
- await sleep(100);
- return a + b;
- })
- ```
ESM is also supported for both Piscina and workers:
- ```js
- import { Piscina } from 'piscina';
- const piscina = new Piscina({
- // The URL must be a file:// URL
- filename: new URL('./worker.mjs', import.meta.url).href
- });
- (async function () {
- const result = await piscina.runTask({ a: 4, b: 6 });
- console.log(result); // Prints 10
- })();
- ```
In worker.mjs:
- ```js
- export default ({ a, b }) => {
- return a + b;
- };
- ```
Cancelable Tasks
Submitted tasks may be canceled using either an AbortController or
an EventEmitter:
- ```js
- 'use strict';
- const Piscina = require('piscina');
- const { AbortController } = require('abort-controller');
- const { resolve } = require('path');
- const piscina = new Piscina({
- filename: resolve(__dirname, 'worker.js')
- });
- (async function() {
- const abortController = new AbortController();
- try {
- const task = piscina.runTask({ a: 4, b: 6 }, abortController.signal);
- abortController.abort();
- await task;
- } catch (err) {
- console.log('The task was canceled');
- }
- })();
- ```
To use AbortController, you will need to npm i abort-controller
(or yarn add abort-controller).
Alternatively, any EventEmitter that emits an 'abort' event
may be used as an abort controller:
- ```js
- 'use strict';
- const Piscina = require('piscina');
- const EventEmitter = require('events');
- const { resolve } = require('path');
- const piscina = new Piscina({
- filename: resolve(__dirname, 'worker.js')
- });
- (async function() {
- const ee = new EventEmitter();
- try {
- const task = piscina.runTask({ a: 4, b: 6 }, ee);
- ee.emit('abort');
- await task;
- } catch (err) {
- console.log('The task was canceled');
- }
- })();
- ```
Delaying Availability of Workers
A worker thread will not be made available to process tasks until Piscina
determines that it is "ready". By default, a worker is ready as soon as
Piscina loads it and acquires a reference to the exported handler function.
There may be times when the availability of a worker may need to be delayed
longer while the worker initializes any resources it may need to operate.
To support this case, the worker module may export a Promise that resolves
the handler function as opposed to exporting the function directly:
- ```js
- async function initialize() {
- await someAsyncInitializationActivity();
- return ({ a, b }) => a + b;
- }
- module.exports = initialize();
- ```
Piscina will await the resolution of the exported Promise before marking
the worker thread available.
Backpressure
When the maxQueue option is set, once the Piscina queue is full, no
additional tasks may be submitted until the queue size falls below the
limit. The 'drain' event may be used to receive notification when the
queue is empty and all tasks have been submitted to workers for processing.
Example: Using a Node.js stream to feed a Piscina worker pool:
- ```js
- 'use strict';
- const { resolve } = require('path');
- const Pool = require('../..');
- const pool = new Pool({
- filename: resolve(__dirname, 'worker.js'),
- maxQueue: 'auto'
- });
- const stream = getStreamSomehow();
- stream.setEncoding('utf8');
- pool.on('drain', () => {
- if (stream.isPaused()) {
- console.log('resuming...', counter, pool.queueSize);
- stream.resume();
- }
- });
- stream
- .on('data', (data) => {
- pool.runTask(data);
- if (pool.queueSize === pool.options.maxQueue) {
- console.log('pausing...', counter, pool.queueSize);
- stream.pause();
- }
- })
- .on('error', console.error)
- .on('end', () => {
- console.log('done');
- });
- ```
Additional Examples
Additional examples can be found in the GitHub repo at
https://github.com/jasnell/piscina/tree/master/examples
Class: Piscina
Piscina works by creating a pool of Node.js Worker Threads to which
one or more tasks may be dispatched. Each worker thread executes a
single exported function defined in a separate file. Whenever a
task is dispatched to a worker, the worker invokes the exported
function and reports the return value back to Piscina when the
function completes.
This class extends [EventEmitter][] from Node.js.