Skip to content

Framework to manage the worker/manager relationship to maximise threads and performance.

License

Notifications You must be signed in to change notification settings

danm/thread-factory

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

WORK IN PROGRESS

thread-factory

Framework to manage the worker/manager relationship to maximise threads and performance.

Install

npm i -S thread-factory

What is it?

Thread factory is a module which utilises all cores on your machines rather than just the 1 which by default Node uses. It uses a manager worker relationship where a worker advertises itself as available to work and the manager holds a list of jobs which the workers need to carry out.

Wait, Node only uses 1 of my Cores? But I have 8

Modern processors have swapped clock speed for cores. In simple terms this is swapping serialisation for parallelisation meaning we can do more things slower then one thing really fast. This is over simplified description but the important thing to remember is that your Node programme will only make use of one core. If you have 8 cores you are are only making use 12.5% of your CPU.

Why not just use pm2?

PM2 has the ability to automatically run your Node programme on each Core. Each of these processes has no idea what the other process is working on. They can't communicate with each other. This might be what you want with a web server but if you are working with a dataset you need this workload to be centrally managed.

Why not use pm2 with RabbitMQ or SQS?

These are great production solutions for continuous and evergreen workloads. But there is a high level of entry and setup for this. Thread factory is ideal for work loads where you do not want to spin up multiple machines, deal with external queues or worry about managing your own processes and threads.

OK, so how does it work?

When you start your Node application, the manager will start working and look at the jobs it has been asked to hand out. This list is an array of instructions which the manager managers the state of. The manager then creates child processes to the number of cores which the machine includes. Each worker will make itself know to the manager that is it ready to work. The manager distributes jobs to each worker and the workers communicate with the manager to say when it has completed or whether it encountered a problem.

Workers cannot communicate with other workers, just its manager and can either write data to a file for pass data back to the manager.

Why would I use something like this?

I created this because I needed to look through 1000's of large files on AWS S3. The solution at the time was to use Hadoop or Athena, both of which have a level of setup that is over kill some some projects. These tools feel like they were created for data scientist or analysts who use CSV and python. JSON and JS are second class citizens in this world and this module is to help with that.

Example Scenario

I have a month of log data stored in S3. The data is formatted as ld-json (JSON which is on a single line and delimetered by \n). Each file contains a million rows totalling 500mb per file which is GZIP'd down to 30mb. I want to find all errors which happened over a time period with error code 505.

Include the modules we are going to use in our example

const ThreadFactory = require('thread-factory');
const AWS = require('aws-sdk');
const zlib = require('zlib');
const fs = require('fs');

The manger asks AWS S3 for all keys that match the criteria of the prefix - in this case all keys for November. It maps the returned object to only data we care about and passes it to the promise resolver.

const manager = () => (
  new Promise((resolve, reject) => {
    const params = {
      Bucket: 'name-of-log-bucket',
      Prefix: '2017-11-',
    };
    s3.listObjectsV2(params, (err, res) => {
      if (err) {
        reject(err);
        return;
      }
      const keys = res.Contents.map(row => row.Key);
      resolve(keys);
    });
  })
);

Once the managers promise resolves, the workers startup and notify the manager they are ready for work. The manager passes an element from the keys array to each of the workers.

const worker = (key, resume) => (
  new Promise((resolve, reject) => {
    const params = {
      Bucket: ‘name-of-log-bucket’, 
      Key: key,
    };
    
    const zipper = zlib.Gunzip();
    const rl = readline.createInterface({
      input: s3.getObject(params).createReadStream().pipe(zipper),
    });

    rl.on('line', (data) => {
      const json = JSON.parse(data);
      if (json.error === 505) {
        fs.appendFileSync(`output-${resume.id}.ldjson`);
      }
    });
    rl.on('close', () => resolve());
    rl.on('error', e => reject(e));
  })
);

Each of these workers runs on its own core, writes its output to a new file and when it finishes, notifies the worker it is ready for the next key in the array.

// init Thread Factory with the manager and worker functions
const factory = new ThreadFactory({
  manager,
  worker,
});

factory.on('finished', () => {
  console.log('finished');
});

factory.on('error', (e) => {
  console.log(e);
})

factory.start();

API

ThreadFactory takes 1 argument in form of an object of functions

const ThreadFactory = require('thread-factory');
const spec = {
  manager, //<Function> The function is run when ThreadFactory starts up, returns a Promise with an array. Required.
  managerFinish, //<Function> The function which runs once all items in the manager array have completed. Returns a Promise. Optional.
  workerStartup, //<Function> If your worker needs to do any one of processes such as start a socket or connect to a db. Returns a Promise. Optional.
  workerProcess, //<Function> The function that runs every time the manager passes an instruction. Returns a Promise. Required.
  workerFinish, //<Function> When the manager has no more work for the worker, this cleanup function is fired. Optional.
};

const factory = new ThreadFactory(spec);

// fired each time a worker finishes a job
factory.on('completed', (d) => {
  console.log(d);
});

// fired when all of the managers tasks have been completed
factory.on('finished', (d) => {
  console.log('finished');
});

// fired on error passing the Error Object
factory.on('error', (e) => {
  console.log(e);
})

// starts the manager
factory.start();

About

Framework to manage the worker/manager relationship to maximise threads and performance.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published