Skip to content

Commit

Permalink
modify opensearch behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
ttozser committed Dec 5, 2024
1 parent cbf958f commit 0a83e55
Showing 1 changed file with 73 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
node_to_metadata_dict,
)
from opensearchpy.client import Client as OSClient
import json

IMPORT_OPENSEARCH_PY_ERROR = (
"Could not import OpenSearch. Please install it with `pip install opensearch-py`."
Expand All @@ -32,6 +33,8 @@
)
MATCH_ALL_QUERY = {"match_all": {}} # type: Dict

REMOVED_AODOCS_METADATA_MARKER = "aodocs.metadata-removed-from-source-node"


class OpensearchVectorClient:
"""
Expand Down Expand Up @@ -104,19 +107,6 @@ def __init__(
http_auth = kwargs.get("http_auth")
self.space_type = space_type
self.is_aoss = self._is_aoss_enabled(http_auth=http_auth)
# initialize mapping
idx_conf = {
"settings": settings,
"mappings": {
"properties": {
embedding_field: {
"type": "knn_vector",
"dimension": dim,
"method": method,
},
}
},
}
self._os_client = os_client or self._get_opensearch_client(
self._endpoint, **kwargs
)
Expand All @@ -127,13 +117,6 @@ def __init__(
self._efficient_filtering_enabled = self._is_efficient_filtering_enabled(
self._os_version
)
not_found_error = self._import_not_found_error()

try:
self._os_client.indices.get(index=self._index)
except not_found_error:
self._os_client.indices.create(index=self._index, body=idx_conf)
self._os_client.indices.refresh(index=self._index)

def _import_opensearch(self) -> Any:
"""Import OpenSearch if available, otherwise raise error."""
Expand Down Expand Up @@ -223,14 +206,18 @@ def _bulk_ingest_embeddings(
mapping = {}

bulk = self._import_bulk()
not_found_error = self._import_not_found_error()
requests = []
return_ids = []

try:
client.indices.get(index=index_name)
except not_found_error:
client.indices.create(index=index_name, body=mapping)
index_exists = client.indices.exists(index=index_name)
if not index_exists:
template_exists = client.indices.exists_index_template(
f"{index_name}-template"
)
if not template_exists:
raise ValueError(
f"Invalid index name {index_name}, does not exist and there is no template for it."
)

for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
Expand All @@ -250,8 +237,9 @@ def _bulk_ingest_embeddings(
return_ids.append(_id)

bulk(client, requests, max_chunk_bytes=max_chunk_bytes)
if not is_aoss:
client.indices.refresh(index=index_name)
# we don't use it on Amazon, but we still do not want to trigger a refresh here
# if not is_aoss:
# client.indices.refresh(index=index_name)

return return_ids

Expand All @@ -274,14 +262,18 @@ async def _abulk_ingest_embeddings(
mapping = {}

async_bulk = self._import_async_bulk()
not_found_error = self._import_not_found_error()
requests = []
return_ids = []

try:
await client.indices.get(index=index_name)
except not_found_error:
await client.indices.create(index=index_name, body=mapping)
index_exists = await client.indices.exists(index=index_name)
if not index_exists:
template_exists = await client.indices.exists_index_template(
f"{index_name}-template"
)
if not template_exists:
raise ValueError(
f"Invalid index name {index_name}, does not exist and there is no template for it."
)

for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
Expand All @@ -301,8 +293,9 @@ async def _abulk_ingest_embeddings(
return_ids.append(_id)

await async_bulk(client, requests, max_chunk_bytes=max_chunk_bytes)
if not is_aoss:
await client.indices.refresh(index=index_name)
# we don't use it on Amazon, but we still do not want to trigger a refresh here
# if not is_aoss:
# await client.indices.refresh(index=index_name)

return return_ids

Expand Down Expand Up @@ -632,7 +625,39 @@ def index_results(self, nodes: List[BaseNode], **kwargs: Any) -> List[str]:
ids.append(node.node_id)
embeddings.append(node.get_embedding())
texts.append(node.get_content(metadata_mode=MetadataMode.NONE))
metadatas.append(node_to_metadata_dict(node, remove_text=True))

# all "aodocs.*" metadata are also stored in separate index fields,
# no need to store duplicates of them inside _node_content,
# we'll just add them all back to the node after we load it from _node_content during query
metadata = node_to_metadata_dict(node, remove_text=True)
node_content_dict = json.loads(metadata["_node_content"])
node_content_dict["metadata"] = {
k: v
for k, v in node_content_dict["metadata"].items()
if not k.startswith("aodocs.")
}

# _node_content also stores relationships, like prev/next, parent/child, and source.
# We have source relationships in our index, probably created by MarkdownElementNodeParser,
# which by default creates a source reference that points back to the same node.
# The relationship info also stores the metadata of the target node, which in the case of a self-referencing
# source relationship, is another duplication of all the metadata.
# In that case we can also remove and later restore the aodocs entries.
source_node_info = node_content_dict["relationships"].get("1")
if source_node_info and source_node_info["metadata"] == node.metadata:
source_node_info["metadata"] = {
k: v
for k, v in source_node_info["metadata"].items()
if not k.startswith("aodocs.")
}
# We checked "source meta == node meta" but then changed the source, which complicates checking
# after a query if it was true before the removals or not.
# So flagging it to make identification for aodocs entry restoration easy and reliable.
source_node_info["metadata"][REMOVED_AODOCS_METADATA_MARKER] = True

metadata["_node_content"] = json.dumps(node_content_dict)

metadatas.append(metadata)

return self._bulk_ingest_embeddings(
self._os_client,
Expand Down Expand Up @@ -855,6 +880,19 @@ def _to_query_result(self, res) -> VectorStoreQueryResult:
try:
node = metadata_dict_to_node(metadata)
node.text = text

node.metadata.update(
{k: v for k, v in metadata.items() if k.startswith("aodocs.")}
)

source_node_info = node.source_node
if source_node_info and source_node_info.metadata.get(
REMOVED_AODOCS_METADATA_MARKER
):
del source_node_info.metadata[REMOVED_AODOCS_METADATA_MARKER]
source_node_info.metadata.update(
{k: v for k, v in metadata.items() if k.startswith("aodocs.")}
)
except Exception:
# TODO: Legacy support for old nodes
node_info = source.get("node_info")
Expand Down

0 comments on commit 0a83e55

Please sign in to comment.