Skip to content

Commit

Permalink
feat: Initial thread management
Browse files Browse the repository at this point in the history
  • Loading branch information
johnjcsmith committed Dec 13, 2024
1 parent 8a18042 commit fdccc53
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 16 deletions.
114 changes: 103 additions & 11 deletions control-plane/src/modules/slack/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import { App } from '@slack/bolt';
import { App, KnownEventFromType, webApi } from '@slack/bolt';
import { FastifySlackReceiver } from './receiver';
import { env } from '../../utilities/env';
import { FastifyInstance } from 'fastify';
import { logger } from '../observability/logger';
import { getRunsByMetadata } from '../workflows/metadata';
import { addMessageAndResume, createRunWithMessage, Run } from '../workflows/workflows';
import { AuthenticationError } from '../../utilities/errors';
import { ulid } from 'ulid';

let app: App | undefined;

export const SLACK_CLUSTER_ID = env.SLACK_CLUSTER_ID;

const THREAD_META_KEY = "stripeThreadTs";
const CHANNEL_META_KEY = "stripeChannel";

export const start = async (fastify: FastifyInstance) => {
app = new App({
token: env.SLACK_BOT_TOKEN,
Expand All @@ -16,29 +25,112 @@ export const start = async (fastify: FastifyInstance) => {
})
});

app.event('app_mention', async ({ event }) => {
logger.info("Received app_mention event. Skipping.", event);
});

// Event listener for direct messages
app.event('message', async ({ event, client }) => {
logger.info("Received message event. Responding.", event);
try {
if (event.channel_type === 'im' && event.subtype !== 'bot_message') {
// Respond to the direct message
await client.chat.postMessage({
channel: event.channel,
text: `Thanks for tagging me! Let's discuss here.`,
thread_ts: event.ts, // Use the timestamp of the original message to create a thread
});
await authenticateUser(event, client);

if ('thread_ts' in event && event.thread_ts) {
const [existingRun] = await getRunsByMetadata({
clusterId: SLACK_CLUSTER_ID,
key: THREAD_META_KEY,
value: event.thread_ts,
limit: 1,
});

if (existingRun) {
await handleExistingThread(event, client, existingRun);
return
}
}

await handleNewThread(event, client);
}
} catch (error) {
if (error instanceof AuthenticationError) {
client.chat.postMessage({
thread_ts: event.ts,
channel: event.channel,
text: "Sorry, I couldn't authenticate you. Please ensure you have an Inferable account with the same email as your Slack account."
})
return
}
logger.error('Error responding to DM', { error });
}
});

await app.start();
}

const handleNewThread = async (event: KnownEventFromType<'message'>, client: webApi.WebClient) => {
if ('text' in event && event.text) {
const run = await createRunWithMessage({
clusterId: SLACK_CLUSTER_ID,
message: event.text,
type: "human",
metadata: {
[THREAD_META_KEY]: event.ts,
[CHANNEL_META_KEY]: event.channel
},
})

client.chat.postMessage({
thread_ts: event.ts,
channel: event.channel,
mrkdwn: true,
text: `On it. I will get back to you soon.\nRun ID: <${env.APP_ORIGIN}/clusters/${SLACK_CLUSTER_ID}/runs/${run.id}|${run.id}>`,
})
return
}

throw new Error("Event had no text")
}

const handleExistingThread = async (event: KnownEventFromType<'message'>, client: webApi.WebClient, run: Run) => {
if ('text' in event && event.text) {
await addMessageAndResume({
id: ulid(),
clusterId: run.clusterId,
runId: run.id,
message: event.text,
type: "human",
})
return
}

throw new Error("Event had no text")
}

const authenticateUser = async (event: KnownEventFromType<'message'>, client: webApi.WebClient) => {
if ('user' in event && event.user) {
const user = await client.users.info({
user: event.user,
token: env.SLACK_BOT_TOKEN
})

const confirmed = user.user?.is_email_confirmed
const email = user.user?.profile?.email

if (!confirmed || !email) {
throw new AuthenticationError('Could not authenticate Slack user')
}

// TODO: Check user in Clerk
}
}

export const notifySlack = async (input: {
threadTs: string;
channel: string;
text: string;
}) => {
await app?.client.chat.postMessage({
thread_ts: input.threadTs,
channel: input.channel,
text: input.text,
});
}

export const stop = async () => await app?.stop();
4 changes: 2 additions & 2 deletions control-plane/src/modules/workflows/metadata.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createOwner } from "../test/util";
import { getWorkflowsByMetadata } from "./metadata";
import { getRunsByMetadata } from "./metadata";
import { createRun } from "./workflows";

describe("getWorkflowsByMetadata", () => {
Expand All @@ -24,7 +24,7 @@ describe("getWorkflowsByMetadata", () => {
},
});

const result = await getWorkflowsByMetadata({
const result = await getRunsByMetadata({
clusterId: owner.clusterId,
key: "foo",
value: "bar",
Expand Down
2 changes: 1 addition & 1 deletion control-plane/src/modules/workflows/metadata.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { and, eq, desc } from "drizzle-orm";
import { db, workflowMetadata, workflows } from "../data";

export const getWorkflowsByMetadata = async ({
export const getRunsByMetadata = async ({
clusterId,
key,
value,
Expand Down
4 changes: 2 additions & 2 deletions control-plane/src/modules/workflows/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { JsonSchemaInput } from "inferable/bin/types";
import { contract } from "../contract";
import { getJobReferences, getJobsForWorkflow } from "../jobs/jobs";
import * as events from "../observability/events";
import { getWorkflowsByMetadata } from "./metadata";
import { getRunsByMetadata } from "./metadata";
import { getRunMessagesForDisplay } from "./workflow-messages";
import {
createRetry,
Expand Down Expand Up @@ -320,7 +320,7 @@ export const runsRouter = initServer().router(
};
}

const result = await getWorkflowsByMetadata({
const result = await getRunsByMetadata({
clusterId,
key,
value,
Expand Down
1 change: 1 addition & 0 deletions control-plane/src/utilities/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const envSchema = z

SLACK_BOT_TOKEN: z.string(),
SLACK_SIGNING_SECRET: z.string(),
SLACK_CLUSTER_ID: z.string(),

SQS_RUN_PROCESS_QUEUE_URL: z.string(),
SQS_RUN_GENERATE_NAME_QUEUE_URL: z.string(),
Expand Down

0 comments on commit fdccc53

Please sign in to comment.