Skip to content

Commit

Permalink
Request/Response classes for DocumentIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
NickyHavoc committed Oct 31, 2023
1 parent 4b33901 commit d2dfac0
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ repos:
hooks:
- id: codespell
args: ["-L newyorker"]
exclude: '^(poetry\.lock|log-viewer/.*|tests/retrievers/test_document_index\.py)$'
exclude: '^(poetry\.lock|log-viewer/.*|tests/connectors/retrievers/test_document_index_retriever\.py)$'
182 changes: 131 additions & 51 deletions src/intelligence_layer/connectors/document_index/document_index.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,97 @@
from datetime import datetime
import json
from typing import Any
from typing import Any, Mapping, Sequence

from pydantic import BaseModel, Field
import requests


class DocumentContents(BaseModel):
contents: Sequence[str]

@classmethod
def from_text(cls, text: str) -> "DocumentContents":
return cls(contents=[text])

@classmethod
def _from_modalities_json(
cls, modalities_json: Mapping[str, Any]
) -> "DocumentContents":
contents = []
for m in modalities_json.get("contents", []):
if m["modality"] == "text":
contents.append(m["text"])
return cls(contents=contents)

def _to_modalities_json(self) -> Sequence[Mapping[str, str]]:
text_contents = []
for c in self.contents:
if not isinstance(c, str):
raise TypeError("Currently, only str modality is supported.")
text_contents.append({"modality": "text", "text": c})
return text_contents


class CollectionPath(BaseModel):
namespace: str
collection: str


class DocumentPath(BaseModel):
collection_path: CollectionPath
document_name: str

@classmethod
def _from_json(cls, document_path_json: Mapping[str, str]) -> "DocumentPath":
return cls(
collection_path=CollectionPath(
namespace=document_path_json["namespace"],
collection=document_path_json["collection"],
),
document_name=document_path_json["name"],
)


class DocumentInfo(BaseModel):
document_path: DocumentPath
created: datetime
version: int

@classmethod
def _from_list_documents_response(
cls, list_documents_response: Mapping[str, Any]
) -> "DocumentInfo":
return cls(
document_path=DocumentPath._from_json(list_documents_response["path"]),
created=datetime.strptime(
list_documents_response["created_timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ"
),
version=list_documents_response["version"],
)


class SearchQuery(BaseModel):
query: str
max_results: int = Field(..., ge=0)
min_score: float = Field(..., ge=0.0, le=1.0)


class DocumentSearchResult(BaseModel):
document_path: DocumentPath
section: str
score: float

@classmethod
def _from_search_response(
cls, search_response: Mapping[str, Any]
) -> "DocumentSearchResult":
return cls(
document_path=DocumentPath._from_json(search_response["document_path"]),
section=search_response["section"][0]["text"],
score=search_response["score"],
)


class DocumentIndex:
"""Client for the Document Index allowing handling documents and search.
Expand All @@ -18,10 +106,12 @@ class DocumentIndex:
>>> document_index = DocumentIndex(os.getenv("AA_TOKEN"))
>>> document_index.create_collection(namespace="my_namespace", collection="germany_facts_collection")
>>> document_index.add_document(
>>> namespace="my_namespace",
>>> collection="germany_facts_collection",
>>> name="Fun facts about Germany",
>>> content="Germany is a country located in ..."
>>> document_path=CollectionPath(
>>> namespace="my_namespace",
>>> collection="germany_facts_collection",
>>> document_name="Fun facts about Germany",
>>> )
>>> content=DocumentContents.from_text("Germany is a country located in ...")
>>> )
>>> documents = document_index.search(
>>> namespace="my_namespace",
Expand All @@ -44,83 +134,73 @@ def __init__(
"Authorization": f"Bearer {token}",
}

def create_collection(self, namespace: str, collection: str) -> None:
url = f"{self._base_document_index_url}/collections/{namespace}/{collection}"
def create_collection(self, collection_path: CollectionPath) -> None:
url = f"{self._base_document_index_url}/collections/{collection_path.namespace}/{collection_path.collection}"
response = requests.put(url, headers=self.headers)
response.raise_for_status()

def delete_collection(self, namespace: str, collection: str) -> None:
url = f"{self._base_document_index_url}/collections/{namespace}/{collection}"
def delete_collection(self, collection_path: CollectionPath) -> None:
url = f"{self._base_document_index_url}/collections/{collection_path.namespace}/{collection_path.collection}"
response = requests.delete(url, headers=self.headers)
response.raise_for_status()

def list_collections(self, namespace: str) -> Sequence[str]:
url = f"{self._base_document_index_url}/collections/{namespace}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
collections: Sequence[str] = response.json()
return collections

def add_document(
self,
namespace: str,
collection: str,
name: str,
content: str,
document_path: DocumentPath,
contents: DocumentContents,
) -> None:
url = f"{self._base_document_index_url}/collections/{namespace}/{collection}/docs/{name}"
url = f"{self._base_document_index_url}/collections/{document_path.collection_path.namespace}/{document_path.collection_path.collection}/docs/{document_path.document_name}"
data = {
"schema_version": "V1",
"contents": [{"modality": "text", "text": content}],
"contents": contents._to_modalities_json(),
}
response = requests.put(url, data=json.dumps(data), headers=self.headers)
response.raise_for_status()

def delete_document(self, namespace: str, collection: str, name: str) -> None:
url = f"{self._base_document_index_url}/collections/{namespace}/{collection}/docs/{name}"
def delete_document(self, document_path: DocumentPath) -> None:
url = f"{self._base_document_index_url}/collections/{document_path.collection_path.namespace}/{document_path.collection_path.collection}/docs/{document_path.document_name}"
response = requests.delete(url, headers=self.headers)
response.raise_for_status()

def get_document(
self, namespace: str, collection: str, name: str, get_chunks: bool = False
) -> Any:
if not get_chunks:
url = f"{self._base_document_index_url}/collections/{namespace}/{collection}/docs/{name}"
else:
url = f"{self._base_document_index_url}/collections/{namespace}/{collection}/docs/{name}/chunks"
def document(self, document_path: DocumentPath) -> DocumentContents:
url = f"{self._base_document_index_url}/collections/{document_path.collection_path.namespace}/{document_path.collection_path.collection}/docs/{document_path.document_name}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
return DocumentContents._from_modalities_json(response.json())

def list_documents(self, namespace: str, collection: str) -> Any:
url = (
f"{self._base_document_index_url}/collections/{namespace}/{collection}/docs"
)
def list_documents(self, collection_path: CollectionPath) -> Sequence[DocumentInfo]:
url = f"{self._base_document_index_url}/collections/{collection_path.namespace}/{collection_path.collection}/docs"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
return [DocumentInfo._from_list_documents_response(r) for r in response.json()]

def index_search(
def search(
self,
namespace: str,
collection: str,
collection_path: CollectionPath,
index: str,
query: str,
max_results: int,
min_score: float,
) -> Any:
url = f"{self._base_document_index_url}/collections/{namespace}/{collection}/indexes/{index}/search"
search_query: SearchQuery,
) -> Sequence[DocumentSearchResult]:
url = f"{self._base_document_index_url}/collections/{collection_path.namespace}/{collection_path.collection}/indexes/{index}/search"
data = {
"query": [{"modality": "text", "text": query}],
"max_results": max_results,
"min_score": min_score,
"query": [{"modality": "text", "text": search_query.query}],
"max_results": search_query.max_results,
"min_score": search_query.min_score,
"filter": [{"with": [{"modality": "text"}]}],
}
response = requests.post(url, data=json.dumps(data), headers=self.headers)
response.raise_for_status()
return response.json()
return [DocumentSearchResult._from_search_response(r) for r in response.json()]

def asymmetric_search(
self,
namespace: str,
collection: str,
query: str,
max_results: int,
min_score: float,
) -> Any:
return self.index_search(
namespace, collection, "asymmetric", query, max_results, min_score
)
collection_path: CollectionPath,
search_query: SearchQuery,
) -> Sequence[DocumentSearchResult]:
return self.search(collection_path, "asymmetric", search_query)
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Sequence

from intelligence_layer.connectors.document_index.document_index import DocumentIndex
from intelligence_layer.connectors.document_index.document_index import (
CollectionPath,
DocumentIndex,
SearchQuery,
)
from intelligence_layer.connectors.retrievers.base_retriever import (
BaseRetriever,
Document,
Expand Down Expand Up @@ -37,19 +41,23 @@ def __init__(
threshold: float = 0.5,
) -> None:
self._document_index = document_index
self._namespace = namespace
self._collection = collection
self._collection_path = CollectionPath(
namespace=namespace, collection=collection
)
self._k = k
self._threshold = threshold

def get_relevant_documents_with_scores(self, query: str) -> Sequence[SearchResult]:
search_query = SearchQuery(
query=query, max_results=self._k, min_score=self._threshold
)
response = self._document_index.asymmetric_search(
self._namespace, self._collection, query, self._k, self._threshold
self._collection_path, search_query
)
relevant_chunks = [
SearchResult(
score=result["score"],
document=Document(text=result["section"][0]["text"], metadata=None),
score=result.score,
document=Document(text=result.section),
)
for result in response
]
Expand Down
107 changes: 107 additions & 0 deletions tests/connectors/document_index/test_document_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from pytest import fixture
import pytest
from intelligence_layer.connectors.document_index.document_index import (
CollectionPath,
DocumentContents,
DocumentIndex,
DocumentPath,
SearchQuery,
)


@fixture
def collection_path() -> CollectionPath:
return CollectionPath(namespace="aleph-alpha", collection="ci-collection")


@fixture
def document_path(
document_index: DocumentIndex, collection_path: CollectionPath
) -> DocumentPath:
document_index.create_collection(collection_path)
return DocumentPath(
collection_path=collection_path, document_name="Example Document"
)


@fixture
def document_contents() -> DocumentContents:
text = """John Stith Pemberton, the inventor of the world-renowned beverage Coca-Cola, was a figure whose life was marked by creativity, entrepreneurial spirit, and the turbulent backdrop of 19th-century America. Born on January 8, 1831, in Knoxville, Georgia, Pemberton grew up in an era of profound transformation and change.
Pemberton began his professional journey by studying medicine and pharmacy. After earning a degree in pharmacy, he started his career as a druggist in Columbus, Georgia. He was known for his keen interest in creating medicinal concoctions and was well-respected in his community. His early creations included various medicines and tonics, which were typical of the times when pharmacists often concocted their own remedies.
Pemberton's life took a significant turn during the American Civil War. He served as a lieutenant colonel in the Confederate Army, and it was during this period that he sustained a wound that led him to become dependent on morphine. This personal struggle with addiction likely influenced his later work in seeking out alternatives and remedies for pain relief.
In the post-war years, Pemberton relocated to Atlanta, Georgia, where he continued to experiment with various medicinal syrups and tonics. It was during this time, in the late 19th century, that he developed a beverage he initially called "Pemberton's French Wine Coca." This concoction was inspired by Vin Mariani, a popular French tonic wine that contained coca leaves. Pemberton's beverage was intended to serve not just as a refreshing drink but also as a remedy for various ailments, including morphine addiction, indigestion, and headaches.
However, in 1886, when Atlanta introduced prohibition legislation, Pemberton was compelled to create a non-alcoholic version of his beverage. He experimented with a combination of carbonated water, coca leaf extract, kola nut, and other ingredients, eventually perfecting the formula for what would soon become Coca-Cola. The name was suggested by his bookkeeper, Frank Robinson, who also created the distinctive cursive logo that is still in use today.
Pemberton advertised his new creation as a "brain tonic" and "temperance drink," asserting that it could alleviate headaches and fatigue. However, due to his declining health and financial difficulties, Pemberton was eventually compelled to sell portions of his business to various partners. Shortly before his death in 1888, he sold his remaining stake in Coca-Cola to Asa G. Candler, a fellow pharmacist and businessman.
Under Candler's leadership, Coca-Cola transformed from a pharmacist's concoction into a mass-produced and marketed beverage that became a staple of American culture and a global icon. Despite the changes and the immense growth of the brand, the legacy of John Stith Pemberton as the inventor of Coca-Cola remains an integral part of the beverage's history.
Pemberton's life story is a testament to the spirit of innovation and resilience. His creation, borne out of personal struggles and the context of his times, went on to transcend its origins and become a symbol recognized across the globe. Today, when we think of Coca-Cola, we are reminded of Pemberton's journey from a small-town pharmacist to the creator of one of the world's most enduring and beloved brands."""
return DocumentContents.from_text(text)


@pytest.mark.internal
def test_document_index_creates_collection(
document_index: DocumentIndex, collection_path: CollectionPath
) -> None:
document_index.create_collection(collection_path)
collections = document_index.list_collections(collection_path.namespace)

assert collection_path.collection in collections


@pytest.mark.internal
def test_document_index_adds_document(
document_index: DocumentIndex,
document_path: DocumentPath,
document_contents: DocumentContents,
) -> None:
document_index.add_document(document_path, document_contents)
document_paths = document_index.list_documents(document_path.collection_path)

assert any(d.document_path == document_path for d in document_paths)


@pytest.mark.internal
def test_document_index_searches_asymmetrically(
document_index: DocumentIndex, collection_path: CollectionPath
) -> None:
document_path = DocumentPath(
collection_path=collection_path,
document_name="test_document_index_searches_asymmetrically", # is always there
)
search_query = SearchQuery(query="Who likes pizza?", max_results=1, min_score=0.0)
search_result = document_index.asymmetric_search(
document_path.collection_path, search_query
)

assert "Mark" in search_result[0].section


@pytest.mark.internal
def test_document_index_gets_document(
document_index: DocumentIndex, document_path: DocumentPath
) -> None:
document = document_index.document(document_path)

assert any("John Stith Pemberton" in c for c in document.contents)


@pytest.mark.internal
def test_document_index_deletes_document(
document_index: DocumentIndex, collection_path: CollectionPath
) -> None:
document_path = DocumentPath(
collection_path=collection_path, document_name="Document to be deleted"
)
document_contents = DocumentContents.from_text("Some text...")

document_index.add_document(document_path, document_contents)
document_index.delete_document(document_path)
document_paths = document_index.list_documents(document_path.collection_path)

assert not any(d.document_path == document_path for d in document_paths)
Loading

0 comments on commit d2dfac0

Please sign in to comment.