Skip to content

Commit

Permalink
Merge pull request #111 from xmtp/rygine/fix-reaction-processing
Browse files Browse the repository at this point in the history
Fix reaction processing
  • Loading branch information
rygine authored Oct 10, 2023
2 parents 40fb962 + 457bbc6 commit 5f0884d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 27 deletions.
5 changes: 5 additions & 0 deletions .changeset/chatty-parrots-drive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@xmtp/react-sdk": patch
---

- Added a mutex to the reaction message processor so that messages are processed in order, which is important for determining the state of a message's reactions
60 changes: 33 additions & 27 deletions packages/react-sdk/src/helpers/caching/contentTypes/reaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from "@xmtp/content-type-reaction";
import { ContentTypeId } from "@xmtp/xmtp-js";
import type { Dexie, Table } from "dexie";
import { Mutex } from "async-mutex";
import { z } from "zod";
import type {
ContentTypeConfiguration,
Expand Down Expand Up @@ -162,6 +163,8 @@ const isValidReactionContent = (content: unknown) => {
return success;
};

const processReactionMutex = new Mutex();

/**
* Process a reaction message
*
Expand All @@ -172,34 +175,37 @@ export const processReaction: ContentTypeMessageProcessor = async ({
message,
db,
}) => {
const contentType = ContentTypeId.fromString(message.contentType);
if (
ContentTypeReaction.sameAs(contentType) &&
isValidReactionContent(message.content)
) {
const reaction = message.content as Reaction;
const cachedReaction = {
content: reaction.content,
referenceXmtpID: reaction.reference,
schema: reaction.schema,
senderAddress: message.senderAddress,
sentAt: message.sentAt,
xmtpID: message.xmtpID,
} satisfies CachedReaction;

switch (reaction.action) {
case "added":
await saveReaction(cachedReaction, db);
break;
case "removed":
await deleteReaction(cachedReaction, db);
break;
// no default
// ensure that only 1 reaction message is processed at a time to preserve order
await processReactionMutex.runExclusive(async () => {
const contentType = ContentTypeId.fromString(message.contentType);
if (
ContentTypeReaction.sameAs(contentType) &&
isValidReactionContent(message.content)
) {
const reaction = message.content as Reaction;
const cachedReaction = {
content: reaction.content,
referenceXmtpID: reaction.reference,
schema: reaction.schema,
senderAddress: message.senderAddress,
sentAt: message.sentAt,
xmtpID: message.xmtpID,
} satisfies CachedReaction;

switch (reaction.action) {
case "added":
await saveReaction(cachedReaction, db);
break;
case "removed":
await deleteReaction(cachedReaction, db);
break;
// no default
}

// update reactions metadata on the referenced message
await updateReactionsMetadata(reaction.reference, db);
}

// update reactions metadata on the referenced message
await updateReactionsMetadata(reaction.reference, db);
}
});
};

export const reactionContentTypeConfig: ContentTypeConfiguration = {
Expand Down

0 comments on commit 5f0884d

Please sign in to comment.