Skip to content

Commit

Permalink
DU Inboxing:)
Browse files Browse the repository at this point in the history
  • Loading branch information
mehotkhan committed Sep 2, 2024
1 parent a50dd5f commit 058b964
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 254 deletions.
3 changes: 2 additions & 1 deletion src/bot/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Context } from "grammy";
import { Conversation, User } from "../types";
import { createReplyKeyboard } from "../utils/constant";
import { KVModel } from "../utils/kv-storage";
import { incrementStat } from "../utils/logs";
import {
HuhMessage,
NoConversationFoundMessage,
Expand Down Expand Up @@ -73,7 +74,7 @@ export const handleReplyAction = async (
conversation
);

await incrementStat(statsModel, "newReply"); // Increment the reply stat
incrementStat(statsModel, "newConversation"); // Increment the reply stat
await ctx.reply(REPLAY_TO_MESSAGE);
} catch (error) {
await ctx.reply(HuhMessage);
Expand Down
23 changes: 12 additions & 11 deletions src/bot/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,37 +53,38 @@ export const createBot = (env: Environment) => {
);

/**
* Handles incoming messages.
* Handles the /inbox command.
*
* This handler processes any text or media message sent by users, manages
* ongoing conversations, and routes the message appropriately depending on
* the current context.
*/
bot.on("message", (ctx) =>
handleMessage(
bot.command("inbox", (ctx) =>
handleInboxCommand(
ctx,
userModel,
conversationModel,
INBOX_DO,
statsModel,
APP_SECURE_KEY
)
);

/**
* Handles the /inbox command.
* Handles incoming messages.
*
* This handler processes any text or media message sent by users, manages
* ongoing conversations, and routes the message appropriately depending on
* the current context.
*/
bot.command("inbox", (ctx) =>
handleInboxCommand(
bot.on("message", (ctx) =>
handleMessage(
ctx,
userModel,
conversationModel,
INBOX_DO,
statsModel,
APP_SECURE_KEY
)
);

/**
* Handles reply actions from inline keyboard buttons.
*
Expand Down
196 changes: 97 additions & 99 deletions src/bot/commands.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { DurableObjectNamespace } from "@cloudflare/workers-types"; // Ensure your environment is set up correctly
import { Context } from "grammy";
import { WebUUID } from "web-uuid";
import { Conversation, User } from "../types";
import { Conversation, InboxMessage, User } from "../types";
import {
createReplyKeyboard,
handleMenuCommand,
Expand All @@ -18,6 +18,7 @@ import {
StartConversationMessage,
USER_IS_BLOCKED_MESSAGE,
WelcomeMessage,
YOUR_MESSAGE_SEEN_MESSAGE,
} from "../utils/messages";
import { sendDecryptedMessage } from "../utils/messageSender";
import {
Expand Down Expand Up @@ -107,6 +108,93 @@ export const handleStartCommand = async (
}
};

/**
* Handles the /inbox command, retrieving and displaying messages from the user's inbox.
*
* @param {Context} ctx - The context of the current Telegram update.
* @param {KVModel<User>} userModel - KVModel instance for managing user data.
* @param {KVModel<string>} conversationModel - KVModel instance for managing conversation data.
* @param {DurableObjectNamespace} inboxNamespace - Durable Object Namespace for inbox handling.
* @param {string} APP_SECURE_KEY - The application-specific secure key.
*/

export const handleInboxCommand = async (
ctx: Context,
userModel: KVModel<User>,
conversationModel: KVModel<string>,
inboxNamespace: DurableObjectNamespace,
APP_SECURE_KEY: string
): Promise<void> => {
const currentUserId = ctx.from?.id!;

try {
// Fetch the Durable Object associated with this user's inbox
const inboxId = inboxNamespace.idFromName(currentUserId.toString());
const inboxStub = inboxNamespace.get(inboxId);

const response = await inboxStub.fetch("https://inbox/retrieve");

const inbox: InboxMessage[] = await response.json();
if (inbox.length > 0) {
for (const { ticketId, timestamp } of inbox) {
try {
const conversationId = getConversationId(ticketId, APP_SECURE_KEY);
const conversationData = await conversationModel.get(conversationId);
const decryptedMessage: Conversation = JSON.parse(
await decryptPayload(ticketId, conversationData!, APP_SECURE_KEY)
);

const otherUser = await userModel.get(
decryptedMessage.connection.from.toString()
);
const isBlocked = !!otherUser?.blockList.some(
(item: number) => item === currentUserId
);

const replyOptions: any = {
reply_markup: createReplyKeyboard(ticketId, isBlocked),
};

if (decryptedMessage.connection.reply_to_message_id) {
replyOptions.reply_to_message_id =
decryptedMessage.connection.reply_to_message_id;
}

await sendDecryptedMessage(ctx, decryptedMessage, replyOptions);
await ctx.api.sendMessage(
decryptedMessage.connection.from,
YOUR_MESSAGE_SEEN_MESSAGE
);

// Clear payload data in the conversation to minimize storage
const clearedConversation = {
connection: decryptedMessage.connection,
payload: {}, // Clear the payload after sending the message
};
const clearedConversationData = await encryptedPayload(
ticketId,
JSON.stringify(clearedConversation),
APP_SECURE_KEY
);
await conversationModel.save(conversationId, clearedConversationData);
} catch (error) {
await ctx.reply(HuhMessage + JSON.stringify(error), {
reply_markup: mainMenu,
});
}
}
} else {
await ctx.reply(EMPTY_INBOX_MESSAGE, {
reply_markup: mainMenu,
});
}
} catch (error) {
await ctx.reply(HuhMessage + JSON.stringify(error), {
reply_markup: mainMenu,
});
}
};

/**
* Handles all incoming messages that are not menu commands, routing them based on the user's current conversation state.
*
Expand Down Expand Up @@ -190,6 +278,8 @@ export const handleMessage = async (
} else if (ctx.message?.audio) {
conversation.payload.message_type = "audio";
conversation.payload.audio_id = ctx.message.audio.file_id;
if (ctx.message.caption)
conversation.payload.caption = ctx.message.caption;
}

const conversationId = getConversationId(ticketId, APP_SECURE_KEY);
Expand All @@ -203,19 +293,15 @@ export const handleMessage = async (
await conversationModel.save(conversationId, conversationData);

// Interact with Durable Object for inbox management
const inboxId = inboxNamespace.idFromString(
const inboxId = inboxNamespace.idFromName(
currentUser.currentConversation.to.toString()
);
console.log("start", inboxId);
const inboxObject = inboxNamespace.get(inboxId);
console.log("inboxObject", inboxObject);
const inboxStub = inboxNamespace.get(inboxId);

await inboxObject.fetch(
new Request("https://inbox/add", {
method: "POST",
body: JSON.stringify({ timestamp: Date.now(), ticketId }),
})
);
await inboxStub.fetch("https://inbox/add", {
method: "POST",
body: JSON.stringify({ timestamp: Date.now(), ticketId }),
});
// Clear current conversation for the user
await userModel.updateField(
currentUserId.toString(),
Expand All @@ -237,91 +323,3 @@ export const handleMessage = async (
});
}
};

/**
* Handles the /inbox command, retrieving and displaying messages from the user's inbox.
*
* @param {Context} ctx - The context of the current Telegram update.
* @param {KVModel<User>} userModel - KVModel instance for managing user data.
* @param {KVModel<string>} conversationModel - KVModel instance for managing conversation data.
* @param {DurableObjectNamespace} inboxNamespace - Durable Object Namespace for inbox handling.
* @param {string} APP_SECURE_KEY - The application-specific secure key.
*/

export const handleInboxCommand = async (
ctx: Context,
userModel: KVModel<User>,
conversationModel: KVModel<string>,
inboxNamespace: DurableObjectNamespace,
APP_SECURE_KEY: string
): Promise<void> => {
const currentUserId = ctx.from?.id!;

try {
// Fetch the Durable Object associated with this user's inbox
const inboxId = inboxNamespace.idFromString(currentUserId.toString());
const inboxObject = inboxNamespace.get(inboxId);

const response = await inboxObject.fetch(new Request("https://inbox/get"));
const inbox = await response.json();

if (inbox.items.length > 0) {
for (const { ticketId, timestamp } of inbox.items) {
try {
const conversationId = getConversationId(ticketId, APP_SECURE_KEY);
const conversationData = await conversationModel.get(conversationId);
const decryptedMessage: Conversation = JSON.parse(
await decryptPayload(ticketId, conversationData!, APP_SECURE_KEY)
);

const otherUser = await userModel.get(
decryptedMessage.connection.from.toString()
);
const isBlocked = !!otherUser?.blockList.some(
(item: number) => item === currentUserId
);

const replyOptions: any = {
reply_markup: createReplyKeyboard(ticketId, isBlocked),
};

if (decryptedMessage.connection.reply_to_message_id) {
replyOptions.reply_to_message_id =
decryptedMessage.connection.reply_to_message_id;
}

await sendDecryptedMessage(ctx, decryptedMessage, replyOptions);

// Clear payload data in the conversation to minimize storage
const clearedConversation = {
connection: decryptedMessage.connection,
payload: {}, // Clear the payload after sending the message
};
const clearedConversationData = await encryptedPayload(
ticketId,
JSON.stringify(clearedConversation),
APP_SECURE_KEY
);
await conversationModel.save(conversationId, clearedConversationData);
} catch (error) {
await ctx.reply(HuhMessage + JSON.stringify(error), {
reply_markup: mainMenu,
});
}
}

// Clear the inbox in the Durable Object after processing
await inboxObject.fetch(
new Request("https://inbox.clear", { method: "POST" })
);
} else {
await ctx.reply(EMPTY_INBOX_MESSAGE, {
reply_markup: mainMenu,
});
}
} catch (error) {
await ctx.reply(HuhMessage + JSON.stringify(error), {
reply_markup: mainMenu,
});
}
};
34 changes: 34 additions & 0 deletions src/bot/inboxDU.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { DurableObject } from "cloudflare:workers";
import { InboxMessage } from "../types";

export class InboxDurableObject implements DurableObject {
state: DurableObjectState;

constructor(state: DurableObjectState, env: any) {
this.state = state;
}

async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
const { method } = request;

if (method === "POST" && url.pathname === "/add") {
const { timestamp, ticketId } = await request.json<InboxMessage>();
const inbox =
(await this.state.storage.get<InboxMessage[]>("inbox")) || [];
inbox.push({ timestamp, ticketId });
await this.state.storage.put("inbox", inbox);
return new Response("Message added to inbox", { status: 200 });
}

if (method === "GET" && url.pathname === "/retrieve") {
console.log("get data");
const inbox =
(await this.state.storage.get<InboxMessage[]>("inbox")) || [];
await this.state.storage.delete("inbox");
return new Response(JSON.stringify(inbox), { status: 200 });
}

return new Response("Not Found", { status: 404 });
}
}
Loading

0 comments on commit 058b964

Please sign in to comment.