Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subgraph Issues: Invalid Update For Channel query #474

Open
justinlevi opened this issue Sep 11, 2024 · 2 comments
Open

Subgraph Issues: Invalid Update For Channel query #474

justinlevi opened this issue Sep 11, 2024 · 2 comments

Comments

@justinlevi
Copy link
Contributor

I'm trying to build a RAG graph with a few extra nodes to experiment with a few ideas and I thought it would be interesting to try and create a subgraph for the rag pipeline. Unfortunately I can't figure out where this error is coming from and how to solve it. Any direction is greatly appreciated.

I've tried everything I can think of to return the state from the subgraph to match the parent graph state as well as even just return an empty state in the last node in the graph, but nothing seems to work. I feel like the issue is with the handoff back to the parent graph, but I'm not sure I fully understand how to return properly.

I'm seeing the following error:

[Nest] 51582  - 09/10/2024, 8:37:25 PM   ERROR [ExceptionsHandler] Invalid update for channel query. Values: How can I create a managed cluster?,What are the step-by-step instructions to create a managed cluster?,What files are needed to create a managed cluster?,What code is required to create a managed cluster?,How can I deploy a managed cluster to a specific bare metal host?,What are the step-by-step instructions to deploy to a specific bare metal host?,What files are needed to deploy to a specific bare metal host?,What code is required to deploy to a specific bare metal host?

Error:
InvalidUpdateError: Invalid update for channel query. Values: How can I create a managed cluster?,What are the step-by-step instructions to create a managed cluster?,What files are needed to create a managed cluster?,What code is required to create a managed cluster?,How can I deploy a managed cluster to a specific bare metal host?,What are the step-by-step instructions to deploy to a specific bare metal host?,What files are needed to deploy to a specific bare metal host?,What code is required to deploy to a specific bare metal host?

Error:
    at _applyWrites (/Users/justinwinter/projects/docbot/backend/node_modules/@langchain/langgraph/dist/pregel/index.cjs:652:27)
    at CompiledStateGraph._transform (/Users/justinwinter/projects/docbot/backend/node_modules/@langchain/langgraph/dist/pregel/index.cjs:475:17)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
    at async CompiledStateGraph._transformStreamWithConfig (/Users/justinwinter/projects/docbot/backend/node_modules/@langchain/core/dist/runnables/base.cjs:308:30)
    at async CompiledStateGraph.transform (/Users/justinwinter/projects/docbot/backend/node_modules/@langchain/langgraph/dist/pregel/index.cjs:536:26)
    at async Object.pull (/Users/justinwinter/projects/docbot/backend/node_modules/@langchain/core/dist/utils/stream.cjs:98:41)

Here is my state and graph definitions

import { Injectable, Logger } from '@nestjs/common';
import { StateGraph, END, START, Send } from '@langchain/langgraph';
import { PrismaCheckpointer } from './checkpointer';
import { PrismaService } from '../prisma.service';
import {
  RetrieveDocumentsNode,
  curateComplexResponsesNode,
  generateRagResponseNode,
  gradeDocumentsNode,
  queryDestructuringNode,
} from './nodes';
import { queryKeywordsNode } from './nodes/queryKeywordsNode';
import { TavilySearchNode } from './nodes/tavilySearchNode';
import { querySimpleComplexRouterChain } from './chains/querySimpleComplexRouterChain';
import { BaseMessage } from '@langchain/core/messages';

import { messagesStateReducer, Annotation } from '@langchain/langgraph';
import { Document } from '@langchain/core/documents';

export type IMainState = typeof MainState.State;

export const MainState = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: messagesStateReducer,
  }),
  query: Annotation<string>(),
  queryType: Annotation<'SIMPLE' | 'COMPLEX' | 'INTERACTION'>(),
  queries: Annotation<string[]>,
  responses: Annotation<string[]>({
    reducer: (prev: string[], curr: string[]) => [...prev, ...curr],
  }),
  finalResponse: Annotation<string>,
});

export type IRAGState = typeof RAGState.State;

export const RAGState = Annotation.Root({
  messages: Annotation<BaseMessage[]>,
  query: Annotation<string>,
  queryRewrite: Annotation<string>,
  keywords: Annotation<string[]>,
  documents: Annotation<Document[]>,
  response: Annotation<string>,
});

@Injectable()
export class AgentGraph {
  private readonly logger = new Logger(AgentGraph.name);

  constructor(
    private readonly prismaService: PrismaService,
    private readonly retrieverNode: RetrieveDocumentsNode,
    private readonly tavilySearchNode: TavilySearchNode,
  ) {}

  private checkpointer: PrismaCheckpointer;

  createGraph() {
    const ragSubgraph = new StateGraph(RAGState)
      // Add nodes
      .addNode('GENERATE_KEYWORDS', queryKeywordsNode.bind(this))
      .addNode('PARALLEL_FETCH', async (state, config: any) => {
        const [fetchResult, webResult] = await Promise.all([
          this.retrieverNode.retrieve(state, config),
          this.tavilySearchNode.search(state),
        ]);

        const combinedDocuments = [
          ...fetchResult.documents,
          ...webResult.documents,
        ];
        return { documents: combinedDocuments };
      })
      .addNode('GRADE_DOCUMENTS', gradeDocumentsNode.bind(this))
      .addNode('GENERATE_RESPONSE', generateRagResponseNode.bind(this))

      // Add edges
      .addEdge(START, 'GENERATE_KEYWORDS')
      .addEdge('GENERATE_KEYWORDS', 'PARALLEL_FETCH')
      .addEdge('PARALLEL_FETCH', 'GRADE_DOCUMENTS')
      .addEdge('GRADE_DOCUMENTS', 'GENERATE_RESPONSE')
      .addEdge('GENERATE_RESPONSE', END);

    const graph = new StateGraph(MainState)
      // Add nodes
      .addNode('ROUTE_QUERY', this.routeQueryNode.bind(this))
      .addNode('DESTRUCTURE_QUERY', queryDestructuringNode.bind(this))
      .addNode('RAG_PIPELINE', ragSubgraph.compile())
      .addNode('CURATE_RESPONSES', curateComplexResponsesNode.bind(this))

      // Add edges
      .addEdge(START, 'ROUTE_QUERY')
      .addConditionalEdges('ROUTE_QUERY', ({ queryType }) => queryType, {
        SIMPLE: 'RAG_PIPELINE',
        COMPLEX: 'DESTRUCTURE_QUERY',
        INTERACTION: END,
      })
      .addConditionalEdges(
        'DESTRUCTURE_QUERY',
        this.continueToRagPipeline.bind(this),
      )
      .addEdge('RAG_PIPELINE', 'CURATE_RESPONSES')
      .addEdge('CURATE_RESPONSES', END);

    this.checkpointer = new PrismaCheckpointer(this.prismaService);

    const compiled = graph.compile({ checkpointer: this.checkpointer });

    const representation = compiled.getGraph();
    console.log(representation.drawMermaid());

    return compiled;
  }

  private continueToRagPipeline = (state: typeof MainState.State) => {
    // We will return a list of `Send` objects
    // Each `Send` object consists of the name of a node in the graph
    // as well as the state to send to that node
    return state.queries.map(
      (query) => new Send('RAG_PIPELINE', { messages: state.messages, query }),
    );
  };

  private async routeQueryNode(
    state: IMainState,
  ): Promise<Partial<IMainState>> {
    this.logger.log('Routing question');
    const question = state.query;
    const questionRouter = await querySimpleComplexRouterChain();

    const source = await questionRouter.invoke({ question });
    this.logger.log(`Route question to ${source.queryType}`);
    return {
      queryType: source.queryType,
    };
  }

  getCheckpointer() {
    return this.checkpointer;
  }
}

@justinlevi
Copy link
Contributor Author

I'm also pretty sure I'm not doing the parallel branching correctly either but I'm not sure how to handle that either

@Masstronaut
Copy link
Contributor

We're currently working on support for subgraphs in langgraphjs, but it's not finished yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants