diff --git a/examples/conversation_with_RAG_agents/configs/lc_agent_config.json b/examples/conversation_with_RAG_agents/configs/lc_agent_config.json new file mode 100644 index 000000000..a87d4fb5f --- /dev/null +++ b/examples/conversation_with_RAG_agents/configs/lc_agent_config.json @@ -0,0 +1,62 @@ +[ + { + "class": "LangChainAgent", + "args": { + "name": "Tutorial-Assistant", + "description": "Tutorial-Assistant is an agent that can provide answer based on English tutorial material, mainly the markdown files. It can answer general questions about AgentScope.", + "sys_prompt": "You're an assistant helping new users to use AgentScope. The language style is helpful and cheerful. You generate answers based on the provided context. The answer is expected to be no longer than 100 words. If the key words of the question can be found in the provided context, the answer should contain the section name which contains the answer. For example, 'You may refer to SECTION_NAME for more details.'", + "model_config_name": "qwen_config", + "knowledge_id_list": ["agentscope_tutorial_rag"], + "similarity_top_k": 5, + "log_retrieval": false, + "recent_n_mem_for_retrieve": 1 + } + }, + { + "class": "LangChainAgent", + "args": { + "name": "Code-Search-Assistant", + "description": "Code-Search-Assistant is an agent that can provide answer based on AgentScope code base. It can answer questions about specific modules in AgentScope.", + "sys_prompt": "You're a coding assistant of AgentScope. The answer starts with appreciation for the question, then provide details regarding the functionality and features of the modules mentioned in the question. The language should be in a professional and simple style. The answer is limited to be less than 100 words.", + "model_config_name": "qwen_config", + "knowledge_id_list": ["agentscope_code_rag"], + "search_type": "similarity", + "log_retrieval": true, + "recent_n_mem_for_retrieve": 1 + } + }, + { + "class": "LangChainAgent", + "args": { + "name": "API-Assistant", + "description": "API-Assistant is an agent that can answer questions about APIs in AgentScope. It can answer general questions about AgentScope.", + "sys_prompt": "You're an assistant providing answers to the questions related to APIs (functions and classes) in AgentScope. The language style is helpful and cheerful. You generate answers based on the provided context. The answer is expected to be no longer than 200 words. If the key words of the question can be found in the provided context, the answer should contain the module of the API. For example, 'You may refer to MODULE_NAME for more details.'", + "model_config_name": "qwen_config", + "knowledge_id_list": ["agentscope_api_rag"], + "search_kwargs": {"k": 2}, + "log_retrieval": false, + "recent_n_mem_for_retrieve": 1 + } + }, + { + "class": "LangChainAgent", + "args": { + "name": "Searching-Assistant", + "description": "Search-Assistant is an agent that can provide answer based on AgentScope code and tutorial. It can answer questions about everything in AgentScope codes and tutorials.", + "sys_prompt": "You're a helpful assistant of AgentScope. The answer starts with appreciation for the question, then provide output the location of the code or section that the most relevant to the question. The answer is limited to be less than 50 words.", + "model_config_name": "qwen_config", + "knowledge_id_list": ["agentscope_code_rag","agentscope_tutorial_rag"], + "log_retrieval": false, + "recent_n_mem_for_retrieve": 1 + } + }, + { + "class": "DialogAgent", + "args": { + "name": "Agent-Guiding-Assistant", + "sys_prompt": "You're an assistant guiding the user to specific agent for help. The answer is in a cheerful styled language. The output starts with appreciation for the question. Next, rephrase the question in a simple declarative Sentence for example, 'I think you are asking...'. Last, if the question is about detailed code or example in AgentScope Framework, output '@ Code-Search-Assistant you might be suitable for answering the question'; if the question is about API or function calls (Example: 'Is there function related...' or 'how can I initialize ...' ) in AgentScope, output '@ API-Assistant, I think you are more suitable for the question, please tell us more about it'; if question is about where to find some context (Example:'where can I find...'), output '@ Searching-Assistant, we need your help', otherwise, output '@ Tutorial-Assistant, I think you are more suitable for the question, can you tell us more about it?'. The answer is expected to be only one sentence", + "model_config_name": "qwen_config", + "use_memory": false + } + } +] \ No newline at end of file diff --git a/examples/conversation_with_RAG_agents/configs/lc_knowledge_config.json b/examples/conversation_with_RAG_agents/configs/lc_knowledge_config.json new file mode 100644 index 000000000..feab3a296 --- /dev/null +++ b/examples/conversation_with_RAG_agents/configs/lc_knowledge_config.json @@ -0,0 +1,96 @@ +[ + { + "knowledge_id": "agentscope_code_rag", + "emb_model_config_name": "qwen_emb_config", + "chunk_size": 2048, + "chunk_overlap": 40, + "data_processing": [ + { + "load_data": { + "loader": { + "create_object": true, + "module": "langchain_community.document_loaders", + "class": "DirectoryLoader", + "init_args": { + "path": "../../src/agentscope", + "recursive": true, + "glob": ["**/*.py"] + } + } + }, + "data_parse": { + "splitter": { + "create_object": true, + "module": "langchain_text_splitters.python", + "class": "PythonCodeTextSplitter", + "init_args": {} + } + } + } + ] + }, + { + "knowledge_id": "agentscope_api_rag", + "emb_model_config_name": "qwen_emb_config", + "chunk_size": 1024, + "chunk_overlap": 40, + "data_processing": [ + { + "load_data": { + "loader": { + "create_object": true, + "module": "langchain_community.document_loaders", + "class": "DirectoryLoader", + "init_args": { + "path": "../../docs/docstring_html/", + "glob": ["*.html"] + } + } + } + } + ] + }, + { + "knowledge_id": "agentscope_global_rag", + "emb_model_config_name": "qwen_emb_config", + "chunk_size": 2048, + "chunk_overlap": 40, + "data_processing": [ + { + "load_data": { + "loader": { + "create_object": true, + "module": "langchain_community.document_loaders", + "class": "DirectoryLoader", + "init_args": { + "path": "../../docs/sphinx_doc/en/source/tutorial", + "glob": ["*.md"] + } + } + } + }, + { + "load_data": { + "loader": { + "create_object": true, + "module": "langchain_community.document_loaders", + "class": "DirectoryLoader", + "init_args": { + "path": "../../src/agentscope", + "recursive": true, + "glob": ["**/*.py"] + } + } + }, + "data_parse": { + "splitter": { + "create_object": true, + "module": "langchain_text_splitters.python", + "class": "PythonCodeTextSplitter", + "init_args": {} + } + } + } + ] + } +] diff --git a/examples/conversation_with_RAG_agents/rag_example.py b/examples/conversation_with_RAG_agents/rag_example.py index 9946cd888..f074584f2 100644 --- a/examples/conversation_with_RAG_agents/rag_example.py +++ b/examples/conversation_with_RAG_agents/rag_example.py @@ -78,6 +78,16 @@ def main() -> None: }, ) + # # if use langchain knowledge, we need to set backend_engine + # knowledge_bank.add_data_as_knowledge( + # knowledge_id="agentscope_tutorial_rag", + # emb_model_name="qwen_emb_config", + # data_dirs_and_types={ + # "../../docs/sphinx_doc/en/source/tutorial": ["*.md"], + # }, + # backend_engine="langchain" + # ) + # let knowledgebank to equip rag agent with a (set of) knowledge # corresponding to its knowledge_id_list for agent in rag_agent_list: @@ -104,7 +114,7 @@ def main() -> None: rag_agent_descriptions = [ "agent name: " + agent.name - + "\n agent description:" + + "\n agent description: " + agent.description + "\n" for agent in rag_agent_list diff --git a/setup.py b/setup.py index ecf690b6e..681f2f559 100644 --- a/setup.py +++ b/setup.py @@ -86,6 +86,7 @@ extra_rag_requires = [ "llama-index==0.10.30", + "langchain==0.3.1", ] # API requires diff --git a/src/agentscope/agents/__init__.py b/src/agentscope/agents/__init__.py index b2fd92d10..9dfc8b67d 100644 --- a/src/agentscope/agents/__init__.py +++ b/src/agentscope/agents/__init__.py @@ -6,7 +6,10 @@ from .dict_dialog_agent import DictDialogAgent from .user_agent import UserAgent from .react_agent import ReActAgent -from .rag_agent import LlamaIndexAgent +from .rag_agent import ( + LlamaIndexAgent, + LangChainAgent, +) __all__ = [ @@ -17,4 +20,5 @@ "UserAgent", "ReActAgent", "LlamaIndexAgent", + "LangChainAgent", ] diff --git a/src/agentscope/agents/rag_agent.py b/src/agentscope/agents/rag_agent.py index ec5a8dc94..4f08d358a 100644 --- a/src/agentscope/agents/rag_agent.py +++ b/src/agentscope/agents/rag_agent.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ This example shows how to build an agent with RAG -with LlamaIndex. +with LlamaIndex and LangChain. Notice, this is a Beta version of RAG agent. """ @@ -12,6 +12,7 @@ from agentscope.agents.agent import AgentBase from agentscope.message import Msg from agentscope.rag import Knowledge +from agentscope.utils.common import _convert_to_str CHECKING_PROMPT = """ Is the retrieved content relevant to the query? @@ -192,3 +193,163 @@ def reply(self, x: Optional[Union[Msg, Sequence[Msg]]] = None) -> Msg: self.memory.add(msg) return msg + + +class LangChainAgent(AgentBase): + """ + A LangChain agent build on LangChain. + """ + + def __init__( + self, + name: str, + sys_prompt: str, + model_config_name: str, + knowledge_list: list[Knowledge] = None, + knowledge_id_list: list[str] = None, + similarity_top_k: int = None, + search_type: str = "similarity", + search_kwargs: dict = None, + log_retrieval: bool = True, + recent_n_mem_for_retrieve: int = 1, + **kwargs: Any, + ) -> None: + """ + Initialize the RAG LlamaIndexAgent + Args: + name (str): + the name for the agent + sys_prompt (str): + system prompt for the RAG agent + model_config_name (str): + language model for the agent + knowledge_list (list[Knowledge]): + a list of knowledge. + User can choose to pass a list knowledge object + directly when initializing the RAG agent. Another + choice can be passing a list of knowledge ids and + obtain the knowledge with the `equip` function of a + knowledge bank. + knowledge_id_list (list[Knowledge]): + a list of id of the knowledge. + This is designed for easy setting up multiple RAG + agents with a config file. To obtain the knowledge + objects, users can pass this agent to the `equip` + function in a knowledge bank to add corresponding + knowledge to agent's self.knowledge_list. + search_type (str): + the type of search to be performed on the + Langchain knowledge + search_kwargs (dict): + additional keyword arguments for the + search operation on the Langchain knowledge + log_retrieval (bool): + whether to print the retrieved content + recent_n_mem_for_retrieve (int): + the number of pieces of memory used as part of + retrival query + """ + super().__init__( + name=name, + sys_prompt=sys_prompt, + model_config_name=model_config_name, + ) + self.knowledge_list = knowledge_list or [] + self.knowledge_id_list = knowledge_id_list or [] + self.similarity_top_k = similarity_top_k + self.search_type = search_type + self.search_kwargs = search_kwargs or {} + self.log_retrieval = log_retrieval + self.recent_n_mem_for_retrieve = recent_n_mem_for_retrieve + self.description = kwargs.get("description", "") + + def reply(self, x: Optional[Union[Msg, Sequence[Msg]]] = None) -> Msg: + """ + Reply function of the langchain agent. + Processes the input data, + 1) use the input data to retrieve with RAG function; + 2) generates a prompt using the current memory and system + prompt; + 3) invokes the language model to produce a response. The + response is then formatted and added to the dialogue memory. + + Args: + x (`Optional[Union[Msg, Sequence[Msg]]]`, defaults to `None`): + The input message(s) to the agent, which also can be omitted if + the agent doesn't need any input. + + Returns: + `Msg`: The output message generated by the agent. + """ + retrieved_docs_to_string = "" + # record the input if needed + if self.memory: + self.memory.add(x) + # in case no input is provided (e.g., in msghub), + # use the memory as query + history = self.memory.get_memory( + recent_n=self.recent_n_mem_for_retrieve, + ) + query = ( + "/n".join( + [msg.content for msg in history], + ) + if isinstance(history, list) + else str(history) + ) + elif x is not None: + query = x.content + else: + query = "" + + if len(query) > 0: + # when content has information, do retrieval + for knowledge in self.knowledge_list: + retrieved_nodes = knowledge.retrieve( + str(query), + self.similarity_top_k, + search_type=self.search_type, + search_kwargs=self.search_kwargs, + ) + + for document in retrieved_nodes: + retrieved_docs_to_string += ( + "\n>>>> source:" + + _convert_to_str(document.metadata) + + "\n>>>> content:" + + document.page_content + ) + + if self.log_retrieval: + self.speak("[retrieved]:" + retrieved_docs_to_string) + + # prepare prompt + prompt = self.model.format( + Msg( + name="system", + role="system", + content=self.sys_prompt, + ), + # {"role": "system", "content": retrieved_docs_to_string}, + self.memory.get_memory( + recent_n=self.recent_n_mem_for_retrieve, + ), + Msg( + name="user", + role="user", + content="Context: " + retrieved_docs_to_string, + ), + ) + + # call llm and generate response + response = self.model(prompt).text + msg = Msg(self.name, response, "assistant") + + # Print/speak the message in this agent's voice + self.speak(msg) + + if self.memory: + # Record the message in memory + self.memory.add(msg) + + return msg diff --git a/src/agentscope/constants.py b/src/agentscope/constants.py index 7fb31338f..618a42ad2 100644 --- a/src/agentscope/constants.py +++ b/src/agentscope/constants.py @@ -92,6 +92,7 @@ class ShrinkPolicy(IntEnum): DEFAULT_CHUNK_SIZE = 1024 DEFAULT_CHUNK_OVERLAP = 20 DEFAULT_TOP_K = 5 +DEFAULT_SCORE_THRESHOLD = 0.4 # flask server EXPIRATION_SECONDS = 604800 # One week diff --git a/src/agentscope/rag/knowledge_bank.py b/src/agentscope/rag/knowledge_bank.py index ae4cc57ce..6dd3c44bb 100644 --- a/src/agentscope/rag/knowledge_bank.py +++ b/src/agentscope/rag/knowledge_bank.py @@ -10,6 +10,9 @@ from ..manager import ModelManager from .knowledge import Knowledge +from .llama_index_knowledge import LlamaIndexKnowledge +from .langchain_knowledge import LangChainKnowledge + DEFAULT_INDEX_CONFIG = { "knowledge_id": "", "data_processing": [], @@ -24,11 +27,26 @@ }, }, } +LANGCHAIN_LOADER_CONFIG = { + "load_data": { + "loader": { + "create_object": True, + "module": "langchain_community.document_loaders", + "class": "DirectoryLoader", + "init_args": {}, + }, + }, +} DEFAULT_INIT_CONFIG = { "input_dir": "", "recursive": True, "required_exts": [], } +LANGCHAIN_INIT_CONFIG = { + "path": "", + "recursive": True, + "glob": [], +} class KnowledgeBank: @@ -70,6 +88,7 @@ def add_data_as_knowledge( emb_model_name: str, data_dirs_and_types: dict[str, list[str]] = None, model_name: Optional[str] = None, + backend_engine: Optional[str] = None, knowledge_config: Optional[dict] = None, ) -> None: """ @@ -85,6 +104,10 @@ def add_data_as_knowledge( dictionary of data paths (keys) to the data types (file extensions) for knowledgebase (e.g., [".md", ".py", ".html"]) + (if use langchain backend: ["*.md", "**/*.py", "*.html"]) + backend_engine (Optional[str]): + name of the backend engine + (only used for llama_index or langchain) knowledge_config (optional[dict]): complete indexing configuration, used for more advanced applications. Users can customize @@ -105,7 +128,6 @@ def add_data_as_knowledge( ) '' """ - from .llama_index_knowledge import LlamaIndexKnowledge if knowledge_id in self.stored_knowledge: raise ValueError(f"knowledge_id {knowledge_id} already exists.") @@ -113,18 +135,52 @@ def add_data_as_knowledge( assert data_dirs_and_types is not None or knowledge_config is not None if knowledge_config is None: - knowledge_config = copy.deepcopy(DEFAULT_INDEX_CONFIG) - for data_dir, types in data_dirs_and_types.items(): - loader_config = copy.deepcopy(DEFAULT_LOADER_CONFIG) - loader_init = copy.deepcopy(DEFAULT_INIT_CONFIG) - loader_init["input_dir"] = data_dir - loader_init["required_exts"] = types - loader_config["load_data"]["loader"]["init_args"] = loader_init - knowledge_config["data_processing"].append(loader_config) + if backend_engine is None or backend_engine == "llama_index": + backend_engine = "llama_index" + knowledge_config = copy.deepcopy(DEFAULT_INDEX_CONFIG) + for data_dir, types in data_dirs_and_types.items(): + loader_config = copy.deepcopy(DEFAULT_LOADER_CONFIG) + loader_init = copy.deepcopy(DEFAULT_INIT_CONFIG) + loader_init["input_dir"] = data_dir + loader_init["required_exts"] = types + loader_config["load_data"]["loader"][ + "init_args" + ] = loader_init + knowledge_config["data_processing"].append(loader_config) + elif backend_engine == "langchain": + knowledge_config = copy.deepcopy(DEFAULT_INDEX_CONFIG) + for data_dir, types in data_dirs_and_types.items(): + loader_config = copy.deepcopy(LANGCHAIN_LOADER_CONFIG) + loader_init = copy.deepcopy(LANGCHAIN_INIT_CONFIG) + loader_init["path"] = data_dir + loader_init["glob"] = types + loader_config["load_data"]["loader"][ + "init_args" + ] = loader_init + knowledge_config["data_processing"].append(loader_config) + else: + raise ValueError( + f"Backend engine {backend_engine} not supported.", + ) + else: + backend_engine = knowledge_config.pop("backend_engine", None) + + if backend_engine is None: + # get the backend engine + backend_engine = self._get_backend_engine(knowledge_config) + + if "llama_index" in backend_engine: + CustomKnowledge = LlamaIndexKnowledge + logger.info("Using llama_index backend engine") + elif "langchain" in backend_engine: + CustomKnowledge = LangChainKnowledge + logger.info("Using langchain backend engine") + else: + raise ValueError(f"Backend engine {backend_engine} not supported.") model_manager = ModelManager.get_instance() - self.stored_knowledge[knowledge_id] = LlamaIndexKnowledge( + self.stored_knowledge[knowledge_id] = CustomKnowledge( knowledge_id=knowledge_id, emb_model=model_manager.get_model_by_config_name(emb_model_name), knowledge_config=knowledge_config, @@ -190,3 +246,59 @@ def equip( duplicate=duplicate, ) agent.knowledge_list.append(knowledge) + + def _get_backend_engine(self, config: dict) -> str: + """Determines the backend engine based on the configuration. + + Iterates through the `data_processing` section of the configuration. + It checks each process to find a dict that contains a 'module' key. + If found, it returns the corresponding backend engine + ('langchain' or 'llama_index'). + + Args: + config (dict): + The configuration dictionary containing + data processing information. + + Returns: + str: The name of the backend engine. + """ + data_processing = config.get("data_processing", []) + + for process in data_processing: + if isinstance(process, dict): + for value in process.values(): + if isinstance(value, dict): + if "module" in value: + module_value = value["module"] + else: + module_value = self._recursive_find_module(value) + + if module_value: + return module_value.split(".")[0] + + raise ValueError( + "No rag backend engine module found, " + "please check your knowledge config", + ) + + def _recursive_find_module(self, d: dict) -> Optional[str]: + """Recursively searches for a 'module' key in a nested dict. + + This method traverses a nested dict and returns + the value associated with the first 'module' key it finds. + + Args: + d (dict): The dictionary to search. + + Returns: + str: The value of the 'module' key, or None if not found. + """ + for v in d.values(): + if isinstance(v, dict): + if "module" in v: + return v["module"] + module_value = self._recursive_find_module(v) + if module_value: + return module_value + return None diff --git a/src/agentscope/rag/langchain_knowledge.py b/src/agentscope/rag/langchain_knowledge.py new file mode 100644 index 000000000..2528df30d --- /dev/null +++ b/src/agentscope/rag/langchain_knowledge.py @@ -0,0 +1,436 @@ +# -*- coding: utf-8 -*- +""" +This module is an integration of the Langchain RAG +into AgentScope package +""" +import os +import json +from typing import Any, Optional, List, Union +from loguru import logger +from pydantic import BaseModel + +try: + import langchain + from langchain_core.documents import Document + from langchain_core.retrievers import BaseRetriever + from langchain_core.vectorstores import ( + InMemoryVectorStore, + VectorStoreRetriever, + ) + from langchain_core.embeddings import Embeddings + from langchain_core.indexing import InMemoryRecordManager + from langchain.indexes import index + from langchain_text_splitters import CharacterTextSplitter + from langchain_text_splitters.base import TextSplitter + +except Exception: + langchain = None + Document = None + BaseRetriever = None + TextSplitter = None + VectorStoreRetriever = None + InMemoryVectorStore = None + InMemoryRecordManager = None + index = None + Embeddings = None + CharacterTextSplitter = None + +from agentscope.manager import FileManager +from agentscope.models import ModelWrapperBase +from agentscope.constants import ( + DEFAULT_TOP_K, + DEFAULT_SCORE_THRESHOLD, + DEFAULT_CHUNK_SIZE, + DEFAULT_CHUNK_OVERLAP, +) +from agentscope.rag.knowledge import Knowledge + +try: + + class _EmbeddingModel(BaseModel, Embeddings): + _emb_model_wrapper: ModelWrapperBase + + def __init__(self, emb_model: ModelWrapperBase, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._emb_model_wrapper = emb_model + + def embed_documents(self, texts: List[str]) -> List[List[float]]: + """ + embed a list of strings + Args: + texts (List[str]): texts to be embedded + """ + results = [ + list(self._emb_model_wrapper(t).embedding[0]) for t in texts + ] + return results + + def embed_query(self, text: str) -> List[float]: + """ + embeds a single query text into a vector representation + Args: + text (str): The query text to embed. + """ + return self.embed_documents([text])[0] + +except Exception: + + class _EmbeddingModel: # type: ignore[no-redef] + """ + A dummy embedding model for passing tests when + langchain is not installed + """ + + def __init__(self, emb_model: ModelWrapperBase): + self._emb_model_wrapper = emb_model + + +class LangChainKnowledge(Knowledge): + """ + This class is a wrapper with the langchain RAG. + """ + + def __init__( + self, + knowledge_id: str, + emb_model: Union[ModelWrapperBase, Embeddings, None] = None, + knowledge_config: Optional[dict] = None, + model: Optional[ModelWrapperBase] = None, + persist_root: Optional[str] = None, + overwrite_index: Optional[bool] = False, + **kwargs: Any, + ) -> None: + super().__init__( + knowledge_id=knowledge_id, + emb_model=emb_model, + knowledge_config=knowledge_config, + model=model, + **kwargs, + ) + + if langchain is None: + raise ImportError( + "Please install langchain first.", + ) + + if persist_root is None: + persist_root = FileManager.get_instance().cache_dir or "./" + self.persist_dir = os.path.join(persist_root, knowledge_id) + self.persist_store_file = os.path.join( + self.persist_dir, + "vector_store.json", + ) + self.persist_index_file = os.path.join(self.persist_dir, "index.json") + self.emb_model = emb_model + self.overwrite_index = overwrite_index + self.vectorstore = None + self.record_manager = None + # ensure the emb_model is compatible with Langchian + if isinstance(emb_model, ModelWrapperBase): + self.emb_model = _EmbeddingModel(emb_model) + elif isinstance(self.emb_model, Embeddings): + pass + else: + raise TypeError( + f"Embedding model does not support {type(self.emb_model)}.", + ) + # then we can initialize the RAG + self._init_rag() + + def _init_rag(self, **kwargs: Any) -> None: + """ + Initialize the RAG. This includes: + * if the persist_dir exists, load the persisted index + * if not, convert the data to index + * if needed, update the index + * set the retriever to retrieve information from index + + Notes: + * the index is persisted in the self.persist_dir + * the refresh_index method is placed here for testing, it can be + called externally. For example, updated the index periodically + by calling rag.refresh_index() during the execution of the + agent. + """ + if os.path.exists(self.persist_dir): + self._load_store() + # self.refresh_index() + else: + self._data_to_store() + logger.info( + f"RAG with knowledge ids: {self.knowledge_id} " + f"initialization completed!\n", + ) + + def _load_store(self) -> None: + """ + Load the persisted index from persist_dir. + """ + # set the storage + self.vectorstore = self._set_store( + self.knowledge_config.get("store_and_index", {}), + ) + if not self.vectorstore: + self.vectorstore = InMemoryVectorStore.load( + self.persist_store_file, + self.emb_model, + ) + # set the record manager + self.record_manager = InMemoryRecordManager(self.knowledge_id) + self.record_manager.create_schema() + self._load_memory_record(self.persist_index_file) + logger.info(f"vector store and index loaded from {self.persist_dir}") + + def _data_to_store(self) -> None: + # create the record manager + self.record_manager = InMemoryRecordManager(self.knowledge_id) + self.record_manager.create_schema() + + chunks = [] + for config in self.knowledge_config.get("data_processing"): + documents = self._data_to_docs(config=config) + splitter = self._set_splitter(config=config).get("splitter") + chunks_docs = self._docs_to_chunks( + documents=documents, + splitter=splitter, + ) + chunks = chunks + chunks_docs + + # convert chunks to vector store and index + self.vectorstore = self._set_store( + config=self.knowledge_config.get("store_and_index", {}), + ) + if not self.vectorstore: + self.vectorstore = InMemoryVectorStore( + self.emb_model, + ) + index( + chunks, + self.record_manager, + self.vectorstore, + cleanup=None, + source_id_key="source", + # upsert_kwargs={"embedding": self.emb_model} + # This feature is only supported in langchain 0.3.10 + ) + logger.info("vector store and index created successfully.") + + # persist + if isinstance(self.vectorstore, InMemoryVectorStore): + self.vectorstore.dump(self.persist_store_file) + logger.info("In-memory vector store are persisted.") + self._save_memory_record(self.persist_index_file) + logger.info("index are persisted.") + + def _save_memory_record(self, filename: str) -> None: + filedir = os.path.dirname(filename) + if not os.path.exists(filedir): + os.makedirs(filedir) + with open(filename, "w", encoding="utf-8") as f: + json.dump(self.record_manager.records, f, indent=4) + + def _load_memory_record(self, filename: str) -> None: + with open(filename, "r", encoding="utf-8") as f: + self.record_manager.records = json.load(f) + + def _data_to_docs( + self, + config: dict = None, + ) -> Any: + """ + This method set the loader as needed, or just use the default setting. + Then use the loader to load data from dir to documents. + + Notes: + We can use directory loader (DirectoryReader) + to load general documents, including Markdown, PDFs, + Word documents, PowerPoint decks, images, audio and video. + + Args: + config (dict): + optional, used when the loader config is in a config file. + Returns: + Any: loaded documents + """ + loader = self._set_loader(config=config).get("loader") + documents = loader.load() + logger.info(f"loaded {len(documents)} documents") + return documents + + def _docs_to_chunks( + self, + documents: List[Document], + splitter: Optional[TextSplitter], + ) -> Any: + return splitter.split_documents(documents) + + def _set_store(self, config: dict) -> Any: + if "stores" in config: + init_config = ( + config.get("stores", {}) + .get("vector_store", {}) + .get("init_args", {}) + ) + # we prepare the ~embedding_key from the configs + embedding_key = init_config.pop( + "embedding_key", + "embedding", + ) + init_config[embedding_key] = self.emb_model + temp = self._prepare_args_from_config( + config=config.get("stores", {}), + ) + vector_store = temp.get("vector_store") + else: + vector_store = None + return vector_store + + def _set_loader(self, config: dict) -> Any: + """ + Set the loader as needed, or just use the default setting. + + Args: + config (dict): a dictionary containing configurations + """ + if "load_data" in config: + # we prepare the ~loader from the configs + loader = self._prepare_args_from_config( + config=config.get("load_data", {}), + ) + else: + # we prepare the loader by default + try: + from langchain_community.document_loaders import ( + DirectoryLoader, + ) + except ImportError as exc_inner: + raise ImportError( + " LangChainAgent requires langchain to be install." + "Please run `pip install langchain`", + ) from exc_inner + loader = { + "loader": DirectoryLoader( + path="set_default_data_path", + ), + } + logger.info("loaders are ready.") + return loader + + def _set_splitter(self, config: dict) -> Any: + """ + Set the splitter as needed, or just use the default setting. + + Args: + config (dict): a dictionary containing configurations. + """ + if "data_parse" in config: + temp = self._prepare_args_from_config( + config=config.get("data_parse", {}), + ) + splitter = temp.get("splitter") + elif "store_and_index" in config: + logger.warning( + "The old configuration structure is deprecated, " + "please use data_parse instead of store_and_index.", + ) + temp = self._prepare_args_from_config( + config=config.get("store_and_index", {}), + ) + splitter = temp.get("splitter") + else: + splitter = CharacterTextSplitter( + chunk_size=self.knowledge_config.get( + "chunk_size", + DEFAULT_CHUNK_SIZE, + ), + chunk_overlap=self.knowledge_config.get( + "chunk_overlap", + DEFAULT_CHUNK_OVERLAP, + ), + ) + logger.info("splitter are ready.") + splitter = {"splitter": splitter} + return splitter + + def _get_retriever( + self, + search_type: str, + search_kwargs: dict, + ) -> BaseRetriever: + # set the retriever + default_kwargs = { + "k": DEFAULT_TOP_K, + "score_threshold": ( + DEFAULT_SCORE_THRESHOLD + if search_type == "similarity_score_threshold" + else None + ), + } + default_kwargs = { + key: value + for key, value in default_kwargs.items() + if value is not None + } + search_kwargs = {**default_kwargs, **search_kwargs} + logger.info( + f"search_type: {search_type}; search_kwargs: {search_kwargs}", + ) + retriever = VectorStoreRetriever( + vectorstore=self.vectorstore, + search_type=search_type, + search_kwargs=search_kwargs, + ) + logger.info("retriever is ready.") + return retriever + + def retrieve( + self, + query: str, + similarity_top_k: int = None, + to_list_strs: bool = False, + search_type: str = "similarity", + search_kwargs: dict = None, + retriever: Optional[BaseRetriever] = None, + **kwargs: Any, + ) -> list[Any]: + search_kwargs = search_kwargs or {} + if similarity_top_k: + search_kwargs.update({"k": similarity_top_k}) + if retriever is None: + retriever = self._get_retriever(search_type, search_kwargs) + retrieved = retriever.invoke(str(query)) + if to_list_strs: + results = [] + for node in retrieved: + results.append(node.page_content) + return results + return retrieved + + def refresh_index(self) -> None: + """ + Refresh the index when needed. + """ + clean_results = [] + for config in self.knowledge_config.get("data_processing"): + documents = self._data_to_docs(config=config) + splitter = self._set_splitter(config=config).get( + "splitter", + ) + chunks_docs = self._docs_to_chunks( + documents=documents, + splitter=splitter, + ) + if self.overwrite_index: + clean_method = "incremental" + else: + clean_method = None + clean_result = index( + chunks_docs, + self.record_manager, + self.vectorstore, + cleanup=clean_method, + source_id_key="source", + ) + clean_results.append(clean_result) + + logger.info(f"Refresh result: {clean_results}")