Skip to content

Commit

Permalink
feat: Slack Run triggering (#297)
Browse files Browse the repository at this point in the history
* feat: Add intial slack listener

* chore: Register Slack as plugin

* feat: Initial thread management
  • Loading branch information
johnjcsmith authored and nadeesha committed Dec 14, 2024
1 parent 0171cf0 commit 53584c0
Show file tree
Hide file tree
Showing 9 changed files with 1,211 additions and 5 deletions.
853 changes: 853 additions & 0 deletions control-plane/package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions control-plane/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@langchain/cohere": "^0.3.1",
"@langchain/langgraph": "^0.1.9",
"@openapi-contrib/openapi-schema-to-json-schema": "^5.1.0",
"@slack/bolt": "^4.1.1",
"@toolhouseai/sdk": "^1.0.4",
"@ts-rest/core": "^3.27.0",
"@ts-rest/fastify": "3.45.2",
Expand Down
3 changes: 3 additions & 0 deletions control-plane/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import * as externalCalls from "./modules/jobs/external";
import * as models from "./modules/models/routing";
import { logContext, logger } from "./modules/observability/logger";
import * as workflows from "./modules/workflows/workflows";
import * as slack from "./modules/slack";
import { hdx } from "./modules/observability/hyperdx";
import { pg } from "./modules/data";
import { addAttributes } from "./modules/observability/tracer";
Expand Down Expand Up @@ -148,6 +149,7 @@ const startTime = Date.now();
customerTelemetry.start(),
toolhouse.start(),
externalCalls.start(),
slack.start(app),
...(env.EE_DEPLOYMENT
? [
flagsmith?.getEnvironmentFlags(),
Expand Down Expand Up @@ -192,6 +194,7 @@ process.on("SIGTERM", async () => {
redis.stop(),
customerTelemetry.stop(),
externalCalls.stop(),
slack.stop(),
]);

logger.info("Shutdown complete");
Expand Down
191 changes: 191 additions & 0 deletions control-plane/src/modules/slack/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import { App, KnownEventFromType, webApi } from '@slack/bolt';
import { FastifySlackReceiver } from './receiver';
import { env } from '../../utilities/env';
import { FastifyInstance } from 'fastify';
import { logger } from '../observability/logger';
import { getRunsByMetadata } from '../workflows/metadata';
import { addMessageAndResume, createRunWithMessage, Run } from '../workflows/workflows';
import { AuthenticationError } from '../../utilities/errors';
import { ulid } from 'ulid';

let app: App | undefined;

const THREAD_META_KEY = "stripeThreadTs";
const CHANNEL_META_KEY = "stripeChannel";

type MessageEvent = {
event: KnownEventFromType<'message'>,
client: webApi.WebClient
clusterId: string
}

export const start = async (fastify: FastifyInstance) => {
const SLACK_CLUSTER_ID = env.SLACK_CLUSTER_ID
const SLACK_BOT_TOKEN = env.SLACK_BOT_TOKEN
const SLACK_SIGNING_SECRET = env.SLACK_SIGNING_SECRET

if (
!SLACK_CLUSTER_ID ||
!SLACK_BOT_TOKEN ||
!SLACK_SIGNING_SECRET
) {
logger.info("Missing Slack environment variables. Skipping Slack integration.");
return
}

app = new App({
token: env.SLACK_BOT_TOKEN,
receiver: new FastifySlackReceiver({
signingSecret: SLACK_SIGNING_SECRET,
path: '/triggers/slack',
fastify,
})
});

// Event listener for direct messages
app.event('message', async ({ event, client }) => {
logger.info("Received message event. Responding.", event);

if (isBotMessage(event)) {
logger.info("Received message from bot. Ignoring.", event);
return
}

if (!isDirectMessage(event)) {
logger.info("Received message from channel. Ignoring.", event);
return
}

try {
await authenticateUser(event, client);

if (hasThread(event)) {
const [run] = await getRunsByMetadata({
clusterId: SLACK_CLUSTER_ID,
key: THREAD_META_KEY,
value: event.thread_ts,
limit: 1,
});

if (run) {
await handleExistingThread({
event,
client,
run,
clusterId: SLACK_CLUSTER_ID
});
return
}
}

await handleNewThread({
event,
client,
clusterId: SLACK_CLUSTER_ID
});

} catch (error) {

if (error instanceof AuthenticationError) {
client.chat.postMessage({
thread_ts: event.ts,
channel: event.channel,
text: "Sorry, I couldn't authenticate you. Please ensure you have an Inferable account with the same email as your Slack account."
})
return
}

logger.error('Error responding to Direct Message', { error });
}
});

await app.start();
}

export const stop = async () => await app?.stop();

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const hasThread = (e: any): e is { thread_ts: string } => {
return typeof e?.thread_ts === 'string';
}

const hasUser = (e: any): e is { user: string } => {
return typeof e?.user === 'string';
}

const isDirectMessage = (e: KnownEventFromType<'message'>): boolean => {
return e.channel_type === 'im';
}

const isBotMessage = (e: KnownEventFromType<'message'>): boolean => {
return e.subtype === 'bot_message';
}

const handleNewThread = async ({
event,
client,
clusterId
}: MessageEvent) => {

if ('text' in event && event.text) {
const run = await createRunWithMessage({
clusterId,
message: event.text,
type: "human",
metadata: {
[THREAD_META_KEY]: event.ts,
[CHANNEL_META_KEY]: event.channel
},
});

client.chat.postMessage({
thread_ts: event.ts,
channel: event.channel,
mrkdwn: true,
text: `On it. I will get back to you soon.\nRun ID: <${env.APP_ORIGIN}/clusters/${clusterId}/runs/${run.id}|${run.id}>`,
});
return;
}

throw new Error("Event had no text");
}

const handleExistingThread = async ({
event,
run,
} : MessageEvent & { run: Run }) => {
if ('text' in event && event.text) {
await addMessageAndResume({
id: ulid(),
clusterId: run.clusterId,
runId: run.id,
message: event.text,
type: "human",
})
return
}

throw new Error("Event had no text")
}

const authenticateUser = async (event: KnownEventFromType<'message'>, client: webApi.WebClient) => {
if (hasUser(event)) {
const user = await client.users.info({
user: event.user,
token: env.SLACK_BOT_TOKEN
})

const confirmed = user.user?.is_email_confirmed
const email = user.user?.profile?.email

if (!confirmed || !email) {
throw new AuthenticationError('Could not authenticate Slack user')
}

// TODO: Verify user in Clerk
return
}

throw new Error("Event had no user")
}

154 changes: 154 additions & 0 deletions control-plane/src/modules/slack/receiver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import {
App,
Receiver,
ReceiverEvent,
BufferedIncomingMessage,
HTTPModuleFunctions as boltHelpers,
HTTPResponseAck,
Logger,
LogLevel,
} from '@slack/bolt'
import { FastifyInstance, FastifyPluginCallback, FastifyReply, FastifyRequest } from 'fastify';
import { logger } from '../observability/logger';

const slackLogger: Logger = {
debug: (message: string) => logger.debug(message),
error: (message: string) => logger.error(message),
info: (message: string) => logger.info(message),
warn: (message: string) => logger.warn(message),
getLevel: () => LogLevel.INFO,
setLevel: () => void 0,
setName: () => void 0,
}

type FastifySlackReceiverParams = {
fastify: FastifyInstance
path: string
signingSecret: string
}

export class FastifySlackReceiver implements Receiver {
private fastify: FastifyInstance;
private app?: App;
private path: string
private signingSecret: string

constructor({
path,
fastify,
signingSecret,
}: FastifySlackReceiverParams) {
this.fastify = fastify;
this.path = path
this.signingSecret = signingSecret
}

init(app: App) {
this.app = app;
}

async start() {
logger.info("Registering Slack receiver")

// Register a seperate plugin and disable the content type parsers for the route
const slackPlugin: FastifyPluginCallback = async (instance) => {
const contentTypes = ['application/json', 'application/x-www-form-urlencoded'];

instance.removeContentTypeParser(contentTypes);
instance.addContentTypeParser(contentTypes, { parseAs: 'string' }, instance.defaultTextParser);

instance.post('', (request, reply) => this.requestHandler(request, reply));
};

this.fastify.register(slackPlugin, { prefix: this.path });
}

async stop() {
this.fastify.server.close((err) => {
if (err) {
logger.error("Failed to stop Slack receiver gracefully", {
error: err,
})
}
})
}

async requestHandler(request: FastifyRequest, reply: FastifyReply) {

const req = request.raw;
const res = reply.raw;

try {
// Verify authenticity
let bufferedReq: BufferedIncomingMessage;
try {
if (typeof request.body !== "string") {
throw new Error("Expected Slack request body to be a string");
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
(req as any).rawBody = Buffer.from(request.body);

bufferedReq = await boltHelpers.parseAndVerifyHTTPRequest(
{
enabled: true,
signingSecret: this.signingSecret,
},
req,
);
} catch (error) {
logger.warn("Failed to parse and verify Slack request", {
error,
});
boltHelpers.buildNoBodyResponse(res, 401);
return;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
let body: any;
try {
body = boltHelpers.parseHTTPRequestBody(bufferedReq);
} catch (error) {
logger.warn("Malformed Slack request", {
error,
});
boltHelpers.buildNoBodyResponse(res, 400);
return;
}

if (body.ssl_check) {
boltHelpers.buildSSLCheckResponse(res);
return;
}

if (body.type === 'url_verification') {
boltHelpers.buildUrlVerificationResponse(res, body);
return;
}

const ack = new HTTPResponseAck({
logger: slackLogger,
processBeforeResponse: false,
unhandledRequestHandler: () => {
logger.warn("Unhandled Slack request");
},
httpRequest: bufferedReq,
httpResponse: res,
});

const event: ReceiverEvent = {
body,
ack: ack.bind(),
retryNum: boltHelpers.extractRetryNumFromHTTPRequest(req),
retryReason: boltHelpers.extractRetryReasonFromHTTPRequest(req),
};

await this.app?.processEvent(event);

} catch (error) {
logger.error("Failed to handle Slack request", {
error,
})
}
};
}
4 changes: 2 additions & 2 deletions control-plane/src/modules/workflows/metadata.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createOwner } from "../test/util";
import { getWorkflowsByMetadata } from "./metadata";
import { getRunsByMetadata } from "./metadata";
import { createRun } from "./workflows";

describe("getWorkflowsByMetadata", () => {
Expand All @@ -24,7 +24,7 @@ describe("getWorkflowsByMetadata", () => {
},
});

const result = await getWorkflowsByMetadata({
const result = await getRunsByMetadata({
clusterId: owner.clusterId,
key: "foo",
value: "bar",
Expand Down
Loading

0 comments on commit 53584c0

Please sign in to comment.