diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/llama_index/vector_stores/opensearch/base.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/llama_index/vector_stores/opensearch/base.py index 034197cb484c1d..c25373a38e954b 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/llama_index/vector_stores/opensearch/base.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/llama_index/vector_stores/opensearch/base.py @@ -32,6 +32,8 @@ ) MATCH_ALL_QUERY = {"match_all": {}} # type: Dict +REMOVED_AODOCS_METADATA_MARKER = "aodocs.metadata-removed-from-source-node" + class OpensearchVectorClient: """ @@ -104,19 +106,6 @@ def __init__( http_auth = kwargs.get("http_auth") self.space_type = space_type self.is_aoss = self._is_aoss_enabled(http_auth=http_auth) - # initialize mapping - idx_conf = { - "settings": settings, - "mappings": { - "properties": { - embedding_field: { - "type": "knn_vector", - "dimension": dim, - "method": method, - }, - } - }, - } self._os_client = os_client or self._get_opensearch_client( self._endpoint, **kwargs ) @@ -127,13 +116,6 @@ def __init__( self._efficient_filtering_enabled = self._is_efficient_filtering_enabled( self._os_version ) - not_found_error = self._import_not_found_error() - - try: - self._os_client.indices.get(index=self._index) - except not_found_error: - self._os_client.indices.create(index=self._index, body=idx_conf) - self._os_client.indices.refresh(index=self._index) def _import_opensearch(self) -> Any: """Import OpenSearch if available, otherwise raise error.""" @@ -223,14 +205,18 @@ def _bulk_ingest_embeddings( mapping = {} bulk = self._import_bulk() - not_found_error = self._import_not_found_error() requests = [] return_ids = [] - try: - client.indices.get(index=index_name) - except not_found_error: - client.indices.create(index=index_name, body=mapping) + index_exists = client.indices.exists(index=index_name) + if not index_exists: + template_exists = client.indices.exists_index_template( + f"{index_name}-template" + ) + if not template_exists: + raise ValueError( + f"Invalid index name {index_name}, does not exist and there is no template for it." + ) for i, text in enumerate(texts): metadata = metadatas[i] if metadatas else {} @@ -250,8 +236,9 @@ def _bulk_ingest_embeddings( return_ids.append(_id) bulk(client, requests, max_chunk_bytes=max_chunk_bytes) - if not is_aoss: - client.indices.refresh(index=index_name) + # we don't use it on Amazon, but we still do not want to trigger a refresh here + # if not is_aoss: + # client.indices.refresh(index=index_name) return return_ids @@ -274,14 +261,18 @@ async def _abulk_ingest_embeddings( mapping = {} async_bulk = self._import_async_bulk() - not_found_error = self._import_not_found_error() requests = [] return_ids = [] - try: - await client.indices.get(index=index_name) - except not_found_error: - await client.indices.create(index=index_name, body=mapping) + index_exists = await client.indices.exists(index=index_name) + if not index_exists: + template_exists = await client.indices.exists_index_template( + f"{index_name}-template" + ) + if not template_exists: + raise ValueError( + f"Invalid index name {index_name}, does not exist and there is no template for it." + ) for i, text in enumerate(texts): metadata = metadatas[i] if metadatas else {} @@ -301,8 +292,9 @@ async def _abulk_ingest_embeddings( return_ids.append(_id) await async_bulk(client, requests, max_chunk_bytes=max_chunk_bytes) - if not is_aoss: - await client.indices.refresh(index=index_name) + # we don't use it on Amazon, but we still do not want to trigger a refresh here + # if not is_aoss: + # await client.indices.refresh(index=index_name) return return_ids @@ -632,7 +624,39 @@ def index_results(self, nodes: List[BaseNode], **kwargs: Any) -> List[str]: ids.append(node.node_id) embeddings.append(node.get_embedding()) texts.append(node.get_content(metadata_mode=MetadataMode.NONE)) - metadatas.append(node_to_metadata_dict(node, remove_text=True)) + + # all "aodocs.*" metadata are also stored in separate index fields, + # no need to store duplicates of them inside _node_content, + # we'll just add them all back to the node after we load it from _node_content during query + metadata = node_to_metadata_dict(node, remove_text=True) + node_content_dict = json.loads(metadata["_node_content"]) + node_content_dict["metadata"] = { + k: v + for k, v in node_content_dict["metadata"].items() + if not k.startswith("aodocs.") + } + + # _node_content also stores relationships, like prev/next, parent/child, and source. + # We have source relationships in our index, probably created by MarkdownElementNodeParser, + # which by default creates a source reference that points back to the same node. + # The relationship info also stores the metadata of the target node, which in the case of a self-referencing + # source relationship, is another duplication of all the metadata. + # In that case we can also remove and later restore the aodocs entries. + source_node_info = node_content_dict["relationships"].get("1") + if source_node_info and source_node_info["metadata"] == node.metadata: + source_node_info["metadata"] = { + k: v + for k, v in source_node_info["metadata"].items() + if not k.startswith("aodocs.") + } + # We checked "source meta == node meta" but then changed the source, which complicates checking + # after a query if it was true before the removals or not. + # So flagging it to make identification for aodocs entry restoration easy and reliable. + source_node_info["metadata"][REMOVED_AODOCS_METADATA_MARKER] = True + + metadata["_node_content"] = json.dumps(node_content_dict) + + metadatas.append(metadata) return self._bulk_ingest_embeddings( self._os_client, @@ -855,6 +879,19 @@ def _to_query_result(self, res) -> VectorStoreQueryResult: try: node = metadata_dict_to_node(metadata) node.text = text + + node.metadata.update( + {k: v for k, v in metadata.items() if k.startswith("aodocs.")} + ) + + source_node_info = node.source_node + if source_node_info and source_node_info.metadata.get( + REMOVED_AODOCS_METADATA_MARKER + ): + del source_node_info.metadata[REMOVED_AODOCS_METADATA_MARKER] + source_node_info.metadata.update( + {k: v for k, v in metadata.items() if k.startswith("aodocs.")} + ) except Exception: # TODO: Legacy support for old nodes node_info = source.get("node_info")