Skip to content

Commit

Permalink
restructure and testing different decision layer structures
Browse files Browse the repository at this point in the history
  • Loading branch information
jamescalam committed Nov 12, 2023
1 parent ad200b0 commit 30a50b4
Show file tree
Hide file tree
Showing 17 changed files with 420 additions and 50 deletions.
5 changes: 0 additions & 5 deletions semantic_router/encoders/__init__.py

This file was deleted.

9 changes: 0 additions & 9 deletions semantic_router/encoders/huggingface.py

This file was deleted.

246 changes: 237 additions & 9 deletions semantic_router/layer.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,124 @@
import numpy as np
from numpy.linalg import norm

from semantic_router.encoders import BaseEncoder, CohereEncoder, OpenAIEncoder
from semantic_router.retrievers import (
BaseRetriever,
CohereRetriever,
OpenAIRetriever,
BM25Retriever
)
from semantic_router.rankers import BaseRanker
from semantic_router.matchers import BaseMatcher
from semantic_router.schema import Decision


class MatcherDecisionLayer:
index: None
decision_arr: None
score_threshold: float

def __init__(self, matcher: BaseMatcher, decisions: list[Decision] = []):
self.matcher = matcher
# if decisions list has been passed and we have retriever
# we initialize index now
if matcher.retriever and decisions:
# initialize index now
for decision in decisions:
self._add_decision(decision=decision)

def __call__(self, text: str) -> str | None:
raise NotImplementedError

class RankDecisionLayer:
def __init__(self, ranker: BaseRanker, decisions: list[Decision] = []):
self.ranker = ranker
# if decisions list has been passed, we initialize decision array
if decisions:
for decision in decisions:
self._add_decision(decision=decision)

def __call__(self, text: str) -> str | None:
results = self._query(text)
top_class, top_class_scores = self._semantic_classify(results)
passed = self._pass_threshold(top_class_scores, self.score_threshold)
if passed:
return top_class
else:
return None

def add(self, decision: Decision):
self._add_decision(decision.utterances)

def _add_decision(self, decision: Decision):
# create decision categories array
if self.categories is None:
self.categories = np.array([decision.name] * len(decision.utterances))
self.utterances = np.array(decision.utterances)
else:
str_arr = np.array([decision.name] * len(decision.utterances))
self.categories = np.concatenate([self.categories, str_arr])
self.utterances = np.concatenate([
self.utterances,
np.array(decision.utterances)
])

def _query(self, text: str, top_k: int = 5):
"""Given some text, encodes and searches the index vector space to
retrieve the top_k most similar records.
"""
if self.categories:
self.rerank.top_n = top_k
idx, docs = self.ranker(query=text, docs=self.utterances)
# create scores based on rank
scores = [1/(i+1) for i in range(len(docs))]
# get the utterance categories (decision names)
decisions = self.categories[idx] if self.categories is not None else []
return [
{"decision": d, "score": s.item()} for d, s in zip(decisions, scores)
]
else:
return []

def _semantic_classify(self, query_results: list[dict]) -> tuple[str, list[float]]:
scores_by_class = {}
for result in query_results:
score = result["score"]
decision = result["decision"]
if decision in scores_by_class:
scores_by_class[decision].append(score)
else:
scores_by_class[decision] = [score]

# Calculate total score for each class
total_scores = {
decision: sum(scores) for decision, scores in scores_by_class.items()
}
top_class = max(total_scores, key=lambda x: total_scores[x], default=None)

# Return the top class and its associated scores
return str(top_class), scores_by_class.get(top_class, [])

def _pass_threshold(self, scores: list[float], threshold: float) -> bool:
if scores:
return max(scores) > threshold
else:
return False


class DecisionLayer:
index = None
categories = None
similarity_threshold = 0.82
score_threshold = 0.82

def __init__(self, encoder: BaseEncoder, decisions: list[Decision] = []):
def __init__(self, encoder: BaseRetriever, decisions: list[Decision] = []):
self.encoder = encoder
# decide on default threshold based on encoder
if isinstance(encoder, OpenAIEncoder):
self.similarity_threshold = 0.82
elif isinstance(encoder, CohereEncoder):
self.similarity_threshold = 0.3
if isinstance(encoder, OpenAIRetriever):
self.score_threshold = 0.82
elif isinstance(encoder, CohereRetriever):
self.score_threshold = 0.3
else:
self.similarity_threshold = 0.82
self.score_threshold = 0.82
# if decisions list has been passed, we initialize index now
if decisions:
# initialize index now
Expand All @@ -28,7 +128,7 @@ def __init__(self, encoder: BaseEncoder, decisions: list[Decision] = []):
def __call__(self, text: str) -> str | None:
results = self._query(text)
top_class, top_class_scores = self._semantic_classify(results)
passed = self._pass_threshold(top_class_scores, self.similarity_threshold)
passed = self._pass_threshold(top_class_scores, self.score_threshold)
if passed:
return top_class
else:
Expand Down Expand Up @@ -102,3 +202,131 @@ def _pass_threshold(self, scores: list[float], threshold: float) -> bool:
return max(scores) > threshold
else:
return False


class HybridDecisionLayer:
index = None
categories = None
score_threshold = 0.82

def __init__(
self,
encoder: BaseRetriever,
decisions: list[Decision] = [],
alpha: float = 0.3
):
self.encoder = encoder
self.sparse_encoder = BM25Retriever()
# decide on default threshold based on encoder
if isinstance(encoder, OpenAIRetriever):
self.score_threshold = 0.82
elif isinstance(encoder, CohereRetriever):
self.score_threshold = 0.3
else:
self.score_threshold = 0.82
# if decisions list has been passed, we initialize index now
if decisions:
# initialize index now
for decision in decisions:
self._add_decision(decision=decision)

def __call__(self, text: str) -> str | None:
results = self._query(text)
top_class, top_class_scores = self._semantic_classify(results)
passed = self._pass_threshold(top_class_scores, self.score_threshold)
if passed:
return top_class
else:
return None

def add(self, decision: Decision):
self._add_decision(decision=decision)

def _add_decision(self, decision: Decision):
# create embeddings
dense_embeds = self.encoder(decision.utterances)
sparse_embeds = self.sparse_encoder(decision.utterances)
# concatenate vectors to create hybrid vecs
embeds = np.concatenate([
dense_embeds, sparse_embeds
], axis=1)

# create decision array
if self.categories is None:
self.categories = np.array([decision.name] * len(embeds))
self.utterances = np.array(decision.utterances)
else:
str_arr = np.array([decision.name] * len(embeds))
self.categories = np.concatenate([self.categories, str_arr])
self.utterances = np.concatenate([
self.utterances,
np.array(decision.utterances)
])
# create utterance array (the index)
if self.index is None:
self.index = np.array(embeds)
else:
embed_arr = np.array(embeds)
self.index = np.concatenate([self.index, embed_arr])

def _query(self, text: str, top_k: int = 5):
"""Given some text, encodes and searches the index vector space to
retrieve the top_k most similar records.
"""
# create dense query vector
xq_d = np.array(self.encoder([text]))
xq_d = np.squeeze(xq_d) # Reduce to 1d array.
# create sparse query vector
xq_s = np.array(self.sparse_encoder([text]))
xq_s = np.squeeze(xq_s)
# convex scaling
xq_d, xq_s = self._convex_scaling(xq_d, xq_s)
# concatenate to create single hybrid vec
xq = np.concatenate([xq_d, xq_s], axis=1)

if self.index is not None:
index_norm = norm(self.index, axis=1)
xq_norm = norm(xq.T)
sim = np.dot(self.index, xq.T) / (index_norm * xq_norm)
# get indices of top_k records
top_k = min(top_k, sim.shape[0])
idx = np.argpartition(sim, -top_k)[-top_k:]
scores = sim[idx]
# get the utterance categories (decision names)
decisions = self.categories[idx] if self.categories is not None else []
return [
{"decision": d, "score": s.item()} for d, s in zip(decisions, scores)
]
else:
return []

def _convex_scaling(self, dense: list[float], sparse: list[float]):
# scale sparse and dense vecs
dense = dense * self.alpha
sparse = sparse * (1 - self.alpha)
return dense, sparse

def _semantic_classify(self, query_results: list[dict]) -> tuple[str, list[float]]:
scores_by_class = {}
for result in query_results:
score = result["score"]
decision = result["decision"]
if decision in scores_by_class:
scores_by_class[decision].append(score)
else:
scores_by_class[decision] = [score]

# Calculate total score for each class
total_scores = {
decision: sum(scores) for decision, scores in scores_by_class.items()
}
top_class = max(total_scores, key=lambda x: total_scores[x], default=None)

# Return the top class and its associated scores
return str(top_class), scores_by_class.get(top_class, [])

def _pass_threshold(self, scores: list[float], threshold: float) -> bool:
if scores:
return max(scores) > threshold
else:
return False
Empty file.
18 changes: 18 additions & 0 deletions semantic_router/matchers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from pydantic import BaseModel

from semantic_router.retrievers import BaseRetriever
from semantic_router.rankers import BaseRanker
from semantic_router.schema import Decision


class BaseMatcher(BaseModel):
retriever: BaseRetriever | None
ranker: BaseRanker | None
top_k: int | None
top_n: int | None

class Config:
arbitrary_types_allowed = True

def __call__(self, query: str, decisions: list[Decision]) -> str:
raise NotImplementedError("Subclasses must implement this method")
1 change: 1 addition & 0 deletions semantic_router/matchers/ranker_only.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from semantic_router import rankers
59 changes: 59 additions & 0 deletions semantic_router/matchers/two_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import numpy as np

from semantic_router.rankers import (
BaseRanker,
CohereRanker
)
from semantic_router.retrievers import (
BaseRetriever,
CohereRetriever
)
from semantic_router.matchers import BaseMatcher
from semantic_router.schema import Decision


class TwoStageMatcher(BaseMatcher):
def __init__(
self,
retriever: BaseRetriever | None,
ranker: BaseRanker | None,
top_k: int = 25,
top_n: int = 5
):
super().__init__(
retriever=retriever, ranker=ranker, top_k=top_k, top_n=top_n
)
if retriever is None:
retriever = CohereRetriever(
name="embed-english-v3.0",
top_k=top_k
)
if ranker is None:
ranker = CohereRanker(
name="rerank-english-v2.0",
top_n=top_n
)

def __call__(self, query: str, decisions: list[Decision]) -> str:
pass

def add(self, decision: Decision):
self._add_decision(decision=decision)

def _add_decision(self, decision: Decision):
# create embeddings for first stage
embeds = self.retriever(decision.utterances)
# create a decision array for decision categories
if self.categories is None:
self.categories = np.array([decision.name] * len(embeds))
else:
str_arr = np.array([decision.name] * len(embeds))
self.categories = np.concatenate([self.categories, str_arr])
# create utterance array (the index)
if self.index is None:
self.index = np.array(embeds)
else:
embed_arr = np.array(embeds)
self.index = np.concatenate([self.index, embed_arr])


Empty file.
12 changes: 12 additions & 0 deletions semantic_router/rankers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pydantic import BaseModel


class BaseRanker(BaseModel):
name: str
top_n: int = 5

class Config:
arbitrary_types_allowed = True

def __call__(self, query: str, docs: list[str]) -> list[str]:
raise NotImplementedError("Subclasses must implement this method")
Loading

0 comments on commit 30a50b4

Please sign in to comment.