Skip to content

Commit

Permalink
Refactor read receipt processing
Browse files Browse the repository at this point in the history
  • Loading branch information
rygine committed Oct 20, 2023
1 parent ded27b1 commit 1a53e93
Showing 1 changed file with 65 additions and 27 deletions.
92 changes: 65 additions & 27 deletions packages/react-sdk/src/helpers/caching/contentTypes/readReceipt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,53 @@ import {
import { ContentTypeId } from "@xmtp/xmtp-js";
import { z } from "zod";
import { isAfter, parseISO } from "date-fns";
import { Mutex } from "async-mutex";
import type {
ContentTypeConfiguration,
ContentTypeMessageProcessor,
ContentTypeMetadataValues,
} from "../db";
import type { CachedConversation } from "../conversations";
import {
getCachedConversationByTopic,
type CachedConversation,
} from "../conversations";

const NAMESPACE = "readReceipt";

export type CachedReadReceiptMetadata = string | undefined;
export type CachedReadReceiptMetadata = {
incoming: string | undefined;
outgoing: string | undefined;
};

/**
* Retrieve the read receipt from a cached conversation
* Retrieve the read receipt from a cached conversation for the given type
*
* @param conversation Cached conversation
* @returns The read receipt date, or `undefined` if the conversation
* has no read receipt
* has no read receipt for the given type
*/
export const getReadReceipt = (conversation: CachedConversation) => {
const metadata = conversation?.metadata?.[
NAMESPACE
] as CachedReadReceiptMetadata;
return metadata ? parseISO(metadata) : undefined;
export const getReadReceipt = (
conversation: CachedConversation,
type: keyof CachedReadReceiptMetadata,
) => {
const metadata = conversation?.metadata?.[NAMESPACE] as
| CachedReadReceiptMetadata
| undefined;
const readReceiptType = metadata?.[type];
return readReceiptType ? parseISO(readReceiptType) : undefined;
};

/**
* Check if a cached conversation has a read receipt
* Check if a cached conversation has a read receipt for the given type
*
* @param conversation Cached conversation
* @returns `true` if the conversation has a read receipt, `false` otherwise
* @returns `true` if the conversation has a read receipt for the given type,
* `false` otherwise
*/
export const hasReadReceipt = (conversation: CachedConversation) =>
getReadReceipt(conversation) !== undefined;
export const hasReadReceipt = (
conversation: CachedConversation,
type: keyof CachedReadReceiptMetadata,
) => getReadReceipt(conversation, type) !== undefined;

const ReadReceiptContentSchema = z.object({}).strict();

Expand All @@ -51,6 +66,8 @@ const isValidReadReceiptContent = (content: unknown) => {
return success;
};

const processReadReceiptMutex = new Mutex();

/**
* Process a read receipt message
*
Expand All @@ -59,24 +76,45 @@ const isValidReadReceiptContent = (content: unknown) => {
*/
export const processReadReceipt: ContentTypeMessageProcessor = async ({
client,
db,
message,
conversation,
updateConversationMetadata,
}) => {
const contentType = ContentTypeId.fromString(message.contentType);
const readReceiptDate = getReadReceipt(conversation);
if (
ContentTypeReadReceipt.sameAs(contentType) &&
conversation &&
isValidReadReceiptContent(message.content) &&
// ignore read receipts sent by the client
message.senderAddress !== client.address &&
// ignore read receipts that are older than the current one
(!readReceiptDate || isAfter(message.sentAt, readReceiptDate))
) {
// update message's conversation with the message timestamp
await updateConversationMetadata(message.sentAt.toISOString());
}
// ensure that only 1 read receipt message is processed at a time to preserve order
await processReadReceiptMutex.runExclusive(async () => {
const contentType = ContentTypeId.fromString(message.contentType);
// always use the latest conversation from the cache
const updatedConversation = await getCachedConversationByTopic(
client.address,
conversation.topic,
db,
);
if (updatedConversation) {
const isIncoming = message.senderAddress !== client.address;
const readReceiptType = isIncoming ? "incoming" : "outgoing";
const readReceiptDate = getReadReceipt(
updatedConversation,
readReceiptType,
);
if (
ContentTypeReadReceipt.sameAs(contentType) &&
conversation &&
isValidReadReceiptContent(message.content) &&
// ignore read receipts that are older than the current one
(!readReceiptDate || isAfter(message.sentAt, readReceiptDate))
) {
const metadata = updatedConversation.metadata?.[NAMESPACE] as
| CachedReadReceiptMetadata
| undefined;
// update conversation metadata with the appropriate read receipt
await updateConversationMetadata({
...(metadata ?? {}),
[readReceiptType]: message.sentAt.toISOString(),
} as ContentTypeMetadataValues);
}
}
});
};

export const readReceiptContentTypeConfig: ContentTypeConfiguration = {
Expand Down

0 comments on commit 1a53e93

Please sign in to comment.