Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
nadeesha committed Dec 13, 2024
1 parent 0f2310d commit 61f1ac4
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 235 deletions.
162 changes: 8 additions & 154 deletions control-plane/src/modules/customer-telemetry.ts
Original file line number Diff line number Diff line change
@@ -1,170 +1,26 @@
import {
Message,
SendMessageBatchCommand,
SendMessageCommand,
} from "@aws-sdk/client-sqs";
import { Message } from "@aws-sdk/client-sqs";
import { Consumer } from "sqs-consumer";
import { z } from "zod";
import { env } from "../utilities/env";
import { logger } from "./observability/logger";
import { withSpan } from "./observability/tracer";
import { sqs } from "./sqs";
import crypto from "crypto";
import {
modelCallEventSchema,
runFeedbackEventSchema,
toolCallEventSchema,
} from "./integrations/integration-events";
import {
flushCluster,
processModelCall,
processRunFeedback,
processToolCall,
} from "./integrations/langfuse";
import { getIntegrations } from "./integrations/integrations";
import { createCache } from "../utilities/cache";
import {
modelCallEventSchema,
runFeedbackEventSchema,
toolCallEventSchema,
} from "./integrations/integration-events";
import { logger } from "./observability/logger";
import { sqs } from "./sqs";

const eventSchema = z.discriminatedUnion("type", [
modelCallEventSchema,
runFeedbackEventSchema,
toolCallEventSchema,
]);

type ObservabilityEvent = z.infer<typeof eventSchema>;

const customerTelemetryEnabledCache = createCache(
Symbol("customerTelemetryEnabled"),
);

async function hasCustomerTelemetryEnabled(clusterId: string) {
const cached = await customerTelemetryEnabledCache.get(clusterId);

if (cached === true || cached === false) {
return cached;
}

const integrations = await getIntegrations({ clusterId });
const enabled = !!integrations.langfuse;
await customerTelemetryEnabledCache.set(clusterId, enabled, 60);

return enabled;
}

async function sendToSQS(event: ObservabilityEvent) {
if (!(await hasCustomerTelemetryEnabled(event.clusterId))) {
return;
}

const queueUrl = env.SQS_CUSTOMER_TELEMETRY_QUEUE_URL;

return await withSpan(
"sqs.send_model_call_event",
async () => {
try {
const command = new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify({
...event,
runId: event.runId,
}),
MessageAttributes: {
eventType: {
DataType: "String",
StringValue: event.type,
},
data: {
DataType: "String",
StringValue: JSON.stringify(event),
},
},
});

const response = await sqs.send(command);
return response;
} catch (error) {
logger.error("Error sending model call event to SQS", {
error,
event,
queueUrl,
});
throw error;
}
},
{
attributes: {
"cluster.id": event.clusterId,
"run.id": event.runId,
"sqs.queue.url": queueUrl,
},
},
);
}

async function sendBatchToSQS(events: ObservabilityEvent[]) {
if (
events.some(
async (event) => !(await hasCustomerTelemetryEnabled(event.clusterId)),
)
) {
return;
}

const queueUrl = env.SQS_CUSTOMER_TELEMETRY_QUEUE_URL;

return await withSpan(
"sqs.send_batch_events",
async () => {
try {
// SQS batch requests are limited to 10 messages
const batchSize = 10;
const batches = [];

for (let i = 0; i < events.length; i += batchSize) {
const batch = events.slice(i, i + batchSize);
const command = new SendMessageBatchCommand({
QueueUrl: queueUrl,
Entries: batch.map((event) => ({
Id: `${event.runId}-${event.type}-${crypto.randomUUID()}`,
MessageBody: JSON.stringify({
...event,
runId: event.runId,
}),
MessageAttributes: {
eventType: {
DataType: "String",
StringValue: event.type,
},
data: {
DataType: "String",
StringValue: JSON.stringify(event),
},
},
})),
});

batches.push(sqs.send(command));
}

const responses = await Promise.all(batches);
return responses;
} catch (error) {
logger.error("Error sending batch events to SQS", {
error,
eventCount: events.length,
queueUrl,
});
throw error;
}
},
{
attributes: {
"sqs.queue.url": queueUrl,
"batch.size": events.length,
},
},
);
}

let consumer: Consumer | undefined;

const start = async () => {
Expand Down Expand Up @@ -240,8 +96,6 @@ export const stop = async () => {
};

export const customerTelemetry = {
track: sendToSQS,
trackBatch: sendBatchToSQS,
start,
stop,
};
11 changes: 11 additions & 0 deletions control-plane/src/modules/integrations/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export const toolhouseIntegration = "toolhouse";
export const langfuseIntegration = "langfuse";
export const tavilyIntegration = "tavily";

export const allowedIntegrations = [
toolhouseIntegration,
langfuseIntegration,
tavilyIntegration,
] as const;

export const externalServices = [toolhouseIntegration, tavilyIntegration];
43 changes: 5 additions & 38 deletions control-plane/src/modules/integrations/integrations.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,17 @@
import { eq, sql } from "drizzle-orm";
import { db, integrations } from "../data";
import { z } from "zod";
import { toolhouse } from "./toolhouse";
import { db, integrations } from "../data";
import { integrationSchema } from "./schema";
import { tavilyIntegration } from "./constants";
import { tavily } from "./tavily";

const toolhouseIntegration = "toolhouse";
const langfuseIntegration = "langfuse";
const tavilyIntegration = "tavily";

export const allowedIntegrations = [
toolhouseIntegration,
langfuseIntegration,
tavilyIntegration,
] as const;

export const externalServices = [toolhouse, tavily];
import { toolhouseIntegration } from "./constants";
import { toolhouse } from "./toolhouse";

export const integrationsLibs = {
[toolhouseIntegration]: toolhouse,
[tavilyIntegration]: tavily,
};

export const integrationSchema = z.object({
[toolhouseIntegration]: z
.object({
apiKey: z.string(),
})
.optional()
.nullable(),
[langfuseIntegration]: z
.object({
publicKey: z.string(),
secretKey: z.string(),
baseUrl: z.string(),
sendMessagePayloads: z.boolean(),
})
.optional()
.nullable(),
[tavilyIntegration]: z
.object({
apiKey: z.string(),
})
.optional()
.nullable(),
});

export const getIntegrations = async ({
clusterId,
}: {
Expand Down
47 changes: 21 additions & 26 deletions control-plane/src/modules/integrations/langfuse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ import {
runFeedbackEventSchema,
toolCallEventSchema,
} from "./integration-events";
import { createCache } from "../../utilities/cache";
import { integrationSchema } from "./schema";

const langfuseCache = new NodeCache({
maxKeys: 100,
});

const integrationsCache = createCache<z.infer<typeof integrationSchema>>(
Symbol("langfuseIntegrations"),
);

export async function getLangfuseClient(clusterId: string) {
const cachedClient = langfuseCache.get<{
client: Langfuse;
Expand All @@ -22,12 +28,17 @@ export async function getLangfuseClient(clusterId: string) {
return cachedClient;
}

const integrations = await getIntegrations({
clusterId,
});
let integrations = await integrationsCache.get(clusterId);

if (!integrations) {
integrations = await getIntegrations({
clusterId,
});

integrationsCache.set(clusterId, integrations, 60);
}

if (!integrations.langfuse) {
logger.error("No Langfuse integration found", { clusterId });
return;
}

Expand Down Expand Up @@ -66,27 +77,22 @@ export async function processModelCall(
event: z.infer<typeof modelCallEventSchema>,
) {
const langfuse = await getLangfuseClient(event.clusterId);
if (!langfuse) {
logger.error("No Langfuse client found", { event });

return;
}

const trace = langfuse.client.trace({
const trace = langfuse?.client.trace({
id: event.runId,
name: `run:${event.runId}`,
});

trace.generation({
trace?.generation({
name: event.purpose ?? "model_call.generic",
startTime: new Date(event.startedAt),
endTime: new Date(event.completedAt),
model: event.model,
modelParameters: {
temperature: event.temperature,
},
input: langfuse.sendMessagePayloads ? event.input : undefined,
output: langfuse.sendMessagePayloads ? event.output : undefined,
input: langfuse?.sendMessagePayloads ? event.input : undefined,
output: langfuse?.sendMessagePayloads ? event.output : undefined,
usage: {
promptTokens: event.inputTokens,
completionTokens: event.outputTokens,
Expand All @@ -98,13 +104,8 @@ export async function processRunFeedback(
event: z.infer<typeof runFeedbackEventSchema>,
) {
const langfuse = await getLangfuseClient(event.clusterId);
if (!langfuse) {
logger.error("No Langfuse client found", { event });

return;
}

langfuse.client.score({
langfuse?.client.score({
traceId: event.runId,
name: "user_feedback",
value: event.score,
Expand All @@ -117,13 +118,7 @@ export async function processToolCall(
) {
const langfuse = await getLangfuseClient(event.clusterId);

if (!langfuse) {
logger.error("No Langfuse client found", { event });

return;
}

langfuse.client.span({
langfuse?.client.span({
name: `${event.toolName}()`,
startTime: new Date(event.startedAt),
endTime: new Date(event.completedAt),
Expand Down
30 changes: 30 additions & 0 deletions control-plane/src/modules/integrations/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { z } from "zod";
import {
toolhouseIntegration,
langfuseIntegration,
tavilyIntegration,
} from "./constants";

export const integrationSchema = z.object({
[toolhouseIntegration]: z
.object({
apiKey: z.string(),
})
.optional()
.nullable(),
[langfuseIntegration]: z
.object({
publicKey: z.string(),
secretKey: z.string(),
baseUrl: z.string(),
sendMessagePayloads: z.boolean(),
})
.optional()
.nullable(),
[tavilyIntegration]: z
.object({
apiKey: z.string(),
})
.optional()
.nullable(),
});
Loading

0 comments on commit 61f1ac4

Please sign in to comment.