Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added euclidean metric to basic backend #29

Merged
merged 5 commits into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 138 additions & 66 deletions vicinity/backends/basic.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from typing import Any, Literal

import numpy as np
from numpy import typing as npt
Expand All @@ -13,17 +14,17 @@


@dataclass
class BasicArgs(BaseArgs): ...
class BasicArgs(BaseArgs):
metric: Literal["cosine", "euclidean"] = "cosine"


class BasicBackend(AbstractBackend[BasicArgs]):
class BasicBackend(AbstractBackend[BasicArgs], ABC):
argument_class = BasicArgs
_vectors: npt.NDArray

def __init__(self, vectors: npt.NDArray, arguments: BasicArgs) -> None:
"""Initialize the backend using vectors."""
def __init__(self, arguments: BasicArgs) -> None:
"""Initialize the backend."""
super().__init__(arguments)
self._vectors = vectors
self._norm_vectors: npt.NDArray | None = None

def __len__(self) -> int:
"""Get the number of vectors."""
Expand All @@ -34,26 +35,6 @@ def backend_type(self) -> Backend:
"""The type of the backend."""
return Backend.BASIC

@classmethod
def from_vectors(cls: type[BasicBackend], vectors: npt.NDArray, **kwargs: Any) -> BasicBackend:
"""Create a new instance from vectors."""
return cls(vectors, BasicArgs())

@classmethod
def load(cls: type[BasicBackend], folder: Path) -> BasicBackend:
"""Load the vectors from a path."""
path = folder / "vectors.npy"
arguments = BasicArgs.load(folder / "arguments.json")
with open(path, "rb") as f:
return cls(np.load(f), arguments)

def save(self, folder: Path) -> None:
"""Save the vectors to a path."""
path = Path(folder) / "vectors.npy"
self.arguments.dump(folder / "arguments.json")
with open(path, "wb") as f:
np.save(f, self._vectors)

@property
def dim(self) -> int:
"""The size of the space."""
Expand All @@ -66,36 +47,72 @@ def vectors(self) -> npt.NDArray:

@vectors.setter
def vectors(self, x: Matrix) -> None:
"""Set the vectors."""
matrix = np.asarray(x)
if not np.ndim(matrix) == 2:
if np.ndim(matrix) != 2:
raise ValueError(f"Your array does not have 2 dimensions: {np.ndim(matrix)}")
self._vectors = matrix
# Make sure norm vectors is updated.
if self._norm_vectors is not None:
self._norm_vectors = normalize_or_copy(matrix)
self._update_precomputed_data()

@property
def norm_vectors(self) -> npt.NDArray:
"""
Vectors, but normalized to unit length.
@abstractmethod
def _update_precomputed_data(self) -> None:
"""Update precomputed data based on the metric."""
raise NotImplementedError()

NOTE: when all vectors are unit length, this attribute _is_ vectors.
"""
if self._norm_vectors is None:
self._norm_vectors = normalize_or_copy(self.vectors)
return self._norm_vectors
@abstractmethod
def _dist(self, x: npt.NDArray) -> npt.NDArray:
"""Compute distances between x and self._vectors based on the metric."""
raise NotImplementedError()

@classmethod
def from_vectors(cls, vectors: npt.NDArray, **kwargs: Any) -> BasicBackend:
"""Create a new instance from vectors."""
arguments = BasicArgs(**kwargs)
if arguments.metric == "cosine":
return CosineBasicBackend(vectors, arguments)
elif arguments.metric == "euclidean":
return EuclideanBasicBackend(vectors, arguments)
else:
raise ValueError(f"Unsupported metric: {arguments.metric}")

@classmethod
def load(cls, folder: Path) -> BasicBackend:
"""Load the vectors from a path."""
path = folder / "vectors.npy"
arguments = BasicArgs.load(folder / "arguments.json")
with open(path, "rb") as f:
vectors = np.load(f)
if arguments.metric == "cosine":
return CosineBasicBackend(vectors, arguments)
elif arguments.metric == "euclidean":
return EuclideanBasicBackend(vectors, arguments)
else:
raise ValueError(f"Unsupported metric: {arguments.metric}")

def save(self, folder: Path) -> None:
"""Save the vectors to a path."""
path = folder / "vectors.npy"
self.arguments.dump(folder / "arguments.json")
with open(path, "wb") as f:
np.save(f, self._vectors)

def threshold(
self,
vectors: npt.NDArray,
threshold: float,
) -> list[npt.NDArray]:
"""Batched cosine similarity."""
"""
Batched distance thresholding.

:param vectors: The vectors to threshold.
:param threshold: The threshold to use.
:return: A list of lists of indices of vectors that are below the threshold
"""
out: list[npt.NDArray] = []
for i in range(0, len(vectors), 1024):
batch = vectors[i : i + 1024]
distances = self._dist(batch, self.norm_vectors)
for _, sims in enumerate(distances):
distances = self._dist(batch)
for sims in distances:
Pringled marked this conversation as resolved.
Show resolved Hide resolved
indices = np.flatnonzero(sims <= threshold)
sorted_indices = indices[np.argsort(sims[indices])]
out.append(sorted_indices)
Expand All @@ -107,43 +124,98 @@ def query(
vectors: npt.NDArray,
k: int,
) -> QueryResult:
"""Batched cosine distance."""
"""
Batched distance query.

:param vectors: The vectors to query.
:param k: The number of nearest neighbors to return.
:return: A list of tuples with the indices and distances.
:raises ValueError: If k is less than 1.
"""
if k < 1:
raise ValueError("num should be >= 1, is now {num}")
raise ValueError(f"k should be >= 1, is now {k}")

out: QueryResult = []
num_vectors = len(self.vectors)
effective_k = min(k, num_vectors)

# Batch the queries
for index in range(0, len(vectors), 1024):
batch = vectors[index : index + 1024]
distances = self._dist(batch, self.norm_vectors)
if k == 1:
sorted_indices = np.argmin(distances, 1, keepdims=True)
elif k >= len(self.vectors):
# If we want more than we have, just sort everything.
sorted_indices = np.stack([np.arange(len(self.vectors))] * len(vectors))
else:
sorted_indices = np.argpartition(distances, kth=k, axis=1)
sorted_indices = sorted_indices[:, :k]
for lidx, indices in enumerate(sorted_indices):
dists_for_word = distances[lidx, indices]
word_index = np.argsort(dists_for_word)
i = indices[word_index]
d = dists_for_word[word_index]
out.append((i, d))
distances = self._dist(batch)

return out
# Efficiently get the k smallest distances
indices = np.argpartition(distances, kth=effective_k - 1, axis=1)[:, :effective_k]
sorted_indices = np.take_along_axis(
indices, np.argsort(np.take_along_axis(distances, indices, axis=1)), axis=1
)
sorted_distances = np.take_along_axis(distances, sorted_indices, axis=1)

@classmethod
def _dist(cls, x: npt.NDArray, y: npt.NDArray) -> npt.NDArray:
"""Cosine distance function. This assumes y is normalized."""
sim = normalize(x).dot(y.T)
# Extend the output with tuples of (indices, distances)
out.extend(zip(sorted_indices, sorted_distances))

return 1 - sim
return out

def insert(self, vectors: npt.NDArray) -> None:
"""Insert vectors into the vector space."""
self._vectors = np.vstack([self._vectors, vectors])
self._update_precomputed_data()

def delete(self, indices: list[int]) -> None:
"""Deletes specific indices from the vector space."""
self._vectors = np.delete(self._vectors, indices, axis=0)
self._update_precomputed_data()


class CosineBasicBackend(BasicBackend):
def __init__(self, vectors: npt.NDArray, arguments: BasicArgs) -> None:
"""Initialize the cosine basic backend."""
super().__init__(arguments)
self._vectors = vectors
Pringled marked this conversation as resolved.
Show resolved Hide resolved
self._norm_vectors: npt.NDArray | None = None
self._update_precomputed_data()

def _update_precomputed_data(self) -> None:
"""Update precomputed data for cosine similarity."""
self._norm_vectors = normalize_or_copy(self._vectors)

@property
def norm_vectors(self) -> npt.NDArray:
"""Return normalized vectors."""
if self._norm_vectors is None:
self._norm_vectors = normalize_or_copy(self._vectors)
return self._norm_vectors

def _dist(self, x: npt.NDArray) -> npt.NDArray:
"""Compute cosine distance."""
x_norm = normalize(x)
sim = x_norm.dot(self.norm_vectors.T)
return 1 - sim


class EuclideanBasicBackend(BasicBackend):
def __init__(self, vectors: npt.NDArray, arguments: BasicArgs) -> None:
"""Initialize the Euclidean basic backend."""
super().__init__(arguments)
self._vectors = vectors
self._squared_norm_vectors: npt.NDArray | None = None
self._update_precomputed_data()

def _update_precomputed_data(self) -> None:
"""Update precomputed data for Euclidean distance."""
self._squared_norm_vectors = (self._vectors**2).sum(1)

@property
def squared_norm_vectors(self) -> npt.NDArray:
"""Return squared norms of vectors."""
if self._squared_norm_vectors is None:
self._squared_norm_vectors = (self._vectors**2).sum(1)
return self._squared_norm_vectors

def _dist(self, x: npt.NDArray) -> npt.NDArray:
"""Compute Euclidean distance."""
x_norm = (x**2).sum(1)
dists_squared = (x_norm[:, None] + self.squared_norm_vectors[None, :]) - 2 * (x @ self._vectors.T)
# Ensure non-negative distances
dists_squared = np.clip(dists_squared, 0, None)
return np.sqrt(dists_squared)
1 change: 1 addition & 0 deletions vicinity/vicinity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import logging
import time
from io import open
from pathlib import Path
from typing import Any, Sequence, Union
Expand Down