Skip to content

Commit

Permalink
clean up meta cache
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Nov 8, 2024
1 parent 6088fb1 commit 615252c
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 408 deletions.
118 changes: 35 additions & 83 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
Expand Down Expand Up @@ -83,10 +82,9 @@ type Cache interface {
UpdateCredential(credInfo *internalpb.CredentialInfo)

GetPrivilegeInfo(ctx context.Context) []string
GetGroupPrivileges(groupName string) map[string]struct{}
GetUserRole(username string) []string
RefreshPolicyInfo(op typeutil.CacheOp) error
InitPolicyInfo(info []string, userRoles []string, privilegeGroups []*milvuspb.PrivilegeGroupInfo) error
InitPolicyInfo(info []string, userRoles []string)

RemoveDatabase(ctx context.Context, database string)
HasDatabase(ctx context.Context, database string) bool
Expand Down Expand Up @@ -342,19 +340,18 @@ type MetaCache struct {
rootCoord types.RootCoordClient
queryCoord types.QueryCoordClient

dbInfo map[string]*databaseInfo // database -> db_info
collInfo map[string]map[string]*collectionInfo // database -> collectionName -> collection_info
collLeader map[string]map[string]*shardLeaders // database -> collectionName -> collection_leaders
credMap map[string]*internalpb.CredentialInfo // cache for credential, lazy load
privilegeInfos map[string]struct{} // privileges cache
privilegeGroups map[string]map[string]struct{} // privilege group -> privileges
userToRoles map[string]map[string]struct{} // user to role cache
mu sync.RWMutex
credMut sync.RWMutex
leaderMut sync.RWMutex
shardMgr shardClientMgr
sfGlobal conc.Singleflight[*collectionInfo]
sfDB conc.Singleflight[*databaseInfo]
dbInfo map[string]*databaseInfo // database -> db_info
collInfo map[string]map[string]*collectionInfo // database -> collectionName -> collection_info
collLeader map[string]map[string]*shardLeaders // database -> collectionName -> collection_leaders
credMap map[string]*internalpb.CredentialInfo // cache for credential, lazy load
privilegeInfos map[string]struct{} // privileges cache
userToRoles map[string]map[string]struct{} // user to role cache
mu sync.RWMutex
credMut sync.RWMutex
leaderMut sync.RWMutex
shardMgr shardClientMgr
sfGlobal conc.Singleflight[*collectionInfo]
sfDB conc.Singleflight[*databaseInfo]

IDStart int64
IDCount int64
Expand All @@ -379,27 +376,23 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoordClient, queryCo
log.Error("fail to init meta cache", zap.Error(err))
return err
}
err = globalMetaCache.InitPolicyInfo(resp.PolicyInfos, resp.UserRoles, resp.PrivilegeGroups)
if err != nil {
return err
}
globalMetaCache.InitPolicyInfo(resp.PolicyInfos, resp.UserRoles)
log.Info("success to init meta cache", zap.Strings("policy_infos", resp.PolicyInfos))
return nil
}

// NewMetaCache creates a MetaCache with provided RootCoord and QueryNode
func NewMetaCache(rootCoord types.RootCoordClient, queryCoord types.QueryCoordClient, shardMgr shardClientMgr) (*MetaCache, error) {
return &MetaCache{
rootCoord: rootCoord,
queryCoord: queryCoord,
dbInfo: map[string]*databaseInfo{},
collInfo: map[string]map[string]*collectionInfo{},
collLeader: map[string]map[string]*shardLeaders{},
credMap: map[string]*internalpb.CredentialInfo{},
shardMgr: shardMgr,
privilegeInfos: map[string]struct{}{},
privilegeGroups: map[string]map[string]struct{}{},
userToRoles: map[string]map[string]struct{}{},
rootCoord: rootCoord,
queryCoord: queryCoord,
dbInfo: map[string]*databaseInfo{},
collInfo: map[string]map[string]*collectionInfo{},
collLeader: map[string]map[string]*shardLeaders{},
credMap: map[string]*internalpb.CredentialInfo{},
shardMgr: shardMgr,
privilegeInfos: map[string]struct{}{},
userToRoles: map[string]map[string]struct{}{},
}, nil
}

Expand Down Expand Up @@ -1069,7 +1062,7 @@ func (m *MetaCache) InvalidateShardLeaderCache(collections []int64) {
}
}

func (m *MetaCache) InitPolicyInfo(info []string, userRoles []string, privilegeGroups []*milvuspb.PrivilegeGroupInfo) error {
func (m *MetaCache) InitPolicyInfo(info []string, userRoles []string) {
defer func() {
err := getEnforcer().LoadPolicy()
if err != nil {
Expand All @@ -1078,13 +1071,10 @@ func (m *MetaCache) InitPolicyInfo(info []string, userRoles []string, privilegeG
}()
m.mu.Lock()
defer m.mu.Unlock()
if err := m.unsafeInitPolicyInfo(info, userRoles, privilegeGroups); err != nil {
return err
}
return nil
m.unsafeInitPolicyInfo(info, userRoles)
}

func (m *MetaCache) unsafeInitPolicyInfo(info []string, userRoles []string, privilegeGroups []*milvuspb.PrivilegeGroupInfo) error {
func (m *MetaCache) unsafeInitPolicyInfo(info []string, userRoles []string) {
m.privilegeInfos = util.StringSet(info)
for _, userRole := range userRoles {
user, role, err := funcutil.DecodeUserRoleCache(userRole)
Expand All @@ -1097,16 +1087,6 @@ func (m *MetaCache) unsafeInitPolicyInfo(info []string, userRoles []string, priv
}
m.userToRoles[user][role] = struct{}{}
}
for _, g := range privilegeGroups {
if m.privilegeGroups[g.GroupName] == nil {
m.privilegeGroups[g.GroupName] = make(map[string]struct{})
}
for _, priv := range g.Privileges {
dbPrivName := util.PrivilegeNameForMetastore(priv.Name)
m.privilegeGroups[g.GroupName][dbPrivName] = struct{}{}
}
}
return nil
}

func (m *MetaCache) GetPrivilegeInfo(ctx context.Context) []string {
Expand All @@ -1116,12 +1096,6 @@ func (m *MetaCache) GetPrivilegeInfo(ctx context.Context) []string {
return util.StringList(m.privilegeInfos)
}

func (m *MetaCache) GetGroupPrivileges(groupName string) map[string]struct{} {
m.mu.RLock()
defer m.mu.RUnlock()
return m.privilegeGroups[groupName]
}

func (m *MetaCache) GetUserRole(user string) []string {
m.mu.RLock()
defer m.mu.RUnlock()
Expand Down Expand Up @@ -1149,9 +1123,15 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) {

switch op.OpType {
case typeutil.CacheGrantPrivilege:
m.privilegeInfos[op.OpKey] = struct{}{}
keys := funcutil.PrivilegesForPolicy(op.OpKey)
for _, key := range keys {
m.privilegeInfos[key] = struct{}{}
}
case typeutil.CacheRevokePrivilege:
delete(m.privilegeInfos, op.OpKey)
keys := funcutil.PrivilegesForPolicy(op.OpKey)
for _, key := range keys {
delete(m.privilegeInfos, key)
}
case typeutil.CacheAddUserToRole:
user, role, err := funcutil.DecodeUserRoleCache(op.OpKey)
if err != nil {
Expand All @@ -1169,33 +1149,6 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) {
if m.userToRoles[user] != nil {
delete(m.userToRoles[user], role)
}
case typeutil.CacheDropPrivilegeGroup:
delete(m.privilegeGroups, op.OpKey)
case typeutil.CacheAddPrivilegesToGroup:
groupInfo := &milvuspb.PrivilegeGroupInfo{}
err = proto.Unmarshal([]byte(op.OpKey), groupInfo)
if err != nil {
log.Error("failed to unmarshal privilege group info", zap.Error(err))
return err
}
if m.privilegeGroups[groupInfo.GroupName] == nil {
m.privilegeGroups[groupInfo.GroupName] = make(map[string]struct{})
}
for _, p := range groupInfo.Privileges {
m.privilegeGroups[groupInfo.GroupName][p.Name] = struct{}{}
}
case typeutil.CacheRemovePrivilegesFromGroup:
groupInfo := &milvuspb.PrivilegeGroupInfo{}
err = proto.Unmarshal([]byte(op.OpKey), groupInfo)
if err != nil {
log.Error("failed to unmarshal privilege group info", zap.Error(err))
return err
}
if m.privilegeGroups[groupInfo.GroupName] != nil {
for _, p := range groupInfo.Privileges {
delete(m.privilegeGroups[groupInfo.GroupName], p.Name)
}
}
case typeutil.CacheDeleteUser:
delete(m.userToRoles, op.OpKey)
case typeutil.CacheDropRole:
Expand Down Expand Up @@ -1225,9 +1178,8 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) {
m.mu.Lock()
defer m.mu.Unlock()
m.userToRoles = make(map[string]map[string]struct{})
m.privilegeGroups = make(map[string]map[string]struct{})
m.privilegeInfos = make(map[string]struct{})
m.unsafeInitPolicyInfo(resp.PolicyInfos, resp.UserRoles, resp.PrivilegeGroups)
m.unsafeInitPolicyInfo(resp.PolicyInfos, resp.UserRoles)
default:
return fmt.Errorf("invalid opType, op_type: %d, op_key: %s", int(op.OpType), op.OpKey)
}
Expand Down
50 changes: 9 additions & 41 deletions internal/proxy/meta_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/stretchr/testify/require"
uatomic "go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
Expand Down Expand Up @@ -627,10 +626,6 @@ func TestMetaCache_PolicyInfo(t *testing.T) {
client := &MockRootCoordClientInterface{}
qc := &mocks.MockQueryCoordClient{}
mgr := newShardClientMgr()
privGroups := []*milvuspb.PrivilegeGroupInfo{{
GroupName: "pg1",
Privileges: []*milvuspb.PrivilegeEntity{{Name: "CreateCollection"}, {Name: "DescribeCollection"}},
}}

t.Run("InitMetaCache", func(t *testing.T) {
client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
Expand All @@ -641,9 +636,8 @@ func TestMetaCache_PolicyInfo(t *testing.T) {

client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
return &internalpb.ListPolicyResponse{
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
PrivilegeGroups: privGroups,
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
}, nil
}
err = InitMetaCache(context.Background(), client, qc, mgr)
Expand All @@ -653,10 +647,9 @@ func TestMetaCache_PolicyInfo(t *testing.T) {
t.Run("GetPrivilegeInfo", func(t *testing.T) {
client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
return &internalpb.ListPolicyResponse{
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2")},
PrivilegeGroups: privGroups,
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2")},
}, nil
}
err := InitMetaCache(context.Background(), client, qc, mgr)
Expand All @@ -665,17 +658,14 @@ func TestMetaCache_PolicyInfo(t *testing.T) {
assert.Equal(t, 3, len(policyInfos))
roles := globalMetaCache.GetUserRole("foo")
assert.Equal(t, 2, len(roles))
groups := globalMetaCache.GetGroupPrivileges("pg1")
assert.Equal(t, 2, len(groups))
})

t.Run("GetPrivilegeInfo", func(t *testing.T) {
client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
return &internalpb.ListPolicyResponse{
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2")},
PrivilegeGroups: privGroups,
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2")},
}, nil
}
err := InitMetaCache(context.Background(), client, qc, mgr)
Expand All @@ -701,27 +691,6 @@ func TestMetaCache_PolicyInfo(t *testing.T) {
roles = globalMetaCache.GetUserRole("foo")
assert.Equal(t, 2, len(roles))

groupInfo := &milvuspb.PrivilegeGroupInfo{GroupName: "pg1", Privileges: []*milvuspb.PrivilegeEntity{{Name: "DropCollection"}}}
v, err := proto.Marshal(groupInfo)
assert.NoError(t, err)
err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheAddPrivilegesToGroup, OpKey: string(v)})
assert.NoError(t, err)
groups := globalMetaCache.GetGroupPrivileges("pg1")
assert.Equal(t, 3, len(groups))

groupInfo = &milvuspb.PrivilegeGroupInfo{GroupName: "pg1", Privileges: []*milvuspb.PrivilegeEntity{{Name: "DropCollection"}, {Name: "RenameCollection"}}}
v, err = proto.Marshal(groupInfo)
assert.NoError(t, err)
err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheRemovePrivilegesFromGroup, OpKey: string(v)})
assert.NoError(t, err)
groups = globalMetaCache.GetGroupPrivileges("pg1")
assert.Equal(t, 2, len(groups))

err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheDropPrivilegeGroup, OpKey: "pg1"})
assert.NoError(t, err)
groups = globalMetaCache.GetGroupPrivileges("pg1")
assert.Equal(t, 0, len(groups))

err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheGrantPrivilege, OpKey: ""})
assert.Error(t, err)
err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: 100, OpKey: "policyX"})
Expand All @@ -737,8 +706,7 @@ func TestMetaCache_PolicyInfo(t *testing.T) {
"policy2",
"policy3",
},
UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2"), funcutil.EncodeUserRoleCache("foo2", "role3")},
PrivilegeGroups: privGroups,
UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2"), funcutil.EncodeUserRoleCache("foo2", "role3")},
}, nil
}
err := InitMetaCache(context.Background(), client, qc, mgr)
Expand Down
Loading

0 comments on commit 615252c

Please sign in to comment.