Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send progress/status information from worker to main process #51

Closed
dimitriylol opened this issue Feb 22, 2019 · 30 comments · Fixed by #227
Closed

Send progress/status information from worker to main process #51

dimitriylol opened this issue Feb 22, 2019 · 30 comments · Fixed by #227

Comments

@dimitriylol
Copy link

As I understand, there isn't a way for listening/sending messages between worker and executor. I refer to message listeners and sender methods worker.send in worker.js and WorkerHandler.js files. May you give a suggestion how I can establish message communication? Maybe you've already had some thought about it.

P.S. Great package, thank you for the work.

@josdejong
Copy link
Owner

Thanks. I indeed haven't thought about opening up two way communication between the main process and the workers. workerpool creates a "remote procedure call" on top of the bare-bone message passing mechanism that is the basis of the underlying workers.

We could discuss how allowing 2way message passing could look like, though I'm not sure if this fits in the scope of workerpool (sometimes it's best to have a library do just one thing). Do you have any ideas?

@dimitriylol
Copy link
Author

I give some thoughts about my specific task and realized that I want to implement server sent events with message passing mechanism of the underlying workers instead of HTTP. So master process is client and worker is server which may produce messages to a master.

It can be done in two way:

  1. Quick and easy. Create new API function (let's call it execWithMsg which calls internally exec) and pass a callback for listening messages from worker. This callback is passed down to a callback in WorkerHandler.js file.
this.worker.on('message', function (response) {
// call it here via task.msgCallback
}

And pass carried function for sending message from worker code:

method.apply(method, [ ...request.params, (msg) => worker.send({ id: request.id, msg }) ]);

I've done rough implementation, so it's easy. This approach violates logic of "one function do just one thing" which is pretty bad. But if it's okay to you, I can create pull request in couple days.

  1. Create an API function which implements server sent events pattern in addition to RPC. There can be a function establishConnection:
function establishConnection (method, callbackForWorkerEvents) {
// in the same manner as `exec` creates a worker
// but returns undefined. All logic for processing messages from worker must be implemented in `callbackForWorkerEvents`
}

By the way, if someone would like to have 2way connection, then establishConnection must return send function instead of undefined. The function would send messages from master to client . Channel for listening messages from master can be done by writing following code in dedicated worker script:

process.on('message', (msg) => {})

It should be abstracted in some way, but I didn't come up with any thoughts except passing stringified message listener to a worker script. IMO, it's not acceptable.

Please respond which way you like more (or none of it 😃 ).

@josdejong
Copy link
Owner

So if I understand you correctly you're looking to send and event from the worker to the main application? We could think about extending the workerpool with an on('event', callback) API.

What may be confusing is that when a pool creates multiple workers, all these workers may start sending events. I guess that's not what you want. I'm not sure how this could work out nicely. Do you have a real-world use case of what you try to achieve?

@dimitriylol
Copy link
Author

Sorry, my bad, for writing in abstract manner.

Do you have a real-world use case of what you try to achieve?

I have long running task and I want to get some status feedback from them. I've made a fork of your repo and pushed 2 commit to master:

  • f11f6cb - for adding execWithMsg function
  • 40c828a - for an example of usage

It's not ready to go solution, feel free to criticize it.

@josdejong
Copy link
Owner

Ah, now I get it, that's really a great idea! Makes a lot of sense to be able to post status or progress information back to the main application.

Would be great to integrate this feature into workerpool. I would love to think a bit more about the API and the most clear naming for this feature. In first instance the name execWithMsg did confuse me. On the other hand, calling it something like onProgress may be clear but maybe too limiting in what you can do with such a feature. Maybe we can call it onUpdate and pass it as an optional extra argument to exec or something. Just thinking aloud here, but I really like your idea :)

@josdejong josdejong changed the title expose messaging between worker and executor Send progress/status information from worker to main process Mar 2, 2019
@x-077
Copy link

x-077 commented Mar 31, 2019

Hello @josdejong , I can across your github, create tool :) .

any progress by any chance. I have similar situation and it could be really useful to integrate with redis (pub/sub) as we could bind the worker directly on the appropriate chan based on the name event.

Last comment, @dimitriylol it could be good to use async generator for each task, it could prevent to wait that everything is processed.

As a example to better understand :

..... some code


//this is the async generator that will take care of each promise. this could be a custom function given as argument too

async function* gen(myPromise) {
    const {data} = ...
    // consume the promise here ... 
    return anOtherPromise
}

// instead of using Promise.all([...]) you could do that 

for await(const myPromise in [arrayOfPromises] ) {
   const data = myPromise.then(...) 
   //do something else on the data that were consumed by the async generator 
   // return a Promise. 
}

The good use case that I see is when you dont want to wait to get all the "promises" to be resolved. Let say I have an array of promises to fetch 100 API endpoint, You would not have to wait to get all of them to start whatever I have to do on that data.

sorry for the long post,

Thanks by advance.

@josdejong
Copy link
Owner

Thanks for your inputs @matth-c3 👍

@x-077
Copy link

x-077 commented Apr 7, 2019

Hello @josdejong ,

did you have a chance to think about it ? It's a really cool feature ;)

@josdejong
Copy link
Owner

@matth-c3 what do you mean exactly? About the progress information in general: that sounds like a good idea to me. I do not have any time soon to implement it myself though, so help would be very welcome. About your redis use case: I don't have enough info to understand your case exactly but if the progress plans fit your use case it's only more reason to implement it.

@x-077
Copy link

x-077 commented Apr 9, 2019

Hello . @josdejong ,

Yes I'm talking about the possibility to name the workers and talk to specifics one.

Regarding Redis, I will try to come up with an example.

thanks.

@josdejong
Copy link
Owner

Regarding Redis, I will try to come up with an example.

Thanks, I would like to understand your Redis use case better, that can help shaping the right solution.

@x-077
Copy link

x-077 commented Apr 9, 2019

Hello @josdejong ,

As Redis work using pub/sub mechanism, the idea would be to have a redis client listening on the messageChannel of the Worker and each time that the Master receive a request to fetch data (HTTP for example) it "forward" it to the worker using the messageChannel.

When the data comes back, it can be send back to the requester (with websocket for example). It could be useful for very large set of data.
This could also work with Redis streams.

thanks.

@josdejong
Copy link
Owner

@matth-c3 I'm not sure I get it. You would like to use Redis to communicate between main process and workers? I'm not sure why you would like to do that instead of the more direct, built-in message API of workers that we use right now. Or would you like to have remote workers "somewhere" in the cloud?

@fas3r
Copy link

fas3r commented Jun 15, 2019

Hello @josdejong ,

do you have an update regarding @dimitriylol suggestion ?

@josdejong
Copy link
Owner

Had no feedback since 9 April

@fas3r
Copy link

fas3r commented Jun 17, 2019

Hello @josdejong ,

I guess he was waiting on you to take a decision on this one, no :D ?

@josdejong
Copy link
Owner

Ah, could be. I think we should look into introducing an onUpdate callback, or an even more generic event listener on(message, data), which can be used for progress and other type of messages.

When someone is working this feature out in a concrete PR, we can discuss the details I think.

@boneskull
Copy link
Contributor

+1 for this.

If you agree it's beyond the scope of workerpool, it might make sense as a separate package which consumes or extends workerpool.

As I mentioned in another issue, Mocha intends to consume workerpool to control parallel testing (mochajs/mocha#4245), using forked processes (not worker_threads). It works fine for this purpose, insofar as the initial implementation uses a "buffering" scheme; each test file runs in a worker, and when the test file is completed, the worker returns the result to the main process as an array of event data. In this way, an RPC-like interface makes sense.

But we likely will want to consume the data in other ways; it may be desirable in some cases to "stream" the result of each test back to the main process, instead of buffering. I could likely make this work with workerpool as-is, but it'd require either a) digging around in private methods (e.g., Pool._getWorker()) to get a reference to the actual ChildProcess objects (ostensibly to call ChildProcess.on('message')), or b) implementing my own Pool and just using the worker abstraction.

Given I don't want to build anything on a private API, but I do want to use workerpool otherwise, I'm interested in a solution here--even if that just means making Pool.getWorker() a public, semver-respecting API.

@boneskull
Copy link
Contributor

Anyhow, I would be interested in sending a PR to at least make this more easily implemented by a third-party. That would probably make Pool.getWorker() public, and likely create a helper in Worker, e.g., onMessage(), which would handle adding an event listener to the underlying ChildProcess. I'm not sure I would know where to begin with doing this for worker threads or the browser though.

boneskull added a commit to mochajs/mocha that referenced this issue May 21, 2020
- moved all non-CLI Node.js-specific sources into `lib/nodejs/`
- renamed `WorkerPool` to `BufferedWorkerPool` and udpated filename accordingly.  This is in anticipation of eventually adding a "streaming" worker pool that would communicate via IPC (e.g., `process.send()` and `process.on("message")`); see josdejong/workerpool#51

Signed-off-by: Christopher Hiller <[email protected]>
boneskull added a commit to mochajs/mocha that referenced this issue May 21, 2020
- moved all non-CLI Node.js-specific sources into `lib/nodejs/`
- renamed `WorkerPool` to `BufferedWorkerPool` and udpated filename accordingly.  This is in anticipation of eventually adding a "streaming" worker pool that would communicate via IPC (e.g., `process.send()` and `process.on("message")`); see josdejong/workerpool#51

Signed-off-by: Christopher Hiller <[email protected]>
@josdejong
Copy link
Owner

If you agree it's beyond the scope of workerpool, it might make sense as a separate package which consumes or extends workerpool.

This feature is not beyond scope.

Help implementing it would be very welcome. Like I said in #51 (comment), before implementing it we should think it though in detail.

boneskull added a commit to mochajs/mocha that referenced this issue May 27, 2020
- moved all non-CLI Node.js-specific sources into `lib/nodejs/`
- renamed `WorkerPool` to `BufferedWorkerPool` and udpated filename accordingly.  This is in anticipation of eventually adding a "streaming" worker pool that would communicate via IPC (e.g., `process.send()` and `process.on("message")`); see josdejong/workerpool#51

Signed-off-by: Christopher Hiller <[email protected]>
@boneskull
Copy link
Contributor

Yes, as you mentioned there, I think a draft PR is a fair place to start.

Having thought about it a bit... we have a pool, and that pool abstracts workers from the consumer. It seems a bit awkward to expose an API that would require the consumer to interface with a worker directly, or specific workers. An EventEmitter-like API would make sense to me:

// main.js

// pool is an EventEmitter
let pool = workerpool.pool({enableMessages: true});

// pool could emit 'onStart' or any "system"-level events using a convention, e.g., `$start`
// or `pool.messages` could be an EventEmitter instead
pool.on('<user-defined event name>', data => {
  // handle data
});

await pool.exec('myFunc');
// worker.js

// worker is an EventEmitter
let worker = workerpool.worker({
  myFunc: async () => {

    // emit() could attach some metadata about the worker to its payload; perhaps unique ID / pid
    worker.emit('<user-defined event name>'), data);
  }
});

@josdejong
Copy link
Owner

I think indeed that it should be an event emitter kind of solution.

I think that we somehow need to be able to bind progress events to a specific task that is running. Would it be possible to create an API that works like this:

// main.js

const result = await pool.exec('fibonacci', [10], {
  // pass an event listener along when starting a task
  on: function (event, data) {
    // event callback can be used for anything. 
    // For example event can be 'progress', and data can be a percentage like 0.25
    // maybe we should also pass the original method and arguments so you have all
    // context available
    console.log(event, data)
  }
})

@dko-slapdash
Copy link

dko-slapdash commented Oct 9, 2020

That's so weird that a project with 1M+ weekly downloads on NPM doesn't have such an essential function. This is how I monkey-patched the library as a work-around:

import workerpool, { WorkerPoolOptions } from "workerpool";
import { parentPort } from "worker_threads";

const TAG = "workerpool-x";

export type WorkerPool = workerpool.WorkerPool & {
  onMessageFromWorker: (cb: (message: any) => void) => void;
};

/**
 * The workerpool NPM module has 1M+ weekly downloads and still doesn't have any
 * mechanism for worker processes to report their status to the parent. There is
 * an open issue with code examples about this, but it doesn't move. There is
 * also no way to send a MessagePort object into the worker along with other
 * arguments.
 *
 * To compensate this issue, here is a helper module which monkey-patches
 * workerpool and adds an ability to send messages from workers to the parent.
 */
export function pool(options: WorkerPoolOptions): WorkerPool {
  const pool = workerpool.pool(require.main!.filename, {
    ...options,
    workerType: "thread",
  }) as WorkerPool & { _createWorkerHandler: (...args: any[]) => any };

  const messageCallbacks: Array<(message: any) => void> = [];
  pool.onMessageFromWorker = (cb) => {
    messageCallbacks.push(cb);
  };

  const oldCreateWorkerHandler = pool._createWorkerHandler.bind(pool);
  pool._createWorkerHandler = (...args: any) => {
    const handler = oldCreateWorkerHandler(...args);
    handler.worker.on("message", (message: any) => {
      if (message && typeof message === "object" && message._tag === TAG) {
        messageCallbacks.forEach((cb) => cb(message.message));
      }
    });
    return handler;
  };

  return pool;
}

export function worker(methods: { [k: string]: (...args: any[]) => void }) {
  return workerpool.worker(methods);
}

export function postMessageToParent(message: any) {
  parentPort!.postMessage({ _tag: TAG, message });
}

export const isMainThread = workerpool.isMainThread;
export const cpus = workerpool.cpus;

@josdejong
Copy link
Owner

Thanks for sharing your solution Dimitri!

That's so weird that a project with 1M+ weekly downloads on NPM doesn't have such an essential function.

I hope you appreciate all the spare time me and others put in this library you're using for free 😄 . Any help improving workerpool further is very welcome.

@Akryum
Copy link
Contributor

Akryum commented Jan 18, 2021

@josdejong Made a PR for one-way (worker to pool) communication feature.
You can try it by installing yarn add @akryum/workerpool.

josdejong pushed a commit that referenced this issue Jan 31, 2021
* feat: workerEmit

* refactor: remove eventType

* test: fix event emit test

* chore: revert generated file

* text: fix worker for emit test

* docs: workerEmit/exec on option

* docs: fix wrong function name

* docs: fix link

* docs: words are hard

* docs: missing semi-colons
@josdejong
Copy link
Owner

This feature is available now in v6.1.0. Thanks again @Akryum 👍

@gokaybiz
Copy link

is it possible pool -> worker message too?

@josdejong
Copy link
Owner

@gokaybiz I think you mean something like discussed in #185?

@gokaybiz
Copy link

More like this pr
#372

@josdejong
Copy link
Owner

Ahh, ok, yeah that PR is a nice improvement but it did stall. Help picking that up again and finishing the PR would be welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants