Skip to content

Commit

Permalink
add cache
Browse files Browse the repository at this point in the history
Signed-off-by: denis-tingaikin <[email protected]>
  • Loading branch information
denis-tingaikin committed Oct 17, 2024
1 parent 3d46f1e commit 3dadc67
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 252 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/edwarnicke/serialize v1.0.7
github.com/fsnotify/fsnotify v1.5.4
github.com/ghodss/yaml v1.0.0
github.com/go-pkgz/expirable-cache/v3 v3.0.0
github.com/golang-jwt/jwt/v4 v4.2.0
github.com/golang/protobuf v1.5.3
github.com/google/go-cmp v0.6.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-pkgz/expirable-cache/v3 v3.0.0 h1:u3/gcu3sabLYiTCevoRKv+WzjIn5oo7P8XtiXBeRDLw=
github.com/go-pkgz/expirable-cache/v3 v3.0.0/go.mod h1:2OQiDyEGQalYecLWmXprm3maPXeVb5/6/X7yRPYTzec=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
Expand Down Expand Up @@ -143,6 +145,7 @@ github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/C
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/raft v1.3.9 h1:9yuo1aR0bFTr1cw7pj3S2Bk6MhJCsnr2NAxvIBrP2x4=
github.com/hashicorp/raft v1.3.9/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
Expand Down
2 changes: 2 additions & 0 deletions pkg/networkservice/chains/nsmgr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/registry/common/grpcmetadata"
"github.com/networkservicemesh/sdk/pkg/registry/common/localbypass"
"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/common/querycache"
registryrecvfd "github.com/networkservicemesh/sdk/pkg/registry/common/recvfd"
registrysendfd "github.com/networkservicemesh/sdk/pkg/registry/common/sendfd"
"github.com/networkservicemesh/sdk/pkg/registry/common/updatepath"
Expand Down Expand Up @@ -267,6 +268,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
clientconn.NewNetworkServiceEndpointRegistryClient(),
opts.authorizeNSERegistryClient,
grpcmetadata.NewNetworkServiceEndpointRegistryClient(),
querycache.NewClient(ctx),
dial.NewNetworkServiceEndpointRegistryClient(ctx,
dial.WithDialTimeout(opts.dialTimeout),
dial.WithDialOptions(opts.dialOptions...),
Expand Down
125 changes: 0 additions & 125 deletions pkg/registry/common/querycache/cache.go

This file was deleted.

107 changes: 32 additions & 75 deletions pkg/registry/common/querycache/nse_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package querycache

import (
"context"
"time"

cache "github.com/go-pkgz/expirable-cache/v3"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"

Expand All @@ -31,100 +33,55 @@ import (

type queryCacheNSEClient struct {
ctx context.Context
cache *cache
cache cache.Cache[string, []*registry.NetworkServiceEndpoint]
}

// NewClient creates new querycache NSE registry client that caches all resolved NSEs
func NewClient(ctx context.Context, opts ...Option) registry.NetworkServiceEndpointRegistryClient {
return &queryCacheNSEClient{
func NewClient(ctx context.Context) registry.NetworkServiceEndpointRegistryClient {
var res = &queryCacheNSEClient{
ctx: ctx,
cache: newCache(ctx, opts...),
cache: cache.NewCache[string, []*registry.NetworkServiceEndpoint]().WithLRU().WithMaxKeys(32).WithTTL(time.Millisecond * 300),
}
return res
}

func (q *queryCacheNSEClient) Register(ctx context.Context, nse *registry.NetworkServiceEndpoint, opts ...grpc.CallOption) (*registry.NetworkServiceEndpoint, error) {
return next.NetworkServiceEndpointRegistryClient(ctx).Register(ctx, nse, opts...)
resp, err := next.NetworkServiceEndpointRegistryClient(ctx).Register(ctx, nse, opts...)
if err == nil {
q.cache.Add(resp.GetName(), []*registry.NetworkServiceEndpoint{resp})
}
return resp, err
}

func (q *queryCacheNSEClient) Find(ctx context.Context, query *registry.NetworkServiceEndpointQuery, opts ...grpc.CallOption) (registry.NetworkServiceEndpointRegistry_FindClient, error) {
if query.Watch {
return next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...)
}

if client, ok := q.findInCache(ctx, query.String()); ok {
return client, nil
}

client, err := next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...)
if err != nil {
return nil, err
}

nses := registry.ReadNetworkServiceEndpointList(client)

resultCh := make(chan *registry.NetworkServiceEndpointResponse, len(nses))
for _, nse := range nses {
resultCh <- &registry.NetworkServiceEndpointResponse{NetworkServiceEndpoint: nse}
q.storeInCache(ctx, nse.Clone(), opts...)
}
close(resultCh)

return streamchannel.NewNetworkServiceEndpointFindClient(ctx, resultCh), nil
}

func (q *queryCacheNSEClient) findInCache(ctx context.Context, key string) (registry.NetworkServiceEndpointRegistry_FindClient, bool) {
nse, ok := q.cache.Load(key)
if !ok {
return nil, false
}

resultCh := make(chan *registry.NetworkServiceEndpointResponse, 1)
resultCh <- &registry.NetworkServiceEndpointResponse{NetworkServiceEndpoint: nse.Clone()}
close(resultCh)

return streamchannel.NewNetworkServiceEndpointFindClient(ctx, resultCh), true
}

func (q *queryCacheNSEClient) storeInCache(ctx context.Context, nse *registry.NetworkServiceEndpoint, opts ...grpc.CallOption) {
nseQuery := &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: nse.Name,
},
}

key := nseQuery.String()

findCtx, cancel := context.WithCancel(q.ctx)

entry, loaded := q.cache.LoadOrStore(key, nse, cancel)
if loaded {
cancel()
return
}

go func() {
defer entry.Cleanup()

nseQuery.Watch = true

stream, err := next.NetworkServiceEndpointRegistryClient(ctx).Find(findCtx, nseQuery, opts...)
var list []*registry.NetworkServiceEndpoint
if v, ok := q.cache.Get(query.GetNetworkServiceEndpoint().GetName()); ok {
list = v
} else {
var streamClient, err = next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...)
if err != nil {
return
return streamClient, err
}
list = registry.ReadNetworkServiceEndpointList(streamClient)
q.cache.Add(query.GetNetworkServiceEndpoint().GetName(), list)

Check failure on line 71 in pkg/registry/common/querycache/nse_client.go

View workflow job for this annotation

GitHub Actions / golangci-lint / golangci-lint

unnecessary trailing newline (whitespace)
for nseResp, err := stream.Recv(); err == nil; nseResp, err = stream.Recv() {
if nseResp.NetworkServiceEndpoint.Name != nseQuery.NetworkServiceEndpoint.Name {
continue
}
if nseResp.Deleted {
break
}

entry.Update(nseResp.NetworkServiceEndpoint)
}
}()
}
var resultStreamChannel = make(chan *registry.NetworkServiceEndpointResponse, len(list))
for _, item := range list {
resultStreamChannel <- &registry.NetworkServiceEndpointResponse{NetworkServiceEndpoint: item}
}
close(resultStreamChannel)
return streamchannel.NewNetworkServiceEndpointFindClient(ctx, resultStreamChannel), nil
}

func (q *queryCacheNSEClient) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint, opts ...grpc.CallOption) (*empty.Empty, error) {
return next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...)
resp, err := next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...)
if err == nil {
q.cache.Remove(in.GetName())
}
return resp, err
}
Loading

0 comments on commit 3dadc67

Please sign in to comment.