Skip to content

Commit

Permalink
feat: Email run triggering (#446)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnjcsmith authored Jan 1, 2025
1 parent d8db48d commit 115b15e
Show file tree
Hide file tree
Showing 14 changed files with 612 additions and 84 deletions.
310 changes: 309 additions & 1 deletion control-plane/package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion control-plane/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"jwks-rsa": "^3.1.0",
"langfuse": "^3.31.0",
"lodash": "^4.17.21",
"mailparser": "^3.7.2",
"node-cache": "^5.1.2",
"pem-jwk": "^2.0.0",
"pg": "^8.11.2",
Expand All @@ -79,6 +80,7 @@
"@types/jsonpath": "^0.2.4",
"@types/jsonwebtoken": "^9.0.2",
"@types/lodash": "^4.17.6",
"@types/mailparser": "^3.4.5",
"@types/pg": "^8.10.2",
"@types/skmeans": "^0.11.7",
"@typescript-eslint/eslint-plugin": "^7.16.0",
Expand All @@ -90,4 +92,4 @@
"ts-node": "^10.9.2",
"tsx": "^4.7.0"
}
}
}
19 changes: 0 additions & 19 deletions control-plane/scripts/sendTestEmail.sh

This file was deleted.

30 changes: 30 additions & 0 deletions control-plane/scripts/sendTestEmail.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env tsx

import { exec } from "child_process";
import { buildMessageBody } from "../src/modules/email/test.util";

const messageBody = JSON.stringify(buildMessageBody({
body: "What tools are available",
from: "[email protected]",
subject: "Subject",
to: [ "[email protected]" ]
}));

// Define the AWS CLI command
const region = "us-west-2";
const endpoint = "http://localhost:9324";
const queueUrl = "http://localhost:9324/000000000000/email-ingestion";
const command = `aws --region=${region} --endpoint=${endpoint} sqs send-message --queue-url ${queueUrl} --message-body '${messageBody}'`;

// Execute the AWS CLI command
exec(command, (error, stdout, stderr) => {
if (error) {
console.error(`Error executing command: ${error.message}`);
return;
}
if (stderr) {
console.error(`stderr: ${stderr}`);
return;
}
console.log(`stdout: ${stdout}`);
});
8 changes: 4 additions & 4 deletions control-plane/src/modules/clerk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const getOrgIdForClusterId = async (cluserId: string) => {
organizationId: clusters.organization_id
}).from(clusters).where(eq(clusters.id, cluserId));

return cluster.organizationId
return cluster?.organizationId
}

const getUserForOrg = async (emailAddress: string, organizationId: string) => {
Expand All @@ -30,12 +30,12 @@ const getUserForOrg = async (emailAddress: string, organizationId: string) => {

export const getUserForCluster = async ({
emailAddress,
cluserId
clusterId
}: {
emailAddress: string;
cluserId: string;
clusterId: string;
}) => {
const organizationId = await getOrgIdForClusterId(cluserId);
const organizationId = await getOrgIdForClusterId(clusterId);

if (!organizationId) {
throw new Error("Can not lookup user without organizationId")
Expand Down
49 changes: 49 additions & 0 deletions control-plane/src/modules/email/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { ulid } from "ulid";
import { parseMessage } from ".";
import { createOwner } from "../test/util";
import { buildMessageBody } from "./test.util";


describe("parseMessage", () => {
const clusterId = ulid();
const organizationId = ulid();

beforeAll(async () => {
await createOwner({
clusterId,
organizationId,
});
})

it("should parse a message ingestion event", async () => {
const result = await parseMessage(buildMessageBody({
from: "[email protected]",
to: [ `${clusterId}@run.inferable.ai` ],
subject: "Subject",
body: "What tools are available"
}));

expect(result).toBeDefined();
expect(result.clusterId).toBe(clusterId);
expect(result.source).toBe("[email protected]");
});

it("should fail with multiple '@run.inferable.ai' addresses", async () => {
await expect(parseMessage(buildMessageBody({
from: "[email protected]",
to: [ "[email protected]", `${clusterId}@run.inferable.ai` ],
subject: "Subject",
body: "What tools are available"
}))).rejects.toThrow("Found multiple Inferable email addresses in destination");
})

it("should fail with no '@run.inferable.ai' addresses", async () => {
await expect(parseMessage(buildMessageBody({
from: "[email protected]",
to: [ "[email protected]" ],
subject: "Subject",
body: "What tools are available"
}))).rejects.toThrow("Could not extract clusterId from email address");
})

})
186 changes: 135 additions & 51 deletions control-plane/src/modules/email/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import { Consumer } from "sqs-consumer";
import { env } from "../../utilities/env";
import { sqs } from "../sqs";
import { BaseMessage, sqs, withObservability } from "../sqs";
import { z } from "zod";
import { Message } from "@aws-sdk/client-sqs";
import { logger } from "../observability/logger";
import { safeParse } from "../../utilities/safe-parse";
import { ParsedMail, simpleParser } from "mailparser";
import { getUserForCluster } from "../clerk";
import { AuthenticationError } from "../../utilities/errors";
import { createRunWithMessage } from "../workflows/workflows";
import { flagsmith } from "../flagsmith";

const sesMessageSchema = z.object({
notificationType: z.string(),
mail: z.object({
timestamp: z.string().datetime(),
timestamp: z.string(),
source: z.string().email(),
messageId: z.string(),
destination: z.array(z.string().email()),
Expand All @@ -30,7 +34,7 @@ const sesMessageSchema = z.object({
}),
}),
receipt: z.object({
timestamp: z.string().datetime(),
timestamp: z.string(),
processingTimeMillis: z.number(),
recipients: z.array(z.string().email()),
spamVerdict: z.object({ status: z.string() }),
Expand All @@ -54,11 +58,11 @@ const snsNotificationSchema = z.object({
TopicArn: z.string(),
Subject: z.string(),
Message: z.string(),
Timestamp: z.string().datetime(),
Timestamp: z.string(),
SignatureVersion: z.string(),
Signature: z.string(),
SigningCertURL: z.string().url(),
UnsubscribeURL: z.string().url(),
SigningCertURL: z.string(),
UnsubscribeURL: z.string(),
});

const emailIngestionConsumer = env.SQS_EMAIL_INGESTION_QUEUE_URL
Expand All @@ -67,7 +71,7 @@ const emailIngestionConsumer = env.SQS_EMAIL_INGESTION_QUEUE_URL
batchSize: 5,
visibilityTimeout: 60,
heartbeatInterval: 30,
handleMessage: handleEmailIngestion,
handleMessage: withObservability(env.SQS_EMAIL_INGESTION_QUEUE_URL, handleEmailIngestion),
sqs,
})
: undefined;
Expand All @@ -80,52 +84,132 @@ export const stop = async () => {
emailIngestionConsumer?.stop();
};

async function handleEmailIngestion(message: Message) {
try {
const notificationJson = safeParse(message.Body);
if (!notificationJson.success) {
logger.error("SNS notification is not valid JSON", {
error: notificationJson.error,
});
return;
}

const notification = snsNotificationSchema.safeParse(notificationJson.data);
if (!notification.success) {
logger.error("Could not parse SNS notification", {
error: notification.error,
});
return;
}


const sesJson = safeParse(notification.data.Message);
if (!sesJson.success) {
logger.error("SES message is not valid JSON", {
error: sesJson.error,
});
return;
}

const sesMessage = sesMessageSchema.safeParse(sesJson.data);
if (!sesMessage.success) {
logger.error("Could not parse SES message", {
error: sesMessage.error,
});
return;
}

logger.info("Ingesting email event", {
messageId: sesMessage.data.mail.messageId,
source: sesMessage.data.mail.source,
destination: sesMessage.data.mail.destination,
subject: sesMessage.data.mail.commonHeaders.subject,
async function handleEmailIngestion(raw: unknown) {
const message = await parseMessage(raw);
if (!message.body) {
logger.info("Email had no body. Skipping", {
});
return;
}

const user = await authenticateUser(message.source, message.clusterId);

const flags = await flagsmith?.getIdentityFlags(message.clusterId, {
clusterId: message.clusterId,
});

const useEmail = flags?.isFeatureEnabled("experimental_email_trigger");

if (!useEmail) {
logger.info("Email trigger is disabled. Skipping", {
clusterId: message.clusterId,
});
return;
}

await handleNewChain({
userId: user.id,
body: message.body,
clusterId: message.clusterId
});
}

export async function parseMessage(message: unknown) {
const notification = snsNotificationSchema.safeParse(message);
if (!notification.success) {
throw new Error("Could not parse SNS notification message");
}

const sesJson = safeParse(notification.data.Message);
if (!sesJson.success) {
throw new Error("SES message is not valid JSON");
}

const sesMessage = sesMessageSchema.safeParse(sesJson.data);
if (!sesMessage.success) {
throw new Error("Could not parse SES message");
}

const ingestionAddresses = sesMessage.data.mail.destination.filter(
(email) => email.endsWith(env.INFERABLE_EMAIL_DOMAIN)
)

if (ingestionAddresses.length > 1) {
throw new Error("Found multiple Inferable email addresses in destination");
}

const clusterId = ingestionAddresses.pop()?.replace(env.INFERABLE_EMAIL_DOMAIN, "").replace("@", "");

if (!clusterId) {
throw new Error("Could not extract clusterId from email address");
}

const mail = await parseMailContent(sesMessage.data.content);
if (!mail) {
throw new Error("Could not parse email content");
}

let body = mail.text
if (!body && mail.html) {
body = mail.html
}

} catch (error) {
logger.error("Error while ingesting email event", {
error,
return {
body,
clusterId,
ingestionAddresses,
source: sesMessage.data.mail.source,
messageId: sesMessage.data.mail.messageId,
}
}

const parseMailContent = (message: string): Promise<ParsedMail> => {
return new Promise((resolve, reject) => {
simpleParser(message, (error, parsed) => {
if (error) {
reject(error);
} else {
resolve(parsed);
}
});
})
};


const authenticateUser = async (emailAddress: string, clusterId: string) => {
if (!env.CLERK_SECRET_KEY) {
throw new Error("CLERK_SECRET_KEY must be set for email authentication");
}

const clerkUser = await getUserForCluster({
emailAddress,
clusterId,
});

if (!clerkUser) {
logger.info("Could not find Email in Clerk.", {
emailAddress,
});
throw new AuthenticationError("Could not authenticate Email sender");
}

return clerkUser;
};

const handleNewChain = async ({
userId,
body,
clusterId,
}: {
userId: string;
body: string;
clusterId: string;
}) => {
await createRunWithMessage({
userId,
clusterId,
message: body,
type: "human",
})
}

Loading

0 comments on commit 115b15e

Please sign in to comment.