Skip to content

Commit

Permalink
Merge branch 'main' into trayan-12-13-fix_delete_collection_resource_…
Browse files Browse the repository at this point in the history
…leak
  • Loading branch information
tazarov authored Jan 9, 2025
2 parents 9fd2b3c + d50a942 commit 2b21771
Show file tree
Hide file tree
Showing 20 changed files with 241 additions and 80 deletions.
4 changes: 2 additions & 2 deletions chromadb/api/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class HNSWConfigurationInternal(ConfigurationInternal):
name="ef_search",
validator=lambda value: isinstance(value, int) and value >= 1,
is_static=False,
default_value=10,
default_value=100,
),
"num_threads": ConfigurationDefinition(
name="num_threads",
Expand Down Expand Up @@ -328,7 +328,7 @@ def __init__(
self,
space: str = "l2",
ef_construction: int = 100,
ef_search: int = 10,
ef_search: int = 100,
num_threads: int = cpu_count(),
M: int = 16,
resize_factor: float = 1.2,
Expand Down
3 changes: 3 additions & 0 deletions chromadb/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,7 @@ def name(cls) -> str:
"VersionMismatchError": VersionMismatchError,
"RateLimitError": RateLimitError,
"AuthError": ChromaAuthError,
"UniqueConstraintError": UniqueConstraintError,
"QuotaError": QuotaError,
"InternalError": InternalError,
}
6 changes: 1 addition & 5 deletions chromadb/execution/executor/distributed.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
from typing import Dict, Optional

import grpc
from overrides import overrides

from chromadb.api.types import GetResult, Metadata, QueryResult
from chromadb.config import System
from chromadb.errors import VersionMismatchError
from chromadb.execution.executor.abstract import Executor
from chromadb.execution.expression.operator import Scan
from chromadb.execution.expression.plan import CountPlan, GetPlan, KNNPlan
from chromadb.proto import convert

from chromadb.proto.query_executor_pb2_grpc import QueryExecutorStub
from chromadb.proto.utils import RetryOnRpcErrorClientInterceptor
from chromadb.segment.impl.manager.distributed import DistributedSegmentManager
Expand Down Expand Up @@ -170,6 +166,6 @@ def _grpc_executuor_stub(self, scan: Scan) -> QueryExecutorStub:
channel = grpc.insecure_channel(grpc_url)
interceptors = [OtelInterceptor(), RetryOnRpcErrorClientInterceptor()]
channel = grpc.intercept_channel(channel, *interceptors)
self._grpc_stub_pool[grpc_url] = QueryExecutorStub(channel) # type: ignore[no-untyped-call]
self._grpc_stub_pool[grpc_url] = QueryExecutorStub(channel)

return self._grpc_stub_pool[grpc_url]
9 changes: 8 additions & 1 deletion chromadb/segment/distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, Callable, List

from overrides import EnforceOverrides, overrides
Expand All @@ -22,7 +23,13 @@ def register_updated_segment_callback(
pass


Memberlist = List[str]
@dataclass
class Member:
id: str
ip: str


Memberlist = List[Member]


class MemberlistProvider(Component, EnforceOverrides):
Expand Down
36 changes: 30 additions & 6 deletions chromadb/segment/impl/distributed/segment_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from chromadb.config import System
from chromadb.segment.distributed import (
Member,
Memberlist,
MemberlistProvider,
SegmentDirectory,
Expand Down Expand Up @@ -35,7 +36,11 @@ class MockMemberlistProvider(MemberlistProvider, EnforceOverrides):

def __init__(self, system: System):
super().__init__(system)
self._memberlist = ["a", "b", "c"]
self._memberlist = [
Member(id="a", ip="10.0.0.1"),
Member(id="b", ip="10.0.0.2"),
Member(id="c", ip="10.0.0.3"),
]

@override
def get_memberlist(self) -> Memberlist:
Expand Down Expand Up @@ -203,7 +208,12 @@ def _parse_response_memberlist(
) -> Memberlist:
if "members" not in api_response_spec:
return []
return [m["member_id"] for m in api_response_spec["members"]]
parsed = []
for m in api_response_spec["members"]:
id = m["member_id"]
ip = m["member_ip"] if "member_ip" in m else ""
parsed.append(Member(id=id, ip=ip))
return parsed

def _notify(self, memberlist: Memberlist) -> None:
for callback in self.callbacks:
Expand Down Expand Up @@ -245,11 +255,23 @@ def get_segment_endpoint(self, segment: Segment) -> str:
raise ValueError("Memberlist is not initialized")
# Query to the same collection should end up on the same endpoint
assignment = assign(
segment["collection"].hex, self._curr_memberlist, murmur3hasher, 1
segment["collection"].hex,
[m.id for m in self._curr_memberlist],
murmur3hasher,
1,
)[0]
service_name = self.extract_service_name(assignment)
assignment = f"{assignment}.{service_name}.{KUBERNETES_NAMESPACE}.{HEADLESS_SERVICE}:50051" # TODO: make port configurable
return assignment

# If the memberlist has an ip, use it, otherwise use the member id with the headless service
# this is for backwards compatibility with the old memberlist which only had ids
for member in self._curr_memberlist:
if member.id == assignment:
if member.ip is not None and member.ip != "":
endpoint = f"{member.ip}:50051"
return endpoint

endpoint = f"{assignment}.{service_name}.{KUBERNETES_NAMESPACE}.{HEADLESS_SERVICE}:50051" # TODO: make port configurable
return endpoint

@override
def register_updated_segment_callback(
Expand All @@ -263,7 +285,9 @@ def register_updated_segment_callback(
)
def _update_memberlist(self, memberlist: Memberlist) -> None:
with self._curr_memberlist_mutex:
add_attributes_to_current_span({"new_memberlist": memberlist})
add_attributes_to_current_span(
{"new_memberlist": [m.id for m in memberlist]}
)
self._curr_memberlist = memberlist

def extract_service_name(self, pod_name: str) -> Optional[str]:
Expand Down
5 changes: 0 additions & 5 deletions chromadb/segment/impl/manager/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@
from chromadb.segment.distributed import SegmentDirectory
from chromadb.segment.impl.vector.hnsw_params import PersistentHnswParams
from chromadb.telemetry.opentelemetry import (
OpenTelemetryClient,
OpenTelemetryGranularity,
trace_method,
)
from chromadb.types import (
Collection,
CollectionAndSegments,
Operation,
Segment,
SegmentScope,
Expand All @@ -30,18 +28,15 @@
class DistributedSegmentManager(SegmentManager):
_sysdb: SysDB
_system: System
_opentelemetry_client: OpenTelemetryClient
_instances: Dict[UUID, SegmentImplementation]
_segment_directory: SegmentDirectory
_lock: Lock
# _segment_server_stubs: Dict[str, SegmentServerStub] # grpc_url -> grpc stub

def __init__(self, system: System):
super().__init__(system)
self._sysdb = self.require(SysDB)
self._segment_directory = self.require(SegmentDirectory)
self._system = system
self._opentelemetry_client = system.require(OpenTelemetryClient)
self._instances = {}
self._lock = Lock()

Expand Down
2 changes: 1 addition & 1 deletion chromadb/segment/impl/vector/hnsw_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(self, metadata: Metadata):
metadata = metadata or {}
self.space = str(metadata.get("hnsw:space", "l2"))
self.construction_ef = int(metadata.get("hnsw:construction_ef", 100))
self.search_ef = int(metadata.get("hnsw:search_ef", 10))
self.search_ef = int(metadata.get("hnsw:search_ef", 100))
self.M = int(metadata.get("hnsw:M", 16))
self.num_threads = int(
metadata.get("hnsw:num_threads", multiprocessing.cpu_count())
Expand Down
Loading

0 comments on commit 2b21771

Please sign in to comment.