Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
nadeesha committed Dec 14, 2024
1 parent e897bd0 commit 5a02798
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 51 deletions.
17 changes: 8 additions & 9 deletions control-plane/src/modules/machines.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { upsertMachine } from "./machines";
import {
createCluster,
getClusterMachines,
getClusterServices,
} from "./management";
import { createCluster, getClusterMachines } from "./management";
import * as redis from "./redis";
import { upsertServiceDefinition } from "./service-definitions";
import {
getServiceDefinitions,
upsertServiceDefinition,
} from "./service-definitions";

describe("machines", () => {
beforeAll(async () => {
Expand Down Expand Up @@ -60,10 +59,10 @@ describe("machines", () => {
);

expect(
await getClusterServices({
await getServiceDefinitions({
clusterId: id,
}),
).toHaveLength(2);
}).then((services) => services.map((s) => s.service)),
).toEqual(services);
});
});
});
23 changes: 0 additions & 23 deletions control-plane/src/modules/management.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,29 +187,6 @@ export const getClusterDetails = async ({
};
};

export const getClusterServices = async ({
clusterId,
}: {
clusterId: string;
}) => {
const services = await data.db
.select({
definition: data.services.definition,
timestamp: data.services.timestamp,
})
.from(data.services)
.where(eq(data.services.cluster_id, clusterId));

const serviceDefinitions = storedServiceDefinitionSchema.parse(
services.map((s) => ({
...(s.definition as object),
timestamp: s.timestamp,
})),
);

return serviceDefinitions;
};

export const getClusterMachines = async ({
clusterId,
}: {
Expand Down
2 changes: 1 addition & 1 deletion control-plane/src/modules/observability/hyperdx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as HyperDX from "@hyperdx/node-opentelemetry";

export const hdx = env.HYPERDX_API_KEY ? HyperDX : null;

if (!hdx) {
if (!hdx && env.NODE_ENV === "production") {
// eslint-disable-next-line no-console
console.log("HyperDX is not configured");
}
Expand Down
2 changes: 1 addition & 1 deletion control-plane/src/modules/prompt-templates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export async function upsertRunConfig({
name,
initial_prompt: initialPrompt,
system_prompt: systemPrompt,
attached_functions: attachedFunctions,
attached_functions: attachedFunctions ?? [],
result_schema: resultSchema,
input_schema: inputSchema,
})
Expand Down
21 changes: 18 additions & 3 deletions control-plane/src/modules/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ import {
} from "./knowledge/knowledgebase";
import { callsRouter } from "./calls/router";
import { buildModel } from "./models";
import { deserializeFunctionSchema } from "./service-definitions";
import {
deserializeFunctionSchema,
getServiceDefinitions,
} from "./service-definitions";
import { integrationsRouter } from "./integrations/router";
import { getServerStats } from "./server-stats";

Expand Down Expand Up @@ -439,13 +442,25 @@ export const router = initServer().router(contract, {
const user = request.request.getAuth();
await user.canAccess({ cluster: { clusterId } });

const services = await management.getClusterServices({
const services = await getServiceDefinitions({
clusterId,
});

const transformedServices = services.map((service) => ({
name: service.service,
timestamp: service.timestamp ?? new Date(),
description: service.definition.description,
functions: service.definition.functions?.map((fn) => ({
name: fn.name,
description: fn.description,
schema: fn.schema,
config: fn.config,
})),
}));

return {
status: 200,
body: services,
body: transformedServices,
};
},
getBlobData: async (request) => {
Expand Down
21 changes: 14 additions & 7 deletions control-plane/src/modules/service-definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ export const storedServiceDefinitionSchema = z.array(
z.object({
name: z.string(),
description: z.string().optional(),
timestamp: z.date(),
functions: z
.array(
z.object({
Expand Down Expand Up @@ -194,10 +193,18 @@ export const getServiceDefinition = async ({

export const getServiceDefinitions = async (owner: {
clusterId: string;
}): Promise<ServiceDefinition[]> => {
}): Promise<
{
service: string;
definition: ServiceDefinition;
timestamp: Date | null;
}[]
> => {
const serviceDefinitions = await data.db
.select({
service: data.services.service,
definition: data.services.definition,
timestamp: data.services.timestamp,
})
.from(data.services)
.where(eq(data.services.cluster_id, owner.clusterId));
Expand All @@ -210,11 +217,11 @@ export const getServiceDefinitions = async (owner: {
return [];
}

const retrieved = parseServiceDefinition(
serviceDefinitions.map((d) => d.definition),
);

return retrieved;
return serviceDefinitions.map((r) => ({
service: r.service,
definition: parseServiceDefinition([r.definition])[0],
timestamp: r.timestamp,
}));
};

export const parseServiceDefinition = (
Expand Down
17 changes: 10 additions & 7 deletions control-plane/src/modules/workflows/agent/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ import { events } from "../../observability/events";
/**
* Run a workflow from the most recent saved state
**/
export const processRun = async (run: Run, metadata?: Record<string, string>) => {
export const processRun = async (
run: Run,
metadata?: Record<string, string>,
) => {
logger.info("Running workflow");

// Parallelize fetching additional context and service definitions
Expand Down Expand Up @@ -64,15 +67,15 @@ export const processRun = async (run: Run, metadata?: Record<string, string>) =>
allAvailableTools.push(...attachedFunctions);

serviceDefinitions.flatMap((service) =>
(service.functions ?? []).forEach((f) => {
(service.definition.functions ?? []).forEach((f) => {
// Do not attach additional tools if `attachedFunctions` is provided
if (attachedFunctions.length > 0 || f.config?.private) {
return;
}

allAvailableTools.push(
serviceFunctionEmbeddingId({
serviceName: service.name,
serviceName: service.definition.name,
functionName: f.name,
}),
);
Expand Down Expand Up @@ -167,8 +170,8 @@ export const processRun = async (run: Run, metadata?: Record<string, string>) =>
for (const message of state.messages.filter((m) => !m.persisted)) {
await Promise.all([
insertRunMessage(message),
notifyNewMessage({message, metadata}),
])
notifyNewMessage({ message, metadata }),
]);
message.persisted = true;
}
},
Expand Down Expand Up @@ -447,7 +450,7 @@ export const buildMockTools = async (workflow: Run) => {
}

const serviceDefinition = serviceDefinitions.find(
(sd) => sd.name === serviceName,
(sd) => sd.service === serviceName,
);

if (!serviceDefinition) {
Expand All @@ -461,7 +464,7 @@ export const buildMockTools = async (workflow: Run) => {
continue;
}

const functionDefinition = serviceDefinition.functions?.find(
const functionDefinition = serviceDefinition.definition.functions?.find(
(f) => f.name === functionName,
);

Expand Down

0 comments on commit 5a02798

Please sign in to comment.