diff --git a/server/datasource/etcd/kv/kv_cache.go b/server/datasource/etcd/kv/kv_cache.go index 61d017b2..7cf65297 100644 --- a/server/datasource/etcd/kv/kv_cache.go +++ b/server/datasource/etcd/kv/kv_cache.go @@ -9,15 +9,16 @@ import ( "sync" "time" - "github.com/apache/servicecomb-kie/pkg/model" - "github.com/apache/servicecomb-kie/pkg/stringutil" - "github.com/apache/servicecomb-kie/server/datasource" - "github.com/apache/servicecomb-kie/server/datasource/etcd/key" "github.com/go-chassis/foundation/backoff" "github.com/go-chassis/openlog" "github.com/little-cui/etcdadpt" goCache "github.com/patrickmn/go-cache" "go.etcd.io/etcd/api/v3/mvccpb" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/pkg/stringutil" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/etcd/key" ) func Init() { @@ -35,8 +36,6 @@ const ( backOffMinInterval = 5 * time.Second ) -type IDSet map[string]struct{} - type Cache struct { timeOut time.Duration client etcdadpt.Client @@ -158,11 +157,13 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) { cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) m, ok := kc.LoadKvIDSet(cacheKey) if !ok { - kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}}) + z := &sync.Map{} + z.Store(kvDoc.ID, struct{}{}) + kc.StoreKvIDSet(cacheKey, z) openlog.Info("cacheKey " + cacheKey + "not exists") continue } - m[kvDoc.ID] = struct{}{} + m.Store(kvDoc.ID, struct{}{}) } } @@ -180,23 +181,23 @@ func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) { openlog.Error("cacheKey " + cacheKey + "not exists") continue } - delete(m, kvDoc.ID) + m.Delete(kvDoc.ID) } } -func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) { +func (kc *Cache) LoadKvIDSet(cacheKey string) (*sync.Map, bool) { val, ok := kc.kvIDCache.Load(cacheKey) if !ok { return nil, false } - kvIds, ok := val.(IDSet) + kvIds, ok := val.(*sync.Map) if !ok { return nil, false } return kvIds, true } -func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) { +func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds *sync.Map) { kc.kvIDCache.Store(cacheKey, kvIds) } @@ -220,9 +221,9 @@ func (kc *Cache) DeleteKvDoc(kvID string) { kc.kvDocCache.Delete(kvID) } -func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) { +func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) { if !req.Opts.ExactLabels { - return nil, false + return nil, false, nil } openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) @@ -232,22 +233,25 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) kvIds, ok := kvCache.LoadKvIDSet(cacheKey) if !ok { - kvCache.StoreKvIDSet(cacheKey, IDSet{}) - return result, true + kvCache.StoreKvIDSet(cacheKey, &sync.Map{}) + return result, true, nil } var docs []*model.KVDoc var kvIdsLeft []string - for kvID := range kvIds { - if doc, ok := kvCache.LoadKvDoc(kvID); ok { + kvIds.Range(func(kvID, value any) bool { + if doc, ok := kvCache.LoadKvDoc(kvID.(string)); ok { docs = append(docs, doc) - continue + } else { + kvIdsLeft = append(kvIdsLeft, kvID.(string)) } - kvIdsLeft = append(kvIdsLeft, kvID) + return true + }) + tpData, err := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) + if err != nil { + return nil, true, err } - - tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) docs = append(docs, tpData...) for _, doc := range docs { @@ -257,17 +261,18 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) } } result.Total = len(result.Data) - return result, true + return result, true, nil } -func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc { +func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) ([]*model.KVDoc, error) { if len(kvIdsLeft) == 0 { - return nil + return nil, nil } openlog.Debug("get kv from etcd by kvId") wg := sync.WaitGroup{} docs := make([]*model.KVDoc, len(kvIdsLeft)) + var getKvErr error for i, kvID := range kvIdsLeft { wg.Add(1) go func(kvID string, cnt int) { @@ -277,12 +282,14 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe kv, err := etcdadpt.Get(ctx, docKey) if err != nil { openlog.Error(fmt.Sprintf("failed to get kv from etcd, err %v", err)) + getKvErr = err return } doc, err := kc.GetKvDoc(kv) if err != nil { openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err)) + getKvErr = err return } @@ -291,7 +298,10 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe }(kvID, i) } wg.Wait() - return docs + if getKvErr != nil { + return nil, getKvErr + } + return docs, nil } func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { diff --git a/server/datasource/etcd/kv/kv_dao.go b/server/datasource/etcd/kv/kv_dao.go index 2332b586..d7260dfd 100644 --- a/server/datasource/etcd/kv/kv_dao.go +++ b/server/datasource/etcd/kv/kv_dao.go @@ -524,15 +524,18 @@ func (s *Dao) listData(ctx context.Context, project, domain string, options ...d } if Enabled() { - result, useCache := Search(ctx, &CacheSearchReq{ + result, useCache, err := Search(ctx, &CacheSearchReq{ Domain: domain, Project: project, Opts: &opts, Regex: regex, }) - if useCache { + if useCache && err == nil { return result, opts, nil } + if useCache && err != nil { + openlog.Error("using cache to search kv failed: " + err.Error()) + } } result, err := matchLabelsSearch(ctx, domain, project, regex, opts)