Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[merge]merge dev to master #317

Merged
merged 10 commits into from
Jan 25, 2024
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
version: v1.51.2
version: v1.55.2
args: --enable gofmt,gocyclo,goimports,dupl,gosec --timeout 5m --skip-dirs=examples,test --skip-files=.*_test.go$
static-checks:
runs-on: ubuntu-latest
Expand Down
62 changes: 36 additions & 26 deletions server/datasource/etcd/kv/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"sync"
"time"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/go-chassis/foundation/backoff"
"github.com/go-chassis/openlog"
"github.com/little-cui/etcdadpt"
goCache "github.com/patrickmn/go-cache"
"go.etcd.io/etcd/api/v3/mvccpb"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
)

func Init() {
Expand All @@ -35,8 +36,6 @@ const (
backOffMinInterval = 5 * time.Second
)

type IDSet map[string]struct{}

type Cache struct {
timeOut time.Duration
client etcdadpt.Client
Expand Down Expand Up @@ -158,11 +157,13 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
m, ok := kc.LoadKvIDSet(cacheKey)
if !ok {
kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}})
z := &sync.Map{}
z.Store(kvDoc.ID, struct{}{})
kc.StoreKvIDSet(cacheKey, z)
openlog.Info("cacheKey " + cacheKey + "not exists")
continue
}
m[kvDoc.ID] = struct{}{}
m.Store(kvDoc.ID, struct{}{})
}
}

Expand All @@ -180,23 +181,23 @@ func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) {
openlog.Error("cacheKey " + cacheKey + "not exists")
continue
}
delete(m, kvDoc.ID)
m.Delete(kvDoc.ID)
}
}

func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) {
func (kc *Cache) LoadKvIDSet(cacheKey string) (*sync.Map, bool) {
val, ok := kc.kvIDCache.Load(cacheKey)
if !ok {
return nil, false
}
kvIds, ok := val.(IDSet)
kvIds, ok := val.(*sync.Map)
if !ok {
return nil, false
}
return kvIds, true
}

func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) {
func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds *sync.Map) {
kc.kvIDCache.Store(cacheKey, kvIds)
}

Expand All @@ -220,9 +221,9 @@ func (kc *Cache) DeleteKvDoc(kvID string) {
kc.kvDocCache.Delete(kvID)
}

func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) {
func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
if !req.Opts.ExactLabels {
return nil, false
return nil, false, nil
}

openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
Expand All @@ -232,22 +233,25 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool)
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
kvCache.StoreKvIDSet(cacheKey, IDSet{})
return result, true
kvCache.StoreKvIDSet(cacheKey, &sync.Map{})
return result, true, nil
}

var docs []*model.KVDoc

var kvIdsLeft []string
for kvID := range kvIds {
if doc, ok := kvCache.LoadKvDoc(kvID); ok {
kvIds.Range(func(kvID, value any) bool {
if doc, ok := kvCache.LoadKvDoc(kvID.(string)); ok {
docs = append(docs, doc)
continue
} else {
kvIdsLeft = append(kvIdsLeft, kvID.(string))
}
kvIdsLeft = append(kvIdsLeft, kvID)
return true
})
tpData, err := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
if err != nil {
return nil, true, err
}

tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)

for _, doc := range docs {
Expand All @@ -257,17 +261,18 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool)
}
}
result.Total = len(result.Data)
return result, true
return result, true, nil
}

func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc {
func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) ([]*model.KVDoc, error) {
if len(kvIdsLeft) == 0 {
return nil
return nil, nil
}

openlog.Debug("get kv from etcd by kvId")
wg := sync.WaitGroup{}
docs := make([]*model.KVDoc, len(kvIdsLeft))
var getKvErr error
for i, kvID := range kvIdsLeft {
wg.Add(1)
go func(kvID string, cnt int) {
Expand All @@ -277,12 +282,14 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
kv, err := etcdadpt.Get(ctx, docKey)
if err != nil {
openlog.Error(fmt.Sprintf("failed to get kv from etcd, err %v", err))
getKvErr = err
return
}

doc, err := kc.GetKvDoc(kv)
if err != nil {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
getKvErr = err
return
}

Expand All @@ -291,7 +298,10 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
}(kvID, i)
}
wg.Wait()
return docs
if getKvErr != nil {
return nil, getKvErr
}
return docs, nil
}

func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
Expand Down
7 changes: 5 additions & 2 deletions server/datasource/etcd/kv/kv_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,15 +524,18 @@ func (s *Dao) listData(ctx context.Context, project, domain string, options ...d
}

if Enabled() {
result, useCache := Search(ctx, &CacheSearchReq{
result, useCache, err := Search(ctx, &CacheSearchReq{
Domain: domain,
Project: project,
Opts: &opts,
Regex: regex,
})
if useCache {
if useCache && err == nil {
return result, opts, nil
}
if useCache && err != nil {
openlog.Error("using cache to search kv failed: " + err.Error())
}
}

result, err := matchLabelsSearch(ctx, domain, project, regex, opts)
Expand Down
7 changes: 4 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package server

import (
chassis "github.com/go-chassis/go-chassis/v2"
"github.com/go-chassis/go-chassis/v2/core/common"
"github.com/go-chassis/openlog"

"github.com/apache/servicecomb-kie/pkg/validator"
"github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/db"
"github.com/apache/servicecomb-kie/server/pubsub"
"github.com/apache/servicecomb-kie/server/rbac"
v1 "github.com/apache/servicecomb-kie/server/resource/v1"
"github.com/go-chassis/go-chassis/v2"
"github.com/go-chassis/go-chassis/v2/core/common"
"github.com/go-chassis/openlog"
)

func Run() {
Expand Down
Loading