Skip to content

Commit

Permalink
fix(langchain-core): Pick runnable config keys in asynclocalstorage (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bracesproul authored Dec 5, 2024
1 parent 560e451 commit 217d788
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 17 deletions.
12 changes: 8 additions & 4 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import {
type TraceableFunction,
isTraceableFunction,
} from "langsmith/singletons/traceable";
import type { RunnableInterface, RunnableBatchOptions } from "./types.js";
import type {
RunnableInterface,
RunnableBatchOptions,
RunnableConfig,
} from "./types.js";
import { CallbackManagerForChainRun } from "../callbacks/manager.js";
import {
LogStreamCallbackHandler,
Expand All @@ -33,11 +37,11 @@ import {
import { raceWithSignal } from "../utils/signal.js";
import {
DEFAULT_RECURSION_LIMIT,
RunnableConfig,
ensureConfig,
getCallbackManagerForConfig,
mergeConfigs,
patchConfig,
pickRunnableConfigKeys,
} from "./config.js";
import { AsyncCaller } from "../utils/async_caller.js";
import { Run } from "../tracers/base.js";
Expand Down Expand Up @@ -2529,7 +2533,7 @@ export class RunnableLambda<
recursionLimit: (config?.recursionLimit ?? DEFAULT_RECURSION_LIMIT) - 1,
});
void AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
pickRunnableConfigKeys(childConfig),
async () => {
try {
let output = await this.func(input, {
Expand Down Expand Up @@ -2627,7 +2631,7 @@ export class RunnableLambda<
const output = await new Promise<RunOutput | Runnable>(
(resolve, reject) => {
void AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
pickRunnableConfigKeys(childConfig),
async () => {
try {
const res = await this.func(finalChunk as RunInput, {
Expand Down
18 changes: 18 additions & 0 deletions langchain-core/src/runnables/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,21 @@ export function patchConfig<CallOptions extends RunnableConfig>(
}
return newConfig;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function pickRunnableConfigKeys<CallOptions extends Record<string, any>>(
config?: CallOptions
): Partial<RunnableConfig> | undefined {
return config
? {
configurable: config.configurable,
recursionLimit: config.recursionLimit,
callbacks: config.callbacks,
tags: config.tags,
metadata: config.metadata,
maxConcurrency: config.maxConcurrency,
timeout: config.timeout,
signal: config.signal,
}
: undefined;
}
1 change: 1 addition & 0 deletions langchain-core/src/runnables/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export {
patchConfig,
ensureConfig,
mergeConfigs,
pickRunnableConfigKeys,
} from "./config.js";
export { RunnablePassthrough } from "./passthrough.js";
export { type RouterInput, RouterRunnable } from "./router.js";
Expand Down
7 changes: 4 additions & 3 deletions langchain-core/src/runnables/iter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { RunnableConfig } from "../runnables/types.js";
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";
import { RunnableConfig } from "./config.js";
import { pickRunnableConfigKeys } from "./config.js";

export function isIterableIterator(
thing: unknown
Expand Down Expand Up @@ -36,7 +37,7 @@ export function* consumeIteratorInContext<T>(
): IterableIterator<T> {
while (true) {
const { value, done } = AsyncLocalStorageProviderSingleton.runWithConfig(
context,
pickRunnableConfigKeys(context),
iter.next.bind(iter),
true
);
Expand All @@ -56,7 +57,7 @@ export async function* consumeAsyncIterableInContext<T>(
while (true) {
const { value, done } =
await AsyncLocalStorageProviderSingleton.runWithConfig(
context,
pickRunnableConfigKeys(context),
iterator.next.bind(iter),
true
);
Expand Down
2 changes: 1 addition & 1 deletion langchain-core/src/runnables/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { z } from "zod";
import type { IterableReadableStreamInterface } from "../utils/stream.js";
import type { SerializableInterface } from "../load/serializable.js";
import type { BaseCallbackConfig } from "../callbacks/manager.js";
import type { IterableReadableStreamInterface } from "../types/stream.js";

export type RunnableBatchOptions = {
/** @deprecated Pass in via the standard runnable config object instead */
Expand Down
5 changes: 3 additions & 2 deletions langchain-core/src/tools/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import {
ensureConfig,
patchConfig,
pickRunnableConfigKeys,
type RunnableConfig,
} from "../runnables/config.js";
import type { RunnableFunc, RunnableInterface } from "../runnables/base.js";
Expand Down Expand Up @@ -594,7 +595,7 @@ export function tool<
callbacks: runManager?.getChild(),
});
void AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
pickRunnableConfigKeys(childConfig),
async () => {
try {
// TS doesn't restrict the type here based on the guard above
Expand Down Expand Up @@ -625,7 +626,7 @@ export function tool<
callbacks: runManager?.getChild(),
});
void AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
pickRunnableConfigKeys(childConfig),
async () => {
try {
// TS doesn't restrict the type here based on the guard above
Expand Down
5 changes: 5 additions & 0 deletions langchain-core/src/types/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Make this a type to override ReadableStream's async iterator type in case
// the popular web-streams-polyfill is imported - the supplied types
// in that case don't quite match.
export type IterableReadableStreamInterface<T> = ReadableStream<T> &
AsyncIterable<T>;
18 changes: 11 additions & 7 deletions langchain-core/src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { pickRunnableConfigKeys } from "../runnables/config.js";
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";
import type { IterableReadableStreamInterface } from "../types/stream.js";
import { raceWithSignal } from "./signal.js";

// Make this a type to override ReadableStream's async iterator type in case
// the popular web-streams-polyfill is imported - the supplied types
// in that case don't quite match.
export type IterableReadableStreamInterface<T> = ReadableStream<T> &
AsyncIterable<T>;
// Re-exported for backwards compatibility
// Do NOT import this type from this file inside the project. Instead, always import from `types/stream.js`
export type { IterableReadableStreamInterface };

/*
* Support async iterator syntax for ReadableStreams in all environments.
Expand Down Expand Up @@ -215,7 +215,9 @@ export class AsyncGeneratorWithSetup<
// to each generator is available.
this.setup = new Promise((resolve, reject) => {
void AsyncLocalStorageProviderSingleton.runWithConfig(
params.config,
pickRunnableConfigKeys(
params.config as Record<string, unknown> | undefined
),
async () => {
this.firstResult = params.generator.next();
if (params.startSetup) {
Expand All @@ -238,7 +240,9 @@ export class AsyncGeneratorWithSetup<
}

return AsyncLocalStorageProviderSingleton.runWithConfig(
this.config,
pickRunnableConfigKeys(
this.config as Record<string, unknown> | undefined
),
this.signal
? async () => {
return raceWithSignal(this.generator.next(...args), this.signal);
Expand Down

0 comments on commit 217d788

Please sign in to comment.