Skip to content

Commit

Permalink
feat: Attribute Slack messages with Clerk user
Browse files Browse the repository at this point in the history
  • Loading branch information
johnjcsmith committed Dec 29, 2024
1 parent fe45248 commit a9908e1
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 43 deletions.
4 changes: 4 additions & 0 deletions control-plane/.env.base
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ SQS_CUSTOMER_TELEMETRY_QUEUE_URL='http://localhost:9324/000000000000/customer-te
SQS_EXTERNAL_TOOL_CALL_QUEUE_URL='http://localhost:9324/000000000000/external-tool-call'

# JWKS_URL=
# CLERK_SECRET_KEY=
# MANAGEMENT_API_SECRET=

# ANTHROPIC_API_KEY=
# COHERE_API_KEY=
# BEDROCK_AVAILABLE=

# NANGO_SECRET_KEY=
# SLACK_SIGNING_SECRET
5 changes: 2 additions & 3 deletions control-plane/src/modules/auth/auth.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import * as clusterAuth from "./cluster";
import * as clerkAuth from "./clerk";
import {
Auth,
ClerkAuth,
extractAuthState,
extractCustomAuthState,
} from "./auth";
Expand Down Expand Up @@ -363,14 +362,14 @@ describe("extractAuthState", () => {
});

run1 = await createRunWithMessage({
user: owner1AuthState! as ClerkAuth,
userId: owner1AuthState!.entityId,
clusterId: owner1.clusterId,
message: "hello",
type: "human",
});

run2 = await createRunWithMessage({
user: owner2AuthState! as ClerkAuth,
userId: owner2AuthState!.entityId,
clusterId: owner2.clusterId,
message: "hello",
type: "human",
Expand Down
14 changes: 11 additions & 3 deletions control-plane/src/modules/integrations/slack/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type MessageEvent = {
event: KnownEventFromType<"message">;
client: webApi.WebClient;
clusterId: string;
userId?: string;
};

export const slack: InstallableIntegration = {
Expand Down Expand Up @@ -210,17 +211,19 @@ export const start = async (fastify: FastifyInstance) => {
return;
}

await authenticateUser(event, client, integration);
const user = await authenticateUser(event, client, integration);

try {
if (hasThread(event)) {
await handleExistingThread({
userId: user?.id,
event,
client,
clusterId: integration.cluster_id,
});
} else {
await handleNewThread({
userId: user?.id,
event,
client,
clusterId: integration.cluster_id,
Expand Down Expand Up @@ -355,7 +358,7 @@ const deleteNangoConnection = async (connectionId: string) => {
);
};

const handleNewThread = async ({ event, client, clusterId }: MessageEvent) => {
const handleNewThread = async ({ event, client, clusterId, userId }: MessageEvent) => {
let thread = event.ts;
// If this message is part of a thread, associate the run with the thread rather than the message
if (hasThread(event)) {
Expand All @@ -364,6 +367,7 @@ const handleNewThread = async ({ event, client, clusterId }: MessageEvent) => {

if ("text" in event && event.text) {
const run = await createRunWithMessage({
userId,
clusterId,
message: event.text,
type: "human",
Expand All @@ -386,7 +390,7 @@ const handleNewThread = async ({ event, client, clusterId }: MessageEvent) => {
throw new Error("Event had no text");
};

const handleExistingThread = async ({ event, client, clusterId }: MessageEvent) => {
const handleExistingThread = async ({ event, client, clusterId, userId }: MessageEvent) => {
if ("text" in event && event.text) {
if (!hasThread(event)) {
throw new Error("Event had no thread_ts");
Expand All @@ -402,6 +406,7 @@ const handleExistingThread = async ({ event, client, clusterId }: MessageEvent)
// Message is within a thread which already has a Run, continue
if (run) {
await addMessageAndResume({
userId,
id: ulid(),
clusterId: run.clusterId,
runId: run.id,
Expand All @@ -412,6 +417,7 @@ const handleExistingThread = async ({ event, client, clusterId }: MessageEvent)
// Message is in a thread, but does't have a Run, start a new one
// TODO: Inferable doesn't have context for the original message, we should include this
await handleNewThread({
userId,
event,
client,
clusterId,
Expand Down Expand Up @@ -460,4 +466,6 @@ const authenticateUser = async (event: KnownEventFromType<"message">, client: we
logger.info("Could not find Slack user in Clerk.");
throw new AuthenticationError("Could not authenticate Slack user");
}

return clerkUser;
};
4 changes: 2 additions & 2 deletions control-plane/src/modules/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ export const router = initServer().router(contract, {

await addMessageAndResume({
id: id ?? ulid(),
user: auth,
userId: auth?.entityId,
clusterId,
runId,
message,
Expand Down Expand Up @@ -300,7 +300,7 @@ export const router = initServer().router(contract, {

const messages = await editHumanMessage({
id: messageId,
user: auth,
userId: auth.entityId,
clusterId,
runId,
message,
Expand Down
8 changes: 3 additions & 5 deletions control-plane/src/modules/workflows/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { getRunsByMetadata } from "./metadata";
import { getRunMessagesForDisplay } from "./workflow-messages";
import {
createRetry,
createRunWithMessage,
deleteRun,
getClusterWorkflows,
getRunConfigMetrics,
Expand All @@ -21,7 +20,6 @@ import { posthog } from "../posthog";
import {
RunOptions,
getRunConfig,
listRunConfigs,
mergeRunConfigOptions,
validateSchema,
} from "../prompt-templates";
Expand Down Expand Up @@ -139,11 +137,11 @@ export const runsRouter = initServer().router(
runOptions.initialPrompt = `${runOptions.initialPrompt}\n\n<DATA>\n${JSON.stringify(runOptions.input, null, 2)}\n</DATA>`;
}

let customAuth = auth.type === "custom" ? auth.isCustomAuth() : undefined;
const customAuth = auth.type === "custom" ? auth.isCustomAuth() : undefined;

const workflow = await createRun({
runId: runOptions.runId,
user: auth,
userId: auth.entityId,
clusterId,

name: body.name,
Expand Down Expand Up @@ -175,7 +173,7 @@ export const runsRouter = initServer().router(
if (runOptions.initialPrompt) {
await addMessageAndResume({
id: ulid(),
user: auth,
userId: auth.entityId,
clusterId,
runId: workflow.id,
message: runOptions.initialPrompt,
Expand Down
24 changes: 11 additions & 13 deletions control-plane/src/modules/workflows/workflow-messages.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import assert from "assert";
import { InferSelectModel, and, asc, desc, eq, gt, ne } from "drizzle-orm";
import { Auth, ClerkAuth } from "../auth/auth";
import { InferSelectModel, and, desc, eq, gt, ne } from "drizzle-orm";
import { db, RunMessageMetadata, workflowMessages } from "../data";
import { deleteJobsAfter } from "../jobs/jobs";
import { resumeRun } from "./workflows";
Expand All @@ -13,7 +12,6 @@ import {
} from "../contract";
import { logger } from "../observability/logger";
import Anthropic from "@anthropic-ai/sdk";
import { get, reduce } from "lodash";

export type MessageData = z.infer<typeof messageDataSchema>;

Expand Down Expand Up @@ -55,14 +53,14 @@ export type RunMessage = {

export const insertRunMessage = async ({
clusterId,
user,
userId,
runId,
id,
type,
data,
}: {
id: string;
user?: ClerkAuth;
userId?: string;
clusterId: string;
runId: string;
type: InferSelectModel<typeof workflowMessages>["type"];
Expand All @@ -73,7 +71,7 @@ export const insertRunMessage = async ({
.insert(workflowMessages)
.values({
id,
user_id: user?.entityId ?? "SYSTEM",
user_id: userId ?? "SYSTEM",
cluster_id: clusterId,
workflow_id: runId,
type,
Expand All @@ -83,15 +81,15 @@ export const insertRunMessage = async ({
};

export const upsertRunMessage = async ({
user,
userId,
clusterId,
runId,
id,
type,
data,
metadata,
}: {
user?: Auth;
userId?: string;
id: string;
clusterId: string;
runId: string;
Expand All @@ -106,7 +104,7 @@ export const upsertRunMessage = async ({
.values([
{
id,
user_id: user?.entityId ?? "SYSTEM",
user_id: userId ?? "SYSTEM",
cluster_id: clusterId,
workflow_id: runId,
type,
Expand All @@ -126,7 +124,7 @@ export const upsertRunMessage = async ({
data,
updated_at: new Date(),
metadata,
user_id: user?.entityId ?? "SYSTEM",
user_id: userId ?? "SYSTEM",
},
})
.returning({
Expand All @@ -143,19 +141,19 @@ export const upsertRunMessage = async ({
export const editHumanMessage = async ({
clusterId,
runId,
user,
userId,
message,
id,
}: {
clusterId: string;
user?: Auth;
userId?: string;
runId: string;
message: string;
id: string;
}) => {
const [upserted] = await upsertRunMessage({
clusterId,
user,
userId,
runId,
data: {
message,
Expand Down
18 changes: 9 additions & 9 deletions control-plane/src/modules/workflows/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export type Run = {

export const createRun = async ({
runId,
user,
userId,
clusterId,
name,
test,
Expand All @@ -87,7 +87,7 @@ export const createRun = async ({
enableResultGrounding,
}: {
runId?: string;
user?: Auth;
userId?: string;
clusterId: string;
name?: string;
systemPrompt?: string;
Expand Down Expand Up @@ -131,7 +131,7 @@ export const createRun = async ({
id: runId ?? ulid(),
cluster_id: clusterId,
status: "pending",
user_id: user?.entityId ?? "SYSTEM",
user_id: userId ?? "SYSTEM",
...(name ? { name } : {}),
debug: debugQuery.debug,
system_prompt: systemPrompt,
Expand Down Expand Up @@ -361,7 +361,7 @@ export const getWorkflowDetail = async ({
};

export const addMessageAndResume = async ({
user,
userId,
id,
clusterId,
runId,
Expand All @@ -370,7 +370,7 @@ export const addMessageAndResume = async ({
metadata,
skipAssert,
}: {
user?: Auth;
userId?: string;
id: string;
clusterId: string;
runId: string;
Expand All @@ -384,7 +384,7 @@ export const addMessageAndResume = async ({
}

await upsertRunMessage({
user,
userId,
clusterId,
runId,
data: {
Expand Down Expand Up @@ -470,7 +470,7 @@ export type RunMessage = {

export const createRunWithMessage = async ({
runId,
user,
userId,
clusterId,
message,
systemPrompt,
Expand All @@ -496,7 +496,7 @@ export const createRunWithMessage = async ({
}: Parameters<typeof createRun>[0] & RunMessage) => {
const workflow = await createRun({
runId,
user,
userId,
clusterId,
name,
test,
Expand All @@ -520,7 +520,7 @@ export const createRunWithMessage = async ({

await addMessageAndResume({
id: ulid(),
user,
userId,
clusterId,
runId: workflow.id,
message,
Expand Down
8 changes: 0 additions & 8 deletions control-plane/src/utilities/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,6 @@ const envSchema = z
}
}

if (!!value.JWKS_URL !== !!value.CLERK_SECRET_KEY) {
return ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "JWKS_URL and CLERK_SECRET_KEY must be provided together",
path: ["JWKS_URL", "CLERK_SECRET_KEY"],
});
}

if (!value.EE_DEPLOYMENT) {
return;
}
Expand Down

0 comments on commit a9908e1

Please sign in to comment.