From 1f86d2e83aebbca8f63d622b60ce1136050cb53f Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 13 Dec 2024 15:02:28 +1030 Subject: [PATCH] feat: Initial thread management --- control-plane/src/index.ts | 2 +- control-plane/src/modules/slack/index.ts | 173 ++++++++++++++++-- control-plane/src/modules/slack/receiver.ts | 96 ++++------ .../src/modules/workflows/metadata.test.ts | 4 +- .../src/modules/workflows/metadata.ts | 2 +- control-plane/src/modules/workflows/router.ts | 4 +- control-plane/src/utilities/env.ts | 5 +- 7 files changed, 209 insertions(+), 77 deletions(-) diff --git a/control-plane/src/index.ts b/control-plane/src/index.ts index 955d9684..011e250a 100644 --- a/control-plane/src/index.ts +++ b/control-plane/src/index.ts @@ -23,7 +23,7 @@ import { flagsmith } from "./modules/flagsmith"; import { runMigrations } from "./utilities/migrate"; import { customerTelemetry } from "./modules/customer-telemetry"; -export const app = fastify({ +const app = fastify({ logger: env.ENABLE_FASTIFY_LOGGER, }); diff --git a/control-plane/src/modules/slack/index.ts b/control-plane/src/modules/slack/index.ts index 2aad784a..056328f2 100644 --- a/control-plane/src/modules/slack/index.ts +++ b/control-plane/src/modules/slack/index.ts @@ -1,40 +1,101 @@ -import { App } from '@slack/bolt'; +import { App, KnownEventFromType, webApi } from '@slack/bolt'; import { FastifySlackReceiver } from './receiver'; import { env } from '../../utilities/env'; import { FastifyInstance } from 'fastify'; import { logger } from '../observability/logger'; +import { getRunsByMetadata } from '../workflows/metadata'; +import { addMessageAndResume, createRunWithMessage, Run } from '../workflows/workflows'; +import { AuthenticationError } from '../../utilities/errors'; +import { ulid } from 'ulid'; let app: App | undefined; +const THREAD_META_KEY = "stripeThreadTs"; +const CHANNEL_META_KEY = "stripeChannel"; + +type MessageEvent = { + event: KnownEventFromType<'message'>, + client: webApi.WebClient + clusterId: string +} + export const start = async (fastify: FastifyInstance) => { + const SLACK_CLUSTER_ID = env.SLACK_CLUSTER_ID + const SLACK_BOT_TOKEN = env.SLACK_BOT_TOKEN + const SLACK_SIGNING_SECRET = env.SLACK_SIGNING_SECRET + + if ( + !SLACK_CLUSTER_ID || + !SLACK_BOT_TOKEN || + !SLACK_SIGNING_SECRET + ) { + logger.info("Missing Slack environment variables. Skipping Slack integration."); + return + } + app = new App({ token: env.SLACK_BOT_TOKEN, receiver: new FastifySlackReceiver({ - signingSecret: env.SLACK_SIGNING_SECRET, + signingSecret: SLACK_SIGNING_SECRET, path: '/triggers/slack', fastify, }) }); - app.event('app_mention', async ({ event }) => { - logger.info("Received app_mention event. Skipping.", event); - }); - // Event listener for direct messages app.event('message', async ({ event, client }) => { logger.info("Received message event. Responding.", event); + + if (isBotMessage(event)) { + logger.info("Received message from bot. Ignoring.", event); + return + } + + if (!isDirectMessage(event)) { + logger.info("Received message from channel. Ignoring.", event); + return + } + try { - if (event.channel_type === 'im' && event.subtype !== 'bot_message') { - // Respond to the direct message - await client.chat.postMessage({ - channel: event.channel, - text: `Thanks for tagging me! Let's discuss here.`, - thread_ts: event.ts, // Use the timestamp of the original message to create a thread + await authenticateUser(event, client); + + if (hasThread(event)) { + const [run] = await getRunsByMetadata({ + clusterId: SLACK_CLUSTER_ID, + key: THREAD_META_KEY, + value: event.thread_ts, + limit: 1, }); + if (run) { + await handleExistingThread({ + event, + client, + run, + clusterId: SLACK_CLUSTER_ID + }); + return + } } + + await handleNewThread({ + event, + client, + clusterId: SLACK_CLUSTER_ID + }); + } catch (error) { - logger.error('Error responding to DM', { error }); + + if (error instanceof AuthenticationError) { + client.chat.postMessage({ + thread_ts: event.ts, + channel: event.channel, + text: "Sorry, I couldn't authenticate you. Please ensure you have an Inferable account with the same email as your Slack account." + }) + return + } + + logger.error('Error responding to Direct Message', { error }); } }); @@ -42,3 +103,89 @@ export const start = async (fastify: FastifyInstance) => { } export const stop = async () => await app?.stop(); + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const hasThread = (e: any): e is { thread_ts: string } => { + return typeof e?.thread_ts === 'string'; +} + +const hasUser = (e: any): e is { user: string } => { + return typeof e?.user === 'string'; +} + +const isDirectMessage = (e: KnownEventFromType<'message'>): boolean => { + return e.channel_type === 'im'; +} + +const isBotMessage = (e: KnownEventFromType<'message'>): boolean => { + return e.subtype === 'bot_message'; +} + +const handleNewThread = async ({ + event, + client, + clusterId +}: MessageEvent) => { + + if ('text' in event && event.text) { + const run = await createRunWithMessage({ + clusterId, + message: event.text, + type: "human", + metadata: { + [THREAD_META_KEY]: event.ts, + [CHANNEL_META_KEY]: event.channel + }, + }); + + client.chat.postMessage({ + thread_ts: event.ts, + channel: event.channel, + mrkdwn: true, + text: `On it. I will get back to you soon.\nRun ID: <${env.APP_ORIGIN}/clusters/${clusterId}/runs/${run.id}|${run.id}>`, + }); + return; + } + + throw new Error("Event had no text"); +} + +const handleExistingThread = async ({ + event, + run, +} : MessageEvent & { run: Run }) => { + if ('text' in event && event.text) { + await addMessageAndResume({ + id: ulid(), + clusterId: run.clusterId, + runId: run.id, + message: event.text, + type: "human", + }) + return + } + + throw new Error("Event had no text") +} + +const authenticateUser = async (event: KnownEventFromType<'message'>, client: webApi.WebClient) => { + if (hasUser(event)) { + const user = await client.users.info({ + user: event.user, + token: env.SLACK_BOT_TOKEN + }) + + const confirmed = user.user?.is_email_confirmed + const email = user.user?.profile?.email + + if (!confirmed || !email) { + throw new AuthenticationError('Could not authenticate Slack user') + } + + // TODO: Verify user in Clerk + return + } + + throw new Error("Event had no user") +} + diff --git a/control-plane/src/modules/slack/receiver.ts b/control-plane/src/modules/slack/receiver.ts index bd32cb71..2ebe5f44 100644 --- a/control-plane/src/modules/slack/receiver.ts +++ b/control-plane/src/modules/slack/receiver.ts @@ -8,7 +8,7 @@ import { Logger, LogLevel, } from '@slack/bolt' -import { FastifyInstance, FastifyPluginCallback } from 'fastify'; +import { FastifyInstance, FastifyPluginCallback, FastifyReply, FastifyRequest } from 'fastify'; import { logger } from '../observability/logger'; const slackLogger: Logger = { @@ -21,21 +21,23 @@ const slackLogger: Logger = { setName: () => void 0, } -export class FastifySlackReceiver implements Receiver { - fastify: FastifyInstance; - app?: App; +type FastifySlackReceiverParams = { + fastify: FastifyInstance path: string signingSecret: string +} + +export class FastifySlackReceiver implements Receiver { + private fastify: FastifyInstance; + private app?: App; + private path: string + private signingSecret: string constructor({ - path = '/slack/events', + path, fastify, signingSecret, - }: { - path?: string - fastify: FastifyInstance - signingSecret: string - }) { + }: FastifySlackReceiverParams) { this.fastify = fastify; this.path = path this.signingSecret = signingSecret @@ -45,57 +47,47 @@ export class FastifySlackReceiver implements Receiver { this.app = app; } - start() { - logger.info("Starting Slack receiver") + async start() { + logger.info("Registering Slack receiver") - return new Promise((resolve, reject) => { - try { - // Register a seperate plugin and dissable the content type parsers - const slackPlugin: FastifyPluginCallback = async (instance) => { - const contentTypes = ['application/json', 'application/x-www-form-urlencoded']; + // Register a seperate plugin and disable the content type parsers for the route + const slackPlugin: FastifyPluginCallback = async (instance) => { + const contentTypes = ['application/json', 'application/x-www-form-urlencoded']; - instance.removeContentTypeParser(contentTypes); - instance.addContentTypeParser(contentTypes, { parseAs: 'string' }, instance.defaultTextParser); + instance.removeContentTypeParser(contentTypes); + instance.addContentTypeParser(contentTypes, { parseAs: 'string' }, instance.defaultTextParser); - instance.post('', (request, reply) => this.requestHandler(request, reply)); - }; + instance.post('', (request, reply) => this.requestHandler(request, reply)); + }; - this.fastify.register(slackPlugin, { prefix: this.path }); - resolve(void 0); - } catch (error) { - reject(error); - } - }); + this.fastify.register(slackPlugin, { prefix: this.path }); } - stop() { - logger.info("Stopping Slack receiver") - - return new Promise((resolve, reject) => { - this.fastify.server.close((err) => { - if (err) { - reject(err); - return; - } - resolve(void 0); - }) + async stop() { + this.fastify.server.close((err) => { + if (err) { + logger.error("Failed to stop Slack receiver gracefully", { + error: err, + }) + } }) } - async requestHandler(request: any, response: any) { + async requestHandler(request: FastifyRequest, reply: FastifyReply) { + const req = request.raw; - const res = response.raw; + const res = reply.raw; + try { // Verify authenticity let bufferedReq: BufferedIncomingMessage; try { - const bodyString = typeof request.body === "string" - ? request.body - : JSON.stringify(request.body); - + if (typeof request.body !== "string") { + throw new Error("Expected Slack request body to be a string"); + } // eslint-disable-next-line @typescript-eslint/no-explicit-any - (req as any).rawBody = Buffer.from(bodyString); + (req as any).rawBody = Buffer.from(request.body); bufferedReq = await boltHelpers.parseAndVerifyHTTPRequest( { @@ -151,18 +143,10 @@ export class FastifySlackReceiver implements Receiver { retryReason: boltHelpers.extractRetryReasonFromHTTPRequest(req), }; - try { - logger.info("Processing Slack request", { - event, - }); - await this.app?.processEvent(event); - } catch (error) { - logger.error("Failed to process Slack request", { - error, - }) - } + await this.app?.processEvent(event); + } catch (error) { - logger.error("Failed to process Slack request", { + logger.error("Failed to handle Slack request", { error, }) } diff --git a/control-plane/src/modules/workflows/metadata.test.ts b/control-plane/src/modules/workflows/metadata.test.ts index 44c45a54..01579389 100644 --- a/control-plane/src/modules/workflows/metadata.test.ts +++ b/control-plane/src/modules/workflows/metadata.test.ts @@ -1,5 +1,5 @@ import { createOwner } from "../test/util"; -import { getWorkflowsByMetadata } from "./metadata"; +import { getRunsByMetadata } from "./metadata"; import { createRun } from "./workflows"; describe("getWorkflowsByMetadata", () => { @@ -24,7 +24,7 @@ describe("getWorkflowsByMetadata", () => { }, }); - const result = await getWorkflowsByMetadata({ + const result = await getRunsByMetadata({ clusterId: owner.clusterId, key: "foo", value: "bar", diff --git a/control-plane/src/modules/workflows/metadata.ts b/control-plane/src/modules/workflows/metadata.ts index 4dd42025..7d4c6d38 100644 --- a/control-plane/src/modules/workflows/metadata.ts +++ b/control-plane/src/modules/workflows/metadata.ts @@ -1,7 +1,7 @@ import { and, eq, desc } from "drizzle-orm"; import { db, workflowMetadata, workflows } from "../data"; -export const getWorkflowsByMetadata = async ({ +export const getRunsByMetadata = async ({ clusterId, key, value, diff --git a/control-plane/src/modules/workflows/router.ts b/control-plane/src/modules/workflows/router.ts index 0682af47..93e9336c 100644 --- a/control-plane/src/modules/workflows/router.ts +++ b/control-plane/src/modules/workflows/router.ts @@ -3,7 +3,7 @@ import { JsonSchemaInput } from "inferable/bin/types"; import { contract } from "../contract"; import { getJobReferences, getJobsForWorkflow } from "../jobs/jobs"; import * as events from "../observability/events"; -import { getWorkflowsByMetadata } from "./metadata"; +import { getRunsByMetadata } from "./metadata"; import { getRunMessagesForDisplay } from "./workflow-messages"; import { createRetry, @@ -320,7 +320,7 @@ export const runsRouter = initServer().router( }; } - const result = await getWorkflowsByMetadata({ + const result = await getRunsByMetadata({ clusterId, key, value, diff --git a/control-plane/src/utilities/env.ts b/control-plane/src/utilities/env.ts index fb7c4b6d..6ba63f39 100644 --- a/control-plane/src/utilities/env.ts +++ b/control-plane/src/utilities/env.ts @@ -41,8 +41,9 @@ const envSchema = z ANTHROPIC_API_KEY: z.string().optional(), COHERE_API_KEY: z.string().optional(), - SLACK_BOT_TOKEN: z.string(), - SLACK_SIGNING_SECRET: z.string(), + SLACK_BOT_TOKEN: z.string().optional(), + SLACK_SIGNING_SECRET: z.string().optional(), + SLACK_CLUSTER_ID: z.string().optional(), SQS_RUN_PROCESS_QUEUE_URL: z.string(), SQS_RUN_GENERATE_NAME_QUEUE_URL: z.string(),