diff --git a/control-plane/src/modules/slack/index.ts b/control-plane/src/modules/slack/index.ts index 2aad784a..e1ef9c18 100644 --- a/control-plane/src/modules/slack/index.ts +++ b/control-plane/src/modules/slack/index.ts @@ -1,39 +1,77 @@ -import { App } from '@slack/bolt'; +import { App, KnownEventFromType, webApi } from '@slack/bolt'; import { FastifySlackReceiver } from './receiver'; import { env } from '../../utilities/env'; import { FastifyInstance } from 'fastify'; import { logger } from '../observability/logger'; +import { getRunsByMetadata } from '../workflows/metadata'; +import { addMessageAndResume, createRunWithMessage, Run } from '../workflows/workflows'; +import { AuthenticationError } from '../../utilities/errors'; +import { ulid } from 'ulid'; let app: App | undefined; +const THREAD_META_KEY = "stripeThreadTs"; +const CHANNEL_META_KEY = "stripeChannel"; + export const start = async (fastify: FastifyInstance) => { + const SLACK_CLUSTER_ID = env.SLACK_CLUSTER_ID + const SLACK_BOT_TOKEN = env.SLACK_BOT_TOKEN + const SLACK_SIGNING_SECRET = env.SLACK_SIGNING_SECRET + + if ( + !SLACK_CLUSTER_ID || + !SLACK_BOT_TOKEN || + !SLACK_SIGNING_SECRET + ) { + logger.info("Missing Slack environment variables. Skipping Slack integration."); + return + } + app = new App({ token: env.SLACK_BOT_TOKEN, receiver: new FastifySlackReceiver({ - signingSecret: env.SLACK_SIGNING_SECRET, + signingSecret: SLACK_SIGNING_SECRET, path: '/triggers/slack', fastify, }) }); - app.event('app_mention', async ({ event }) => { - logger.info("Received app_mention event. Skipping.", event); - }); - // Event listener for direct messages app.event('message', async ({ event, client }) => { logger.info("Received message event. Responding.", event); try { if (event.channel_type === 'im' && event.subtype !== 'bot_message') { - // Respond to the direct message - await client.chat.postMessage({ - channel: event.channel, - text: `Thanks for tagging me! Let's discuss here.`, - thread_ts: event.ts, // Use the timestamp of the original message to create a thread - }); + await authenticateUser(event, client); + + if ('thread_ts' in event && event.thread_ts) { + const [existingRun] = await getRunsByMetadata({ + clusterId: SLACK_CLUSTER_ID, + key: THREAD_META_KEY, + value: event.thread_ts, + limit: 1, + }); + if (existingRun) { + await handleExistingThread({ event, client, run: existingRun }); + return + } + } + + await handleNewThread({ + event, + client, + clusterId: SLACK_CLUSTER_ID + }); } } catch (error) { + if (error instanceof AuthenticationError) { + client.chat.postMessage({ + thread_ts: event.ts, + channel: event.channel, + text: "Sorry, I couldn't authenticate you. Please ensure you have an Inferable account with the same email as your Slack account." + }) + return + } logger.error('Error responding to DM', { error }); } }); @@ -41,4 +79,89 @@ export const start = async (fastify: FastifyInstance) => { await app.start(); } +const handleNewThread = async ({ + event, + client, + clusterId +}: { + event: KnownEventFromType<'message'>, + client: webApi.WebClient, + clusterId: string + }) => { + if ('text' in event && event.text) { + const run = await createRunWithMessage({ + clusterId, + message: event.text, + type: "human", + metadata: { + [THREAD_META_KEY]: event.ts, + [CHANNEL_META_KEY]: event.channel + }, + }); + + client.chat.postMessage({ + thread_ts: event.ts, + channel: event.channel, + mrkdwn: true, + text: `On it. I will get back to you soon.\nRun ID: <${env.APP_ORIGIN}/clusters/${clusterId}/runs/${run.id}|${run.id}>`, + }); + return; + } + + throw new Error("Event had no text"); +} + +const handleExistingThread = async ({ + event, + client, + run +}: { + event: KnownEventFromType<'message'>, + client: webApi.WebClient, + run: Run + }) => { + if ('text' in event && event.text) { + await addMessageAndResume({ + id: ulid(), + clusterId: run.clusterId, + runId: run.id, + message: event.text, + type: "human", + }) + return + } + + throw new Error("Event had no text") +} + +const authenticateUser = async (event: KnownEventFromType<'message'>, client: webApi.WebClient) => { + if ('user' in event && event.user) { + const user = await client.users.info({ + user: event.user, + token: env.SLACK_BOT_TOKEN + }) + + const confirmed = user.user?.is_email_confirmed + const email = user.user?.profile?.email + + if (!confirmed || !email) { + throw new AuthenticationError('Could not authenticate Slack user') + } + + // TODO: Check user in Clerk + } +} + +export const notifySlack = async (input: { + threadTs: string; + channel: string; + text: string; +}) => { + await app?.client.chat.postMessage({ + thread_ts: input.threadTs, + channel: input.channel, + text: input.text, + }); +} + export const stop = async () => await app?.stop(); diff --git a/control-plane/src/modules/workflows/metadata.test.ts b/control-plane/src/modules/workflows/metadata.test.ts index 44c45a54..01579389 100644 --- a/control-plane/src/modules/workflows/metadata.test.ts +++ b/control-plane/src/modules/workflows/metadata.test.ts @@ -1,5 +1,5 @@ import { createOwner } from "../test/util"; -import { getWorkflowsByMetadata } from "./metadata"; +import { getRunsByMetadata } from "./metadata"; import { createRun } from "./workflows"; describe("getWorkflowsByMetadata", () => { @@ -24,7 +24,7 @@ describe("getWorkflowsByMetadata", () => { }, }); - const result = await getWorkflowsByMetadata({ + const result = await getRunsByMetadata({ clusterId: owner.clusterId, key: "foo", value: "bar", diff --git a/control-plane/src/modules/workflows/metadata.ts b/control-plane/src/modules/workflows/metadata.ts index 4dd42025..7d4c6d38 100644 --- a/control-plane/src/modules/workflows/metadata.ts +++ b/control-plane/src/modules/workflows/metadata.ts @@ -1,7 +1,7 @@ import { and, eq, desc } from "drizzle-orm"; import { db, workflowMetadata, workflows } from "../data"; -export const getWorkflowsByMetadata = async ({ +export const getRunsByMetadata = async ({ clusterId, key, value, diff --git a/control-plane/src/modules/workflows/router.ts b/control-plane/src/modules/workflows/router.ts index 0682af47..93e9336c 100644 --- a/control-plane/src/modules/workflows/router.ts +++ b/control-plane/src/modules/workflows/router.ts @@ -3,7 +3,7 @@ import { JsonSchemaInput } from "inferable/bin/types"; import { contract } from "../contract"; import { getJobReferences, getJobsForWorkflow } from "../jobs/jobs"; import * as events from "../observability/events"; -import { getWorkflowsByMetadata } from "./metadata"; +import { getRunsByMetadata } from "./metadata"; import { getRunMessagesForDisplay } from "./workflow-messages"; import { createRetry, @@ -320,7 +320,7 @@ export const runsRouter = initServer().router( }; } - const result = await getWorkflowsByMetadata({ + const result = await getRunsByMetadata({ clusterId, key, value, diff --git a/control-plane/src/utilities/env.ts b/control-plane/src/utilities/env.ts index fb7c4b6d..6ba63f39 100644 --- a/control-plane/src/utilities/env.ts +++ b/control-plane/src/utilities/env.ts @@ -41,8 +41,9 @@ const envSchema = z ANTHROPIC_API_KEY: z.string().optional(), COHERE_API_KEY: z.string().optional(), - SLACK_BOT_TOKEN: z.string(), - SLACK_SIGNING_SECRET: z.string(), + SLACK_BOT_TOKEN: z.string().optional(), + SLACK_SIGNING_SECRET: z.string().optional(), + SLACK_CLUSTER_ID: z.string().optional(), SQS_RUN_PROCESS_QUEUE_URL: z.string(), SQS_RUN_GENERATE_NAME_QUEUE_URL: z.string(),