Skip to content

Commit

Permalink
[merge]merge dev to master (#317)
Browse files Browse the repository at this point in the history
* fix the concurrent bug of KvIdCache

* fix the bug of do not report the error which occured in action of get kvdocs from etcd

* fix the bug of do not report the error which occured in action of get kvdocs from etcd

* resolve conflicts in master

* [fix] fix golangci-lint (#318)

Co-authored-by: songshiyuan 00649746 <[email protected]>

* fix the concurrent bug of KvIdCache

* fix the bug of do not report the error which occured in action of get kvdocs from etcd

* fix the bug of do not report the error which occured in action of get kvdocs from etcd

* resolve conflicts in master

---------

Co-authored-by: songshiyuan 00649746 <[email protected]>
  • Loading branch information
tornado-ssy and songshiyuan 00649746 authored Jan 25, 2024
1 parent 577408a commit 3e0f1fb
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 28 deletions.
62 changes: 36 additions & 26 deletions server/datasource/etcd/kv/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"sync"
"time"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/go-chassis/foundation/backoff"
"github.com/go-chassis/openlog"
"github.com/little-cui/etcdadpt"
goCache "github.com/patrickmn/go-cache"
"go.etcd.io/etcd/api/v3/mvccpb"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
)

func Init() {
Expand All @@ -35,8 +36,6 @@ const (
backOffMinInterval = 5 * time.Second
)

type IDSet map[string]struct{}

type Cache struct {
timeOut time.Duration
client etcdadpt.Client
Expand Down Expand Up @@ -158,11 +157,13 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
m, ok := kc.LoadKvIDSet(cacheKey)
if !ok {
kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}})
z := &sync.Map{}
z.Store(kvDoc.ID, struct{}{})
kc.StoreKvIDSet(cacheKey, z)
openlog.Info("cacheKey " + cacheKey + "not exists")
continue
}
m[kvDoc.ID] = struct{}{}
m.Store(kvDoc.ID, struct{}{})
}
}

Expand All @@ -180,23 +181,23 @@ func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) {
openlog.Error("cacheKey " + cacheKey + "not exists")
continue
}
delete(m, kvDoc.ID)
m.Delete(kvDoc.ID)
}
}

func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) {
func (kc *Cache) LoadKvIDSet(cacheKey string) (*sync.Map, bool) {
val, ok := kc.kvIDCache.Load(cacheKey)
if !ok {
return nil, false
}
kvIds, ok := val.(IDSet)
kvIds, ok := val.(*sync.Map)
if !ok {
return nil, false
}
return kvIds, true
}

func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) {
func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds *sync.Map) {
kc.kvIDCache.Store(cacheKey, kvIds)
}

Expand All @@ -220,9 +221,9 @@ func (kc *Cache) DeleteKvDoc(kvID string) {
kc.kvDocCache.Delete(kvID)
}

func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) {
func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
if !req.Opts.ExactLabels {
return nil, false
return nil, false, nil
}

openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
Expand All @@ -232,22 +233,25 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool)
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
kvCache.StoreKvIDSet(cacheKey, IDSet{})
return result, true
kvCache.StoreKvIDSet(cacheKey, &sync.Map{})
return result, true, nil
}

var docs []*model.KVDoc

var kvIdsLeft []string
for kvID := range kvIds {
if doc, ok := kvCache.LoadKvDoc(kvID); ok {
kvIds.Range(func(kvID, value any) bool {
if doc, ok := kvCache.LoadKvDoc(kvID.(string)); ok {
docs = append(docs, doc)
continue
} else {
kvIdsLeft = append(kvIdsLeft, kvID.(string))
}
kvIdsLeft = append(kvIdsLeft, kvID)
return true
})
tpData, err := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
if err != nil {
return nil, true, err
}

tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)

for _, doc := range docs {
Expand All @@ -257,17 +261,18 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool)
}
}
result.Total = len(result.Data)
return result, true
return result, true, nil
}

func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc {
func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) ([]*model.KVDoc, error) {
if len(kvIdsLeft) == 0 {
return nil
return nil, nil
}

openlog.Debug("get kv from etcd by kvId")
wg := sync.WaitGroup{}
docs := make([]*model.KVDoc, len(kvIdsLeft))
var getKvErr error
for i, kvID := range kvIdsLeft {
wg.Add(1)
go func(kvID string, cnt int) {
Expand All @@ -277,12 +282,14 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
kv, err := etcdadpt.Get(ctx, docKey)
if err != nil {
openlog.Error(fmt.Sprintf("failed to get kv from etcd, err %v", err))
getKvErr = err
return
}

doc, err := kc.GetKvDoc(kv)
if err != nil {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
getKvErr = err
return
}

Expand All @@ -291,7 +298,10 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
}(kvID, i)
}
wg.Wait()
return docs
if getKvErr != nil {
return nil, getKvErr
}
return docs, nil
}

func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
Expand Down
7 changes: 5 additions & 2 deletions server/datasource/etcd/kv/kv_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,15 +524,18 @@ func (s *Dao) listData(ctx context.Context, project, domain string, options ...d
}

if Enabled() {
result, useCache := Search(ctx, &CacheSearchReq{
result, useCache, err := Search(ctx, &CacheSearchReq{
Domain: domain,
Project: project,
Opts: &opts,
Regex: regex,
})
if useCache {
if useCache && err == nil {
return result, opts, nil
}
if useCache && err != nil {
openlog.Error("using cache to search kv failed: " + err.Error())
}
}

result, err := matchLabelsSearch(ctx, domain, project, regex, opts)
Expand Down

0 comments on commit 3e0f1fb

Please sign in to comment.