Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How can I update parent data in a stream and retrieve the updated data #1204

Open
bekkazy-k opened this issue Jul 1, 2024 · 3 comments
Open
Labels

Comments

@bekkazy-k
Copy link

bekkazy-k commented Jul 1, 2024

Here is a method used nodejs-driver:

public async getReviewsInStream(importId: string, handleData: Handler) {
  const rxSession = driver.rxSession({ defaultAccessMode: Neo4jSessionMode.WRITE, database });
  try {
    await new Promise<void>((resolve, reject) => {
      rxSession
        .run(
          `MATCH (i:Import)-[PART_OF]-(p:Product)-[REVIEWS]-(r:Review)-[WROTE]-(c:Customer)
                        WHERE elementId(i) = $importId
                        RETURN r, p, c`,
          { importId },
        )
        .records()
        .pipe(
          map((record) => {
            const review = record.get("r");
            const product = record.get("p");
            const customer = record.get("c");
            return { review, product, customer };
          }),
          concatMap((data) => handleData(importId, data.review, data.product, data.customer).then(data)),
          concatWith(rxSession.close()),
        )
        .subscribe({
          next: (data) => {
            // console.log(data)
          },
          complete: () => {
            resolve();
          },
          error: (err) => {
            rxSession.close();
            reject(err);
          },
        });
    });
  } catch (error) {
    console.log("stream eror:", error);
  }
}

In the handleData method, I update the statuses of Product and Customer, but in the subsequent records, they are received as not updated. Could you please advise on what I can do?
Currently, even if the parent has already been updated in subsequent review nodes, I still get the old data.

@bigmontz
Copy link
Contributor

bigmontz commented Jul 1, 2024

When you call session.run(), a transaction is created in the database. So, the data in the stream are from the point where the transaction was created, no changes in future transactions will affect this stream.

A workaround you can use is keeping track of what did you changed in a local state, this way you avoid to update twice the same Node.

@bekkazy-k
Copy link
Author

@bigmontz I would prefer not to store states in memory. I'm new to Neo4j; is it possible that APOC can help me with streaming reads and updates?

@bigmontz
Copy link
Contributor

bigmontz commented Jul 1, 2024

So,

if the changes are based in the content which are being processed, then you can use cypher+apoc for doing it.

If it depends on an external input, than no, unless that can be expressed with parameters to the query.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants