From 3dadc67a3a550b507857e83c03c795f5b3a4af34 Mon Sep 17 00:00:00 2001 From: denis-tingaikin Date: Thu, 17 Oct 2024 19:04:04 +0300 Subject: [PATCH] add cache Signed-off-by: denis-tingaikin --- go.mod | 1 + go.sum | 3 + pkg/networkservice/chains/nsmgr/server.go | 2 + pkg/registry/common/querycache/cache.go | 125 ------------------ pkg/registry/common/querycache/nse_client.go | 107 +++++---------- .../common/querycache/nse_client_test.go | 43 +++--- pkg/registry/common/querycache/option.go | 29 ---- 7 files changed, 58 insertions(+), 252 deletions(-) delete mode 100644 pkg/registry/common/querycache/cache.go delete mode 100644 pkg/registry/common/querycache/option.go diff --git a/go.mod b/go.mod index c31f44d92..8725ecda8 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/edwarnicke/serialize v1.0.7 github.com/fsnotify/fsnotify v1.5.4 github.com/ghodss/yaml v1.0.0 + github.com/go-pkgz/expirable-cache/v3 v3.0.0 github.com/golang-jwt/jwt/v4 v4.2.0 github.com/golang/protobuf v1.5.3 github.com/google/go-cmp v0.6.0 diff --git a/go.sum b/go.sum index 877fa311e..8a8660fb3 100644 --- a/go.sum +++ b/go.sum @@ -79,6 +79,8 @@ github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-pkgz/expirable-cache/v3 v3.0.0 h1:u3/gcu3sabLYiTCevoRKv+WzjIn5oo7P8XtiXBeRDLw= +github.com/go-pkgz/expirable-cache/v3 v3.0.0/go.mod h1:2OQiDyEGQalYecLWmXprm3maPXeVb5/6/X7yRPYTzec= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= @@ -143,6 +145,7 @@ github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/C github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/raft v1.3.9 h1:9yuo1aR0bFTr1cw7pj3S2Bk6MhJCsnr2NAxvIBrP2x4= github.com/hashicorp/raft v1.3.9/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= diff --git a/pkg/networkservice/chains/nsmgr/server.go b/pkg/networkservice/chains/nsmgr/server.go index c1196ec56..e4c45f14e 100644 --- a/pkg/networkservice/chains/nsmgr/server.go +++ b/pkg/networkservice/chains/nsmgr/server.go @@ -55,6 +55,7 @@ import ( "github.com/networkservicemesh/sdk/pkg/registry/common/grpcmetadata" "github.com/networkservicemesh/sdk/pkg/registry/common/localbypass" "github.com/networkservicemesh/sdk/pkg/registry/common/memory" + "github.com/networkservicemesh/sdk/pkg/registry/common/querycache" registryrecvfd "github.com/networkservicemesh/sdk/pkg/registry/common/recvfd" registrysendfd "github.com/networkservicemesh/sdk/pkg/registry/common/sendfd" "github.com/networkservicemesh/sdk/pkg/registry/common/updatepath" @@ -267,6 +268,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options clientconn.NewNetworkServiceEndpointRegistryClient(), opts.authorizeNSERegistryClient, grpcmetadata.NewNetworkServiceEndpointRegistryClient(), + querycache.NewClient(ctx), dial.NewNetworkServiceEndpointRegistryClient(ctx, dial.WithDialTimeout(opts.dialTimeout), dial.WithDialOptions(opts.dialOptions...), diff --git a/pkg/registry/common/querycache/cache.go b/pkg/registry/common/querycache/cache.go deleted file mode 100644 index f9558dc42..000000000 --- a/pkg/registry/common/querycache/cache.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// Copyright (c) 2023 Cisco and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package querycache - -import ( - "context" - "sync" - "time" - - "github.com/edwarnicke/genericsync" - "github.com/networkservicemesh/api/pkg/api/registry" - - "github.com/networkservicemesh/sdk/pkg/tools/clock" -) - -type cache struct { - expireTimeout time.Duration - entries genericsync.Map[string, *cacheEntry] - clockTime clock.Clock -} - -func newCache(ctx context.Context, opts ...Option) *cache { - c := &cache{ - expireTimeout: time.Minute, - clockTime: clock.FromContext(ctx), - } - - for _, opt := range opts { - opt(c) - } - - ticker := c.clockTime.Ticker(c.expireTimeout) - go func() { - for { - select { - case <-ctx.Done(): - ticker.Stop() - return - case <-ticker.C(): - c.entries.Range(func(_ string, e *cacheEntry) bool { - e.lock.Lock() - defer e.lock.Unlock() - - if c.clockTime.Until(e.expirationTime) < 0 { - e.cleanup() - } - - return true - }) - } - } - }() - - return c -} - -func (c *cache) LoadOrStore(key string, nse *registry.NetworkServiceEndpoint, cancel context.CancelFunc) (*cacheEntry, bool) { - var once sync.Once - return c.entries.LoadOrStore(key, &cacheEntry{ - nse: nse, - expirationTime: c.clockTime.Now().Add(c.expireTimeout), - cleanup: func() { - once.Do(func() { - c.entries.Delete(key) - cancel() - }) - }, - }) -} - -func (c *cache) Load(key string) (*registry.NetworkServiceEndpoint, bool) { - e, ok := c.entries.Load(key) - if !ok { - return nil, false - } - - e.lock.Lock() - defer e.lock.Unlock() - - if c.clockTime.Until(e.expirationTime) < 0 { - e.cleanup() - return nil, false - } - - e.expirationTime = c.clockTime.Now().Add(c.expireTimeout) - - return e.nse, true -} - -type cacheEntry struct { - nse *registry.NetworkServiceEndpoint - expirationTime time.Time - lock sync.Mutex - cleanup func() -} - -func (e *cacheEntry) Update(nse *registry.NetworkServiceEndpoint) { - e.lock.Lock() - defer e.lock.Unlock() - - e.nse = nse -} - -func (e *cacheEntry) Cleanup() { - e.lock.Lock() - defer e.lock.Unlock() - - e.cleanup() -} diff --git a/pkg/registry/common/querycache/nse_client.go b/pkg/registry/common/querycache/nse_client.go index efc37f204..188932a86 100644 --- a/pkg/registry/common/querycache/nse_client.go +++ b/pkg/registry/common/querycache/nse_client.go @@ -19,7 +19,9 @@ package querycache import ( "context" + "time" + cache "github.com/go-pkgz/expirable-cache/v3" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc" @@ -31,19 +33,24 @@ import ( type queryCacheNSEClient struct { ctx context.Context - cache *cache + cache cache.Cache[string, []*registry.NetworkServiceEndpoint] } // NewClient creates new querycache NSE registry client that caches all resolved NSEs -func NewClient(ctx context.Context, opts ...Option) registry.NetworkServiceEndpointRegistryClient { - return &queryCacheNSEClient{ +func NewClient(ctx context.Context) registry.NetworkServiceEndpointRegistryClient { + var res = &queryCacheNSEClient{ ctx: ctx, - cache: newCache(ctx, opts...), + cache: cache.NewCache[string, []*registry.NetworkServiceEndpoint]().WithLRU().WithMaxKeys(32).WithTTL(time.Millisecond * 300), } + return res } func (q *queryCacheNSEClient) Register(ctx context.Context, nse *registry.NetworkServiceEndpoint, opts ...grpc.CallOption) (*registry.NetworkServiceEndpoint, error) { - return next.NetworkServiceEndpointRegistryClient(ctx).Register(ctx, nse, opts...) + resp, err := next.NetworkServiceEndpointRegistryClient(ctx).Register(ctx, nse, opts...) + if err == nil { + q.cache.Add(resp.GetName(), []*registry.NetworkServiceEndpoint{resp}) + } + return resp, err } func (q *queryCacheNSEClient) Find(ctx context.Context, query *registry.NetworkServiceEndpointQuery, opts ...grpc.CallOption) (registry.NetworkServiceEndpointRegistry_FindClient, error) { @@ -51,80 +58,30 @@ func (q *queryCacheNSEClient) Find(ctx context.Context, query *registry.NetworkS return next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...) } - if client, ok := q.findInCache(ctx, query.String()); ok { - return client, nil - } - - client, err := next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...) - if err != nil { - return nil, err - } - - nses := registry.ReadNetworkServiceEndpointList(client) - - resultCh := make(chan *registry.NetworkServiceEndpointResponse, len(nses)) - for _, nse := range nses { - resultCh <- ®istry.NetworkServiceEndpointResponse{NetworkServiceEndpoint: nse} - q.storeInCache(ctx, nse.Clone(), opts...) - } - close(resultCh) - - return streamchannel.NewNetworkServiceEndpointFindClient(ctx, resultCh), nil -} - -func (q *queryCacheNSEClient) findInCache(ctx context.Context, key string) (registry.NetworkServiceEndpointRegistry_FindClient, bool) { - nse, ok := q.cache.Load(key) - if !ok { - return nil, false - } - - resultCh := make(chan *registry.NetworkServiceEndpointResponse, 1) - resultCh <- ®istry.NetworkServiceEndpointResponse{NetworkServiceEndpoint: nse.Clone()} - close(resultCh) - - return streamchannel.NewNetworkServiceEndpointFindClient(ctx, resultCh), true -} - -func (q *queryCacheNSEClient) storeInCache(ctx context.Context, nse *registry.NetworkServiceEndpoint, opts ...grpc.CallOption) { - nseQuery := ®istry.NetworkServiceEndpointQuery{ - NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ - Name: nse.Name, - }, - } - - key := nseQuery.String() - - findCtx, cancel := context.WithCancel(q.ctx) - - entry, loaded := q.cache.LoadOrStore(key, nse, cancel) - if loaded { - cancel() - return - } - - go func() { - defer entry.Cleanup() - - nseQuery.Watch = true - - stream, err := next.NetworkServiceEndpointRegistryClient(ctx).Find(findCtx, nseQuery, opts...) + var list []*registry.NetworkServiceEndpoint + if v, ok := q.cache.Get(query.GetNetworkServiceEndpoint().GetName()); ok { + list = v + } else { + var streamClient, err = next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...) if err != nil { - return + return streamClient, err } + list = registry.ReadNetworkServiceEndpointList(streamClient) + q.cache.Add(query.GetNetworkServiceEndpoint().GetName(), list) - for nseResp, err := stream.Recv(); err == nil; nseResp, err = stream.Recv() { - if nseResp.NetworkServiceEndpoint.Name != nseQuery.NetworkServiceEndpoint.Name { - continue - } - if nseResp.Deleted { - break - } - - entry.Update(nseResp.NetworkServiceEndpoint) - } - }() + } + var resultStreamChannel = make(chan *registry.NetworkServiceEndpointResponse, len(list)) + for _, item := range list { + resultStreamChannel <- ®istry.NetworkServiceEndpointResponse{NetworkServiceEndpoint: item} + } + close(resultStreamChannel) + return streamchannel.NewNetworkServiceEndpointFindClient(ctx, resultStreamChannel), nil } func (q *queryCacheNSEClient) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint, opts ...grpc.CallOption) (*empty.Empty, error) { - return next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...) + resp, err := next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...) + if err == nil { + q.cache.Remove(in.GetName()) + } + return resp, err } diff --git a/pkg/registry/common/querycache/nse_client_test.go b/pkg/registry/common/querycache/nse_client_test.go index 516a911ba..fc1e196fd 100644 --- a/pkg/registry/common/querycache/nse_client_test.go +++ b/pkg/registry/common/querycache/nse_client_test.go @@ -18,6 +18,7 @@ package querycache_test import ( "context" + "fmt" "sync/atomic" "testing" "time" @@ -38,12 +39,12 @@ import ( ) const ( - expireTimeout = time.Minute + expireTimeout = time.Second name = "nse" url1 = "tcp://1.1.1.1" url2 = "tcp://2.2.2.2" - testWait = 100 * time.Millisecond - testTick = testWait / 100 + testWait = time.Second + testTick = time.Second / 15 ) func testNSEQuery(nseName string) *registry.NetworkServiceEndpointQuery { @@ -53,7 +54,6 @@ func testNSEQuery(nseName string) *registry.NetworkServiceEndpointQuery { }, } } - func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) @@ -64,7 +64,7 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { failureClient := new(failureNSEClient) c := next.NewNetworkServiceEndpointRegistryClient( - querycache.NewClient(ctx, querycache.WithExpireTimeout(expireTimeout)), + querycache.NewClient(ctx), failureClient, adapters.NetworkServiceEndpointServerToClient(mem), ) @@ -75,9 +75,6 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { }) require.NoError(t, err) - // Goroutines should be cleaned up on NSE unregister - t.Cleanup(func() { goleak.VerifyNone(t) }) - // 1. Find from memory atomic.StoreInt32(&failureClient.shouldFail, 0) @@ -86,28 +83,24 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { nseResp, err := stream.Recv() require.NoError(t, err) - require.Equal(t, name, nseResp.NetworkServiceEndpoint.Name) require.Equal(t, url1, nseResp.NetworkServiceEndpoint.Url) // 2. Find from cache atomic.StoreInt32(&failureClient.shouldFail, 1) - require.Eventually(t, func() bool { - if stream, err = c.Find(ctx, testNSEQuery(name)); err != nil { - return false - } - if nseResp, err = stream.Recv(); err != nil { - return false - } - return name == nseResp.NetworkServiceEndpoint.Name && url1 == nseResp.NetworkServiceEndpoint.Url - }, testWait, testTick) + stream, err = c.Find(ctx, testNSEQuery(name)) + require.NoError(t, err) + nseResp, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, name, nseResp.NetworkServiceEndpoint.Name) + require.Equal(t, url1, nseResp.NetworkServiceEndpoint.Url) // 3. Update NSE in memory reg.Url = url2 - reg, err = mem.Register(ctx, reg) require.NoError(t, err) + atomic.StoreInt32(&failureClient.shouldFail, 0) require.Eventually(t, func() bool { if stream, err = c.Find(ctx, testNSEQuery(name)); err != nil { @@ -116,6 +109,7 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { if nseResp, err = stream.Recv(); err != nil { return false } + fmt.Println(name == nseResp.NetworkServiceEndpoint.Name, url2 == nseResp.NetworkServiceEndpoint.Url) return name == nseResp.NetworkServiceEndpoint.Name && url2 == nseResp.NetworkServiceEndpoint.Url }, testWait, testTick) @@ -124,8 +118,11 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { - _, err = c.Find(ctx, testNSEQuery(name)) - return err != nil + s, err := c.Find(ctx, testNSEQuery(name)) + if err != nil { + return false + } + return len(registry.ReadNetworkServiceEndpointList(s)) == 0 }, testWait, testTick) } @@ -142,7 +139,7 @@ func Test_QueryCacheClient_ShouldCleanUpOnTimeout(t *testing.T) { failureClient := new(failureNSEClient) c := next.NewNetworkServiceEndpointRegistryClient( - querycache.NewClient(ctx, querycache.WithExpireTimeout(expireTimeout)), + querycache.NewClient(ctx), failureClient, adapters.NetworkServiceEndpointServerToClient(mem), ) @@ -184,7 +181,7 @@ func Test_QueryCacheClient_ShouldCleanUpOnTimeout(t *testing.T) { } // 4. Wait for the expire to happen - clockMock.Add(expireTimeout) + time.Sleep(expireTimeout) _, err = c.Find(ctx, testNSEQuery(name)) require.Errorf(t, err, "find error") diff --git a/pkg/registry/common/querycache/option.go b/pkg/registry/common/querycache/option.go deleted file mode 100644 index f70220737..000000000 --- a/pkg/registry/common/querycache/option.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package querycache - -import "time" - -// Option is an option for cache -type Option func(c *cache) - -// WithExpireTimeout sets cache expire timeout -func WithExpireTimeout(expireTimeout time.Duration) Option { - return func(c *cache) { - c.expireTimeout = expireTimeout - } -}