Skip to content

Commit

Permalink
feat: Initial thread management
Browse files Browse the repository at this point in the history
  • Loading branch information
johnjcsmith committed Dec 13, 2024
1 parent 8a18042 commit 5eec348
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 20 deletions.
2 changes: 1 addition & 1 deletion control-plane/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { flagsmith } from "./modules/flagsmith";
import { runMigrations } from "./utilities/migrate";
import { customerTelemetry } from "./modules/customer-telemetry";

export const app = fastify({
const app = fastify({
logger: env.ENABLE_FASTIFY_LOGGER,
});

Expand Down
147 changes: 135 additions & 12 deletions control-plane/src/modules/slack/index.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,167 @@
import { App } from '@slack/bolt';
import { App, KnownEventFromType, webApi } from '@slack/bolt';
import { FastifySlackReceiver } from './receiver';
import { env } from '../../utilities/env';
import { FastifyInstance } from 'fastify';
import { logger } from '../observability/logger';
import { getRunsByMetadata } from '../workflows/metadata';
import { addMessageAndResume, createRunWithMessage, Run } from '../workflows/workflows';
import { AuthenticationError } from '../../utilities/errors';
import { ulid } from 'ulid';

let app: App | undefined;

const THREAD_META_KEY = "stripeThreadTs";
const CHANNEL_META_KEY = "stripeChannel";

export const start = async (fastify: FastifyInstance) => {
const SLACK_CLUSTER_ID = env.SLACK_CLUSTER_ID
const SLACK_BOT_TOKEN = env.SLACK_BOT_TOKEN
const SLACK_SIGNING_SECRET = env.SLACK_SIGNING_SECRET

if (
!SLACK_CLUSTER_ID ||
!SLACK_BOT_TOKEN ||
!SLACK_SIGNING_SECRET
) {
logger.info("Missing Slack environment variables. Skipping Slack integration.");
return
}

app = new App({
token: env.SLACK_BOT_TOKEN,
receiver: new FastifySlackReceiver({
signingSecret: env.SLACK_SIGNING_SECRET,
signingSecret: SLACK_SIGNING_SECRET,
path: '/triggers/slack',
fastify,
})
});

app.event('app_mention', async ({ event }) => {
logger.info("Received app_mention event. Skipping.", event);
});

// Event listener for direct messages
app.event('message', async ({ event, client }) => {
logger.info("Received message event. Responding.", event);
try {
if (event.channel_type === 'im' && event.subtype !== 'bot_message') {
// Respond to the direct message
await client.chat.postMessage({
channel: event.channel,
text: `Thanks for tagging me! Let's discuss here.`,
thread_ts: event.ts, // Use the timestamp of the original message to create a thread
});
await authenticateUser(event, client);

if ('thread_ts' in event && event.thread_ts) {
const [existingRun] = await getRunsByMetadata({
clusterId: SLACK_CLUSTER_ID,
key: THREAD_META_KEY,
value: event.thread_ts,
limit: 1,
});

if (existingRun) {
await handleExistingThread({ event, client, run: existingRun });
return
}
}

await handleNewThread({
event,
client,
clusterId: SLACK_CLUSTER_ID
});
}
} catch (error) {
if (error instanceof AuthenticationError) {
client.chat.postMessage({
thread_ts: event.ts,
channel: event.channel,
text: "Sorry, I couldn't authenticate you. Please ensure you have an Inferable account with the same email as your Slack account."
})
return
}
logger.error('Error responding to DM', { error });
}
});

await app.start();
}

const handleNewThread = async ({
event,
client,
clusterId
}: {
event: KnownEventFromType<'message'>,
client: webApi.WebClient,
clusterId: string
}) => {
if ('text' in event && event.text) {
const run = await createRunWithMessage({
clusterId,
message: event.text,
type: "human",
metadata: {
[THREAD_META_KEY]: event.ts,
[CHANNEL_META_KEY]: event.channel
},
});

client.chat.postMessage({
thread_ts: event.ts,
channel: event.channel,
mrkdwn: true,
text: `On it. I will get back to you soon.\nRun ID: <${env.APP_ORIGIN}/clusters/${clusterId}/runs/${run.id}|${run.id}>`,
});
return;
}

throw new Error("Event had no text");
}

const handleExistingThread = async ({
event,
client,
run
}: {
event: KnownEventFromType<'message'>,
client: webApi.WebClient,
run: Run
}) => {
if ('text' in event && event.text) {
await addMessageAndResume({
id: ulid(),
clusterId: run.clusterId,
runId: run.id,
message: event.text,
type: "human",
})
return
}

throw new Error("Event had no text")
}

const authenticateUser = async (event: KnownEventFromType<'message'>, client: webApi.WebClient) => {
if ('user' in event && event.user) {
const user = await client.users.info({
user: event.user,
token: env.SLACK_BOT_TOKEN
})

const confirmed = user.user?.is_email_confirmed
const email = user.user?.profile?.email

if (!confirmed || !email) {
throw new AuthenticationError('Could not authenticate Slack user')
}

// TODO: Check user in Clerk
}
}

export const notifySlack = async (input: {
threadTs: string;
channel: string;
text: string;
}) => {
await app?.client.chat.postMessage({
thread_ts: input.threadTs,
channel: input.channel,
text: input.text,
});
}

export const stop = async () => await app?.stop();
4 changes: 2 additions & 2 deletions control-plane/src/modules/workflows/metadata.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createOwner } from "../test/util";
import { getWorkflowsByMetadata } from "./metadata";
import { getRunsByMetadata } from "./metadata";
import { createRun } from "./workflows";

describe("getWorkflowsByMetadata", () => {
Expand All @@ -24,7 +24,7 @@ describe("getWorkflowsByMetadata", () => {
},
});

const result = await getWorkflowsByMetadata({
const result = await getRunsByMetadata({
clusterId: owner.clusterId,
key: "foo",
value: "bar",
Expand Down
2 changes: 1 addition & 1 deletion control-plane/src/modules/workflows/metadata.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { and, eq, desc } from "drizzle-orm";
import { db, workflowMetadata, workflows } from "../data";

export const getWorkflowsByMetadata = async ({
export const getRunsByMetadata = async ({
clusterId,
key,
value,
Expand Down
4 changes: 2 additions & 2 deletions control-plane/src/modules/workflows/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { JsonSchemaInput } from "inferable/bin/types";
import { contract } from "../contract";
import { getJobReferences, getJobsForWorkflow } from "../jobs/jobs";
import * as events from "../observability/events";
import { getWorkflowsByMetadata } from "./metadata";
import { getRunsByMetadata } from "./metadata";
import { getRunMessagesForDisplay } from "./workflow-messages";
import {
createRetry,
Expand Down Expand Up @@ -320,7 +320,7 @@ export const runsRouter = initServer().router(
};
}

const result = await getWorkflowsByMetadata({
const result = await getRunsByMetadata({
clusterId,
key,
value,
Expand Down
5 changes: 3 additions & 2 deletions control-plane/src/utilities/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ const envSchema = z
ANTHROPIC_API_KEY: z.string().optional(),
COHERE_API_KEY: z.string().optional(),

SLACK_BOT_TOKEN: z.string(),
SLACK_SIGNING_SECRET: z.string(),
SLACK_BOT_TOKEN: z.string().optional(),
SLACK_SIGNING_SECRET: z.string().optional(),
SLACK_CLUSTER_ID: z.string().optional(),

SQS_RUN_PROCESS_QUEUE_URL: z.string(),
SQS_RUN_GENERATE_NAME_QUEUE_URL: z.string(),
Expand Down

0 comments on commit 5eec348

Please sign in to comment.