Skip to content

Commit

Permalink
[ENH] Add node name to memberlist (#3450)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
   - Adds node name to the memberlist to allow for node-level routing
 - New functionality
   - None

## Test plan
*How are these changes tested?*
Existing tests are modified
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
HammadB authored Jan 15, 2025
1 parent f564b0b commit 23d99d2
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 42 deletions.
4 changes: 3 additions & 1 deletion chromadb/execution/executor/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ def _grpc_executor_stub(self, scan: Scan) -> QueryExecutorStub:
grpc_url = self._manager.get_endpoint(scan.record)
with self._mtx:
if grpc_url not in self._grpc_stub_pool:
channel = grpc.insecure_channel(grpc_url, options=[("grpc.max_concurrent_streams", 1000)])
channel = grpc.insecure_channel(
grpc_url, options=[("grpc.max_concurrent_streams", 1000)]
)
interceptors = [OtelInterceptor(), RetryOnRpcErrorClientInterceptor()]
channel = grpc.intercept_channel(channel, *interceptors)
self._grpc_stub_pool[grpc_url] = QueryExecutorStub(channel)
Expand Down
1 change: 1 addition & 0 deletions chromadb/segment/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def register_updated_segment_callback(
class Member:
id: str
ip: str
node: str


Memberlist = List[Member]
Expand Down
9 changes: 5 additions & 4 deletions chromadb/segment/impl/distributed/segment_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ class MockMemberlistProvider(MemberlistProvider, EnforceOverrides):
def __init__(self, system: System):
super().__init__(system)
self._memberlist = [
Member(id="a", ip="10.0.0.1"),
Member(id="b", ip="10.0.0.2"),
Member(id="c", ip="10.0.0.3"),
Member(id="a", ip="10.0.0.1", node="node1"),
Member(id="b", ip="10.0.0.2", node="node2"),
Member(id="c", ip="10.0.0.3", node="node3"),
]

@override
Expand Down Expand Up @@ -212,7 +212,8 @@ def _parse_response_memberlist(
for m in api_response_spec["members"]:
id = m["member_id"]
ip = m["member_ip"] if "member_ip" in m else ""
parsed.append(Member(id=id, ip=ip))
node = m["member_node_name"] if "member_node_name" in m else ""
parsed.append(Member(id=id, ip=ip, node=node))
return parsed

def _notify(self, memberlist: Memberlist) -> None:
Expand Down
12 changes: 10 additions & 2 deletions chromadb/test/segment/distributed/test_memberlist_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ def update_memberlist(n: int, memberlist_name: str = "test-memberlist") -> Membe
config.load_config()
api_instance = client.CustomObjectsApi()

members = [Member(id=f"test-{i}", ip=f"10.0.0.{i}") for i in range(1, n + 1)]
members = [
Member(id=f"test-{i}", ip=f"10.0.0.{i}", node="node-{i}")
for i in range(1, n + 1)
]

body = {
"kind": "MemberList",
"metadata": {"name": memberlist_name},
"spec": {"members": [{"member_id": m.id, "member_ip": m.ip} for m in members]},
"spec": {
"members": [
{"member_id": m.id, "member_ip": m.ip, "member_node_name": m.node}
for m in members
]
},
}

_ = api_instance.patch_namespaced_custom_object(
Expand Down
30 changes: 14 additions & 16 deletions go/pkg/memberlist_manager/memberlist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package memberlist_manager
import (
"context"
"time"
"sort"

"github.com/chroma-core/chroma/go/pkg/common"
"github.com/pingcap/log"
Expand Down Expand Up @@ -128,26 +129,23 @@ func memberlistSame(oldMemberlist Memberlist, newMemberlist Memberlist) bool {
if len(oldMemberlist) != len(newMemberlist) {
return false
}
oldMemberlistIps := make(map[string]string)
for _, member := range oldMemberlist {
oldMemberlistIps[member.id] = member.ip
}
for _, member := range newMemberlist {
if ip, ok := oldMemberlistIps[member.id]; !ok || ip != member.ip {
return false
}
}

// use a map to check if the new memberlist contains all the old members
newMemberlistMap := make(map[string]bool)
for _, member := range newMemberlist {
newMemberlistMap[member.id] = true
}
for _, member := range oldMemberlist {
if _, ok := newMemberlistMap[member.id]; !ok {
// make a copy of the slices to avoid modifying the original
oldMemberlistClone := make(Memberlist, len(oldMemberlist))
newMemberlistClone := make(Memberlist, len(newMemberlist))
copy(oldMemberlistClone, oldMemberlist)
copy(newMemberlistClone, newMemberlist)

// sort the slices to ensure that the order of the elements does not matter
sort.Sort(oldMemberlistClone)
sort.Sort(newMemberlistClone)

for i := range oldMemberlistClone {
if oldMemberlistClone[i] != newMemberlistClone[i] {
return false
}
}

return true
}

Expand Down
55 changes: 39 additions & 16 deletions go/pkg/memberlist_manager/memberlist_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func TestNodeWatcher(t *testing.T) {
},
},
},
Spec: v1.PodSpec{
NodeName: "test-node-0",
},
}, metav1.CreateOptions{})

// Get the status of the node
Expand All @@ -52,7 +55,7 @@ func TestNodeWatcher(t *testing.T) {
t.Fatalf("Error getting node status: %v", err)
}

return reflect.DeepEqual(memberlist, Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1"}})
return reflect.DeepEqual(memberlist, Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}})
}, 10, 1*time.Second)
if !ok {
t.Fatalf("Node status did not update after adding a pod")
Expand Down Expand Up @@ -83,7 +86,7 @@ func TestNodeWatcher(t *testing.T) {
if err != nil {
t.Fatalf("Error getting node status: %v", err)
}
return reflect.DeepEqual(memberlist, Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1"}})
return reflect.DeepEqual(memberlist, Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}})
}, 10, 1*time.Second)
if !ok {
t.Fatalf("Node status did not update after adding a not ready pod")
Expand All @@ -108,18 +111,18 @@ func TestMemberlistStore(t *testing.T) {
assert.Equal(t, Memberlist{}, memberlist)

// Add a member to the memberlist
memberlist_store.UpdateMemberlist(context.Background(), Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1"}, Member{id: "test-pod-1", ip: "10.0.0.2"}}, "0")
memberlist_store.UpdateMemberlist(context.Background(), Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}, Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}}, "0")
memberlist, _, err = memberlist_store.GetMemberlist(context.Background())
if err != nil {
t.Fatalf("Error getting memberlist: %v", err)
}
// assert the memberlist has the correct members
if !memberlistSame(memberlist, Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1"}, Member{id: "test-pod-1", ip: "10.0.0.2"}}) {
if !memberlistSame(memberlist, Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}, Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}}) {
t.Fatalf("Memberlist did not update after adding a member")
}
}

func createFakePod(memberId string, podIp string, clientset kubernetes.Interface) {
func createFakePod(memberId string, podIp string, node string, clientset kubernetes.Interface) {
clientset.CoreV1().Pods("chroma").Create(context.Background(), &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: memberId,
Expand All @@ -137,6 +140,9 @@ func createFakePod(memberId string, podIp string, clientset kubernetes.Interface
},
},
},
Spec: v1.PodSpec{
NodeName: node,
},
}, metav1.CreateOptions{})
}

Expand Down Expand Up @@ -180,22 +186,22 @@ func TestMemberlistManager(t *testing.T) {
}

// Add a ready pod
createFakePod("test-pod-0", "10.0.0.49", clientset)
createFakePod("test-pod-0", "10.0.0.49", "test-node-0", clientset)

// Get the memberlist
ok := retryUntilCondition(func() bool {
return getMemberlistAndCompare(t, memberlistStore, Memberlist{Member{id: "test-pod-0", ip: "10.0.0.49"}})
return getMemberlistAndCompare(t, memberlistStore, Memberlist{Member{id: "test-pod-0", ip: "10.0.0.49", node: "test-node-0"}})
}, 30, 1*time.Second)
if !ok {
t.Fatalf("Memberlist did not update after adding a pod")
}

// Add another ready pod
createFakePod("test-pod-1", "10.0.0.50", clientset)
createFakePod("test-pod-1", "10.0.0.50", "test-node-1", clientset)

// Get the memberlist
ok = retryUntilCondition(func() bool {
return getMemberlistAndCompare(t, memberlistStore, Memberlist{Member{id: "test-pod-0", ip: "10.0.0.49"}, Member{id: "test-pod-1", ip: "10.0.0.50"}})
return getMemberlistAndCompare(t, memberlistStore, Memberlist{Member{id: "test-pod-0", ip: "10.0.0.49", node: "test-node-0"}, Member{id: "test-pod-1", ip: "10.0.0.50", node: "test-node-1"}})
}, 30, 1*time.Second)
if !ok {
t.Fatalf("Memberlist did not update after adding a pod")
Expand All @@ -206,7 +212,7 @@ func TestMemberlistManager(t *testing.T) {

// Get the memberlist
ok = retryUntilCondition(func() bool {
return getMemberlistAndCompare(t, memberlistStore, Memberlist{Member{id: "test-pod-1", ip: "10.0.0.50"}})
return getMemberlistAndCompare(t, memberlistStore, Memberlist{Member{id: "test-pod-1", ip: "10.0.0.50", node: "test-node-1"}})
}, 30, 1*time.Second)
if !ok {
t.Fatalf("Memberlist did not update after deleting a pod")
Expand All @@ -217,25 +223,42 @@ func TestMemberlistSame(t *testing.T) {
memberlist := Memberlist{}
assert.True(t, memberlistSame(memberlist, memberlist))

newMemberlist := Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1"}}
newMemberlist := Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}}
assert.False(t, memberlistSame(memberlist, newMemberlist))
assert.False(t, memberlistSame(newMemberlist, memberlist))
assert.True(t, memberlistSame(newMemberlist, newMemberlist))

memberlist = Memberlist{Member{id: "test-pod-1", ip: "10.0.0.2"}}
memberlist = Memberlist{Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}}
assert.False(t, memberlistSame(newMemberlist, memberlist))
assert.False(t, memberlistSame(memberlist, newMemberlist))
assert.True(t, memberlistSame(memberlist, memberlist))

memberlist = Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1"}, Member{id: "test-pod-1", ip: "10.0.0.2"}}
newMemberlist = Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1"}, Member{id: "test-pod-1", ip: "10.0.0.2"}}
memberlist = Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}, Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}}
newMemberlist = Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}, Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}}
assert.True(t, memberlistSame(memberlist, newMemberlist))
assert.True(t, memberlistSame(newMemberlist, memberlist))

memberlist = Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1"}, Member{id: "test-pod-1", ip: "10.0.0.2"}}
newMemberlist = Memberlist{Member{id: "test-pod-1", ip: "10.0.0.2"}, Member{id: "test-pod-0", ip: "10.0.0.1"}}
memberlist = Memberlist{Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}, Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}}
newMemberlist = Memberlist{Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}, Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}}
assert.True(t, memberlistSame(memberlist, newMemberlist))
assert.True(t, memberlistSame(newMemberlist, memberlist))

memberlist = Memberlist{Member{id: "test-pod-0", ip: "10.0.0.2", node: "test-node-0"}, Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}}
newMemberlist = Memberlist{Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}, Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}}
assert.False(t, memberlistSame(memberlist, newMemberlist))
assert.False(t, memberlistSame(newMemberlist, memberlist))

// Just one ip wrong
memberlist = Memberlist{Member{id: "test-pod-0", ip: "10.0.0.2", node: "test-node-0"}, Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}}
newMemberlist = Memberlist{Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}, Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}}
assert.False(t, memberlistSame(memberlist, newMemberlist))
assert.False(t, memberlistSame(newMemberlist, memberlist))

// Just one node wrong
memberlist = Memberlist{Member{id: "test-pod-0", ip: "10.0.0.2", node: "test-node-2"}, Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}}
newMemberlist = Memberlist{Member{id: "test-pod-1", ip: "10.0.0.2", node: "test-node-1"}, Member{id: "test-pod-0", ip: "10.0.0.1", node: "test-node-0"}}
assert.False(t, memberlistSame(memberlist, newMemberlist))
assert.False(t, memberlistSame(newMemberlist, memberlist))
}

func retryUntilCondition(f func() bool, retry_count int, retry_interval time.Duration) bool {
Expand Down
15 changes: 14 additions & 1 deletion go/pkg/memberlist_manager/memberlist_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@ type IMemberlistStore interface {
type Member struct {
id string
ip string
node string
}

// MarshalLogObject implements the zapcore.ObjectMarshaler interface
func (m Member) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("id", m.id)
enc.AddString("ip", m.ip)
enc.AddString("node", m.node)
return nil
}

type Memberlist []Member

func (p Memberlist) Len() int { return len(p) }
func (p Memberlist) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p Memberlist) Less(i, j int) bool { return p[i].id < p[j].id }

// MarshalLogArray implements the zapcore.ArrayMarshaler interface
func (ml Memberlist) MarshalLogArray(enc zapcore.ArrayEncoder) error {
for _, member := range ml {
Expand Down Expand Up @@ -88,8 +94,14 @@ func (s *CRMemberlistStore) GetMemberlist(ctx context.Context) (return_memberlis
if !ok {
member_ip = ""
}
// If the member_node_name is in the CR, extract it, otherwise set it to empty string
// This is for backwards compatibility with older CRs that don't have member_node_name
member_node_name, ok := member_map["member_node_name"].(string)
if !ok {
member_node_name = ""
}

memberlist = append(memberlist, Member{member_id, member_ip})
memberlist = append(memberlist, Member{member_id, member_ip, member_node_name})
}
return memberlist, unstrucuted.GetResourceVersion(), nil
}
Expand Down Expand Up @@ -117,6 +129,7 @@ func (list Memberlist) toCr(namespace string, memberlistName string, resourceVer
members[i] = map[string]interface{}{
"member_id": member.id,
"member_ip": member.ip,
"member_node_name": member.node,
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/pkg/memberlist_manager/node_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (w *KubernetesWatcher) ListReadyMembers() (Memberlist, error) {
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodReady {
if condition.Status == v1.ConditionTrue {
memberlist = append(memberlist, Member{pod.Name, pod.Status.PodIP})
memberlist = append(memberlist, Member{pod.Name, pod.Status.PodIP, pod.Spec.NodeName})
}
break
}
Expand Down
2 changes: 1 addition & 1 deletion k8s/distributed-chroma/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ apiVersion: v2
name: distributed-chroma
description: A helm chart for distributed Chroma
type: application
version: 0.1.13
version: 0.1.14
appVersion: "0.4.24"
keywords:
- chroma
Expand Down
2 changes: 2 additions & 0 deletions k8s/distributed-chroma/crds/memberlist_crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ spec:
type: string
member_ip:
type: string
member_node_name:
type: string
scope: Namespaced
names:
plural: memberlists
Expand Down

0 comments on commit 23d99d2

Please sign in to comment.