Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharing complex objects across worker threads? #323

Open
mayurk opened this issue Oct 7, 2021 · 11 comments
Open

Sharing complex objects across worker threads? #323

mayurk opened this issue Oct 7, 2021 · 11 comments

Comments

@mayurk
Copy link

mayurk commented Oct 7, 2021

Is there a way to share DB pool object across the worker threads. Any attempts to send such object to the worker thread from the main thread via the pool.exec(...) call gives a DataCloneError since that object has a function reference. Also even though the worker threads are part of the same process ( i might be wrong here ) every time we "require" a module in the worker it gets a fresh copy instead getting a same copy from cache. This means we cannot use this module where DB connectivity is required.

@shlomiah
Copy link

+1
I try to use variables that was required in the main thread and don't know how to inject them into the worker pool.

@josdejong
Copy link
Owner

@mayurk worker threads are really isolated from each other, just search some more explanation about it on the internet if you want to know more.

@shlomiah I'm not sure if your remark is about the same topic. Can you explain your use case?

@jfoclpf
Copy link

jfoclpf commented Sep 6, 2022

@josdejong there's no way to use some sort of SharedArrayBuffer?

I need to share (not clone, not transfer, just share) a big heavy JS object created at the parent thread, and share it amongst children threads.

@jfoclpf
Copy link

jfoclpf commented Sep 6, 2022

@josdejong
I know this does not work, but something like this:

const workerpool = require('workerpool');
const pool = workerpool.pool();

const sharedObj = {a: 1, b: 2}

function add(a, b) {
  return a + b + sharedObj.a + sharedObj.b;
}

for (let i=0; i < 10; i++) {
  pool.exec(add, [i, i+1])
    .then(function (result) {
      console.log('result', result); // outputs 7
    })
    .catch(function (err) {
      console.error(err);
    })
    .then(function () {
      pool.terminate(); // terminate all workers when done
    });
}

@josdejong
Copy link
Owner

Sounds like you would like to use a SharedWorker. The workerpool library doesn't support that though.

And on a side note: options to transfer an object without copying are discussed in #3.

@jfoclpf
Copy link

jfoclpf commented Sep 7, 2022

@josdejong do you recommend me any npm library for nodeJs?

@josdejong
Copy link
Owner

I don't know, it depends on your use case. I'm sure you can find something or can just use native API's.

@normancapule
Copy link

@josdejong there's no way to use some sort of SharedArrayBuffer?

I need to share (not clone, not transfer, just share) a big heavy JS object created at the parent thread, and share it amongst children threads.

In my case I had to pass an image buffer from main node process to the worker processes. I used base64 encoding to share the buffer object

@Mae6e
Copy link

Mae6e commented Jan 7, 2024

@josdejong there's no way to use some sort of SharedArrayBuffer?
I need to share (not clone, not transfer, just share) a big heavy JS object created at the parent thread, and share it amongst children threads.

In my case I had to pass an image buffer from main node process to the worker processes. I used base64 encoding to share the buffer object

hi. can you write a sample for that here?

@pcace
Copy link

pcace commented Jul 18, 2024

In my case I had to pass an image buffer from main node process to the worker processes. I used base64 encoding to share the buffer object

i`d be interested too! Any help on this would be great!

@Wakatem
Copy link

Wakatem commented Aug 13, 2024

This is an example of the approach @normancapule mentioned

Main Thread

const { Worker } = require('worker_threads');

class TestClass {
    constructor(data) {
        this.data = data;
    }

    increment() {
        this.data.value += 1;
    }

    // Method to serialize the object to Base64
    toBase64() {
        const jsonString = JSON.stringify(this);
        return Buffer.from(jsonString).toString('base64');
    }

    // Static method to deserialize a Base64 string back to an object
    static fromBase64(base64String) {
        const jsonString = Buffer.from(base64String, 'base64').toString('utf-8');
        const jsonObject = JSON.parse(jsonString);
        return new TestClass(jsonObject.data);
    }
}

// Define the outer object containing an inner object
var outerObject = new TestClass({ value: 0 });

// Create a new worker
const worker = new Worker('./worker.js');

// Listen for messages from the worker
worker.on('message', (message) => {
    // Deserialize the outer object from Base64
    const updatedOuterObject = TestClass.fromBase64(message);
    originalMapFacade = ModifiedMapFacade;
    console.log('Received message from worker:', updatedOuterObject);
    console.log('Updated inner object value:', updatedOuterObject.data.value); // Should be 1 after worker increments it
});

// Serialize the outer object to Base64 and send it to the worker
worker.postMessage(outerObject.toBase64());

Worker

const { parentPort } = require('worker_threads');

// Define the class in the worker
class TestClass {
    constructor(data) {
        this.data = data;
    }

    increment() {
        this.data.value += 1;
    }

    // Static method to deserialize a Base64 string back to an object
    static fromBase64(base64String) {
        const jsonString = Buffer.from(base64String, 'base64').toString('utf-8');
        const jsonObject = JSON.parse(jsonString);
        return new TestClass(jsonObject.data);
    }

    // Method to serialize the object to Base64
    toBase64() {
        const jsonString = JSON.stringify(this);
        return Buffer.from(jsonString).toString('base64');
    }
}

// Listen for messages from the main thread
parentPort.on('message', (message) => {
    // Deserialize the outer object from Base64
    const outerObject = TestClass.fromBase64(message);

    // Perform some work (e.g., incrementing the value inside the inner object)
    outerObject.increment();

    // Send the updated outer object back to the main thread
    parentPort.postMessage(outerObject.toBase64());
});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants