diff --git a/semantic_chunkers/__init__.py b/semantic_chunkers/__init__.py index 8b1bf5b..b3c06d4 100644 --- a/semantic_chunkers/__init__.py +++ b/semantic_chunkers/__init__.py @@ -1,7 +1 @@ -from semantic_chunkers.hybrid_layer import HybridRouteLayer -from semantic_chunkers.layer import LayerConfig, RouteLayer -from semantic_chunkers.route import Route - -__all__ = ["RouteLayer", "HybridRouteLayer", "Route", "LayerConfig"] - -__version__ = "0.0.1" +__version__ = "0.0.1" \ No newline at end of file diff --git a/semantic_chunkers/chunkers/base.py b/semantic_chunkers/chunkers/base.py index cbd2fe7..ae0b305 100644 --- a/semantic_chunkers/chunkers/base.py +++ b/semantic_chunkers/chunkers/base.py @@ -3,21 +3,21 @@ from colorama import Fore, Style from pydantic.v1 import BaseModel, Extra -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.schema import DocumentSplit +from semantic_router.encoders.base import BaseEncoder +from semantic_chunkers.schema import ChunkSet -class BaseSplitter(BaseModel): +class BaseChunker(BaseModel): name: str encoder: BaseEncoder class Config: extra = Extra.allow - def __call__(self, docs: List[str]) -> List[DocumentSplit]: + def __call__(self, docs: List[str]) -> List[ChunkSet]: raise NotImplementedError("Subclasses must implement this method") - def print(self, document_splits: List[DocumentSplit]) -> None: + def print(self, document_splits: List[ChunkSet]) -> None: colors = [Fore.RED, Fore.GREEN, Fore.BLUE, Fore.MAGENTA] for i, split in enumerate(document_splits): color = colors[i % len(colors)] diff --git a/semantic_chunkers/chunkers/consecutive_sim.py b/semantic_chunkers/chunkers/consecutive_sim.py index e2c0702..bff4a0f 100644 --- a/semantic_chunkers/chunkers/consecutive_sim.py +++ b/semantic_chunkers/chunkers/consecutive_sim.py @@ -2,33 +2,33 @@ import numpy as np -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.schema import DocumentSplit -from semantic_chunkers.chunkers.base import BaseSplitter +from semantic_router.encoders.base import BaseEncoder +from semantic_chunkers.schema import ChunkSet +from semantic_chunkers.chunkers.base import BaseChunker -class ConsecutiveSimSplitter(BaseSplitter): +class ConsecutiveChunker(BaseChunker): """ - Called "consecutive sim splitter" because we check the similarities of consecutive document embeddings (compare ith to i+1th document embedding). + Called "consecutive sim chunker" because we check the similarities of consecutive document embeddings (compare ith to i+1th document embedding). """ def __init__( self, encoder: BaseEncoder, - name: str = "consecutive_similarity_splitter", + name: str = "consecutive_chunker", score_threshold: float = 0.45, ): super().__init__(name=name, encoder=encoder) encoder.score_threshold = score_threshold self.score_threshold = score_threshold - def __call__(self, docs: List[Any]) -> List[DocumentSplit]: + def __call__(self, docs: List[Any]) -> List[ChunkSet]: """Split documents into smaller chunks based on semantic similarity. :param docs: list of text documents to be split, if only wanted to split a single document, pass it as a list with a single element. - :return: list of DocumentSplit objects containing the split documents. + :return: list of ChunkSet objects containing the chunks. """ # Check if there's only a single document if len(docs) == 1: @@ -48,7 +48,7 @@ def __call__(self, docs: List[Any]) -> List[DocumentSplit]: curr_sim_score = sim_matrix[idx - 1][idx] if idx < len(sim_matrix) and curr_sim_score < self.score_threshold: splits.append( - DocumentSplit( + ChunkSet( docs=list(docs[curr_split_start_idx:idx]), is_triggered=True, triggered_score=curr_sim_score, @@ -56,5 +56,5 @@ def __call__(self, docs: List[Any]) -> List[DocumentSplit]: ) curr_split_start_idx = idx curr_split_num += 1 - splits.append(DocumentSplit(docs=list(docs[curr_split_start_idx:]))) + splits.append(ChunkSet(docs=list(docs[curr_split_start_idx:]))) return splits diff --git a/semantic_chunkers/chunkers/cumulative_sim.py b/semantic_chunkers/chunkers/cumulative_sim.py index 5335523..1dc2167 100644 --- a/semantic_chunkers/chunkers/cumulative_sim.py +++ b/semantic_chunkers/chunkers/cumulative_sim.py @@ -2,12 +2,12 @@ import numpy as np -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.schema import DocumentSplit -from semantic_chunkers.chunkers.base import BaseSplitter +from semantic_router.router.base import BaseEncoder +from semantic_chunkers.schema import ChunkSet +from semantic_chunkers.chunkers.base import BaseChunker -class CumulativeSimSplitter(BaseSplitter): +class CumulativeChunker(BaseChunker): """ Called "cumulative sim" because we check the similarities of the embeddings of cumulative concatenated documents with the next document. @@ -16,20 +16,20 @@ class CumulativeSimSplitter(BaseSplitter): def __init__( self, encoder: BaseEncoder, - name: str = "cumulative_similarity_splitter", + name: str = "cumulative_chunker", score_threshold: float = 0.45, ): super().__init__(name=name, encoder=encoder) encoder.score_threshold = score_threshold self.score_threshold = score_threshold - def __call__(self, docs: List[str]) -> List[DocumentSplit]: + def __call__(self, docs: List[str]) -> List[ChunkSet]: """Split documents into smaller chunks based on semantic similarity. - :param docs: list of text documents to be split, if only wanted to - split a single document, pass it as a list with a single element. + :param docs: list of text documents to be chunk, if only wanted to + chunk a single document, pass it as a list with a single element. - :return: list of DocumentSplit objects containing the split documents. + :return: list of ChunkSet objects containing the chunks. """ total_docs = len(docs) # Check if there's only a single document @@ -38,43 +38,43 @@ def __call__(self, docs: List[str]) -> List[DocumentSplit]: "There is only one document provided; at least two are required " "to determine topics based on similarity." ) - splits = [] - curr_split_start_idx = 0 + chunks = [] + curr_chunk_start_idx = 0 for idx in range(0, total_docs): if idx + 1 < total_docs: # Ensure there is a next document to compare with. if idx == 0: # On the first iteration, compare the # first document directly to the second. - curr_split_docs = docs[idx] + curr_chunk_docs = docs[idx] else: # For subsequent iterations, compare cumulative # documents up to the current one with the next. - curr_split_docs = "\n".join(docs[curr_split_start_idx : idx + 1]) + curr_chunk_docs = "\n".join(docs[curr_chunk_start_idx : idx + 1]) next_doc = docs[idx + 1] # Embedding and similarity calculation remains the same. - curr_split_docs_embed = self.encoder([curr_split_docs])[0] + curr_chunk_docs_embed = self.encoder([curr_chunk_docs])[0] next_doc_embed = self.encoder([next_doc])[0] - curr_sim_score = np.dot(curr_split_docs_embed, next_doc_embed) / ( - np.linalg.norm(curr_split_docs_embed) + curr_sim_score = np.dot(curr_chunk_docs_embed, next_doc_embed) / ( + np.linalg.norm(curr_chunk_docs_embed) * np.linalg.norm(next_doc_embed) ) - # Decision to split based on similarity score. + # Decision to chunk based on similarity score. if curr_sim_score < self.score_threshold: - splits.append( - DocumentSplit( - docs=list(docs[curr_split_start_idx : idx + 1]), + chunks.append( + ChunkSet( + docs=list(docs[curr_chunk_start_idx : idx + 1]), is_triggered=True, triggered_score=curr_sim_score, ) ) - curr_split_start_idx = ( + curr_chunk_start_idx = ( idx + 1 ) # Update the start index for the next segment. # Add the last segment after the loop. - if curr_split_start_idx < total_docs: - splits.append(DocumentSplit(docs=list(docs[curr_split_start_idx:]))) + if curr_chunk_start_idx < total_docs: + chunks.append(ChunkSet(docs=list(docs[curr_chunk_start_idx:]))) - return splits + return chunks diff --git a/semantic_chunkers/chunkers/rolling_window.py b/semantic_chunkers/chunkers/rolling_window.py index a94dfd2..6ad1c11 100644 --- a/semantic_chunkers/chunkers/rolling_window.py +++ b/semantic_chunkers/chunkers/rolling_window.py @@ -3,50 +3,50 @@ import numpy as np -from semantic_chunkers.encoders.base import BaseEncoder -from semantic_chunkers.schema import DocumentSplit -from semantic_chunkers.chunkers.base import BaseSplitter +from semantic_router.encoders.base import BaseEncoder +from semantic_chunkers.schema import ChunkSet +from semantic_chunkers.chunkers.base import BaseChunker from semantic_chunkers.chunkers.utils import split_to_sentences, tiktoken_length from semantic_chunkers.utils.logger import logger @dataclass -class SplitStatistics: +class ChunkStatistics: total_documents: int - total_splits: int - splits_by_threshold: int - splits_by_max_chunk_size: int - splits_by_last_split: int + total_chunks: int + chunks_by_threshold: int + chunks_by_max_chunk_size: int + chunks_by_last_split: int min_token_size: int max_token_size: int - splits_by_similarity_ratio: float + chunks_by_similarity_ratio: float def __str__(self): return ( - f"Splitting Statistics:\n" + f"Chunking Statistics:\n" f" - Total Documents: {self.total_documents}\n" - f" - Total Splits: {self.total_splits}\n" - f" - Splits by Threshold: {self.splits_by_threshold}\n" - f" - Splits by Max Chunk Size: {self.splits_by_max_chunk_size}\n" - f" - Last Split: {self.splits_by_last_split}\n" - f" - Minimum Token Size of Split: {self.min_token_size}\n" - f" - Maximum Token Size of Split: {self.max_token_size}\n" - f" - Similarity Split Ratio: {self.splits_by_similarity_ratio:.2f}" + f" - Total Chunks: {self.total_chunks}\n" + f" - Chunks by Threshold: {self.chunks_by_threshold}\n" + f" - Chunks by Max Chunk Size: {self.chunks_by_max_chunk_size}\n" + f" - Last Chunk: {self.chunks_by_last_split}\n" + f" - Minimum Token Size of Chunk: {self.min_token_size}\n" + f" - Maximum Token Size of Chunk: {self.max_token_size}\n" + f" - Similarity Chunk Ratio: {self.chunks_by_similarity_ratio:.2f}" ) -class RollingWindowSplitter(BaseSplitter): +class StatisticalChunker(BaseChunker): def __init__( self, encoder: BaseEncoder, - name="rolling_window_splitter", + name="statistical_chunker", threshold_adjustment=0.01, dynamic_threshold: bool = True, window_size=5, min_split_tokens=100, max_split_tokens=300, split_tokens_tolerance=10, - plot_splits=False, + plot_chunks=False, enable_statistics=False, ): super().__init__(name=name, encoder=encoder) @@ -55,20 +55,20 @@ def __init__( self.threshold_adjustment = threshold_adjustment self.dynamic_threshold = dynamic_threshold self.window_size = window_size - self.plot_splits = plot_splits + self.plot_chunks = plot_chunks self.min_split_tokens = min_split_tokens self.max_split_tokens = max_split_tokens self.split_tokens_tolerance = split_tokens_tolerance self.enable_statistics = enable_statistics - self.statistics: SplitStatistics + self.statistics: ChunkStatistics - def __call__(self, docs: List[str]) -> List[DocumentSplit]: - """Split documents into smaller chunks based on semantic similarity. + def __call__(self, docs: List[str]) -> List[ChunkSet]: + """Chunk documents into smaller chunks based on semantic similarity. :param docs: list of text documents to be split, if only wanted to split a single document, pass it as a list with a single element. - :return: list of DocumentSplit objects containing the split documents. + :return: list of DocumentChunk objects containing the split documents. """ if not docs: raise ValueError("At least one document is required for splitting.") @@ -79,7 +79,7 @@ def __call__(self, docs: List[str]) -> List[DocumentSplit]: logger.info( f"Single document exceeds the maximum token limit " f"of {self.max_split_tokens}. " - "Splitting to sentences before semantically splitting." + "Splitting to sentences before semantically merging." ) docs = split_to_sentences(docs[0]) encoded_docs = self._encode_documents(docs) @@ -89,15 +89,15 @@ def __call__(self, docs: List[str]) -> List[DocumentSplit]: else: self.calculated_threshold = self.encoder.score_threshold split_indices = self._find_split_indices(similarities=similarities) - splits = self._split_documents(docs, split_indices, similarities) + chunks = self._split_documents(docs, split_indices, similarities) - if self.plot_splits: - self.plot_similarity_scores(similarities, split_indices, splits) + if self.plot_chunks: + self.plot_similarity_scores(similarities, split_indices, chunks) if self.enable_statistics: print(self.statistics) - return splits + return chunks def _encode_documents(self, docs: List[str]) -> np.ndarray: """ @@ -143,7 +143,7 @@ def _find_split_indices(self, similarities: List[float]) -> List[int]: f"Adding to split_indices due to score < threshold: " f"{score} < {self.calculated_threshold}" ) - # Split after the document at idx + # Chunk after the document at idx split_indices.append(idx + 1) return split_indices @@ -176,7 +176,7 @@ def _find_optimal_threshold(self, docs: List[str], similarity_scores: List[float ) ] - # Calculate the median token count for the splits + # Calculate the median token count for the chunks median_tokens = np.median(split_token_counts) logger.debug( f"Iteration {iteration}: Median tokens per split: {median_tokens}" @@ -206,23 +206,23 @@ def _find_optimal_threshold(self, docs: List[str], similarity_scores: List[float def _split_documents( self, docs: List[str], split_indices: List[int], similarities: List[float] - ) -> List[DocumentSplit]: + ) -> List[ChunkSet]: """ This method iterates through each document, appending it to the current split until it either reaches a split point (determined by split_indices) or exceeds the maximum token limit for a split (self.max_split_tokens). When a document causes the current token count to exceed this limit, or when a split point is reached and the minimum token requirement is met, - the current split is finalized and added to the List of splits. + the current split is finalized and added to the List of chunks. """ token_counts = [tiktoken_length(doc) for doc in docs] - splits, current_split = [], [] + chunks, current_split = [], [] current_tokens_count = 0 # Statistics - splits_by_threshold = 0 - splits_by_max_chunk_size = 0 - splits_by_last_split = 0 + chunks_by_threshold = 0 + chunks_by_max_chunk_size = 0 + chunks_by_last_split = 0 for doc_idx, doc in enumerate(docs): doc_token_count = token_counts[doc_idx] @@ -243,8 +243,8 @@ def _split_documents( triggered_score = ( similarities[doc_idx] if doc_idx < len(similarities) else None ) - splits.append( - DocumentSplit( + chunks.append( + ChunkSet( docs=current_split.copy(), is_triggered=True, triggered_score=triggered_score, @@ -252,27 +252,27 @@ def _split_documents( ) ) logger.debug( - f"Split finalized with {current_tokens_count} tokens due to " + f"Chunk finalized with {current_tokens_count} tokens due to " f"threshold {self.calculated_threshold}." ) current_split, current_tokens_count = [], 0 - splits_by_threshold += 1 + chunks_by_threshold += 1 continue # Move to the next document after splitting # Check if adding the current document exceeds the max token limit if current_tokens_count + doc_token_count > self.max_split_tokens: if current_tokens_count >= self.min_split_tokens: - splits.append( - DocumentSplit( + chunks.append( + ChunkSet( docs=current_split.copy(), is_triggered=False, triggered_score=None, token_count=current_tokens_count, ) ) - splits_by_max_chunk_size += 1 + chunks_by_max_chunk_size += 1 logger.debug( - f"Split finalized with {current_tokens_count} tokens due to " + f"Chink finalized with {current_tokens_count} tokens due to " f"exceeding token limit of {self.max_split_tokens}." ) current_split, current_tokens_count = [], 0 @@ -282,15 +282,15 @@ def _split_documents( # Handle the last split if current_split: - splits.append( - DocumentSplit( + chunks.append( + ChunkSet( docs=current_split.copy(), is_triggered=False, triggered_score=None, token_count=current_tokens_count, ) ) - splits_by_last_split += 1 + chunks_by_last_split += 1 logger.debug( f"Final split added with {current_tokens_count} " "tokens due to remaining documents." @@ -299,7 +299,7 @@ def _split_documents( # Validation to ensure no tokens are lost during the split original_token_count = sum(token_counts) split_token_count = sum( - [tiktoken_length(doc) for split in splits for doc in split.docs] + [tiktoken_length(doc) for split in chunks for doc in split.docs] ) if original_token_count != split_token_count: logger.error( @@ -310,37 +310,37 @@ def _split_documents( ) # Statistics - total_splits = len(splits) - splits_by_similarity_ratio = ( - splits_by_threshold / total_splits if total_splits else 0 + total_chunks = len(chunks) + chunks_by_similarity_ratio = ( + chunks_by_threshold / total_chunks if total_chunks else 0 ) min_token_size = max_token_size = 0 - if splits: + if chunks: token_counts = [ - split.token_count for split in splits if split.token_count is not None + split.token_count for split in chunks if split.token_count is not None ] min_token_size, max_token_size = min(token_counts, default=0), max( token_counts, default=0 ) - self.statistics = SplitStatistics( + self.statistics = ChunkStatistics( total_documents=len(docs), - total_splits=total_splits, - splits_by_threshold=splits_by_threshold, - splits_by_max_chunk_size=splits_by_max_chunk_size, - splits_by_last_split=splits_by_last_split, + total_chunks=total_chunks, + chunks_by_threshold=chunks_by_threshold, + chunks_by_max_chunk_size=chunks_by_max_chunk_size, + chunks_by_last_split=chunks_by_last_split, min_token_size=min_token_size, max_token_size=max_token_size, - splits_by_similarity_ratio=splits_by_similarity_ratio, + chunks_by_similarity_ratio=chunks_by_similarity_ratio, ) - return splits + return chunks def plot_similarity_scores( self, similarities: List[float], split_indices: List[int], - splits: list[DocumentSplit], + chunks: list[ChunkSet], ): try: from matplotlib import pyplot as plt @@ -360,7 +360,7 @@ def plot_similarity_scores( x=split_index - 1, color="r", linestyle="--", - label="Split" if split_index == split_indices[0] else "", + label="Chunk" if split_index == split_indices[0] else "", ) axs[0].axhline( y=self.calculated_threshold, @@ -389,11 +389,11 @@ def plot_similarity_scores( ) axs[0].legend() - # Plot 2: Split Token Size Distribution - token_counts = [split.token_count for split in splits] + # Plot 2: Chunk Token Size Distribution + token_counts = [split.token_count for split in chunks] axs[1].bar(range(len(token_counts)), token_counts, color="lightblue") - axs[1].set_title("Split Token Sizes") - axs[1].set_xlabel("Split Index") + axs[1].set_title("Chunk Token Sizes") + axs[1].set_xlabel("Chunk Index") axs[1].set_ylabel("Token Count") axs[1].set_xticks(range(len(token_counts))) axs[1].set_xticklabels([str(i) for i in range(len(token_counts))]) diff --git a/semantic_chunkers/encoders/__init__.py b/semantic_chunkers/encoders/__init__.py deleted file mode 100644 index b66f3e7..0000000 --- a/semantic_chunkers/encoders/__init__.py +++ /dev/null @@ -1,74 +0,0 @@ -from typing import List, Optional - -from semantic_chunkers.encoders.base import BaseEncoder -from semantic_chunkers.encoders.bm25 import BM25Encoder -from semantic_chunkers.encoders.clip import CLIPEncoder -from semantic_chunkers.encoders.cohere import CohereEncoder -from semantic_chunkers.encoders.fastembed import FastEmbedEncoder -from semantic_chunkers.encoders.google import GoogleEncoder -from semantic_chunkers.encoders.huggingface import HuggingFaceEncoder -from semantic_chunkers.encoders.huggingface import HFEndpointEncoder -from semantic_chunkers.encoders.mistral import MistralEncoder -from semantic_chunkers.encoders.openai import OpenAIEncoder -from semantic_chunkers.encoders.tfidf import TfidfEncoder -from semantic_chunkers.encoders.vit import VitEncoder -from semantic_chunkers.encoders.zure import AzureOpenAIEncoder -from semantic_chunkers.schema import EncoderType - -__all__ = [ - "BaseEncoder", - "AzureOpenAIEncoder", - "CohereEncoder", - "OpenAIEncoder", - "BM25Encoder", - "TfidfEncoder", - "FastEmbedEncoder", - "HuggingFaceEncoder", - "HFEndpointEncoder", - "MistralEncoder", - "VitEncoder", - "CLIPEncoder", - "GoogleEncoder", -] - - -class AutoEncoder: - type: EncoderType - name: Optional[str] - model: BaseEncoder - - def __init__(self, type: str, name: Optional[str]): - self.type = EncoderType(type) - self.name = name - if self.type == EncoderType.AZURE: - # TODO should change `model` to `name` JB - self.model = AzureOpenAIEncoder(model=name) - elif self.type == EncoderType.COHERE: - self.model = CohereEncoder(name=name) - elif self.type == EncoderType.OPENAI: - self.model = OpenAIEncoder(name=name) - elif self.type == EncoderType.BM25: - if name is None: - name = "bm25" - self.model = BM25Encoder(name=name) - elif self.type == EncoderType.TFIDF: - if name is None: - name = "tfidf" - self.model = TfidfEncoder(name=name) - elif self.type == EncoderType.FASTEMBED: - self.model = FastEmbedEncoder(name=name) - elif self.type == EncoderType.HUGGINGFACE: - self.model = HuggingFaceEncoder(name=name) - elif self.type == EncoderType.MISTRAL: - self.model = MistralEncoder(name=name) - elif self.type == EncoderType.VIT: - self.model = VitEncoder(name=name) - elif self.type == EncoderType.CLIP: - self.model = CLIPEncoder(name=name) - elif self.type == EncoderType.GOOGLE: - self.model = GoogleEncoder(name=name) - else: - raise ValueError(f"Encoder type '{type}' not supported") - - def __call__(self, texts: List[str]) -> List[List[float]]: - return self.model(texts) diff --git a/semantic_chunkers/encoders/base.py b/semantic_chunkers/encoders/base.py deleted file mode 100644 index b1e1311..0000000 --- a/semantic_chunkers/encoders/base.py +++ /dev/null @@ -1,15 +0,0 @@ -from typing import Any, List - -from pydantic.v1 import BaseModel, Field - - -class BaseEncoder(BaseModel): - name: str - score_threshold: float - type: str = Field(default="base") - - class Config: - arbitrary_types_allowed = True - - def __call__(self, docs: List[Any]) -> List[List[float]]: - raise NotImplementedError("Subclasses must implement this method") diff --git a/semantic_chunkers/encoders/bm25.py b/semantic_chunkers/encoders/bm25.py deleted file mode 100644 index e34f092..0000000 --- a/semantic_chunkers/encoders/bm25.py +++ /dev/null @@ -1,67 +0,0 @@ -from typing import Any, Dict, List, Optional - -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.utils.logger import logger - - -class BM25Encoder(BaseEncoder): - model: Optional[Any] = None - idx_mapping: Optional[Dict[int, int]] = None - type: str = "sparse" - - def __init__( - self, - name: str = "bm25", - score_threshold: float = 0.82, - use_default_params: bool = True, - ): - super().__init__(name=name, score_threshold=score_threshold) - try: - from pinecone_text.sparse import BM25Encoder as encoder - except ImportError: - raise ImportError( - "Please install pinecone-text to use BM25Encoder. " - "You can install it with: `pip install 'semantic-router[hybrid]'`" - ) - - self.model = encoder() - - if use_default_params: - logger.info("Downloading and initializing default sBM25 model parameters.") - self.model = encoder.default() - self._set_idx_mapping() - - def _set_idx_mapping(self): - params = self.model.get_params() - doc_freq = params["doc_freq"] - if isinstance(doc_freq, dict): - indices = doc_freq["indices"] - self.idx_mapping = {int(idx): i for i, idx in enumerate(indices)} - else: - raise TypeError("Expected a dictionary for 'doc_freq'") - - def __call__(self, docs: List[str]) -> List[List[float]]: - if self.model is None or self.idx_mapping is None: - raise ValueError("Model or index mapping is not initialized.") - if len(docs) == 1: - sparse_dicts = self.model.encode_queries(docs) - elif len(docs) > 1: - sparse_dicts = self.model.encode_documents(docs) - else: - raise ValueError("No documents to encode.") - - embeds = [[0.0] * len(self.idx_mapping)] * len(docs) - for i, output in enumerate(sparse_dicts): - indices = output["indices"] - values = output["values"] - for idx, val in zip(indices, values): - if idx in self.idx_mapping: - position = self.idx_mapping[idx] - embeds[i][position] = val - return embeds - - def fit(self, docs: List[str]): - if self.model is None: - raise ValueError("Model is not initialized.") - self.model.fit(docs) - self._set_idx_mapping() diff --git a/semantic_chunkers/encoders/clip.py b/semantic_chunkers/encoders/clip.py deleted file mode 100644 index bbe3364..0000000 --- a/semantic_chunkers/encoders/clip.py +++ /dev/null @@ -1,126 +0,0 @@ -from typing import Any, List, Optional - -import numpy as np -from pydantic.v1 import PrivateAttr - -from semantic_chunkers.encoders import BaseEncoder - - -class CLIPEncoder(BaseEncoder): - name: str = "openai/clip-vit-base-patch16" - type: str = "huggingface" - score_threshold: float = 0.2 - tokenizer_kwargs: dict = {} - processor_kwargs: dict = {} - model_kwargs: dict = {} - device: Optional[str] = None - _tokenizer: Any = PrivateAttr() - _processor: Any = PrivateAttr() - _model: Any = PrivateAttr() - _torch: Any = PrivateAttr() - _Image: Any = PrivateAttr() - - def __init__(self, **data): - super().__init__(**data) - self._tokenizer, self._processor, self._model = self._initialize_hf_model() - - def __call__( - self, - docs: List[Any], - batch_size: int = 32, - normalize_embeddings: bool = True, - ) -> List[List[float]]: - all_embeddings = [] - if isinstance(docs[0], str): - text = True - else: - text = False - for i in range(0, len(docs), batch_size): - batch_docs = docs[i : i + batch_size] - if text: - embeddings = self._encode_text(docs=batch_docs) - else: - embeddings = self._encode_image(images=batch_docs) - if normalize_embeddings: - embeddings = embeddings / np.linalg.norm(embeddings, axis=0) - - embeddings = embeddings.tolist() - all_embeddings.extend(embeddings) - return all_embeddings - - def _initialize_hf_model(self): - try: - from transformers import CLIPModel, CLIPProcessor, CLIPTokenizerFast - except ImportError: - raise ImportError( - "Please install transformers to use CLIPEncoder. " - "You can install it with: " - "`pip install semantic-router[vision]`" - ) - - try: - import torch - except ImportError: - raise ImportError( - "Please install Pytorch to use CLIPEncoder. " - "You can install it with: " - "`pip install semantic-router[vision]`" - ) - - try: - from PIL import Image - except ImportError: - raise ImportError( - "Please install PIL to use HuggingFaceEncoder. " - "You can install it with: " - "`pip install semantic-router[vision]`" - ) - - self._torch = torch - self._Image = Image - - tokenizer = CLIPTokenizerFast.from_pretrained( - self.name, - **self.tokenizer_kwargs, - ) - processor = CLIPProcessor.from_pretrained(self.name) - model = CLIPModel.from_pretrained(self.name, **self.model_kwargs) - - self.device = self._get_device() - model.to(self.device) - return tokenizer, processor, model - - def _get_device(self) -> str: - if self.device: - device = self.device - elif self._torch.cuda.is_available(): - device = "cuda" - elif self._torch.backends.mps.is_available(): - device = "mps" - else: - device = "cpu" - return device - - def _encode_text(self, docs: List[str]) -> Any: - inputs = self._tokenizer( - docs, return_tensors="pt", padding=True, truncation=True - ).to(self.device) - with self._torch.no_grad(): - embeds = self._model.get_text_features(**inputs) - embeds = embeds.squeeze(0).cpu().detach().numpy() - return embeds - - def _encode_image(self, images: List[Any]) -> Any: - rgb_images = [self._ensure_rgb(img) for img in images] - inputs = self._processor(text=None, images=rgb_images, return_tensors="pt")[ - "pixel_values" - ].to(self.device) - with self._torch.no_grad(): - embeds = self._model.get_image_features(pixel_values=inputs) - embeds = embeds.squeeze(0).cpu().detach().numpy() - return embeds - - def _ensure_rgb(self, img: Any): - rgbimg = self._Image.new("RGB", img.size) - rgbimg.paste(img) - return rgbimg diff --git a/semantic_chunkers/encoders/cohere.py b/semantic_chunkers/encoders/cohere.py deleted file mode 100644 index e219702..0000000 --- a/semantic_chunkers/encoders/cohere.py +++ /dev/null @@ -1,49 +0,0 @@ -import os -from typing import List, Optional - -import cohere - -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.utils.defaults import EncoderDefault - - -class CohereEncoder(BaseEncoder): - client: Optional[cohere.Client] = None - type: str = "cohere" - input_type: Optional[str] = "search_query" - - def __init__( - self, - name: Optional[str] = None, - cohere_api_key: Optional[str] = None, - score_threshold: float = 0.3, - input_type: Optional[str] = "search_query", - ): - if name is None: - name = EncoderDefault.COHERE.value["embedding_model"] - super().__init__( - name=name, - score_threshold=score_threshold, - input_type=input_type, # type: ignore - ) - self.input_type = input_type - cohere_api_key = cohere_api_key or os.getenv("COHERE_API_KEY") - if cohere_api_key is None: - raise ValueError("Cohere API key cannot be 'None'.") - try: - self.client = cohere.Client(cohere_api_key) - except Exception as e: - raise ValueError( - f"Cohere API client failed to initialize. Error: {e}" - ) from e - - def __call__(self, docs: List[str]) -> List[List[float]]: - if self.client is None: - raise ValueError("Cohere client is not initialized.") - try: - embeds = self.client.embed( - docs, input_type=self.input_type, model=self.name - ) - return embeds.embeddings - except Exception as e: - raise ValueError(f"Cohere API call failed. Error: {e}") from e diff --git a/semantic_chunkers/encoders/fastembed.py b/semantic_chunkers/encoders/fastembed.py deleted file mode 100644 index 6612bb5..0000000 --- a/semantic_chunkers/encoders/fastembed.py +++ /dev/null @@ -1,51 +0,0 @@ -from typing import Any, List, Optional - -import numpy as np -from pydantic.v1 import PrivateAttr - -from semantic_chunkers.encoders import BaseEncoder - - -class FastEmbedEncoder(BaseEncoder): - type: str = "fastembed" - name: str = "BAAI/bge-small-en-v1.5" - max_length: int = 512 - cache_dir: Optional[str] = None - threads: Optional[int] = None - _client: Any = PrivateAttr() - - def __init__( - self, score_threshold: float = 0.5, **data - ): # TODO default score_threshold not thoroughly tested, should optimize - super().__init__(score_threshold=score_threshold, **data) - self._client = self._initialize_client() - - def _initialize_client(self): - try: - from fastembed import TextEmbedding - except ImportError: - raise ImportError( - "Please install fastembed to use FastEmbedEncoder. " - "You can install it with: " - "`pip install 'semantic-router[fastembed]'`" - ) - - embedding_args = { - "model_name": self.name, - "max_length": self.max_length, - "cache_dir": self.cache_dir, - "threads": self.threads, - } - - embedding_args = {k: v for k, v in embedding_args.items() if v is not None} - - embedding = TextEmbedding(**embedding_args) - return embedding - - def __call__(self, docs: List[str]) -> List[List[float]]: - try: - embeds: List[np.ndarray] = list(self._client.embed(docs)) - embeddings: List[List[float]] = [e.tolist() for e in embeds] - return embeddings - except Exception as e: - raise ValueError(f"FastEmbed embed failed. Error: {e}") from e diff --git a/semantic_chunkers/encoders/google.py b/semantic_chunkers/encoders/google.py deleted file mode 100644 index fcf4bad..0000000 --- a/semantic_chunkers/encoders/google.py +++ /dev/null @@ -1,139 +0,0 @@ -""" -This module provides the GoogleEncoder class for generating embeddings using Google's AI Platform. - -The GoogleEncoder class is a subclass of BaseEncoder and utilizes the TextEmbeddingModel from the -Google AI Platform to generate embeddings for given documents. It requires a Google Cloud project ID -and supports customization of the pre-trained model, score threshold, location, and API endpoint. - -Example usage: - - from semantic_router.encoders.google_encoder import GoogleEncoder - - encoder = GoogleEncoder(project_id="your-project-id") - embeddings = encoder(["document1", "document2"]) - -Classes: - GoogleEncoder: A class for generating embeddings using Google's AI Platform. -""" - -import os -from typing import Any, List, Optional - -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.utils.defaults import EncoderDefault - - -class GoogleEncoder(BaseEncoder): - """GoogleEncoder class for generating embeddings using Google's AI Platform. - - Attributes: - client: An instance of the TextEmbeddingModel client. - type: The type of the encoder, which is "google". - """ - - client: Optional[Any] = None - type: str = "google" - - def __init__( - self, - name: Optional[str] = None, - score_threshold: float = 0.75, - project_id: Optional[str] = None, - location: Optional[str] = None, - api_endpoint: Optional[str] = None, - ): - """Initializes the GoogleEncoder. - - Args: - model_name: The name of the pre-trained model to use for embedding. - If not provided, the default model specified in EncoderDefault will - be used. - score_threshold: The threshold for similarity scores. - project_id: The Google Cloud project ID. - If not provided, it will be retrieved from the GOOGLE_PROJECT_ID - environment variable. - location: The location of the AI Platform resources. - If not provided, it will be retrieved from the GOOGLE_LOCATION - environment variable, defaulting to "us-central1". - api_endpoint: The API endpoint for the AI Platform. - If not provided, it will be retrieved from the GOOGLE_API_ENDPOINT - environment variable. - - Raises: - ValueError: If the Google Project ID is not provided or if the AI Platform - client fails to initialize. - """ - if name is None: - name = EncoderDefault.GOOGLE.value["embedding_model"] - - super().__init__(name=name, score_threshold=score_threshold) - - self.client = self._initialize_client(project_id, location, api_endpoint) - - def _initialize_client(self, project_id, location, api_endpoint): - """Initializes the Google AI Platform client. - - Args: - project_id: The Google Cloud project ID. - location: The location of the AI Platform resources. - api_endpoint: The API endpoint for the AI Platform. - - Returns: - An instance of the TextEmbeddingModel client. - - Raises: - ImportError: If the required Google Cloud or Vertex AI libraries are not - installed. - ValueError: If the Google Project ID is not provided or if the AI Platform - client fails to initialize. - """ - try: - from google.cloud import aiplatform - from vertexai.language_models import TextEmbeddingModel - except ImportError: - raise ImportError( - "Please install Google Cloud and Vertex AI libraries to use GoogleEncoder. " - "You can install them with: " - "`pip install google-cloud-aiplatform vertexai-language-models`" - ) - - project_id = project_id or os.getenv("GOOGLE_PROJECT_ID") - location = location or os.getenv("GOOGLE_LOCATION", "us-central1") - api_endpoint = api_endpoint or os.getenv("GOOGLE_API_ENDPOINT") - - if project_id is None: - raise ValueError("Google Project ID cannot be 'None'.") - - try: - aiplatform.init( - project=project_id, location=location, api_endpoint=api_endpoint - ) - client = TextEmbeddingModel.from_pretrained(self.name) - except Exception as err: - raise ValueError( - f"Google AI Platform client failed to initialize. Error: {err}" - ) from err - - return client - - def __call__(self, docs: List[str]) -> List[List[float]]: - """Generates embeddings for the given documents. - - Args: - docs: A list of strings representing the documents to embed. - - Returns: - A list of lists, where each inner list contains the embedding values for a - document. - - Raises: - ValueError: If the Google AI Platform client is not initialized or if the - API call fails. - """ - if self.client is None: - raise ValueError("Google AI Platform client is not initialized.") - try: - embeddings = self.client.get_embeddings(docs) - return [embedding.values for embedding in embeddings] - except Exception as e: - raise ValueError(f"Google AI Platform API call failed. Error: {e}") from e diff --git a/semantic_chunkers/encoders/huggingface.py b/semantic_chunkers/encoders/huggingface.py deleted file mode 100644 index 72982f7..0000000 --- a/semantic_chunkers/encoders/huggingface.py +++ /dev/null @@ -1,275 +0,0 @@ -""" -This module provides the HFEndpointEncoder class to embeddings models using Huggingface's endpoint. - -The HFEndpointEncoder class is a subclass of BaseEncoder and utilizes a specified Huggingface -endpoint to generate embeddings for given documents. It requires the URL of the Huggingface -API endpoint and an API key for authentication. The class supports customization of the score -threshold for filtering or processing the embeddings. - -Example usage: - - from semantic_router.encoders.hfendpointencoder import HFEndpointEncoder - - encoder = HFEndpointEncoder( - huggingface_url="https://api-inference.huggingface.co/models/BAAI/bge-large-en-v1.5", - huggingface_api_key="your-hugging-face-api-key" - ) - embeddings = encoder(["document1", "document2"]) - -Classes: - HFEndpointEncoder: A class for generating embeddings using a Huggingface endpoint. -""" - -import requests -import time -import os -from typing import Any, List, Optional - -from pydantic.v1 import PrivateAttr - -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.utils.logger import logger - - -class HuggingFaceEncoder(BaseEncoder): - name: str = "sentence-transformers/all-MiniLM-L6-v2" - type: str = "huggingface" - score_threshold: float = 0.5 - tokenizer_kwargs: dict = {} - model_kwargs: dict = {} - device: Optional[str] = None - _tokenizer: Any = PrivateAttr() - _model: Any = PrivateAttr() - _torch: Any = PrivateAttr() - - def __init__(self, **data): - super().__init__(**data) - self._tokenizer, self._model = self._initialize_hf_model() - - def _initialize_hf_model(self): - try: - from transformers import AutoModel, AutoTokenizer - except ImportError: - raise ImportError( - "Please install transformers to use HuggingFaceEncoder. " - "You can install it with: " - "`pip install semantic-router[local]`" - ) - - try: - import torch - except ImportError: - raise ImportError( - "Please install Pytorch to use HuggingFaceEncoder. " - "You can install it with: " - "`pip install semantic-router[local]`" - ) - - self._torch = torch - - tokenizer = AutoTokenizer.from_pretrained( - self.name, - **self.tokenizer_kwargs, - ) - - model = AutoModel.from_pretrained(self.name, **self.model_kwargs) - - if self.device: - model.to(self.device) - - else: - device = "cuda" if self._torch.cuda.is_available() else "cpu" - model.to(device) - self.device = device - - return tokenizer, model - - def __call__( - self, - docs: List[str], - batch_size: int = 32, - normalize_embeddings: bool = True, - pooling_strategy: str = "mean", - ) -> List[List[float]]: - all_embeddings = [] - for i in range(0, len(docs), batch_size): - batch_docs = docs[i : i + batch_size] - - encoded_input = self._tokenizer( - batch_docs, padding=True, truncation=True, return_tensors="pt" - ).to(self.device) - - with self._torch.no_grad(): - model_output = self._model(**encoded_input) - - if pooling_strategy == "mean": - embeddings = self._mean_pooling( - model_output, encoded_input["attention_mask"] - ) - elif pooling_strategy == "max": - embeddings = self._max_pooling( - model_output, encoded_input["attention_mask"] - ) - else: - raise ValueError( - "Invalid pooling_strategy. Please use 'mean' or 'max'." - ) - - if normalize_embeddings: - embeddings = self._torch.nn.functional.normalize(embeddings, p=2, dim=1) - - embeddings = embeddings.tolist() - all_embeddings.extend(embeddings) - return all_embeddings - - def _mean_pooling(self, model_output, attention_mask): - token_embeddings = model_output[0] - input_mask_expanded = ( - attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() - ) - return self._torch.sum( - token_embeddings * input_mask_expanded, 1 - ) / self._torch.clamp(input_mask_expanded.sum(1), min=1e-9) - - def _max_pooling(self, model_output, attention_mask): - token_embeddings = model_output[0] - input_mask_expanded = ( - attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() - ) - token_embeddings[input_mask_expanded == 0] = -1e9 - return self._torch.max(token_embeddings, 1)[0] - - -class HFEndpointEncoder(BaseEncoder): - """ - A class to encode documents using a Hugging Face transformer model endpoint. - - Attributes: - huggingface_url (str): The URL of the Hugging Face API endpoint. - huggingface_api_key (str): The API key for authenticating with the Hugging Face API. - score_threshold (float): A threshold value used for filtering or processing the embeddings. - """ - - name: str = "hugging_face_custom_endpoint" - huggingface_url: Optional[str] = None - huggingface_api_key: Optional[str] = None - score_threshold: float = 0.8 - - def __init__( - self, - name: Optional[str] = "hugging_face_custom_endpoint", - huggingface_url: Optional[str] = None, - huggingface_api_key: Optional[str] = None, - score_threshold: float = 0.8, - ): - """ - Initializes the HFEndpointEncoder with the specified parameters. - - Args: - name (str, optional): The name of the encoder. Defaults to - "hugging_face_custom_endpoint". - huggingface_url (str, optional): The URL of the Hugging Face API endpoint. - Cannot be None. - huggingface_api_key (str, optional): The API key for the Hugging Face API. - Cannot be None. - score_threshold (float, optional): A threshold for processing the embeddings. - Defaults to 0.8. - - Raises: - ValueError: If either `huggingface_url` or `huggingface_api_key` is None. - """ - huggingface_url = huggingface_url or os.getenv("HF_API_URL") - huggingface_api_key = huggingface_api_key or os.getenv("HF_API_KEY") - - super().__init__(name=name, score_threshold=score_threshold) # type: ignore - - if huggingface_url is None: - raise ValueError("HuggingFace endpoint url cannot be 'None'.") - if huggingface_api_key is None: - raise ValueError("HuggingFace API key cannot be 'None'.") - - self.huggingface_url = huggingface_url or os.getenv("HF_API_URL") - self.huggingface_api_key = huggingface_api_key or os.getenv("HF_API_KEY") - - try: - self.query({"inputs": "Hello World!", "parameters": {}}) - except Exception as e: - raise ValueError( - f"HuggingFace endpoint client failed to initialize. Error: {e}" - ) from e - - def __call__(self, docs: List[str]) -> List[List[float]]: - """ - Encodes a list of documents into embeddings using the Hugging Face API. - - Args: - docs (List[str]): A list of documents to encode. - - Returns: - List[List[float]]: A list of embeddings for the given documents. - - Raises: - ValueError: If no embeddings are returned for a document. - """ - embeddings = [] - for d in docs: - try: - output = self.query({"inputs": d, "parameters": {}}) - if not output or len(output) == 0: - raise ValueError("No embeddings returned from the query.") - embeddings.append(output) - - except Exception as e: - raise ValueError( - f"No embeddings returned for document. Error: {e}" - ) from e - return embeddings - - def query(self, payload, max_retries=3, retry_interval=5): - """ - Sends a query to the Hugging Face API and returns the response. - - Args: - payload (dict): The payload to send in the request. - - Returns: - dict: The response from the Hugging Face API. - - Raises: - ValueError: If the query fails or the response status is not 200. - """ - headers = { - "Accept": "application/json", - "Authorization": f"Bearer {self.huggingface_api_key}", - "Content-Type": "application/json", - } - for attempt in range(1, max_retries + 1): - try: - response = requests.post( - self.huggingface_url, - headers=headers, - json=payload, - # timeout=timeout_seconds, - ) - if response.status_code == 503: - estimated_time = response.json().get("estimated_time", "") - if estimated_time: - logger.info( - f"Model Initializing wait for - {estimated_time:.2f}s " - ) - time.sleep(estimated_time) - continue - else: - response.raise_for_status() - - except requests.exceptions.RequestException: - if attempt < max_retries - 1: - logger.info(f"Retrying attempt: {attempt} for payload: {payload} ") - time.sleep(retry_interval) - retry_interval += attempt - else: - raise ValueError( - f"Query failed with status {response.status_code}: {response.text}" - ) - - return response.json() diff --git a/semantic_chunkers/encoders/mistral.py b/semantic_chunkers/encoders/mistral.py deleted file mode 100644 index 56b310b..0000000 --- a/semantic_chunkers/encoders/mistral.py +++ /dev/null @@ -1,78 +0,0 @@ -"""This file contains the MistralEncoder class which is used to encode text using MistralAI""" - -import os -from time import sleep -from typing import Any, List, Optional - -from pydantic.v1 import PrivateAttr - -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.utils.defaults import EncoderDefault - - -class MistralEncoder(BaseEncoder): - """Class to encode text using MistralAI""" - - _client: Any = PrivateAttr() - _mistralai: Any = PrivateAttr() - type: str = "mistral" - - def __init__( - self, - name: Optional[str] = None, - mistralai_api_key: Optional[str] = None, - score_threshold: float = 0.82, - ): - if name is None: - name = EncoderDefault.MISTRAL.value["embedding_model"] - super().__init__(name=name, score_threshold=score_threshold) - self._client, self._mistralai = self._initialize_client(mistralai_api_key) - - def _initialize_client(self, api_key): - try: - import mistralai - from mistralai.client import MistralClient - except ImportError: - raise ImportError( - "Please install MistralAI to use MistralEncoder. " - "You can install it with: " - "`pip install 'semantic-router[mistralai]'`" - ) - - api_key = api_key or os.getenv("MISTRALAI_API_KEY") - if api_key is None: - raise ValueError("Mistral API key not provided") - try: - client = MistralClient(api_key=api_key) - except Exception as e: - raise ValueError(f"Unable to connect to MistralAI {e.args}: {e}") from e - return client, mistralai - - def __call__(self, docs: List[str]) -> List[List[float]]: - if self._client is None: - raise ValueError("Mistral client not initialized") - embeds = None - error_message = "" - - # Exponential backoff - for _ in range(3): - try: - embeds = self._client.embeddings(model=self.name, input=docs) - if embeds.data: - break - except self._mistralai.exceptions.MistralException as e: - sleep(2**_) - error_message = str(e) - except Exception as e: - raise ValueError(f"Unable to connect to MistralAI {e.args}: {e}") from e - - if ( - not embeds - or not isinstance( - embeds, self._mistralai.models.embeddings.EmbeddingResponse - ) - or not embeds.data - ): - raise ValueError(f"No embeddings returned from MistralAI: {error_message}") - embeddings = [embeds_obj.embedding for embeds_obj in embeds.data] - return embeddings diff --git a/semantic_chunkers/encoders/openai.py b/semantic_chunkers/encoders/openai.py deleted file mode 100644 index 4231b7e..0000000 --- a/semantic_chunkers/encoders/openai.py +++ /dev/null @@ -1,127 +0,0 @@ -import os -from time import sleep -from typing import Any, List, Optional, Union -from pydantic.v1 import PrivateAttr - -import openai -from openai import OpenAIError -from openai._types import NotGiven -from openai.types import CreateEmbeddingResponse -import tiktoken - -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.schema import EncoderInfo -from semantic_chunkers.utils.defaults import EncoderDefault -from semantic_chunkers.utils.logger import logger - - -model_configs = { - "text-embedding-ada-002": EncoderInfo( - name="text-embedding-ada-002", token_limit=8192 - ), - "text-embed-3-small": EncoderInfo(name="text-embed-3-small", token_limit=8192), - "text-embed-3-large": EncoderInfo(name="text-embed-3-large", token_limit=8192), -} - - -class OpenAIEncoder(BaseEncoder): - client: Optional[openai.Client] - dimensions: Union[int, NotGiven] = NotGiven() - token_limit: int = 8192 # default value, should be replaced by config - _token_encoder: Any = PrivateAttr() - type: str = "openai" - - def __init__( - self, - name: Optional[str] = None, - openai_base_url: Optional[str] = None, - openai_api_key: Optional[str] = None, - openai_org_id: Optional[str] = None, - score_threshold: float = 0.82, - dimensions: Union[int, NotGiven] = NotGiven(), - ): - if name is None: - name = EncoderDefault.OPENAI.value["embedding_model"] - super().__init__(name=name, score_threshold=score_threshold) - api_key = openai_api_key or os.getenv("OPENAI_API_KEY") - base_url = openai_base_url or os.getenv("OPENAI_BASE_URL") - openai_org_id = openai_org_id or os.getenv("OPENAI_ORG_ID") - if api_key is None: - raise ValueError("OpenAI API key cannot be 'None'.") - try: - self.client = openai.Client( - base_url=base_url, api_key=api_key, organization=openai_org_id - ) - except Exception as e: - raise ValueError( - f"OpenAI API client failed to initialize. Error: {e}" - ) from e - # set dimensions to support openai embed 3 dimensions param - self.dimensions = dimensions - # if model name is known, set token limit - if name in model_configs: - self.token_limit = model_configs[name].token_limit - # get token encoder - self._token_encoder = tiktoken.encoding_for_model(name) - - def __call__(self, docs: List[str], truncate: bool = True) -> List[List[float]]: - """Encode a list of text documents into embeddings using OpenAI API. - - :param docs: List of text documents to encode. - :param truncate: Whether to truncate the documents to token limit. If - False and a document exceeds the token limit, an error will be - raised. - :return: List of embeddings for each document.""" - if self.client is None: - raise ValueError("OpenAI client is not initialized.") - embeds = None - error_message = "" - - if truncate: - # check if any document exceeds token limit and truncate if so - for i in range(len(docs)): - logger.info(f"Document {i+1} length: {len(docs[i])}") - docs[i] = self._truncate(docs[i]) - logger.info(f"Document {i+1} trunc length: {len(docs[i])}") - - # Exponential backoff - for j in range(1, 7): - try: - embeds = self.client.embeddings.create( - input=docs, - model=self.name, - dimensions=self.dimensions, - ) - if embeds.data: - break - except OpenAIError as e: - sleep(2**j) - error_message = str(e) - logger.warning(f"Retrying in {2**j} seconds...") - except Exception as e: - logger.error(f"OpenAI API call failed. Error: {error_message}") - raise ValueError(f"OpenAI API call failed. Error: {e}") from e - - if ( - not embeds - or not isinstance(embeds, CreateEmbeddingResponse) - or not embeds.data - ): - logger.info(f"Returned embeddings: {embeds}") - raise ValueError(f"No embeddings returned. Error: {error_message}") - - embeddings = [embeds_obj.embedding for embeds_obj in embeds.data] - return embeddings - - def _truncate(self, text: str) -> str: - # we use encode_ordinary as faster equivalent to encode(text, disallowed_special=()) - tokens = self._token_encoder.encode_ordinary(text) - if len(tokens) > self.token_limit: - logger.warning( - f"Document exceeds token limit: {len(tokens)} > {self.token_limit}" - "\nTruncating document..." - ) - text = self._token_encoder.decode(tokens[: self.token_limit - 1]) - logger.info(f"Trunc length: {len(self._token_encoder.encode(text))}") - return text - return text diff --git a/semantic_chunkers/encoders/tfidf.py b/semantic_chunkers/encoders/tfidf.py deleted file mode 100644 index 9c8c176..0000000 --- a/semantic_chunkers/encoders/tfidf.py +++ /dev/null @@ -1,80 +0,0 @@ -import string -from collections import Counter -from typing import Dict - -import numpy as np -from numpy import ndarray -from numpy.linalg import norm - -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.route import Route - - -class TfidfEncoder(BaseEncoder): - idf: ndarray = np.array([]) - word_index: Dict = {} - - def __init__(self, name: str = "tfidf", score_threshold: float = 0.82): - # TODO default score_threshold not thoroughly tested, should optimize - super().__init__(name=name, score_threshold=score_threshold) - self.word_index = {} - self.idf = np.array([]) - - def __call__(self, docs: list[str]) -> list[list[float]]: - if len(self.word_index) == 0 or self.idf.size == 0: - raise ValueError("Vectorizer is not initialized.") - if len(docs) == 0: - raise ValueError("No documents to encode.") - - docs = [self._preprocess(doc) for doc in docs] - tf = self._compute_tf(docs) - tfidf = tf * self.idf - return tfidf.tolist() - - def fit(self, routes: list[Route]): - docs = [] - for route in routes: - for doc in route.utterances: - docs.append(self._preprocess(doc)) # type: ignore - self.word_index = self._build_word_index(docs) - self.idf = self._compute_idf(docs) - - def _build_word_index(self, docs: list[str]) -> dict: - words = set() - for doc in docs: - for word in doc.split(): - words.add(word) - word_index = {word: i for i, word in enumerate(words)} - return word_index - - def _compute_tf(self, docs: list[str]) -> np.ndarray: - if len(self.word_index) == 0: - raise ValueError("Word index is not initialized.") - tf = np.zeros((len(docs), len(self.word_index))) - for i, doc in enumerate(docs): - word_counts = Counter(doc.split()) - for word, count in word_counts.items(): - if word in self.word_index: - tf[i, self.word_index[word]] = count - # L2 normalization - tf = tf / norm(tf, axis=1, keepdims=True) - return tf - - def _compute_idf(self, docs: list[str]) -> np.ndarray: - if len(self.word_index) == 0: - raise ValueError("Word index is not initialized.") - idf = np.zeros(len(self.word_index)) - for doc in docs: - words = set(doc.split()) - for word in words: - if word in self.word_index: - idf[self.word_index[word]] += 1 - idf = np.log(len(docs) / (idf + 1)) - return idf - - def _preprocess(self, doc: str) -> str: - lowercased_doc = doc.lower() - no_punctuation_doc = lowercased_doc.translate( - str.maketrans("", "", string.punctuation) - ) - return no_punctuation_doc diff --git a/semantic_chunkers/encoders/vit.py b/semantic_chunkers/encoders/vit.py deleted file mode 100644 index f5ee2e2..0000000 --- a/semantic_chunkers/encoders/vit.py +++ /dev/null @@ -1,108 +0,0 @@ -from typing import Any, List, Optional - -from pydantic.v1 import PrivateAttr - -from semantic_chunkers.encoders import BaseEncoder - - -class VitEncoder(BaseEncoder): - name: str = "google/vit-base-patch16-224" - type: str = "huggingface" - score_threshold: float = 0.5 - processor_kwargs: dict = {} - model_kwargs: dict = {} - device: Optional[str] = None - _processor: Any = PrivateAttr() - _model: Any = PrivateAttr() - _torch: Any = PrivateAttr() - _T: Any = PrivateAttr() - _Image: Any = PrivateAttr() - - def __init__(self, **data): - super().__init__(**data) - self._processor, self._model = self._initialize_hf_model() - - def _initialize_hf_model(self): - try: - from transformers import ViTImageProcessor, ViTModel - except ImportError: - raise ImportError( - "Please install transformers to use HuggingFaceEncoder. " - "You can install it with: " - "`pip install semantic-router[vision]`" - ) - - try: - import torch - import torchvision.transforms as T - except ImportError: - raise ImportError( - "Please install Pytorch to use HuggingFaceEncoder. " - "You can install it with: " - "`pip install semantic-router[vision]`" - ) - - try: - from PIL import Image - except ImportError: - raise ImportError( - "Please install PIL to use HuggingFaceEncoder. " - "You can install it with: " - "`pip install semantic-router[vision]`" - ) - - self._torch = torch - self._Image = Image - self._T = T - - processor = ViTImageProcessor.from_pretrained( - self.name, **self.processor_kwargs - ) - - model = ViTModel.from_pretrained(self.name, **self.model_kwargs) - - self.device = self._get_device() - model.to(self.device) - - return processor, model - - def _get_device(self) -> str: - if self.device: - device = self.device - elif self._torch.cuda.is_available(): - device = "cuda" - elif self._torch.backends.mps.is_available(): - device = "mps" - else: - device = "cpu" - return device - - def _process_images(self, images: List[Any]): - rgb_images = [self._ensure_rgb(img) for img in images] - processed_images = self._processor(images=rgb_images, return_tensors="pt") - processed_images = processed_images.to(self.device) - return processed_images - - def _ensure_rgb(self, img: Any): - rgbimg = self._Image.new("RGB", img.size) - rgbimg.paste(img) - return rgbimg - - def __call__( - self, - imgs: List[Any], - batch_size: int = 32, - ) -> List[List[float]]: - all_embeddings = [] - for i in range(0, len(imgs), batch_size): - batch_imgs = imgs[i : i + batch_size] - batch_imgs_transform = self._process_images(batch_imgs) - with self._torch.no_grad(): - embeddings = ( - self._model(**batch_imgs_transform) - .last_hidden_state[:, 0] - .cpu() - .tolist() - ) - all_embeddings.extend(embeddings) - return all_embeddings diff --git a/semantic_chunkers/encoders/zure.py b/semantic_chunkers/encoders/zure.py deleted file mode 100644 index 8bc1af4..0000000 --- a/semantic_chunkers/encoders/zure.py +++ /dev/null @@ -1,116 +0,0 @@ -import os -from time import sleep -from typing import List, Optional - -import openai -from openai import OpenAIError -from openai.types import CreateEmbeddingResponse - -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.utils.defaults import EncoderDefault -from semantic_chunkers.utils.logger import logger - - -class AzureOpenAIEncoder(BaseEncoder): - client: Optional[openai.AzureOpenAI] = None - type: str = "azure" - api_key: Optional[str] = None - deployment_name: Optional[str] = None - azure_endpoint: Optional[str] = None - api_version: Optional[str] = None - model: Optional[str] = None - - def __init__( - self, - api_key: Optional[str] = None, - deployment_name: Optional[str] = None, - azure_endpoint: Optional[str] = None, - api_version: Optional[str] = None, - model: Optional[str] = None, # TODO we should change to `name` JB - score_threshold: float = 0.82, - ): - name = deployment_name - if name is None: - name = EncoderDefault.AZURE.value["embedding_model"] - super().__init__(name=name, score_threshold=score_threshold) - self.api_key = api_key - self.deployment_name = deployment_name - self.azure_endpoint = azure_endpoint - self.api_version = api_version - self.model = model - if self.api_key is None: - self.api_key = os.getenv("AZURE_OPENAI_API_KEY") - if self.api_key is None: - raise ValueError("No Azure OpenAI API key provided.") - if self.deployment_name is None: - self.deployment_name = EncoderDefault.AZURE.value["deployment_name"] - # deployment_name may still be None, but it is optional in the API - if self.azure_endpoint is None: - self.azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") - if self.azure_endpoint is None: - raise ValueError("No Azure OpenAI endpoint provided.") - if self.api_version is None: - self.api_version = os.getenv("AZURE_OPENAI_API_VERSION") - if self.api_version is None: - raise ValueError("No Azure OpenAI API version provided.") - if self.model is None: - self.model = os.getenv("AZURE_OPENAI_MODEL") - if self.model is None: - raise ValueError("No Azure OpenAI model provided.") - assert ( - self.api_key is not None - and self.azure_endpoint is not None - and self.api_version is not None - and self.model is not None - ) - - try: - self.client = openai.AzureOpenAI( - azure_deployment=( - str(self.deployment_name) if self.deployment_name else None - ), - api_key=str(self.api_key), - azure_endpoint=str(self.azure_endpoint), - api_version=str(self.api_version), - # _strict_response_validation=True, - ) - except Exception as e: - raise ValueError( - f"OpenAI API client failed to initialize. Error: {e}" - ) from e - - def __call__(self, docs: List[str]) -> List[List[float]]: - if self.client is None: - raise ValueError("OpenAI client is not initialized.") - embeds = None - error_message = "" - - # Exponential backoff - for j in range(3): - try: - embeds = self.client.embeddings.create( - input=docs, model=str(self.model) - ) - if embeds.data: - break - except OpenAIError as e: - # print full traceback - import traceback - - traceback.print_exc() - sleep(2**j) - error_message = str(e) - logger.warning(f"Retrying in {2**j} seconds...") - except Exception as e: - logger.error(f"Azure OpenAI API call failed. Error: {error_message}") - raise ValueError(f"Azure OpenAI API call failed. Error: {e}") from e - - if ( - not embeds - or not isinstance(embeds, CreateEmbeddingResponse) - or not embeds.data - ): - raise ValueError(f"No embeddings returned. Error: {error_message}") - - embeddings = [embeds_obj.embedding for embeds_obj in embeds.data] - return embeddings diff --git a/semantic_chunkers/hybrid_layer.py b/semantic_chunkers/hybrid_layer.py deleted file mode 100644 index 92e9c0b..0000000 --- a/semantic_chunkers/hybrid_layer.py +++ /dev/null @@ -1,215 +0,0 @@ -from typing import Dict, List, Optional, Tuple - -import numpy as np -from numpy.linalg import norm - -from semantic_chunkers.encoders import ( - BaseEncoder, - BM25Encoder, - TfidfEncoder, -) -from semantic_chunkers.route import Route -from semantic_chunkers.utils.logger import logger - - -class HybridRouteLayer: - index = None - sparse_index = None - categories = None - score_threshold: float - - def __init__( - self, - encoder: BaseEncoder, - sparse_encoder: Optional[BM25Encoder] = None, - routes: List[Route] = [], - alpha: float = 0.3, - top_k: int = 5, - aggregation: str = "sum", - ): - self.encoder = encoder - self.score_threshold = self.encoder.score_threshold - - if sparse_encoder is None: - logger.warning("No sparse_encoder provided. Using default BM25Encoder.") - self.sparse_encoder = BM25Encoder() - else: - self.sparse_encoder = sparse_encoder - - self.alpha = alpha - self.top_k = top_k - if self.top_k < 1: - raise ValueError(f"top_k needs to be >= 1, but was: {self.top_k}.") - self.aggregation = aggregation - if self.aggregation not in ["sum", "mean", "max"]: - raise ValueError( - f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." - ) - self.aggregation_method = self._set_aggregation_method(self.aggregation) - self.routes = routes - if isinstance(self.sparse_encoder, TfidfEncoder) and hasattr( - self.sparse_encoder, "fit" - ): - self.sparse_encoder.fit(routes) - # if routes list has been passed, we initialize index now - if routes: - # initialize index now - # for route in tqdm(routes): - # self._add_route(route=route) - self._add_routes(routes) - - def __call__(self, text: str) -> Optional[str]: - results = self._query(text, self.top_k) - top_class, top_class_scores = self._semantic_classify(results) - passed = self._pass_threshold(top_class_scores, self.score_threshold) - if passed: - return top_class - else: - return None - - def add(self, route: Route): - self._add_route(route=route) - - def _add_route(self, route: Route): - self.routes += [route] - - self.update_dense_embeddings_index(route.utterances) - - if isinstance(self.sparse_encoder, TfidfEncoder) and hasattr( - self.sparse_encoder, "fit" - ): - self.sparse_encoder.fit(self.routes) - # re-build index - self.sparse_index = None - all_utterances = [ - utterance for route in self.routes for utterance in route.utterances - ] - self.update_sparse_embeddings_index(all_utterances) - else: - self.update_sparse_embeddings_index(route.utterances) - - # create route array - if self.categories is None: - self.categories = np.array([route.name] * len(route.utterances)) - else: - str_arr = np.array([route.name] * len(route.utterances)) - self.categories = np.concatenate([self.categories, str_arr]) - self.routes.append(route) - - def _add_routes(self, routes: List[Route]): - # create embeddings for all routes - logger.info("Creating embeddings for all routes...") - all_utterances = [ - utterance for route in routes for utterance in route.utterances - ] - self.update_dense_embeddings_index(all_utterances) - self.update_sparse_embeddings_index(all_utterances) - - # create route array - route_names = [route.name for route in routes for _ in route.utterances] - route_array = np.array(route_names) - self.categories = ( - np.concatenate([self.categories, route_array]) - if self.categories is not None - else route_array - ) - - def update_dense_embeddings_index(self, utterances: list): - dense_embeds = np.array(self.encoder(utterances)) - # create utterance array (the dense index) - self.index = ( - np.concatenate([self.index, dense_embeds]) - if self.index is not None - else dense_embeds - ) - - def update_sparse_embeddings_index(self, utterances: list): - sparse_embeds = np.array(self.sparse_encoder(utterances)) - # create sparse utterance array - self.sparse_index = ( - np.concatenate([self.sparse_index, sparse_embeds]) - if self.sparse_index is not None - else sparse_embeds - ) - - def _query(self, text: str, top_k: int = 5): - """Given some text, encodes and searches the index vector space to - retrieve the top_k most similar records. - """ - # create dense query vector - xq_d = np.array(self.encoder([text])) - xq_d = np.squeeze(xq_d) # Reduce to 1d array. - # create sparse query vector - xq_s = np.array(self.sparse_encoder([text])) - xq_s = np.squeeze(xq_s) - # convex scaling - xq_d, xq_s = self._convex_scaling(xq_d, xq_s) - - if self.index is not None and self.sparse_index is not None: - # calculate dense vec similarity - index_norm = norm(self.index, axis=1) - xq_d_norm = norm(xq_d.T) - sim_d = np.dot(self.index, xq_d.T) / (index_norm * xq_d_norm) - # calculate sparse vec similarity - sparse_norm = norm(self.sparse_index, axis=1) - xq_s_norm = norm(xq_s.T) - sim_s = np.dot(self.sparse_index, xq_s.T) / (sparse_norm * xq_s_norm) - total_sim = sim_d + sim_s - # get indices of top_k records - top_k = min(top_k, total_sim.shape[0]) - idx = np.argpartition(total_sim, -top_k)[-top_k:] - scores = total_sim[idx] - # get the utterance categories (route names) - routes = self.categories[idx] if self.categories is not None else [] - return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] - else: - logger.warning("No index found. Please add routes to the layer.") - return [] - - def _convex_scaling(self, dense: np.ndarray, sparse: np.ndarray): - # scale sparse and dense vecs - dense = np.array(dense) * self.alpha - sparse = np.array(sparse) * (1 - self.alpha) - return dense, sparse - - def _set_aggregation_method(self, aggregation: str = "sum"): - if aggregation == "sum": - return lambda x: sum(x) - elif aggregation == "mean": - return lambda x: np.mean(x) - elif aggregation == "max": - return lambda x: max(x) - else: - raise ValueError( - f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." - ) - - def _semantic_classify(self, query_results: List[Dict]) -> Tuple[str, List[float]]: - scores_by_class: Dict[str, List[float]] = {} - for result in query_results: - score = result["score"] - route = result["route"] - if route in scores_by_class: - scores_by_class[route].append(score) - else: - scores_by_class[route] = [score] - - # Calculate total score for each class - total_scores = { - route: self.aggregation_method(scores) - for route, scores in scores_by_class.items() - } - top_class = max(total_scores, key=lambda x: total_scores[x], default=None) - - # Return the top class and its associated scores - if top_class is not None: - return str(top_class), scores_by_class.get(top_class, []) - else: - logger.warning("No classification found for semantic classifier.") - return "", [] - - def _pass_threshold(self, scores: List[float], threshold: float) -> bool: - if scores: - return max(scores) > threshold - else: - return False diff --git a/semantic_chunkers/index/__init__.py b/semantic_chunkers/index/__init__.py deleted file mode 100644 index a6742a2..0000000 --- a/semantic_chunkers/index/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -from semantic_chunkers.index.base import BaseIndex -from semantic_chunkers.index.local import LocalIndex -from semantic_chunkers.index.pinecone import PineconeIndex -from semantic_chunkers.index.qdrant import QdrantIndex - -__all__ = [ - "BaseIndex", - "LocalIndex", - "QdrantIndex", - "PineconeIndex", -] diff --git a/semantic_chunkers/index/base.py b/semantic_chunkers/index/base.py deleted file mode 100644 index d95a62a..0000000 --- a/semantic_chunkers/index/base.py +++ /dev/null @@ -1,66 +0,0 @@ -from typing import Any, List, Optional, Tuple, Union - -import numpy as np -from pydantic.v1 import BaseModel - - -class BaseIndex(BaseModel): - """ - Base class for indices using Pydantic's BaseModel. - This class outlines the expected interface for index classes. - Actual method implementations should be provided in subclasses. - """ - - # You can define common attributes here if there are any. - # For example, a placeholder for the index attribute: - index: Optional[Any] = None - routes: Optional[np.ndarray] = None - utterances: Optional[np.ndarray] = None - dimensions: Union[int, None] = None - type: str = "base" - - def add( - self, embeddings: List[List[float]], routes: List[str], utterances: List[Any] - ): - """ - Add embeddings to the index. - This method should be implemented by subclasses. - """ - raise NotImplementedError("This method should be implemented by subclasses.") - - def delete(self, route_name: str): - """ - Deletes route by route name. - This method should be implemented by subclasses. - """ - raise NotImplementedError("This method should be implemented by subclasses.") - - def describe(self) -> dict: - """ - Returns a dictionary with index details such as type, dimensions, and total - vector count. - This method should be implemented by subclasses. - """ - raise NotImplementedError("This method should be implemented by subclasses.") - - def query( - self, - vector: np.ndarray, - top_k: int = 5, - route_filter: Optional[List[str]] = None, - ) -> Tuple[np.ndarray, List[str]]: - """ - Search the index for the query_vector and return top_k results. - This method should be implemented by subclasses. - """ - raise NotImplementedError("This method should be implemented by subclasses.") - - def delete_index(self): - """ - Deletes or resets the index. - This method should be implemented by subclasses. - """ - raise NotImplementedError("This method should be implemented by subclasses.") - - class Config: - arbitrary_types_allowed = True diff --git a/semantic_chunkers/index/local.py b/semantic_chunkers/index/local.py deleted file mode 100644 index 97e629e..0000000 --- a/semantic_chunkers/index/local.py +++ /dev/null @@ -1,124 +0,0 @@ -from typing import List, Optional, Tuple - -import numpy as np - -from semantic_chunkers.index.base import BaseIndex -from semantic_chunkers.linear import similarity_matrix, top_scores - - -class LocalIndex(BaseIndex): - def __init__( - self, - index: Optional[np.ndarray] = None, - routes: Optional[np.ndarray] = None, - utterances: Optional[np.ndarray] = None, - ): - super().__init__(index=index, routes=routes, utterances=utterances) - self.type = "local" - - class Config: - # Stop pydantic from complaining about Optional[np.ndarray]type hints. - arbitrary_types_allowed = True - - def add( - self, embeddings: List[List[float]], routes: List[str], utterances: List[str] - ): - embeds = np.array(embeddings) # type: ignore - routes_arr = np.array(routes) - if isinstance(utterances[0], str): - utterances_arr = np.array(utterances) - else: - utterances_arr = np.array(utterances, dtype=object) - if self.index is None: - self.index = embeds # type: ignore - self.routes = routes_arr - self.utterances = utterances_arr - else: - self.index = np.concatenate([self.index, embeds]) - self.routes = np.concatenate([self.routes, routes_arr]) - self.utterances = np.concatenate([self.utterances, utterances_arr]) - - def get_routes(self) -> List[Tuple]: - """ - Gets a list of route and utterance objects currently stored in the index. - - Returns: - List[Tuple]: A list of (route_name, utterance) objects. - """ - if self.routes is None or self.utterances is None: - raise ValueError("No routes have been added to the index.") - return list(zip(self.routes, self.utterances)) - - def describe(self) -> dict: - return { - "type": self.type, - "dimensions": self.index.shape[1] if self.index is not None else 0, - "vectors": self.index.shape[0] if self.index is not None else 0, - } - - def query( - self, - vector: np.ndarray, - top_k: int = 5, - route_filter: Optional[List[str]] = None, - ) -> Tuple[np.ndarray, List[str]]: - """ - Search the index for the query and return top_k results. - """ - if self.index is None or self.routes is None: - raise ValueError("Index or routes are not populated.") - if route_filter is not None: - filtered_index = [] - filtered_routes = [] - for route, vec in zip(self.routes, self.index): - if route in route_filter: - filtered_index.append(vec) - filtered_routes.append(route) - if not filtered_routes: - raise ValueError("No routes found matching the filter criteria.") - sim = similarity_matrix(vector, np.array(filtered_index)) - scores, idx = top_scores(sim, top_k) - route_names = [filtered_routes[i] for i in idx] - else: - sim = similarity_matrix(vector, self.index) - scores, idx = top_scores(sim, top_k) - route_names = [self.routes[i] for i in idx] - return scores, route_names - - def delete(self, route_name: str): - """ - Delete all records of a specific route from the index. - """ - if ( - self.index is not None - and self.routes is not None - and self.utterances is not None - ): - delete_idx = self._get_indices_for_route(route_name=route_name) - self.index = np.delete(self.index, delete_idx, axis=0) - self.routes = np.delete(self.routes, delete_idx, axis=0) - self.utterances = np.delete(self.utterances, delete_idx, axis=0) - else: - raise ValueError( - "Attempted to delete route records but either index, routes or " - "utterances is None." - ) - - def delete_index(self): - """ - Deletes the index, effectively clearing it and setting it to None. - """ - self.index = None - - def _get_indices_for_route(self, route_name: str): - """Gets an array of indices for a specific route.""" - if self.routes is None: - raise ValueError("Routes are not populated.") - idx = [i for i, route in enumerate(self.routes) if route == route_name] - return idx - - def __len__(self): - if self.index is not None: - return self.index.shape[0] - else: - return 0 diff --git a/semantic_chunkers/index/pinecone.py b/semantic_chunkers/index/pinecone.py deleted file mode 100644 index 0ae29f5..0000000 --- a/semantic_chunkers/index/pinecone.py +++ /dev/null @@ -1,283 +0,0 @@ -import hashlib -import os -import time -from typing import Any, Dict, List, Optional, Tuple, Union - -import numpy as np -import requests -from pydantic.v1 import BaseModel, Field - -from semantic_chunkers.index.base import BaseIndex -from semantic_chunkers.utils.logger import logger - - -def clean_route_name(route_name: str) -> str: - return route_name.strip().replace(" ", "-") - - -class PineconeRecord(BaseModel): - id: str = "" - values: List[float] - route: str - utterance: str - - def __init__(self, **data): - super().__init__(**data) - clean_route = clean_route_name(self.route) - # Use SHA-256 for a more secure hash - utterance_id = hashlib.sha256(self.utterance.encode()).hexdigest() - self.id = f"{clean_route}#{utterance_id}" - - def to_dict(self): - return { - "id": self.id, - "values": self.values, - "metadata": {"sr_route": self.route, "sr_utterance": self.utterance}, - } - - -class PineconeIndex(BaseIndex): - index_prefix: str = "semantic-router--" - index_name: str = "index" - dimensions: Union[int, None] = None - metric: str = "cosine" - cloud: str = "aws" - region: str = "us-west-2" - host: str = "" - client: Any = Field(default=None, exclude=True) - index: Optional[Any] = Field(default=None, exclude=True) - ServerlessSpec: Any = Field(default=None, exclude=True) - namespace: Optional[str] = "" - - def __init__( - self, - api_key: Optional[str] = None, - index_name: str = "index", - dimensions: Optional[int] = None, - metric: str = "cosine", - cloud: str = "aws", - region: str = "us-west-2", - host: str = "", - namespace: Optional[str] = "", - ): - super().__init__() - self.index_name = index_name - self.dimensions = dimensions - self.metric = metric - self.cloud = cloud - self.region = region - self.host = host - self.namespace = namespace - self.type = "pinecone" - self.client = self._initialize_client(api_key=api_key) - - def _initialize_client(self, api_key: Optional[str] = None): - try: - from pinecone import Pinecone, ServerlessSpec - - self.ServerlessSpec = ServerlessSpec - except ImportError: - raise ImportError( - "Please install pinecone-client to use PineconeIndex. " - "You can install it with: " - "`pip install 'semantic-router[pinecone]'`" - ) - api_key = api_key or os.getenv("PINECONE_API_KEY") - if api_key is None: - raise ValueError("Pinecone API key is required.") - pinecone_args = {"api_key": api_key, "source_tag": "semantic-router"} - if self.namespace: - pinecone_args["namespace"] = self.namespace - - return Pinecone(**pinecone_args) - - def _init_index(self, force_create: bool = False) -> Union[Any, None]: - """Initializing the index can be done after the object has been created - to allow for the user to set the dimensions and other parameters. - - If the index doesn't exist and the dimensions are given, the index will - be created. If the index exists, it will be returned. If the index doesn't - exist and the dimensions are not given, the index will not be created and - None will be returned. - - :param force_create: If True, the index will be created even if the - dimensions are not given (which will raise an error). - :type force_create: bool, optional - """ - index_exists = self.index_name in self.client.list_indexes().names() - dimensions_given = self.dimensions is not None - if dimensions_given and not index_exists: - # if the index doesn't exist and we have dimension value - # we create the index - self.client.create_index( - name=self.index_name, - dimension=self.dimensions, - metric=self.metric, - spec=self.ServerlessSpec(cloud=self.cloud, region=self.region), - ) - # wait for index to be created - while not self.client.describe_index(self.index_name).status["ready"]: - time.sleep(1) - index = self.client.Index(self.index_name) - time.sleep(0.5) - elif index_exists: - # if the index exists we just return it - index = self.client.Index(self.index_name) - # grab the dimensions from the index - self.dimensions = index.describe_index_stats()["dimension"] - elif force_create and not dimensions_given: - raise ValueError( - "Cannot create an index without specifying the dimensions." - ) - else: - # if the index doesn't exist and we don't have the dimensions - # we return None - logger.warning("Index could not be initialized.") - index = None - if index is not None: - self.host = self.client.describe_index(self.index_name)["host"] - return index - - def _batch_upsert(self, batch: List[dict]): - """Helper method for upserting a single batch of records.""" - if self.index is not None: - self.index.upsert(vectors=batch, namespace=self.namespace) - else: - raise ValueError("Index is None, could not upsert.") - - def add( - self, - embeddings: List[List[float]], - routes: List[str], - utterances: List[str], - batch_size: int = 100, - ): - """Add vectors to Pinecone in batches.""" - if self.index is None: - self.dimensions = self.dimensions or len(embeddings[0]) - self.index = self._init_index(force_create=True) - - vectors_to_upsert = [ - PineconeRecord(values=vector, route=route, utterance=utterance).to_dict() - for vector, route, utterance in zip(embeddings, routes, utterances) - ] - - for i in range(0, len(vectors_to_upsert), batch_size): - batch = vectors_to_upsert[i : i + batch_size] - self._batch_upsert(batch) - - def _get_route_ids(self, route_name: str): - clean_route = clean_route_name(route_name) - ids, _ = self._get_all(prefix=f"{clean_route}#") - return ids - - def _get_all(self, prefix: Optional[str] = None, include_metadata: bool = False): - """ - Retrieves all vector IDs from the Pinecone index using pagination. - """ - if self.index is None: - raise ValueError("Index is None, could not retrieve vector IDs.") - all_vector_ids = [] - next_page_token = None - - if prefix: - prefix_str = f"?prefix={prefix}" - else: - prefix_str = "" - - # Construct the request URL for listing vectors. Adjust parameters as needed. - list_url = f"https://{self.host}/vectors/list{prefix_str}" - params: Dict = {} - headers = {"Api-Key": os.environ["PINECONE_API_KEY"]} - metadata = [] - - while True: - if next_page_token: - params["paginationToken"] = next_page_token - - # Make the request to list vectors. Adjust headers and parameters as needed. - response = requests.get(list_url, params=params, headers=headers) - response_data = response.json() - - # Extract vector IDs from the response and add them to the list - vector_ids = [vec["id"] for vec in response_data.get("vectors", [])] - # check that there are vector IDs, otherwise break the loop - if not vector_ids: - break - all_vector_ids.extend(vector_ids) - - # if we need metadata, we fetch it - if include_metadata: - res_meta = self.index.fetch(ids=vector_ids, namespace=self.namespace) - # extract metadata only - metadata.extend([x["metadata"] for x in res_meta["vectors"].values()]) - - # Check if there's a next page token; if not, break the loop - next_page_token = response_data.get("pagination", {}).get("next") - if not next_page_token: - break - - return all_vector_ids, metadata - - def get_routes(self) -> List[Tuple]: - """ - Gets a list of route and utterance objects currently stored in the index. - - Returns: - List[Tuple]: A list of (route_name, utterance) objects. - """ - # Get all records - _, metadata = self._get_all(include_metadata=True) - route_tuples = [(x["sr_route"], x["sr_utterance"]) for x in metadata] - return route_tuples - - def delete(self, route_name: str): - route_vec_ids = self._get_route_ids(route_name=route_name) - if self.index is not None: - self.index.delete(ids=route_vec_ids, namespace=self.namespace) - else: - raise ValueError("Index is None, could not delete.") - - def delete_all(self): - self.index.delete(delete_all=True, namespace=self.namespace) - - def describe(self) -> dict: - if self.index is not None: - stats = self.index.describe_index_stats() - return { - "type": self.type, - "dimensions": stats["dimension"], - "vectors": stats["total_vector_count"], - } - else: - raise ValueError("Index is None, cannot describe index stats.") - - def query( - self, - vector: np.ndarray, - top_k: int = 5, - route_filter: Optional[List[str]] = None, - ) -> Tuple[np.ndarray, List[str]]: - if self.index is None: - raise ValueError("Index is not populated.") - query_vector_list = vector.tolist() - if route_filter is not None: - filter_query = {"sr_route": {"$in": route_filter}} - else: - filter_query = None - results = self.index.query( - vector=[query_vector_list], - top_k=top_k, - filter=filter_query, - include_metadata=True, - namespace=self.namespace, - ) - scores = [result["score"] for result in results["matches"]] - route_names = [result["metadata"]["sr_route"] for result in results["matches"]] - return np.array(scores), route_names - - def delete_index(self): - self.client.delete_index(self.index_name) - - def __len__(self): - return self.index.describe_index_stats()["total_vector_count"] diff --git a/semantic_chunkers/index/qdrant.py b/semantic_chunkers/index/qdrant.py deleted file mode 100644 index fdbd42b..0000000 --- a/semantic_chunkers/index/qdrant.py +++ /dev/null @@ -1,272 +0,0 @@ -from typing import Any, Dict, List, Optional, Tuple, Union - -import numpy as np -from pydantic.v1 import Field - -from semantic_chunkers.index.base import BaseIndex -from semantic_chunkers.schema import Metric - -DEFAULT_COLLECTION_NAME = "semantic-router-index" -DEFAULT_UPLOAD_BATCH_SIZE = 100 -SCROLL_SIZE = 1000 -SR_UTTERANCE_PAYLOAD_KEY = "sr_utterance" -SR_ROUTE_PAYLOAD_KEY = "sr_route" - - -class QdrantIndex(BaseIndex): - "The name of the collection to use" - - index_name: str = Field( - default=DEFAULT_COLLECTION_NAME, - description="Name of the Qdrant collection." - f"Default: '{DEFAULT_COLLECTION_NAME}'", - ) - location: Optional[str] = Field( - default=":memory:", - description="If ':memory:' - use an in-memory Qdrant instance." - "Used as 'url' value otherwise", - ) - url: Optional[str] = Field( - default=None, - description="Qualified URL of the Qdrant instance." - "Optional[scheme], host, Optional[port], Optional[prefix]", - ) - port: Optional[int] = Field( - default=6333, - description="Port of the REST API interface.", - ) - grpc_port: int = Field( - default=6334, - description="Port of the gRPC interface.", - ) - prefer_grpc: bool = Field( - default=None, - description="Whether to use gPRC interface whenever possible in methods", - ) - https: Optional[bool] = Field( - default=None, - description="Whether to use HTTPS(SSL) protocol.", - ) - api_key: Optional[str] = Field( - default=None, - description="API key for authentication in Qdrant Cloud.", - ) - prefix: Optional[str] = Field( - default=None, - description="Prefix to the REST URL path. Example: `http://localhost:6333/some/prefix/{qdrant-endpoint}`.", - ) - timeout: Optional[int] = Field( - default=None, - description="Timeout for REST and gRPC API requests.", - ) - host: Optional[str] = Field( - default=None, - description="Host name of Qdrant service." - "If url and host are None, set to 'localhost'.", - ) - path: Optional[str] = Field( - default=None, - description="Persistence path for Qdrant local", - ) - grpc_options: Optional[Dict[str, Any]] = Field( - default=None, - description="Options to be passed to the low-level GRPC client, if used.", - ) - dimensions: Union[int, None] = Field( - default=None, - description="Embedding dimensions." - "Defaults to the embedding length of the configured encoder.", - ) - metric: Metric = Field( - default=Metric.COSINE, - description="Distance metric to use for similarity search.", - ) - config: Optional[Dict[str, Any]] = Field( - default={}, - description="Collection options passed to `QdrantClient#create_collection`.", - ) - client: Any = Field(default=None, exclude=True) - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.type = "qdrant" - self.client = self._initialize_client() - - def _initialize_client(self): - try: - from qdrant_client import QdrantClient - - return QdrantClient( - location=self.location, - url=self.url, - port=self.port, - grpc_port=self.grpc_port, - prefer_grpc=self.prefer_grpc, - https=self.https, - api_key=self.api_key, - prefix=self.prefix, - timeout=self.timeout, - host=self.host, - path=self.path, - grpc_options=self.grpc_options, - ) - - except ImportError as e: - raise ImportError( - "Please install 'qdrant-client' to use QdrantIndex." - "You can install it with: " - "`pip install 'semantic-router[qdrant]'`" - ) from e - - def _init_collection(self) -> None: - from qdrant_client import QdrantClient, models - - self.client: QdrantClient - if not self.client.collection_exists(self.index_name): - if not self.dimensions: - raise ValueError( - "Cannot create a collection without specifying the dimensions." - ) - - self.client.create_collection( - collection_name=self.index_name, - vectors_config=models.VectorParams( - size=self.dimensions, distance=self.convert_metric(self.metric) - ), - **self.config, - ) - - def add( - self, - embeddings: List[List[float]], - routes: List[str], - utterances: List[str], - batch_size: int = DEFAULT_UPLOAD_BATCH_SIZE, - ): - self.dimensions = self.dimensions or len(embeddings[0]) - self._init_collection() - - payloads = [ - {SR_ROUTE_PAYLOAD_KEY: route, SR_UTTERANCE_PAYLOAD_KEY: utterance} - for route, utterance in zip(routes, utterances) - ] - - # UUIDs are autogenerated by qdrant-client if not provided explicitly - self.client.upload_collection( - self.index_name, - vectors=embeddings, - payload=payloads, - batch_size=batch_size, - ) - - def get_routes(self) -> List[Tuple]: - """ - Gets a list of route and utterance objects currently stored in the index. - - Returns: - List[Tuple]: A list of (route_name, utterance) objects. - """ - - from qdrant_client import grpc - - results = [] - next_offset = None - stop_scrolling = False - while not stop_scrolling: - records, next_offset = self.client.scroll( - self.index_name, - limit=SCROLL_SIZE, - offset=next_offset, - with_payload=True, - ) - stop_scrolling = next_offset is None or ( - isinstance(next_offset, grpc.PointId) - and next_offset.num == 0 - and next_offset.uuid == "" - ) - - results.extend(records) - - route_tuples = [ - (x.payload[SR_ROUTE_PAYLOAD_KEY], x.payload[SR_UTTERANCE_PAYLOAD_KEY]) - for x in results - ] - return route_tuples - - def delete(self, route_name: str): - from qdrant_client import models - - self.client.delete( - self.index_name, - points_selector=models.Filter( - must=[ - models.FieldCondition( - key=SR_ROUTE_PAYLOAD_KEY, - match=models.MatchText(text=route_name), - ) - ] - ), - ) - - def describe(self) -> dict: - collection_info = self.client.get_collection(self.index_name) - - return { - "type": self.type, - "dimensions": collection_info.config.params.vectors.size, - "vectors": collection_info.points_count, - } - - def query( - self, - vector: np.ndarray, - top_k: int = 5, - route_filter: Optional[List[str]] = None, - ) -> Tuple[np.ndarray, List[str]]: - from qdrant_client import models - - results = self.client.search( - self.index_name, query_vector=vector, limit=top_k, with_payload=True - ) - filter = None - if route_filter is not None: - filter = models.Filter( - must=[ - models.FieldCondition( - key=SR_ROUTE_PAYLOAD_KEY, - match=models.MatchAny(any=route_filter), - ) - ] - ) - - results = self.client.search( - self.index_name, - query_vector=vector, - limit=top_k, - with_payload=True, - query_filter=filter, - ) - scores = [result.score for result in results] - route_names = [result.payload[SR_ROUTE_PAYLOAD_KEY] for result in results] - return np.array(scores), route_names - - def delete_index(self): - self.client.delete_collection(self.index_name) - - def convert_metric(self, metric: Metric): - from qdrant_client.models import Distance - - mapping = { - Metric.COSINE: Distance.COSINE, - Metric.EUCLIDEAN: Distance.EUCLID, - Metric.DOTPRODUCT: Distance.DOT, - Metric.MANHATTAN: Distance.MANHATTAN, - } - - if metric not in mapping: - raise ValueError(f"Unsupported Qdrant similarity metric: {metric}") - - return mapping[metric] - - def __len__(self): - return self.client.get_collection(self.index_name).points_count diff --git a/semantic_chunkers/layer.py b/semantic_chunkers/layer.py deleted file mode 100644 index c02410b..0000000 --- a/semantic_chunkers/layer.py +++ /dev/null @@ -1,637 +0,0 @@ -import importlib -import json -import os -import random -from typing import Any, Dict, List, Optional, Tuple, Union - -import numpy as np -import yaml # type: ignore -from tqdm.auto import tqdm - -from semantic_chunkers.encoders import AutoEncoder, BaseEncoder, OpenAIEncoder -from semantic_chunkers.index.base import BaseIndex -from semantic_chunkers.index.local import LocalIndex -from semantic_chunkers.llms import BaseLLM, OpenAILLM -from semantic_chunkers.route import Route -from semantic_chunkers.schema import EncoderType, RouteChoice -from semantic_chunkers.utils.defaults import EncoderDefault -from semantic_chunkers.utils.logger import logger - - -def is_valid(layer_config: str) -> bool: - """Make sure the given string is json format and contains the 3 keys: - ["encoder_name", "encoder_type", "routes"]""" - try: - output_json = json.loads(layer_config) - required_keys = ["encoder_name", "encoder_type", "routes"] - - if isinstance(output_json, list): - for item in output_json: - missing_keys = [key for key in required_keys if key not in item] - if missing_keys: - logger.warning( - f"Missing keys in layer config: {', '.join(missing_keys)}" - ) - return False - return True - else: - missing_keys = [key for key in required_keys if key not in output_json] - if missing_keys: - logger.warning( - f"Missing keys in layer config: {', '.join(missing_keys)}" - ) - return False - else: - return True - except json.JSONDecodeError as e: - logger.error(e) - return False - - -class LayerConfig: - """ - Generates a LayerConfig object that can be used for initializing a - RouteLayer. - """ - - routes: List[Route] = [] - - def __init__( - self, - routes: List[Route] = [], - encoder_type: str = "openai", - encoder_name: Optional[str] = None, - ): - self.encoder_type = encoder_type - if encoder_name is None: - for encode_type in EncoderType: - if encode_type.value == self.encoder_type: - if self.encoder_type == EncoderType.HUGGINGFACE.value: - raise NotImplementedError( - "HuggingFace encoder not supported by LayerConfig yet." - ) - encoder_name = EncoderDefault[encode_type.name].value[ - "embedding_model" - ] - break - logger.info(f"Using default {encoder_type} encoder: {encoder_name}") - self.encoder_name = encoder_name - self.routes = routes - - @classmethod - def from_file(cls, path: str) -> "LayerConfig": - logger.info(f"Loading route config from {path}") - _, ext = os.path.splitext(path) - with open(path, "r") as f: - if ext == ".json": - layer = json.load(f) - elif ext in [".yaml", ".yml"]: - layer = yaml.safe_load(f) - else: - raise ValueError( - "Unsupported file type. Only .json and .yaml are supported" - ) - - if not is_valid(json.dumps(layer)): - raise Exception("Invalid config JSON or YAML") - - encoder_type = layer["encoder_type"] - encoder_name = layer["encoder_name"] - routes = [] - for route_data in layer["routes"]: - # Handle the 'llm' field specially if it exists - if "llm" in route_data and route_data["llm"] is not None: - llm_data = route_data.pop( - "llm" - ) # Remove 'llm' from route_data and handle it separately - # Use the module path directly from llm_data without modification - llm_module_path = llm_data["module"] - # Dynamically import the module and then the class from that module - llm_module = importlib.import_module(llm_module_path) - llm_class = getattr(llm_module, llm_data["class"]) - # Instantiate the LLM class with the provided model name - llm = llm_class(name=llm_data["model"]) - # Reassign the instantiated llm object back to route_data - route_data["llm"] = llm - - # Dynamically create the Route object using the remaining route_data - route = Route(**route_data) - routes.append(route) - - return cls( - encoder_type=encoder_type, encoder_name=encoder_name, routes=routes - ) - - def to_dict(self) -> Dict[str, Any]: - return { - "encoder_type": self.encoder_type, - "encoder_name": self.encoder_name, - "routes": [route.to_dict() for route in self.routes], - } - - def to_file(self, path: str): - """Save the routes to a file in JSON or YAML format""" - logger.info(f"Saving route config to {path}") - _, ext = os.path.splitext(path) - - # Check file extension before creating directories or files - if ext not in [".json", ".yaml", ".yml"]: - raise ValueError( - "Unsupported file type. Only .json and .yaml are supported" - ) - - dir_name = os.path.dirname(path) - - # Create the directory if it doesn't exist and dir_name is not an empty string - if dir_name and not os.path.exists(dir_name): - os.makedirs(dir_name) - - with open(path, "w") as f: - if ext == ".json": - json.dump(self.to_dict(), f, indent=4) - elif ext in [".yaml", ".yml"]: - yaml.safe_dump(self.to_dict(), f) - - def add(self, route: Route): - self.routes.append(route) - logger.info(f"Added route `{route.name}`") - - def get(self, name: str) -> Optional[Route]: - for route in self.routes: - if route.name == name: - return route - logger.error(f"Route `{name}` not found") - return None - - def remove(self, name: str): - if name not in [route.name for route in self.routes]: - logger.error(f"Route `{name}` not found") - else: - self.routes = [route for route in self.routes if route.name != name] - logger.info(f"Removed route `{name}`") - - -class RouteLayer: - score_threshold: float - encoder: BaseEncoder - index: BaseIndex - - def __init__( - self, - encoder: Optional[BaseEncoder] = None, - llm: Optional[BaseLLM] = None, - routes: Optional[List[Route]] = None, - index: Optional[BaseIndex] = None, # type: ignore - top_k: int = 5, - aggregation: str = "sum", - ): - logger.info("local") - self.index: BaseIndex = index if index is not None else LocalIndex() - if encoder is None: - logger.warning( - "No encoder provided. Using default OpenAIEncoder. Ensure " - "that you have set OPENAI_API_KEY in your environment." - ) - self.encoder = OpenAIEncoder() - else: - self.encoder = encoder - self.llm = llm - self.routes: list[Route] = routes if routes is not None else [] - self.score_threshold = self.encoder.score_threshold - self.top_k = top_k - if self.top_k < 1: - raise ValueError(f"top_k needs to be >= 1, but was: {self.top_k}.") - self.aggregation = aggregation - if self.aggregation not in ["sum", "mean", "max"]: - raise ValueError( - f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." - ) - self.aggregation_method = self._set_aggregation_method(self.aggregation) - - # set route score thresholds if not already set - for route in self.routes: - if route.score_threshold is None: - route.score_threshold = self.score_threshold - # if routes list has been passed, we initialize index now - if len(self.routes) > 0: - # initialize index now - self._add_routes(routes=self.routes) - - def check_for_matching_routes(self, top_class: str) -> Optional[Route]: - matching_routes = [route for route in self.routes if route.name == top_class] - if not matching_routes: - logger.error( - f"No route found with name {top_class}. Check to see if any Routes " - "have been defined." - ) - return None - return matching_routes[0] - - def __call__( - self, - text: Optional[str] = None, - vector: Optional[List[float]] = None, - simulate_static: bool = False, - route_filter: Optional[List[str]] = None, - ) -> RouteChoice: - # if no vector provided, encode text to get vector - if vector is None: - if text is None: - raise ValueError("Either text or vector must be provided") - vector = self._encode(text=text) - - route, top_class_scores = self._retrieve_top_route(vector, route_filter) - passed = self._check_threshold(top_class_scores, route) - - if passed and route is not None and not simulate_static: - if route.function_schema and text is None: - raise ValueError( - "Route has a function schema, but no text was provided." - ) - if route.function_schema and not isinstance(route.llm, BaseLLM): - if not self.llm: - logger.warning( - "No LLM provided for dynamic route, will use OpenAI LLM " - "default. Ensure API key is set in OPENAI_API_KEY environment " - "variable." - ) - - self.llm = OpenAILLM() - route.llm = self.llm - else: - route.llm = self.llm - return route(text) - elif passed and route is not None and simulate_static: - return RouteChoice( - name=route.name, - function_call=None, - similarity_score=None, - ) - else: - # if no route passes threshold, return empty route choice - return RouteChoice() - - def retrieve_multiple_routes( - self, - text: Optional[str] = None, - vector: Optional[List[float]] = None, - ) -> List[RouteChoice]: - if vector is None: - if text is None: - raise ValueError("Either text or vector must be provided") - vector_arr = self._encode(text=text) - else: - vector_arr = np.array(vector) - # get relevant utterances - results = self._retrieve(xq=vector_arr) - - # decide most relevant routes - categories_with_scores = self._semantic_classify_multiple_routes(results) - - route_choices = [] - for category, score in categories_with_scores: - route = self.check_for_matching_routes(category) - if route: - route_choice = RouteChoice(name=route.name, similarity_score=score) - route_choices.append(route_choice) - - return route_choices - - def _retrieve_top_route( - self, vector: List[float], route_filter: Optional[List[str]] = None - ) -> Tuple[Optional[Route], List[float]]: - """ - Retrieve the top matching route based on the given vector. - Returns a tuple of the route (if any) and the scores of the top class. - """ - # get relevant results (scores and routes) - results = self._retrieve( - xq=np.array(vector), top_k=self.top_k, route_filter=route_filter - ) - # decide most relevant routes - top_class, top_class_scores = self._semantic_classify(results) - # TODO do we need this check? - route = self.check_for_matching_routes(top_class) - return route, top_class_scores - - def _check_threshold(self, scores: List[float], route: Optional[Route]) -> bool: - """ - Check if the route's score passes the specified threshold. - """ - if route is None: - return False - threshold = ( - route.score_threshold - if route.score_threshold is not None - else self.score_threshold - ) - return self._pass_threshold(scores, threshold) - - def __str__(self): - return ( - f"RouteLayer(encoder={self.encoder}, " - f"score_threshold={self.score_threshold}, " - f"routes={self.routes})" - ) - - @classmethod - def from_json(cls, file_path: str): - config = LayerConfig.from_file(file_path) - encoder = AutoEncoder(type=config.encoder_type, name=config.encoder_name).model - return cls(encoder=encoder, routes=config.routes) - - @classmethod - def from_yaml(cls, file_path: str): - config = LayerConfig.from_file(file_path) - encoder = AutoEncoder(type=config.encoder_type, name=config.encoder_name).model - return cls(encoder=encoder, routes=config.routes) - - @classmethod - def from_config(cls, config: LayerConfig, index: Optional[BaseIndex] = None): - encoder = AutoEncoder(type=config.encoder_type, name=config.encoder_name).model - return cls(encoder=encoder, routes=config.routes, index=index) - - def add(self, route: Route): - logger.info(f"Adding `{route.name}` route") - # create embeddings - embeds = self.encoder(route.utterances) - # if route has no score_threshold, use default - if route.score_threshold is None: - route.score_threshold = self.score_threshold - - # add routes to the index - self.index.add( - embeddings=embeds, - routes=[route.name] * len(route.utterances), - utterances=route.utterances, - ) - self.routes.append(route) - - def list_route_names(self) -> List[str]: - return [route.name for route in self.routes] - - def update(self, route_name: str, utterances: List[str]): - raise NotImplementedError("This method has not yet been implemented.") - - def delete(self, route_name: str): - """Deletes a route given a specific route name. - - :param route_name: the name of the route to be deleted - :type str: - """ - if route_name not in [route.name for route in self.routes]: - err_msg = f"Route `{route_name}` not found" - logger.error(err_msg) - raise ValueError(err_msg) - else: - self.routes = [route for route in self.routes if route.name != route_name] - self.index.delete(route_name=route_name) - - def _refresh_routes(self): - """Pulls out the latest routes from the index.""" - raise NotImplementedError("This method has not yet been implemented.") - route_mapping = {route.name: route for route in self.routes} - index_routes = self.index.get_routes() - new_routes_names = [] - new_routes = [] - for route_name, utterance in index_routes: - if route_name in route_mapping: - if route_name not in new_routes_names: - existing_route = route_mapping[route_name] - new_routes.append(existing_route) - - new_routes.append(Route(name=route_name, utterances=[utterance])) - route = route_mapping[route_name] - self.routes.append(route) - - def _add_routes(self, routes: List[Route]): - # create embeddings for all routes - all_utterances = [ - utterance for route in routes for utterance in route.utterances - ] - embedded_utterances = self.encoder(all_utterances) - # create route array - route_names = [route.name for route in routes for _ in route.utterances] - # add everything to the index - self.index.add( - embeddings=embedded_utterances, - routes=route_names, - utterances=all_utterances, - ) - - def _encode(self, text: str) -> Any: - """Given some text, encode it.""" - # create query vector - xq = np.array(self.encoder([text])) - xq = np.squeeze(xq) # Reduce to 1d array. - return xq - - def _retrieve( - self, xq: Any, top_k: int = 5, route_filter: Optional[List[str]] = None - ) -> List[dict]: - """Given a query vector, retrieve the top_k most similar records.""" - # get scores and routes - scores, routes = self.index.query( - vector=xq, top_k=top_k, route_filter=route_filter - ) - return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] - - def _set_aggregation_method(self, aggregation: str = "sum"): - if aggregation == "sum": - return lambda x: sum(x) - elif aggregation == "mean": - return lambda x: np.mean(x) - elif aggregation == "max": - return lambda x: max(x) - else: - raise ValueError( - f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." - ) - - def _semantic_classify(self, query_results: List[dict]) -> Tuple[str, List[float]]: - scores_by_class = self.group_scores_by_class(query_results) - - # Calculate total score for each class - total_scores = { - route: self.aggregation_method(scores) - for route, scores in scores_by_class.items() - } - top_class = max(total_scores, key=lambda x: total_scores[x], default=None) - - # Return the top class and its associated scores - if top_class is not None: - return str(top_class), scores_by_class.get(top_class, []) - else: - logger.warning("No classification found for semantic classifier.") - return "", [] - - def get(self, name: str) -> Optional[Route]: - for route in self.routes: - if route.name == name: - return route - logger.error(f"Route `{name}` not found") - return None - - def _semantic_classify_multiple_routes( - self, query_results: List[dict] - ) -> List[Tuple[str, float]]: - scores_by_class = self.group_scores_by_class(query_results) - - # Filter classes based on threshold and find max score for each - classes_above_threshold = [] - for route_name, scores in scores_by_class.items(): - # Use the get method to find the Route object by its name - route_obj = self.get(route_name) - if route_obj is not None: - # Use the Route object's threshold if it exists, otherwise use the provided threshold - _threshold = ( - route_obj.score_threshold - if route_obj.score_threshold is not None - else self.score_threshold - ) - if self._pass_threshold(scores, _threshold): - max_score = max(scores) - classes_above_threshold.append((route_name, max_score)) - - return classes_above_threshold - - def group_scores_by_class( - self, query_results: List[dict] - ) -> Dict[str, List[float]]: - scores_by_class: Dict[str, List[float]] = {} - for result in query_results: - score = result["score"] - route = result["route"] - if route in scores_by_class: - scores_by_class[route].append(score) - else: - scores_by_class[route] = [score] - return scores_by_class - - def _pass_threshold(self, scores: List[float], threshold: float) -> bool: - if scores: - return max(scores) > threshold - else: - return False - - def _update_thresholds(self, score_thresholds: Optional[Dict[str, float]] = None): - """ - Update the score thresholds for each route. - """ - if score_thresholds: - for route in self.routes: - route.score_threshold = score_thresholds.get( - route.name, self.score_threshold - ) - - def to_config(self) -> LayerConfig: - return LayerConfig( - encoder_type=self.encoder.type, - encoder_name=self.encoder.name, - routes=self.routes, - ) - - def to_json(self, file_path: str): - config = self.to_config() - config.to_file(file_path) - - def to_yaml(self, file_path: str): - config = self.to_config() - config.to_file(file_path) - - def get_thresholds(self) -> Dict[str, float]: - # TODO: float() below is hacky fix for lint, fix this with new type? - thresholds = { - route.name: float(route.score_threshold or self.score_threshold) - for route in self.routes - } - return thresholds - - def fit( - self, - X: List[str], - y: List[str], - batch_size: int = 500, - max_iter: int = 500, - ): - # convert inputs into array - Xq: List[List[float]] = [] - for i in tqdm(range(0, len(X), batch_size), desc="Generating embeddings"): - emb = np.array(self.encoder(X[i : i + batch_size])) - Xq.extend(emb) - # initial eval (we will iterate from here) - best_acc = self._vec_evaluate(Xq=np.array(Xq), y=y) - best_thresholds = self.get_thresholds() - # begin fit - for _ in (pbar := tqdm(range(max_iter), desc="Training")): - pbar.set_postfix({"acc": round(best_acc, 2)}) - # Find the best score threshold for each route - thresholds = threshold_random_search( - route_layer=self, - search_range=0.8, - ) - # update current route layer - self._update_thresholds(score_thresholds=thresholds) - # evaluate - acc = self._vec_evaluate(Xq=Xq, y=y) - # update best - if acc > best_acc: - best_acc = acc - best_thresholds = thresholds - # update route layer to best thresholds - self._update_thresholds(score_thresholds=best_thresholds) - - def evaluate(self, X: List[str], y: List[str], batch_size: int = 500) -> float: - """ - Evaluate the accuracy of the route selection. - """ - Xq: List[List[float]] = [] - for i in tqdm(range(0, len(X), batch_size), desc="Generating embeddings"): - emb = np.array(self.encoder(X[i : i + batch_size])) - Xq.extend(emb) - - accuracy = self._vec_evaluate(Xq=np.array(Xq), y=y) - return accuracy - - def _vec_evaluate(self, Xq: Union[List[float], Any], y: List[str]) -> float: - """ - Evaluate the accuracy of the route selection. - """ - correct = 0 - for xq, target_route in zip(Xq, y): - # We treate dynamic routes as static here, because when evaluating we use only vectors, and dynamic routes expect strings by default. - route_choice = self(vector=xq, simulate_static=True) - if route_choice.name == target_route: - correct += 1 - accuracy = correct / len(Xq) - return accuracy - - def _get_route_names(self) -> List[str]: - return [route.name for route in self.routes] - - -def threshold_random_search( - route_layer: RouteLayer, - search_range: Union[int, float], -) -> Dict[str, float]: - """Performs a random search iteration given a route layer and a search range.""" - # extract the route names - routes = route_layer.get_thresholds() - route_names = list(routes.keys()) - route_thresholds = list(routes.values()) - # generate search range for each - score_threshold_values = [] - for threshold in route_thresholds: - score_threshold_values.append( - np.linspace( - start=max(threshold - search_range, 0.0), - stop=min(threshold + search_range, 1.0), - num=100, - ) - ) - # Generate a random threshold for each route - score_thresholds = { - route: random.choice(score_threshold_values[i]) - for i, route in enumerate(route_names) - } - return score_thresholds diff --git a/semantic_chunkers/linear.py b/semantic_chunkers/linear.py deleted file mode 100644 index 1c13262..0000000 --- a/semantic_chunkers/linear.py +++ /dev/null @@ -1,30 +0,0 @@ -from typing import Tuple - -import numpy as np -from numpy.linalg import norm - - -def similarity_matrix(xq: np.ndarray, index: np.ndarray) -> np.ndarray: - """Compute the similarity scores between a query vector and a set of vectors. - - Args: - xq: A query vector (1d ndarray) - index: A set of vectors. - - Returns: - The similarity between the query vector and the set of vectors. - """ - - index_norm = norm(index, axis=1) - xq_norm = norm(xq.T) - sim = np.dot(index, xq.T) / (index_norm * xq_norm) - return sim - - -def top_scores(sim: np.ndarray, top_k: int = 5) -> Tuple[np.ndarray, np.ndarray]: - # get indices of top_k records - top_k = min(top_k, sim.shape[0]) - idx = np.argpartition(sim, -top_k)[-top_k:] - scores = sim[idx] - - return scores, idx diff --git a/semantic_chunkers/llms/__init__.py b/semantic_chunkers/llms/__init__.py deleted file mode 100644 index 1d9583c..0000000 --- a/semantic_chunkers/llms/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -from semantic_chunkers.llms.base import BaseLLM -from semantic_chunkers.llms.cohere import CohereLLM -from semantic_chunkers.llms.llamacpp import LlamaCppLLM -from semantic_chunkers.llms.mistral import MistralAILLM -from semantic_chunkers.llms.openai import OpenAILLM -from semantic_chunkers.llms.openrouter import OpenRouterLLM -from semantic_chunkers.llms.zure import AzureOpenAILLM - -__all__ = [ - "BaseLLM", - "OpenAILLM", - "LlamaCppLLM", - "OpenRouterLLM", - "CohereLLM", - "AzureOpenAILLM", - "MistralAILLM", -] diff --git a/semantic_chunkers/llms/base.py b/semantic_chunkers/llms/base.py deleted file mode 100644 index c4f86a3..0000000 --- a/semantic_chunkers/llms/base.py +++ /dev/null @@ -1,110 +0,0 @@ -import json -from typing import Any, List, Optional - -from pydantic.v1 import BaseModel - -from semantic_chunkers.schema import Message -from semantic_chunkers.utils.logger import logger - - -class BaseLLM(BaseModel): - name: str - - class Config: - arbitrary_types_allowed = True - - def __init__(self, name: str, **kwargs): - super().__init__(name=name, **kwargs) - - def __call__(self, messages: List[Message]) -> Optional[str]: - raise NotImplementedError("Subclasses must implement this method") - - def _is_valid_inputs( - self, inputs: dict[str, Any], function_schema: dict[str, Any] - ) -> bool: - """Validate the extracted inputs against the function schema""" - try: - # Extract parameter names and types from the signature string - signature = function_schema["signature"] - param_info = [param.strip() for param in signature[1:-1].split(",")] - param_names = [info.split(":")[0].strip() for info in param_info] - param_types = [ - info.split(":")[1].strip().split("=")[0].strip() for info in param_info - ] - for name, type_str in zip(param_names, param_types): - if name not in inputs: - logger.error(f"Input {name} missing from query") - return False - return True - except Exception as e: - logger.error(f"Input validation error: {str(e)}") - return False - - def extract_function_inputs( - self, query: str, function_schema: dict[str, Any] - ) -> dict: - logger.info("Extracting function input...") - - prompt = f""" -You are an accurate and reliable computer program that only outputs valid JSON. -Your task is to output JSON representing the input arguments of a Python function. - -This is the Python function's schema: - -### FUNCTION_SCHEMA Start ### - {function_schema} -### FUNCTION_SCHEMA End ### - -This is the input query. - -### QUERY Start ### - {query} -### QUERY End ### - -The arguments that you need to provide values for, together with their datatypes, are stated in "signature" in the FUNCTION_SCHEMA. -The values these arguments must take are made clear by the QUERY. -Use the FUNCTION_SCHEMA "description" too, as this might provide helpful clues about the arguments and their values. -Return only JSON, stating the argument names and their corresponding values. - -### FORMATTING_INSTRUCTIONS Start ### - Return a respones in valid JSON format. Do not return any other explanation or text, just the JSON. - The JSON-Keys are the names of the arguments, and JSON-values are the values those arguments should take. -### FORMATTING_INSTRUCTIONS End ### - -### EXAMPLE Start ### - === EXAMPLE_INPUT_QUERY Start === - "How is the weather in Hawaii right now in International units?" - === EXAMPLE_INPUT_QUERY End === - === EXAMPLE_INPUT_SCHEMA Start === - {{ - "name": "get_weather", - "description": "Useful to get the weather in a specific location", - "signature": "(location: str, degree: str) -> str", - "output": "", - }} - === EXAMPLE_INPUT_QUERY End === - === EXAMPLE_OUTPUT Start === - {{ - "location": "Hawaii", - "degree": "Celsius", - }} - === EXAMPLE_OUTPUT End === -### EXAMPLE End ### - -Note: I will tip $500 for and accurate JSON output. You will be penalized for an inaccurate JSON output. - -Provide JSON output now: -""" - llm_input = [Message(role="user", content=prompt)] - output = self(llm_input) - - if not output: - raise Exception("No output generated for extract function input") - - output = output.replace("'", '"').strip().rstrip(",") - logger.info(f"LLM output: {output}") - function_inputs = json.loads(output) - logger.info(f"Function inputs: {function_inputs}") - if not self._is_valid_inputs(function_inputs, function_schema): - raise ValueError("Invalid inputs") - return function_inputs diff --git a/semantic_chunkers/llms/cohere.py b/semantic_chunkers/llms/cohere.py deleted file mode 100644 index c6ddb8b..0000000 --- a/semantic_chunkers/llms/cohere.py +++ /dev/null @@ -1,48 +0,0 @@ -import os -from typing import List, Optional - -import cohere - -from semantic_chunkers.llms import BaseLLM -from semantic_chunkers.schema import Message - - -class CohereLLM(BaseLLM): - client: Optional[cohere.Client] = None - - def __init__( - self, - name: Optional[str] = None, - cohere_api_key: Optional[str] = None, - ): - if name is None: - name = os.getenv("COHERE_CHAT_MODEL_NAME", "command") - super().__init__(name=name) - cohere_api_key = cohere_api_key or os.getenv("COHERE_API_KEY") - if cohere_api_key is None: - raise ValueError("Cohere API key cannot be 'None'.") - try: - self.client = cohere.Client(cohere_api_key) - except Exception as e: - raise ValueError( - f"Cohere API client failed to initialize. Error: {e}" - ) from e - - def __call__(self, messages: List[Message]) -> str: - if self.client is None: - raise ValueError("Cohere client is not initialized.") - try: - completion = self.client.chat( - model=self.name, - chat_history=[m.to_cohere() for m in messages[:-1]], - message=messages[-1].content, - ) - - output = completion.text - - if not output: - raise Exception("No output generated") - return output - - except Exception as e: - raise ValueError(f"Cohere API call failed. Error: {e}") from e diff --git a/semantic_chunkers/llms/grammars/json.gbnf b/semantic_chunkers/llms/grammars/json.gbnf deleted file mode 100644 index a9537cd..0000000 --- a/semantic_chunkers/llms/grammars/json.gbnf +++ /dev/null @@ -1,25 +0,0 @@ -root ::= object -value ::= object | array | string | number | ("true" | "false" | "null") ws - -object ::= - "{" ws ( - string ":" ws value - ("," ws string ":" ws value)* - )? "}" ws - -array ::= - "[" ws ( - value - ("," ws value)* - )? "]" ws - -string ::= - "\"" ( - [^"\\] | - "\\" (["\\/bfnrt] | "u" [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F]) # escapes - )* "\"" ws - -number ::= ("-"? ([0-9] | [1-9] [0-9]*)) ("." [0-9]+)? ([eE] [-+]? [0-9]+)? ws - -# Optional space: by convention, applied in this grammar after literal chars when allowed -ws ::= ([ \t\n] ws)? diff --git a/semantic_chunkers/llms/llamacpp.py b/semantic_chunkers/llms/llamacpp.py deleted file mode 100644 index f2cb6ff..0000000 --- a/semantic_chunkers/llms/llamacpp.py +++ /dev/null @@ -1,87 +0,0 @@ -from contextlib import contextmanager -from pathlib import Path -from typing import Any, Optional - -from pydantic.v1 import PrivateAttr - -from semantic_chunkers.llms.base import BaseLLM -from semantic_chunkers.schema import Message -from semantic_chunkers.utils.logger import logger - - -class LlamaCppLLM(BaseLLM): - llm: Any - temperature: float - max_tokens: Optional[int] = 200 - grammar: Optional[Any] = None - _llama_cpp: Any = PrivateAttr() - - def __init__( - self, - llm: Any, - name: str = "llama.cpp", - temperature: float = 0.2, - max_tokens: Optional[int] = 200, - grammar: Optional[Any] = None, - ): - super().__init__( - name=name, - llm=llm, - temperature=temperature, - max_tokens=max_tokens, - grammar=grammar, - ) - - try: - import llama_cpp - except ImportError: - raise ImportError( - "Please install LlamaCPP to use Llama CPP llm. " - "You can install it with: " - "`pip install 'semantic-router[local]'`" - ) - self._llama_cpp = llama_cpp - self.llm = llm - self.temperature = temperature - self.max_tokens = max_tokens - self.grammar = grammar - - def __call__( - self, - messages: list[Message], - ) -> str: - try: - completion = self.llm.create_chat_completion( - messages=[m.to_llamacpp() for m in messages], - temperature=self.temperature, - max_tokens=self.max_tokens, - grammar=self.grammar, - stream=False, - ) - assert isinstance(completion, dict) # keep mypy happy - output = completion["choices"][0]["message"]["content"] - - if not output: - raise Exception("No output generated") - return output - except Exception as e: - logger.error(f"LLM error: {e}") - raise - - @contextmanager - def _grammar(self): - grammar_path = Path(__file__).parent.joinpath("grammars", "json.gbnf") - assert grammar_path.exists(), f"{grammar_path}\ndoes not exist" - try: - self.grammar = self._llama_cpp.LlamaGrammar.from_file(grammar_path) - yield - finally: - self.grammar = None - - def extract_function_inputs( - self, query: str, function_schema: dict[str, Any] - ) -> dict: - with self._grammar(): - return super().extract_function_inputs( - query=query, function_schema=function_schema - ) diff --git a/semantic_chunkers/llms/mistral.py b/semantic_chunkers/llms/mistral.py deleted file mode 100644 index 82cfb75..0000000 --- a/semantic_chunkers/llms/mistral.py +++ /dev/null @@ -1,78 +0,0 @@ -import os -from typing import Any, List, Optional - -from pydantic.v1 import PrivateAttr - -from semantic_chunkers.llms import BaseLLM -from semantic_chunkers.schema import Message -from semantic_chunkers.utils.defaults import EncoderDefault -from semantic_chunkers.utils.logger import logger - - -class MistralAILLM(BaseLLM): - _client: Any = PrivateAttr() - temperature: Optional[float] - max_tokens: Optional[int] - _mistralai: Any = PrivateAttr() - - def __init__( - self, - name: Optional[str] = None, - mistralai_api_key: Optional[str] = None, - temperature: float = 0.01, - max_tokens: int = 200, - ): - if name is None: - name = EncoderDefault.MISTRAL.value["language_model"] - super().__init__(name=name) - self._client, self._mistralai = self._initialize_client(mistralai_api_key) - self.temperature = temperature - self.max_tokens = max_tokens - - def _initialize_client(self, api_key): - try: - import mistralai - from mistralai.client import MistralClient - except ImportError: - raise ImportError( - "Please install MistralAI to use MistralAI LLM. " - "You can install it with: " - "`pip install 'semantic-router[mistralai]'`" - ) - api_key = api_key or os.getenv("MISTRALAI_API_KEY") - if api_key is None: - raise ValueError("MistralAI API key cannot be 'None'.") - try: - client = MistralClient(api_key=api_key) - except Exception as e: - raise ValueError( - f"MistralAI API client failed to initialize. Error: {e}" - ) from e - return client, mistralai - - def __call__(self, messages: List[Message]) -> str: - if self._client is None: - raise ValueError("MistralAI client is not initialized.") - - chat_messages = [ - self._mistralai.models.chat_completion.ChatMessage( - role=m.role, content=m.content - ) - for m in messages - ] - try: - completion = self._client.chat( - model=self.name, - messages=chat_messages, - temperature=self.temperature, - max_tokens=self.max_tokens, - ) - - output = completion.choices[0].message.content - - if not output: - raise Exception("No output generated") - return output - except Exception as e: - logger.error(f"LLM error: {e}") - raise Exception(f"LLM error: {e}") from e diff --git a/semantic_chunkers/llms/ollama.py b/semantic_chunkers/llms/ollama.py deleted file mode 100644 index d9c3c6f..0000000 --- a/semantic_chunkers/llms/ollama.py +++ /dev/null @@ -1,58 +0,0 @@ -from typing import List, Optional - -import requests - -from semantic_chunkers.llms import BaseLLM -from semantic_chunkers.schema import Message -from semantic_chunkers.utils.logger import logger - - -class OllamaLLM(BaseLLM): - temperature: Optional[float] - llm_name: Optional[str] - max_tokens: Optional[int] - stream: Optional[bool] - - def __init__( - self, - name: str = "ollama", - temperature: float = 0.2, - llm_name: str = "openhermes", - max_tokens: Optional[int] = 200, - stream: bool = False, - ): - super().__init__(name=name) - self.temperature = temperature - self.llm_name = llm_name - self.max_tokens = max_tokens - self.stream = stream - - def __call__( - self, - messages: List[Message], - temperature: Optional[float] = None, - llm_name: Optional[str] = None, - max_tokens: Optional[int] = None, - stream: Optional[bool] = None, - ) -> str: - # Use instance defaults if not overridden - temperature = temperature if temperature is not None else self.temperature - llm_name = llm_name if llm_name is not None else self.llm_name - max_tokens = max_tokens if max_tokens is not None else self.max_tokens - stream = stream if stream is not None else self.stream - - try: - payload = { - "model": llm_name, - "messages": [m.to_openai() for m in messages], - "options": {"temperature": temperature, "num_predict": max_tokens}, - "format": "json", - "stream": stream, - } - response = requests.post("http://localhost:11434/api/chat", json=payload) - output = response.json()["message"]["content"] - - return output - except Exception as e: - logger.error(f"LLM error: {e}") - raise Exception(f"LLM error: {e}") from e diff --git a/semantic_chunkers/llms/openai.py b/semantic_chunkers/llms/openai.py deleted file mode 100644 index 64f545a..0000000 --- a/semantic_chunkers/llms/openai.py +++ /dev/null @@ -1,57 +0,0 @@ -import os -from typing import List, Optional - -import openai - -from semantic_chunkers.llms import BaseLLM -from semantic_chunkers.schema import Message -from semantic_chunkers.utils.defaults import EncoderDefault -from semantic_chunkers.utils.logger import logger - - -class OpenAILLM(BaseLLM): - client: Optional[openai.OpenAI] - temperature: Optional[float] - max_tokens: Optional[int] - - def __init__( - self, - name: Optional[str] = None, - openai_api_key: Optional[str] = None, - temperature: float = 0.01, - max_tokens: int = 200, - ): - if name is None: - name = EncoderDefault.OPENAI.value["language_model"] - super().__init__(name=name) - api_key = openai_api_key or os.getenv("OPENAI_API_KEY") - if api_key is None: - raise ValueError("OpenAI API key cannot be 'None'.") - try: - self.client = openai.OpenAI(api_key=api_key) - except Exception as e: - raise ValueError( - f"OpenAI API client failed to initialize. Error: {e}" - ) from e - self.temperature = temperature - self.max_tokens = max_tokens - - def __call__(self, messages: List[Message]) -> str: - if self.client is None: - raise ValueError("OpenAI client is not initialized.") - try: - completion = self.client.chat.completions.create( - model=self.name, - messages=[m.to_openai() for m in messages], - temperature=self.temperature, - max_tokens=self.max_tokens, - ) - - output = completion.choices[0].message.content - - if not output: - raise Exception("No output generated") - return output - except Exception as e: - logger.error(f"LLM error: {e}") - raise Exception(f"LLM error: {e}") from e diff --git a/semantic_chunkers/llms/openrouter.py b/semantic_chunkers/llms/openrouter.py deleted file mode 100644 index b1c20c7..0000000 --- a/semantic_chunkers/llms/openrouter.py +++ /dev/null @@ -1,61 +0,0 @@ -import os -from typing import List, Optional - -import openai - -from semantic_chunkers.llms import BaseLLM -from semantic_chunkers.schema import Message -from semantic_chunkers.utils.logger import logger - - -class OpenRouterLLM(BaseLLM): - client: Optional[openai.OpenAI] - base_url: Optional[str] - temperature: Optional[float] - max_tokens: Optional[int] - - def __init__( - self, - name: Optional[str] = None, - openrouter_api_key: Optional[str] = None, - base_url: str = "https://openrouter.ai/api/v1", - temperature: float = 0.01, - max_tokens: int = 200, - ): - if name is None: - name = os.getenv( - "OPENROUTER_CHAT_MODEL_NAME", "mistralai/mistral-7b-instruct" - ) - super().__init__(name=name) - self.base_url = base_url - api_key = openrouter_api_key or os.getenv("OPENROUTER_API_KEY") - if api_key is None: - raise ValueError("OpenRouter API key cannot be 'None'.") - try: - self.client = openai.OpenAI(api_key=api_key, base_url=self.base_url) - except Exception as e: - raise ValueError( - f"OpenRouter API client failed to initialize. Error: {e}" - ) from e - self.temperature = temperature - self.max_tokens = max_tokens - - def __call__(self, messages: List[Message]) -> str: - if self.client is None: - raise ValueError("OpenRouter client is not initialized.") - try: - completion = self.client.chat.completions.create( - model=self.name, - messages=[m.to_openai() for m in messages], - temperature=self.temperature, - max_tokens=self.max_tokens, - ) - - output = completion.choices[0].message.content - - if not output: - raise Exception("No output generated") - return output - except Exception as e: - logger.error(f"LLM error: {e}") - raise Exception(f"LLM error: {e}") from e diff --git a/semantic_chunkers/llms/zure.py b/semantic_chunkers/llms/zure.py deleted file mode 100644 index 44cac9f..0000000 --- a/semantic_chunkers/llms/zure.py +++ /dev/null @@ -1,62 +0,0 @@ -import os -from typing import List, Optional - -import openai - -from semantic_chunkers.llms import BaseLLM -from semantic_chunkers.schema import Message -from semantic_chunkers.utils.defaults import EncoderDefault -from semantic_chunkers.utils.logger import logger - - -class AzureOpenAILLM(BaseLLM): - client: Optional[openai.AzureOpenAI] - temperature: Optional[float] - max_tokens: Optional[int] - - def __init__( - self, - name: Optional[str] = None, - openai_api_key: Optional[str] = None, - azure_endpoint: Optional[str] = None, - temperature: float = 0.01, - max_tokens: int = 200, - api_version="2023-07-01-preview", - ): - if name is None: - name = EncoderDefault.AZURE.value["language_model"] - super().__init__(name=name) - api_key = openai_api_key or os.getenv("AZURE_OPENAI_API_KEY") - if api_key is None: - raise ValueError("AzureOpenAI API key cannot be 'None'.") - azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") - if azure_endpoint is None: - raise ValueError("Azure endpoint API key cannot be 'None'.") - try: - self.client = openai.AzureOpenAI( - api_key=api_key, azure_endpoint=azure_endpoint, api_version=api_version - ) - except Exception as e: - raise ValueError(f"AzureOpenAI API client failed to initialize. Error: {e}") - self.temperature = temperature - self.max_tokens = max_tokens - - def __call__(self, messages: List[Message]) -> str: - if self.client is None: - raise ValueError("AzureOpenAI client is not initialized.") - try: - completion = self.client.chat.completions.create( - model=self.name, - messages=[m.to_openai() for m in messages], - temperature=self.temperature, - max_tokens=self.max_tokens, - ) - - output = completion.choices[0].message.content - - if not output: - raise Exception("No output generated") - return output - except Exception as e: - logger.error(f"LLM error: {e}") - raise Exception(f"LLM error: {e}") from e diff --git a/semantic_chunkers/route.py b/semantic_chunkers/route.py deleted file mode 100644 index 84f2ecd..0000000 --- a/semantic_chunkers/route.py +++ /dev/null @@ -1,160 +0,0 @@ -import json -import re -from typing import Any, Callable, Dict, List, Optional, Union - -from pydantic.v1 import BaseModel - -from semantic_chunkers.llms import BaseLLM -from semantic_chunkers.schema import Message, RouteChoice -from semantic_chunkers.utils import function_call -from semantic_chunkers.utils.logger import logger - -try: - from PIL.Image import Image -except ImportError: - pass - - -def is_valid(route_config: str) -> bool: - try: - output_json = json.loads(route_config) - required_keys = ["name", "utterances"] - - if isinstance(output_json, list): - for item in output_json: - missing_keys = [key for key in required_keys if key not in item] - if missing_keys: - logger.warning( - f"Missing keys in route config: {', '.join(missing_keys)}" - ) - return False - return True - else: - missing_keys = [key for key in required_keys if key not in output_json] - if missing_keys: - logger.warning( - f"Missing keys in route config: {', '.join(missing_keys)}" - ) - return False - else: - return True - except json.JSONDecodeError as e: - logger.error(e) - return False - - -class Route(BaseModel): - name: str - utterances: Union[List[str], List[Union[Any, "Image"]]] - description: Optional[str] = None - function_schema: Optional[Dict[str, Any]] = None - llm: Optional[BaseLLM] = None - score_threshold: Optional[float] = None - - class Config: - arbitrary_types_allowed = True - - def __call__(self, query: Optional[str] = None) -> RouteChoice: - if self.function_schema: - if not self.llm: - raise ValueError( - "LLM is required for dynamic routes. Please ensure the `llm` " - "attribute is set." - ) - elif query is None: - raise ValueError( - "Query is required for dynamic routes. Please ensure the `query` " - "argument is passed." - ) - # if a function schema is provided we generate the inputs - extracted_inputs = self.llm.extract_function_inputs( - query=query, function_schema=self.function_schema - ) - func_call = extracted_inputs - else: - # otherwise we just pass None for the call - func_call = None - return RouteChoice(name=self.name, function_call=func_call) - - # def to_dict(self) -> Dict[str, Any]: - # return self.dict() - - def to_dict(self) -> Dict[str, Any]: - data = self.dict() - if self.llm is not None: - data["llm"] = { - "module": self.llm.__module__, - "class": self.llm.__class__.__name__, - "model": self.llm.name, - } - return data - - @classmethod - def from_dict(cls, data: Dict[str, Any]): - return cls(**data) - - @classmethod - def from_dynamic_route(cls, llm: BaseLLM, entity: Union[BaseModel, Callable]): - """ - Generate a dynamic Route object from a function or Pydantic model using LLM - """ - schema = function_call.get_schema(item=entity) - dynamic_route = cls._generate_dynamic_route(llm=llm, function_schema=schema) - dynamic_route.function_schema = schema - return dynamic_route - - @classmethod - def _parse_route_config(cls, config: str) -> str: - # Regular expression to match content inside - config_pattern = r"(.*?)" - match = re.search(config_pattern, config, re.DOTALL) - - if match: - config_content = match.group(1).strip() # Get the matched content - return config_content - else: - raise ValueError("No tags found in the output.") - - @classmethod - def _generate_dynamic_route(cls, llm: BaseLLM, function_schema: Dict[str, Any]): - logger.info("Generating dynamic route...") - - prompt = f""" - You are tasked to generate a JSON configuration based on the provided - function schema. Please follow the template below, no other tokens allowed: - - - {{ - "name": "", - "utterances": [ - "", - "", - "", - "", - ""] - }} - - - Only include the "name" and "utterances" keys in your answer. - The "name" should match the function name and the "utterances" - should comprise a list of 5 example phrases that could be used to invoke - the function. Use real values instead of placeholders. - - Input schema: - {function_schema} - """ - - llm_input = [Message(role="user", content=prompt)] - output = llm(llm_input) - if not output: - raise Exception("No output generated for dynamic route") - - route_config = cls._parse_route_config(config=output) - - logger.info(f"Generated route config:\n{route_config}") - - if is_valid(route_config): - route_config_dict = json.loads(route_config) - route_config_dict["llm"] = llm - return Route.from_dict(route_config_dict) - raise Exception("No config generated") diff --git a/semantic_chunkers/schema.py b/semantic_chunkers/schema.py index 60f6153..4977415 100644 --- a/semantic_chunkers/schema.py +++ b/semantic_chunkers/schema.py @@ -1,57 +1,9 @@ -from enum import Enum from typing import List, Optional from pydantic.v1 import BaseModel -class EncoderType(Enum): - AZURE = "azure" - COHERE = "cohere" - OPENAI = "openai" - BM25 = "bm25" - TFIDF = "tfidf" - FASTEMBED = "fastembed" - HUGGINGFACE = "huggingface" - MISTRAL = "mistral" - VIT = "vit" - CLIP = "clip" - GOOGLE = "google" - - -class EncoderInfo(BaseModel): - name: str - token_limit: int - - -class RouteChoice(BaseModel): - name: Optional[str] = None - function_call: Optional[dict] = None - similarity_score: Optional[float] = None - - -class Message(BaseModel): - role: str - content: str - - def to_openai(self): - if self.role.lower() not in ["user", "assistant", "system"]: - raise ValueError("Role must be either 'user', 'assistant' or 'system'") - return {"role": self.role, "content": self.content} - - def to_cohere(self): - return {"role": self.role, "message": self.content} - - def to_llamacpp(self): - return {"role": self.role, "content": self.content} - - def to_mistral(self): - return {"role": self.role, "content": self.content} - - def __str__(self): - return f"{self.role}: {self.content}" - - -class DocumentSplit(BaseModel): +class ChunkSet(BaseModel): docs: List[str] is_triggered: bool = False triggered_score: Optional[float] = None @@ -61,10 +13,3 @@ class DocumentSplit(BaseModel): @property def content(self) -> str: return " ".join(self.docs) - - -class Metric(Enum): - COSINE = "cosine" - DOTPRODUCT = "dotproduct" - EUCLIDEAN = "euclidean" - MANHATTAN = "manhattan" diff --git a/semantic_chunkers/text.py b/semantic_chunkers/text.py index cd4275b..e0225e1 100644 --- a/semantic_chunkers/text.py +++ b/semantic_chunkers/text.py @@ -1,10 +1,6 @@ -from typing import List, Literal, Tuple, Union +from typing import Union -from colorama import Fore, Style -from pydantic.v1 import BaseModel, Field - -from semantic_chunkers.encoders import BaseEncoder -from semantic_chunkers.schema import DocumentSplit, Message +from colorama import Fore from semantic_chunkers.chunkers.consecutive_sim import ConsecutiveSimSplitter from semantic_chunkers.chunkers.cumulative_sim import CumulativeSimSplitter @@ -19,187 +15,4 @@ Fore.BLUE, Fore.MAGENTA, Fore.CYAN, -] - - -class Conversation(BaseModel): - messages: List[Message] = Field( - default_factory=list - ) # Ensure this is initialized as an empty list - topics: List[Tuple[int, str]] = [] - splitter: SplitterType = None - - def __str__(self): - if not self.messages: - return "" - if not self.topics: - return "\n".join([str(message) for message in self.messages]) - else: - # we print each topic a different color - return_str_list = [] - current_topic_id = None - color_idx = 0 - for topic_id, message in self.topics: - if topic_id != current_topic_id: - # change color - color_idx = (color_idx + 1) % len(colors) - current_topic_id = topic_id - return_str_list.append(f"{colors[color_idx]}{message}{Style.RESET_ALL}") - return "\n".join(return_str_list) - - def add_new_messages(self, new_messages: List[Message]): - """Adds new messages to the conversation. - - :param messages: The new messages to be added to the conversation. - :type messages: List[Message] - """ - self.messages.extend(new_messages) - - def remove_topics(self): - self.topics = [] - - def configure_splitter( - self, - encoder: BaseEncoder, - threshold: float = 0.5, - split_method: Literal[ - "consecutive_similarity", "cumulative_similarity" - ] = "consecutive_similarity", - ): - """ - Configures the splitter for the conversation based on the specified method. - - This method sets the splitter attribute of the Conversation class to an instance of the appropriate splitter class, based on the `split_method` parameter. It uses the provided encoder and similarity threshold to initialize the splitter. - - :param encoder: The encoder to be used by the splitter for encoding messages. - :type encoder: BaseEncoder - :param threshold: The similarity threshold to be used by the splitter. Defaults to 0.5. - :type threshold: float - :param split_method: The method to be used for splitting the conversation into topics. Can be one of "consecutive_similarity" or "cumulative_similarity". Defaults to "consecutive_similarity". - :type split_method: Literal["consecutive_similarity", "cumulative_similarity"] - :raises ValueError: If an invalid split method is provided. - """ - - if split_method == "consecutive_similarity": - self.splitter = ConsecutiveSimSplitter( - encoder=encoder, score_threshold=threshold - ) - elif split_method == "cumulative_similarity": - self.splitter = CumulativeSimSplitter( - encoder=encoder, score_threshold=threshold - ) - else: - raise ValueError(f"Invalid split method: {split_method}") - - def get_last_message_and_topic_id(self): - """ - Retrieves the last message and its corresponding topic ID from the list of topics. - - This method scans the list of topics, if any, and returns the topic ID and message of the last entry. If there are no topics, it returns None for both the topic ID and message. - - The last message from a previous spiltting is useful because it can be passed to the splitter along with new messages, and if the first new message is assigned the same topic as the last message, then we know that the new message should continue with the same topic ID as the last message. - - :return: A tuple containing the topic ID (int) and message (str) of the last topic, or (None, None) if there are no topics. - :rtype: tuple[int | None, str | None] - """ - - if self.topics: - return self.topics[-1] - else: - return None, None - - def determine_topic_start_index(self, new_topics, last_topic_id, last_message): - """ - Determines the starting index for new topics based on existing topics and the last message. - - :param new_topics: The list of new topics generated by the splitter. - :type new_topics: List[DocumentSplit] - :param last_topic_id: The topic ID of the last message from the previous splitting. - :type last_topic_id: int, optional - :param last_message: The last message from the previous splitting. - :type last_message: str, optional - :return: The starting index for new topics. - :rtype: int - """ - if not self.topics or not new_topics: - return 1 - if ( - last_topic_id is not None - and last_message - and last_message in new_topics[0].docs - ): - return last_topic_id - return self.topics[-1][0] + 1 - - def append_new_topics(self, new_topics, start) -> None: - """ - Appends new topics to the list of topics with unique IDs. - - This method takes a list of new topics generated by the splitter and appends them to the existing list of topics, ensuring each topic is assigned a unique ID starting from the specified starting index. - - :param new_topics: The list of new topics generated by the splitter. - :type new_topics: List[DocumentSplit] - :param start: The starting index for new topics. - :type start: int - """ - for i, topic in enumerate(new_topics, start=start): - for message in topic.docs: - self.topics.append((i, message)) - - def split_by_topic( - self, force: bool = False - ) -> Tuple[List[Tuple[int, str]], List[DocumentSplit]]: - """ - Splits the messages into topics based on their semantic similarity. - - This method processes unclustered messages, splits them into topics using the configured splitter, and appends the new topics to the existing list of topics with unique IDs. It ensures that messages belonging to the same topic are grouped together, even if they were not processed in the same batch. - - :raises ValueError: If the splitter is not configured before calling this method. - - :return: A tuple containing the updated list of topics and the list of new topics generated in this call. - :rtype: tuple[list[tuple[int, str]], list[DocumentSplit]] - """ - - if self.splitter is None: - raise ValueError( - "Splitter is not configured. Please call configure_splitter first." - ) - new_topics: List[DocumentSplit] = [] - - if self.topics: - # reset self.topics - self.topics = [] - - # Get unclusteed messages. - unclustered_messages = self.messages[len(self.topics) :] - if not unclustered_messages: - print("No unclustered messages to process.") - return self.topics, new_topics - - # Extract the last topic ID and message from the previous splitting, if they exist. - last_topic_id, last_message = self.get_last_message_and_topic_id() - - # Initialize docs with the last message from the last topic if it exists, and with unclustered messages. - # TODO: Currenlty only getting last message from last topic in previous splitting. Should we get more for more reliable continuation of topic ids? - docs = [last_message] if last_message else [] - docs.extend([f"{m.role}: {m.content}" for m in unclustered_messages]) - - new_topics = self.splitter(docs) - - # Ensure there are new topics before proceeding - if not new_topics: - return self.topics, [] - - # If last_message and the first new message are assigned the same topic ID, then we know the new message should take last_message's place original topic id. - start = self.determine_topic_start_index( - new_topics, last_topic_id, last_message - ) - - # If the last message from the previous splitting is found in the first new topic, remove it - if self.topics and new_topics[0].docs[0] == self.topics[-1][1]: - new_topics[0].docs.pop(0) - - self.append_new_topics(new_topics, start) - - # TODO: Instead of self.topics as list of tuples should it also be a list of DocumentSplit objects? - return self.topics, new_topics +] \ No newline at end of file diff --git a/semantic_chunkers/utils/defaults.py b/semantic_chunkers/utils/defaults.py deleted file mode 100644 index 3c9cbb2..0000000 --- a/semantic_chunkers/utils/defaults.py +++ /dev/null @@ -1,33 +0,0 @@ -import os -from enum import Enum - - -class EncoderDefault(Enum): - FASTEMBED = { - "embedding_model": "BAAI/bge-small-en-v1.5", - "language_model": "BAAI/bge-small-en-v1.5", - } - OPENAI = { - "embedding_model": os.getenv("OPENAI_MODEL_NAME", "text-embedding-ada-002"), - "language_model": os.getenv("OPENAI_CHAT_MODEL_NAME", "gpt-3.5-turbo"), - } - COHERE = { - "embedding_model": os.getenv("COHERE_MODEL_NAME", "embed-english-v3.0"), - "language_model": os.getenv("COHERE_CHAT_MODEL_NAME", "command"), - } - MISTRAL = { - "embedding_model": os.getenv("MISTRAL_MODEL_NAME", "mistral-embed"), - "language_model": os.getenv("MISTRALAI_CHAT_MODEL_NAME", "mistral-tiny"), - } - AZURE = { - "embedding_model": os.getenv("AZURE_OPENAI_MODEL", "text-embedding-ada-002"), - "language_model": os.getenv("OPENAI_CHAT_MODEL_NAME", "gpt-3.5-turbo"), - "deployment_name": os.getenv( - "AZURE_OPENAI_DEPLOYMENT_NAME", "text-embedding-ada-002" - ), - } - GOOGLE = { - "embedding_model": os.getenv( - "GOOGLE_EMBEDDING_MODEL", "textembedding-gecko@003" - ), - } diff --git a/semantic_chunkers/utils/function_call.py b/semantic_chunkers/utils/function_call.py deleted file mode 100644 index fcf2967..0000000 --- a/semantic_chunkers/utils/function_call.py +++ /dev/null @@ -1,56 +0,0 @@ -import inspect -from typing import Any, Callable, Dict, List, Union - -from pydantic.v1 import BaseModel - -from semantic_chunkers.llms import BaseLLM -from semantic_chunkers.schema import Message, RouteChoice -from semantic_chunkers.utils.logger import logger - - -def get_schema(item: Union[BaseModel, Callable]) -> Dict[str, Any]: - if isinstance(item, BaseModel): - signature_parts = [] - for field_name, field_model in item.__annotations__.items(): - field_info = item.__fields__[field_name] - default_value = field_info.default - - if default_value: - default_repr = repr(default_value) - signature_part = ( - f"{field_name}: {field_model.__name__} = {default_repr}" - ) - else: - signature_part = f"{field_name}: {field_model.__name__}" - - signature_parts.append(signature_part) - signature = f"({', '.join(signature_parts)}) -> str" - schema = { - "name": item.__class__.__name__, - "description": item.__doc__, - "signature": signature, - } - else: - schema = { - "name": item.__name__, - "description": str(inspect.getdoc(item)), - "signature": str(inspect.signature(item)), - "output": str(inspect.signature(item).return_annotation), - } - return schema - - -# TODO: Add route layer object to the input, solve circular import issue -async def route_and_execute( - query: str, llm: BaseLLM, functions: List[Callable], layer -) -> Any: - route_choice: RouteChoice = layer(query) - - for function in functions: - if function.__name__ == route_choice.name: - if route_choice.function_call: - return function(**route_choice.function_call) - - logger.warning("No function found, calling LLM.") - llm_input = [Message(role="user", content=query)] - return llm(llm_input) diff --git a/semantic_chunkers/utils/llm.py b/semantic_chunkers/utils/llm.py deleted file mode 100644 index 7106c2b..0000000 --- a/semantic_chunkers/utils/llm.py +++ /dev/null @@ -1,65 +0,0 @@ -import os -from typing import Optional - -import openai - -from semantic_chunkers.utils.logger import logger - - -def llm(prompt: str) -> Optional[str]: - try: - client = openai.OpenAI( - base_url="https://openrouter.ai/api/v1", - api_key=os.getenv("OPENROUTER_API_KEY"), - ) - - completion = client.chat.completions.create( - model="mistralai/mistral-7b-instruct", - messages=[ - { - "role": "user", - "content": prompt, - }, - ], - temperature=0.01, - max_tokens=200, - ) - - output = completion.choices[0].message.content - - if not output: - raise Exception("No output generated") - return output - except Exception as e: - logger.error(f"LLM error: {e}") - raise Exception(f"LLM error: {e}") from e - - -# TODO integrate async LLM function -# async def allm(prompt: str) -> Optional[str]: -# try: -# client = openai.AsyncOpenAI( -# base_url="https://openrouter.ai/api/v1", -# api_key=os.getenv("OPENROUTER_API_KEY"), -# ) - -# completion = await client.chat.completions.create( -# model="mistralai/mistral-7b-instruct", -# messages=[ -# { -# "role": "user", -# "content": prompt, -# }, -# ], -# temperature=0.01, -# max_tokens=200, -# ) - -# output = completion.choices[0].message.content - -# if not output: -# raise Exception("No output generated") -# return output -# except Exception as e: -# logger.error(f"LLM error: {e}") -# raise Exception(f"LLM error: {e}") from e diff --git a/tests/unit/test_text.py b/tests/unit/test_text.py index 675497e..88eb9b3 100644 --- a/tests/unit/test_text.py +++ b/tests/unit/test_text.py @@ -2,12 +2,12 @@ import pytest -from semantic_chunkers.encoders.cohere import ( +from semantic_router.encoders.cohere import ( CohereEncoder, ) # Adjust this import based on your project structure -from semantic_chunkers.schema import DocumentSplit +from semantic_chunkers.schema import ChunkSet from semantic_chunkers.chunkers.consecutive_sim import ConsecutiveSimSplitter from semantic_chunkers.chunkers.cumulative_sim import CumulativeSimSplitter from semantic_chunkers.text import Conversation, Message @@ -105,7 +105,7 @@ def test_get_last_message_and_topic_id_with_topics(conversation_instance): def test_determine_topic_start_index_no_existing_topics(conversation_instance): # Scenario where there are no existing topics new_topics = [ - DocumentSplit(docs=["User: Hello!"], is_triggered=True, triggered_score=0.4) + ChunkSet(docs=["User: Hello!"], is_triggered=True, triggered_score=0.4) ] start_index = conversation_instance.determine_topic_start_index( new_topics, None, None @@ -121,7 +121,7 @@ def test_determine_topic_start_index_with_existing_topics_not_including_last_mes # Scenario where existing topics do not include the last message conversation_instance.topics.append((0, "First message")) new_topics = [ - DocumentSplit(docs=["User: Hello!"], is_triggered=True, triggered_score=0.4) + ChunkSet(docs=["User: Hello!"], is_triggered=True, triggered_score=0.4) ] start_index = conversation_instance.determine_topic_start_index( new_topics, 0, "Non-existent last message" @@ -137,7 +137,7 @@ def test_determine_topic_start_index_with_existing_topics_including_last_message # Scenario where the first new topic includes the last message conversation_instance.topics.append((0, "First message")) new_topics = [ - DocumentSplit( + ChunkSet( docs=["First message", "Another message"], is_triggered=True, triggered_score=0.4, @@ -158,7 +158,7 @@ def test_determine_topic_start_index_increment_from_last_topic_id( conversation_instance.topics.append((1, "First message")) conversation_instance.topics.append((2, "Second message")) new_topics = [ - DocumentSplit(docs=["User: Hello!"], is_triggered=True, triggered_score=0.4) + ChunkSet(docs=["User: Hello!"], is_triggered=True, triggered_score=0.4) ] start_index = conversation_instance.determine_topic_start_index( new_topics, 2, "Non-existent last message"