From cf680903e78951a7d5596da658db06f0e4606fb4 Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Mon, 7 Oct 2024 13:05:38 +0700 Subject: [PATCH 1/5] add a cache for NS/NSE registry Signed-off-by: NikitaSkrynnik --- .../querycache/{cache.go => ns_cache.go} | 68 ++++---- pkg/registry/common/querycache/ns_client.go | 138 +++++++++++++++++ pkg/registry/common/querycache/nse_cache.go | 145 ++++++++++++++++++ pkg/registry/common/querycache/nse_client.go | 42 +++-- .../common/querycache/nse_client_test.go | 4 +- pkg/registry/common/querycache/option.go | 4 +- 6 files changed, 348 insertions(+), 53 deletions(-) rename pkg/registry/common/querycache/{cache.go => ns_cache.go} (57%) create mode 100644 pkg/registry/common/querycache/ns_client.go create mode 100644 pkg/registry/common/querycache/nse_cache.go diff --git a/pkg/registry/common/querycache/cache.go b/pkg/registry/common/querycache/ns_cache.go similarity index 57% rename from pkg/registry/common/querycache/cache.go rename to pkg/registry/common/querycache/ns_cache.go index f9558dc42..f9606afbf 100644 --- a/pkg/registry/common/querycache/cache.go +++ b/pkg/registry/common/querycache/ns_cache.go @@ -1,6 +1,4 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// Copyright (c) 2023 Cisco and/or its affiliates. +// Copyright (c) 2024 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -24,19 +22,19 @@ import ( "time" "github.com/edwarnicke/genericsync" - "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/networkservicemesh/api/pkg/api/registry" "github.com/networkservicemesh/sdk/pkg/tools/clock" ) -type cache struct { +type nsCache struct { expireTimeout time.Duration - entries genericsync.Map[string, *cacheEntry] + entries genericsync.Map[string, *cacheEntry[registry.NetworkService]] clockTime clock.Clock } -func newCache(ctx context.Context, opts ...Option) *cache { - c := &cache{ +func newNSCache(ctx context.Context, opts ...Option) *nsCache { + c := &nsCache{ expireTimeout: time.Minute, clockTime: clock.FromContext(ctx), } @@ -53,14 +51,13 @@ func newCache(ctx context.Context, opts ...Option) *cache { ticker.Stop() return case <-ticker.C(): - c.entries.Range(func(_ string, e *cacheEntry) bool { + c.entries.Range(func(_ string, e *cacheEntry[registry.NetworkService]) bool { e.lock.Lock() defer e.lock.Unlock() if c.clockTime.Until(e.expirationTime) < 0 { e.cleanup() } - return true }) } @@ -70,54 +67,53 @@ func newCache(ctx context.Context, opts ...Option) *cache { return c } -func (c *cache) LoadOrStore(key string, nse *registry.NetworkServiceEndpoint, cancel context.CancelFunc) (*cacheEntry, bool) { +func (c *nsCache) LoadOrStore(value *registry.NetworkService, cancel context.CancelFunc) (*cacheEntry[registry.NetworkService], bool) { var once sync.Once - return c.entries.LoadOrStore(key, &cacheEntry{ - nse: nse, + + entry, ok := c.entries.LoadOrStore(value.GetName(), &cacheEntry[registry.NetworkService]{ + value: value, expirationTime: c.clockTime.Now().Add(c.expireTimeout), cleanup: func() { once.Do(func() { - c.entries.Delete(key) + c.entries.Delete(value.GetName()) cancel() }) - }, - }) -} + }}) -func (c *cache) Load(key string) (*registry.NetworkServiceEndpoint, bool) { - e, ok := c.entries.Load(key) - if !ok { - return nil, false - } - - e.lock.Lock() - defer e.lock.Unlock() + return entry, ok +} - if c.clockTime.Until(e.expirationTime) < 0 { - e.cleanup() - return nil, false +func (c *nsCache) Load(ctx context.Context, query *registry.NetworkService) *registry.NetworkService { + entry, ok := c.entries.Load(query.Name) + if ok { + entry.lock.Lock() + defer entry.lock.Unlock() + if c.clockTime.Until(entry.expirationTime) < 0 { + entry.cleanup() + } else { + entry.expirationTime = c.clockTime.Now().Add(c.expireTimeout) + return entry.value + } } - e.expirationTime = c.clockTime.Now().Add(c.expireTimeout) - - return e.nse, true + return nil } -type cacheEntry struct { - nse *registry.NetworkServiceEndpoint +type cacheEntry[T registry.NetworkService | registry.NetworkServiceEndpoint] struct { + value *T expirationTime time.Time lock sync.Mutex cleanup func() } -func (e *cacheEntry) Update(nse *registry.NetworkServiceEndpoint) { +func (e *cacheEntry[T]) Update(value *T) { e.lock.Lock() defer e.lock.Unlock() - e.nse = nse + e.value = value } -func (e *cacheEntry) Cleanup() { +func (e *cacheEntry[_]) Cleanup() { e.lock.Lock() defer e.lock.Unlock() diff --git a/pkg/registry/common/querycache/ns_client.go b/pkg/registry/common/querycache/ns_client.go new file mode 100644 index 000000000..93b61f647 --- /dev/null +++ b/pkg/registry/common/querycache/ns_client.go @@ -0,0 +1,138 @@ +// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package querycache adds possibility to cache Find queries +package querycache + +import ( + "context" + "time" + + "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc" + + "github.com/networkservicemesh/api/pkg/api/registry" + + "github.com/networkservicemesh/sdk/pkg/registry/core/next" + "github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel" + "github.com/networkservicemesh/sdk/pkg/tools/log" +) + +type queryCacheNSClient struct { + ctx context.Context + cache *nsCache +} + +// NewNetworkServiceClient creates new querycache NS registry client that caches all resolved NSs +func NewNetworkServiceClient(ctx context.Context, opts ...Option) registry.NetworkServiceRegistryClient { + return &queryCacheNSClient{ + ctx: ctx, + cache: newNSCache(ctx, opts...), + } +} + +func (q *queryCacheNSClient) Register(ctx context.Context, nse *registry.NetworkService, opts ...grpc.CallOption) (*registry.NetworkService, error) { + return next.NetworkServiceRegistryClient(ctx).Register(ctx, nse, opts...) +} + +func (q *queryCacheNSClient) Find(ctx context.Context, query *registry.NetworkServiceQuery, opts ...grpc.CallOption) (registry.NetworkServiceRegistry_FindClient, error) { + log.FromContext(ctx).WithField("time", time.Now()).Infof("queryCacheNSClient forth") + if query.Watch { + return next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...) + } + + log.FromContext(ctx).WithField("time", time.Now()).Info("queryCacheNSClient search in cache") + if client, ok := q.findInCache(ctx, query); ok { + log.FromContext(ctx).Info("queryCacheNSClient found in cache") + return client, nil + } + + log.FromContext(ctx).WithField("time", time.Now()).Info("queryCacheNSClient not found in cache") + + client, err := next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...) + if err != nil { + return nil, err + } + + nses := registry.ReadNetworkServiceList(client) + + resultCh := make(chan *registry.NetworkServiceResponse, len(nses)) + for _, nse := range nses { + resultCh <- ®istry.NetworkServiceResponse{NetworkService: nse} + q.storeInCache(ctx, nse.Clone(), opts...) + } + close(resultCh) + + return streamchannel.NewNetworkServiceFindClient(ctx, resultCh), nil +} + +func (q *queryCacheNSClient) findInCache(ctx context.Context, query *registry.NetworkServiceQuery) (registry.NetworkServiceRegistry_FindClient, bool) { + log.FromContext(ctx).WithField("time", time.Now()).Infof("queryCacheNSClient checking key: %v", query.NetworkService) + ns := q.cache.Load(ctx, query.NetworkService) + if ns == nil { + return nil, false + } + + log.FromContext(ctx).WithField("time", time.Now()).Infof("found NS in cache: %v", ns) + + resultCh := make(chan *registry.NetworkServiceResponse, 1) + resultCh <- ®istry.NetworkServiceResponse{NetworkService: ns.Clone()} + close(resultCh) + + return streamchannel.NewNetworkServiceFindClient(ctx, resultCh), true +} + +func (q *queryCacheNSClient) storeInCache(ctx context.Context, ns *registry.NetworkService, opts ...grpc.CallOption) { + nsQuery := ®istry.NetworkServiceQuery{ + NetworkService: ®istry.NetworkService{ + Name: ns.Name, + }, + } + + findCtx, cancel := context.WithCancel(q.ctx) + + entry, loaded := q.cache.LoadOrStore(ns, cancel) + if loaded { + cancel() + return + } + + go func() { + defer entry.Cleanup() + + nsQuery.Watch = true + + stream, err := next.NetworkServiceRegistryClient(ctx).Find(findCtx, nsQuery, opts...) + if err != nil { + return + } + + for nsResp, err := stream.Recv(); err == nil; nsResp, err = stream.Recv() { + if nsResp.NetworkService.Name != nsQuery.NetworkService.Name { + continue + } + if nsResp.Deleted { + break + } + + entry.Update(nsResp.NetworkService) + } + }() +} + +func (q *queryCacheNSClient) Unregister(ctx context.Context, in *registry.NetworkService, opts ...grpc.CallOption) (*empty.Empty, error) { + return next.NetworkServiceRegistryClient(ctx).Unregister(ctx, in, opts...) +} diff --git a/pkg/registry/common/querycache/nse_cache.go b/pkg/registry/common/querycache/nse_cache.go new file mode 100644 index 000000000..cc6c07ab2 --- /dev/null +++ b/pkg/registry/common/querycache/nse_cache.go @@ -0,0 +1,145 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// Copyright (c) 2023 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querycache + +import ( + "context" + "sync" + "time" + + "github.com/edwarnicke/genericsync" + + "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/networkservicemesh/sdk/pkg/tools/clock" + "github.com/networkservicemesh/sdk/pkg/tools/log" +) + +type nseCache struct { + expireTimeout time.Duration + entries genericsync.Map[string, *cacheEntry[registry.NetworkServiceEndpoint]] + clockTime clock.Clock +} + +func newNSECache(ctx context.Context, opts ...Option) *nseCache { + c := &nseCache{ + expireTimeout: time.Minute, + clockTime: clock.FromContext(ctx), + } + + // for _, opt := range opts { + // opt(c) + // } + + ticker := c.clockTime.Ticker(c.expireTimeout) + go func() { + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C(): + c.entries.Range(func(_ string, e *cacheEntry[registry.NetworkServiceEndpoint]) bool { + e.lock.Lock() + defer e.lock.Unlock() + + if c.clockTime.Until(e.expirationTime) < 0 { + e.cleanup() + } + return true + }) + } + } + }() + + return c +} + +func (c *nseCache) LoadOrStore(value *registry.NetworkServiceEndpoint, cancel context.CancelFunc) (*cacheEntry[registry.NetworkServiceEndpoint], bool) { + var once sync.Once + + entry, ok := c.entries.LoadOrStore(value.GetName(), &cacheEntry[registry.NetworkServiceEndpoint]{ + value: value, + expirationTime: c.clockTime.Now().Add(c.expireTimeout), + cleanup: func() { + once.Do(func() { + c.entries.Delete(value.GetName()) + cancel() + }) + }}) + + return entry, ok +} + +func (c *nseCache) add(entry *cacheEntry[registry.NetworkServiceEndpoint], values []*registry.NetworkServiceEndpoint) []*registry.NetworkServiceEndpoint { + entry.lock.Lock() + defer entry.lock.Unlock() + if c.clockTime.Until(entry.expirationTime) < 0 { + entry.cleanup() + } else { + entry.expirationTime = c.clockTime.Now().Add(c.expireTimeout) + values = append(values, entry.value) + } + + return values +} + +// Checks if a is a subset of b +func subset(a, b []string) bool { + set := make(map[string]struct{}) + for _, value := range a { + set[value] = struct{}{} + } + + for _, value := range b { + if _, found := set[value]; !found { + return false + } + } + + return true +} + +func (c *nseCache) Load(ctx context.Context, query *registry.NetworkServiceEndpointQuery) []*registry.NetworkServiceEndpoint { + values := make([]*registry.NetworkServiceEndpoint, 0) + + log.FromContext(ctx).WithField("time", time.Now()).Infof("query: %v\n", query) + + if query.NetworkServiceEndpoint.Name != "" { + entry, ok := c.entries.Load(query.NetworkServiceEndpoint.Name) + if ok { + values = c.add(entry, values) + } + return values + } + + log.FromContext(ctx).WithField("time", time.Now()).Infof("Range") + c.entries.Range(func(key string, entry *cacheEntry[registry.NetworkServiceEndpoint]) bool { + log.FromContext(ctx).WithField("time", time.Now()).Infof("key: %v\n", key) + log.FromContext(ctx).WithField("time", time.Now()).Infof("entry.value: %v\n", entry.value) + if subset(query.NetworkServiceEndpoint.NetworkServiceNames, entry.value.NetworkServiceNames) { + log.FromContext(ctx).WithField("time", time.Now()).Infof("adding entry to nses\n") + values = c.add(entry, values) + } + return true + }) + + log.FromContext(ctx).WithField("time", time.Now()).Infof("values: %v\n", values) + + return values +} diff --git a/pkg/registry/common/querycache/nse_client.go b/pkg/registry/common/querycache/nse_client.go index efc37f204..b7a7797af 100644 --- a/pkg/registry/common/querycache/nse_client.go +++ b/pkg/registry/common/querycache/nse_client.go @@ -27,18 +27,19 @@ import ( "github.com/networkservicemesh/sdk/pkg/registry/core/next" "github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel" + "github.com/networkservicemesh/sdk/pkg/tools/log" ) type queryCacheNSEClient struct { ctx context.Context - cache *cache + cache *nseCache } -// NewClient creates new querycache NSE registry client that caches all resolved NSEs -func NewClient(ctx context.Context, opts ...Option) registry.NetworkServiceEndpointRegistryClient { +// NewNetworkServiceEndpointClient creates new querycache NSE registry client that caches all resolved NSEs +func NewNetworkServiceEndpointClient(ctx context.Context, opts ...Option) registry.NetworkServiceEndpointRegistryClient { return &queryCacheNSEClient{ ctx: ctx, - cache: newCache(ctx, opts...), + cache: newNSECache(ctx, opts...), } } @@ -47,14 +48,19 @@ func (q *queryCacheNSEClient) Register(ctx context.Context, nse *registry.Networ } func (q *queryCacheNSEClient) Find(ctx context.Context, query *registry.NetworkServiceEndpointQuery, opts ...grpc.CallOption) (registry.NetworkServiceEndpointRegistry_FindClient, error) { + log.FromContext(ctx).Infof("queryCacheNSEClient forth") if query.Watch { return next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...) } - if client, ok := q.findInCache(ctx, query.String()); ok { + log.FromContext(ctx).Info("queryCacheNSEClient search in cache") + if client, ok := q.findInCache(ctx, query); ok { + log.FromContext(ctx).Info("queryCacheNSEClient found in cache") return client, nil } + log.FromContext(ctx).Info("queryCacheNSEClient not found in cache") + client, err := next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...) if err != nil { return nil, err @@ -72,14 +78,26 @@ func (q *queryCacheNSEClient) Find(ctx context.Context, query *registry.NetworkS return streamchannel.NewNetworkServiceEndpointFindClient(ctx, resultCh), nil } -func (q *queryCacheNSEClient) findInCache(ctx context.Context, key string) (registry.NetworkServiceEndpointRegistry_FindClient, bool) { - nse, ok := q.cache.Load(key) - if !ok { +func (q *queryCacheNSEClient) findInCache(ctx context.Context, query *registry.NetworkServiceEndpointQuery) (registry.NetworkServiceEndpointRegistry_FindClient, bool) { + log.FromContext(ctx).Infof("queryCacheNSEClient checking key: %v", query.NetworkServiceEndpoint) + + q.cache.entries.Range(func(key string, value *cacheEntry[registry.NetworkServiceEndpoint]) bool { + log.FromContext(ctx).Infof("Entries: %v, %v", key, value.value) + + return true + }) + + nses := q.cache.Load(ctx, query) + if len(nses) == 0 { return nil, false } - resultCh := make(chan *registry.NetworkServiceEndpointResponse, 1) - resultCh <- ®istry.NetworkServiceEndpointResponse{NetworkServiceEndpoint: nse.Clone()} + log.FromContext(ctx).Infof("found NSEs in cache: %v", nses) + + resultCh := make(chan *registry.NetworkServiceEndpointResponse, len(nses)) + for _, nse := range nses { + resultCh <- ®istry.NetworkServiceEndpointResponse{NetworkServiceEndpoint: nse.Clone()} + } close(resultCh) return streamchannel.NewNetworkServiceEndpointFindClient(ctx, resultCh), true @@ -92,11 +110,9 @@ func (q *queryCacheNSEClient) storeInCache(ctx context.Context, nse *registry.Ne }, } - key := nseQuery.String() - findCtx, cancel := context.WithCancel(q.ctx) - entry, loaded := q.cache.LoadOrStore(key, nse, cancel) + entry, loaded := q.cache.LoadOrStore(nse, cancel) if loaded { cancel() return diff --git a/pkg/registry/common/querycache/nse_client_test.go b/pkg/registry/common/querycache/nse_client_test.go index 516a911ba..10a5c8800 100644 --- a/pkg/registry/common/querycache/nse_client_test.go +++ b/pkg/registry/common/querycache/nse_client_test.go @@ -64,7 +64,7 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { failureClient := new(failureNSEClient) c := next.NewNetworkServiceEndpointRegistryClient( - querycache.NewClient(ctx, querycache.WithExpireTimeout(expireTimeout)), + querycache.NewNetworkServiceEndpointClient(ctx, querycache.WithExpireTimeout(expireTimeout)), failureClient, adapters.NetworkServiceEndpointServerToClient(mem), ) @@ -142,7 +142,7 @@ func Test_QueryCacheClient_ShouldCleanUpOnTimeout(t *testing.T) { failureClient := new(failureNSEClient) c := next.NewNetworkServiceEndpointRegistryClient( - querycache.NewClient(ctx, querycache.WithExpireTimeout(expireTimeout)), + querycache.NewNetworkServiceEndpointClient(ctx, querycache.WithExpireTimeout(expireTimeout)), failureClient, adapters.NetworkServiceEndpointServerToClient(mem), ) diff --git a/pkg/registry/common/querycache/option.go b/pkg/registry/common/querycache/option.go index f70220737..fd1288f48 100644 --- a/pkg/registry/common/querycache/option.go +++ b/pkg/registry/common/querycache/option.go @@ -19,11 +19,11 @@ package querycache import "time" // Option is an option for cache -type Option func(c *cache) +type Option func(c *nsCache) // WithExpireTimeout sets cache expire timeout func WithExpireTimeout(expireTimeout time.Duration) Option { - return func(c *cache) { + return func(c *nsCache) { c.expireTimeout = expireTimeout } } From 53a4ed0629a9a85b753fa5020e97729e9ca3d538 Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Mon, 7 Oct 2024 14:57:05 +0700 Subject: [PATCH 2/5] add unit tests + cleanup Signed-off-by: NikitaSkrynnik --- pkg/registry/common/querycache/ns_cache.go | 2 +- pkg/registry/common/querycache/ns_client.go | 14 +- .../common/querycache/ns_client_test.go | 197 ++++++++++++++++++ pkg/registry/common/querycache/nse_cache.go | 17 +- pkg/registry/common/querycache/nse_client.go | 32 +-- .../common/querycache/nse_client_test.go | 23 +- pkg/registry/common/querycache/option.go | 18 +- 7 files changed, 233 insertions(+), 70 deletions(-) create mode 100644 pkg/registry/common/querycache/ns_client_test.go diff --git a/pkg/registry/common/querycache/ns_cache.go b/pkg/registry/common/querycache/ns_cache.go index f9606afbf..d80463735 100644 --- a/pkg/registry/common/querycache/ns_cache.go +++ b/pkg/registry/common/querycache/ns_cache.go @@ -33,7 +33,7 @@ type nsCache struct { clockTime clock.Clock } -func newNSCache(ctx context.Context, opts ...Option) *nsCache { +func newNSCache(ctx context.Context, opts ...NSCacheOption) *nsCache { c := &nsCache{ expireTimeout: time.Minute, clockTime: clock.FromContext(ctx), diff --git a/pkg/registry/common/querycache/ns_client.go b/pkg/registry/common/querycache/ns_client.go index 93b61f647..5273f8fcd 100644 --- a/pkg/registry/common/querycache/ns_client.go +++ b/pkg/registry/common/querycache/ns_client.go @@ -19,7 +19,6 @@ package querycache import ( "context" - "time" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc" @@ -28,7 +27,6 @@ import ( "github.com/networkservicemesh/sdk/pkg/registry/core/next" "github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel" - "github.com/networkservicemesh/sdk/pkg/tools/log" ) type queryCacheNSClient struct { @@ -37,7 +35,7 @@ type queryCacheNSClient struct { } // NewNetworkServiceClient creates new querycache NS registry client that caches all resolved NSs -func NewNetworkServiceClient(ctx context.Context, opts ...Option) registry.NetworkServiceRegistryClient { +func NewNetworkServiceClient(ctx context.Context, opts ...NSCacheOption) registry.NetworkServiceRegistryClient { return &queryCacheNSClient{ ctx: ctx, cache: newNSCache(ctx, opts...), @@ -49,19 +47,14 @@ func (q *queryCacheNSClient) Register(ctx context.Context, nse *registry.Network } func (q *queryCacheNSClient) Find(ctx context.Context, query *registry.NetworkServiceQuery, opts ...grpc.CallOption) (registry.NetworkServiceRegistry_FindClient, error) { - log.FromContext(ctx).WithField("time", time.Now()).Infof("queryCacheNSClient forth") if query.Watch { return next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...) } - log.FromContext(ctx).WithField("time", time.Now()).Info("queryCacheNSClient search in cache") if client, ok := q.findInCache(ctx, query); ok { - log.FromContext(ctx).Info("queryCacheNSClient found in cache") return client, nil } - log.FromContext(ctx).WithField("time", time.Now()).Info("queryCacheNSClient not found in cache") - client, err := next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...) if err != nil { return nil, err @@ -80,14 +73,11 @@ func (q *queryCacheNSClient) Find(ctx context.Context, query *registry.NetworkSe } func (q *queryCacheNSClient) findInCache(ctx context.Context, query *registry.NetworkServiceQuery) (registry.NetworkServiceRegistry_FindClient, bool) { - log.FromContext(ctx).WithField("time", time.Now()).Infof("queryCacheNSClient checking key: %v", query.NetworkService) ns := q.cache.Load(ctx, query.NetworkService) if ns == nil { return nil, false } - log.FromContext(ctx).WithField("time", time.Now()).Infof("found NS in cache: %v", ns) - resultCh := make(chan *registry.NetworkServiceResponse, 1) resultCh <- ®istry.NetworkServiceResponse{NetworkService: ns.Clone()} close(resultCh) @@ -103,7 +93,6 @@ func (q *queryCacheNSClient) storeInCache(ctx context.Context, ns *registry.Netw } findCtx, cancel := context.WithCancel(q.ctx) - entry, loaded := q.cache.LoadOrStore(ns, cancel) if loaded { cancel() @@ -114,7 +103,6 @@ func (q *queryCacheNSClient) storeInCache(ctx context.Context, ns *registry.Netw defer entry.Cleanup() nsQuery.Watch = true - stream, err := next.NetworkServiceRegistryClient(ctx).Find(findCtx, nsQuery, opts...) if err != nil { return diff --git a/pkg/registry/common/querycache/ns_client_test.go b/pkg/registry/common/querycache/ns_client_test.go new file mode 100644 index 000000000..2bcca942d --- /dev/null +++ b/pkg/registry/common/querycache/ns_client_test.go @@ -0,0 +1,197 @@ +// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. +// +// SPDX-Licens-Identifier: Apache-2.0 +// +// Licensd under the Apache Licens, Version 2.0 (the "Licens"); +// you may not use this file except in compliance with the Licens. +// You may obtain a copy of the Licens at: +// +// http://www.apache.org/licenss/LICEns-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Licens is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the Licens for the specific language governing permissions and +// limitations under the Licens. + +package querycache_test + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/registry/common/memory" + "github.com/networkservicemesh/sdk/pkg/registry/common/querycache" + "github.com/networkservicemesh/sdk/pkg/registry/core/adapters" + "github.com/networkservicemesh/sdk/pkg/registry/core/next" + "github.com/networkservicemesh/sdk/pkg/tools/clock" + "github.com/networkservicemesh/sdk/pkg/tools/clockmock" +) + +const ( + payload1 = "ethernet" + payload2 = "ip" +) + +func testNSQuery(nsName string) *registry.NetworkServiceQuery { + return ®istry.NetworkServiceQuery{ + NetworkService: ®istry.NetworkService{ + Name: nsName, + }, + } +} + +func Test_QueryCacheClient_ShouldCacheNSs(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mem := memory.NewNetworkServiceRegistryServer() + + failureClient := new(failureNSClient) + c := next.NewNetworkServiceRegistryClient( + querycache.NewNetworkServiceClient(ctx, querycache.WithNSExpireTimeout(expireTimeout)), + failureClient, + adapters.NetworkServiceServerToClient(mem), + ) + + reg, err := mem.Register(ctx, ®istry.NetworkService{ + Name: name, + Payload: payload1, + }) + require.NoError(t, err) + + // Goroutines should be cleaned up on ns unregister + t.Cleanup(func() { goleak.VerifyNone(t) }) + + // 1. Find from memory + atomic.StoreInt32(&failureClient.shouldFail, 0) + + stream, err := c.Find(ctx, testNSQuery("")) + require.NoError(t, err) + nsResp, err := stream.Recv() + require.NoError(t, err) + require.Equal(t, name, nsResp.NetworkService.Name) + + // 2. Find from cache + atomic.StoreInt32(&failureClient.shouldFail, 1) + + stream, err = c.Find(ctx, testNSQuery(name)) + require.NoError(t, err) + nsResp, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, name, nsResp.NetworkService.Name) + + // 3. Update NS in memory + reg.Payload = payload2 + reg, err = mem.Register(ctx, reg) + require.NoError(t, err) + + require.Eventually(t, func() bool { + if stream, err = c.Find(ctx, testNSQuery(name)); err != nil { + return false + } + if nsResp, err = stream.Recv(); err != nil { + return false + } + return name == nsResp.NetworkService.Name && payload2 == nsResp.NetworkService.Payload + }, testWait, testTick) + + // 4. Delete ns from memory + _, err = mem.Unregister(ctx, reg) + require.NoError(t, err) + + require.Eventually(t, func() bool { + _, err = c.Find(ctx, testNSQuery(name)) + return err != nil + }, testWait, testTick) +} + +func Test_QueryCacheClient_ShouldCleanUpNSOnTimeout(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clockMock := clockmock.New(ctx) + ctx = clock.WithClock(ctx, clockMock) + + mem := memory.NewNetworkServiceRegistryServer() + + failureClient := new(failureNSClient) + c := next.NewNetworkServiceRegistryClient( + querycache.NewNetworkServiceClient(ctx, querycache.WithNSExpireTimeout(expireTimeout)), + failureClient, + adapters.NetworkServiceServerToClient(mem), + ) + + _, err := mem.Register(ctx, ®istry.NetworkService{ + Name: name, + }) + require.NoError(t, err) + + // Goroutines should be cleaned up on cache entry expiration + t.Cleanup(func() { goleak.VerifyNone(t) }) + + // 1. Find from memory + atomic.StoreInt32(&failureClient.shouldFail, 0) + + stream, err := c.Find(ctx, testNSQuery("")) + require.NoError(t, err) + + _, err = stream.Recv() + require.NoError(t, err) + + // 2. Find from cache + atomic.StoreInt32(&failureClient.shouldFail, 1) + + require.Eventually(t, func() bool { + if stream, err = c.Find(ctx, testNSQuery(name)); err == nil { + _, err = stream.Recv() + } + return err == nil + }, testWait, testTick) + + // 3. Keep finding from cache to prevent expiration + for start := clockMock.Now(); clockMock.Since(start) < 2*expireTimeout; clockMock.Add(expireTimeout / 3) { + stream, err = c.Find(ctx, testNSQuery(name)) + require.NoError(t, err) + + _, err = stream.Recv() + require.NoError(t, err) + } + + // 4. Wait for the expire to happen + clockMock.Add(expireTimeout) + + _, err = c.Find(ctx, testNSQuery(name)) + require.Errorf(t, err, "find error") +} + +type failureNSClient struct { + shouldFail int32 +} + +func (c *failureNSClient) Register(ctx context.Context, ns *registry.NetworkService, opts ...grpc.CallOption) (*registry.NetworkService, error) { + return next.NetworkServiceRegistryClient(ctx).Register(ctx, ns, opts...) +} + +func (c *failureNSClient) Find(ctx context.Context, query *registry.NetworkServiceQuery, opts ...grpc.CallOption) (registry.NetworkServiceRegistry_FindClient, error) { + if atomic.LoadInt32(&c.shouldFail) == 1 && !query.Watch { + return nil, errors.New("find error") + } + return next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...) +} + +func (c *failureNSClient) Unregister(ctx context.Context, ns *registry.NetworkService, opts ...grpc.CallOption) (*empty.Empty, error) { + return next.NetworkServiceRegistryClient(ctx).Unregister(ctx, ns, opts...) +} diff --git a/pkg/registry/common/querycache/nse_cache.go b/pkg/registry/common/querycache/nse_cache.go index cc6c07ab2..4de65314e 100644 --- a/pkg/registry/common/querycache/nse_cache.go +++ b/pkg/registry/common/querycache/nse_cache.go @@ -27,7 +27,6 @@ import ( "github.com/networkservicemesh/api/pkg/api/registry" "github.com/networkservicemesh/sdk/pkg/tools/clock" - "github.com/networkservicemesh/sdk/pkg/tools/log" ) type nseCache struct { @@ -36,15 +35,15 @@ type nseCache struct { clockTime clock.Clock } -func newNSECache(ctx context.Context, opts ...Option) *nseCache { +func newNSECache(ctx context.Context, opts ...NSECacheOption) *nseCache { c := &nseCache{ expireTimeout: time.Minute, clockTime: clock.FromContext(ctx), } - // for _, opt := range opts { - // opt(c) - // } + for _, opt := range opts { + opt(c) + } ticker := c.clockTime.Ticker(c.expireTimeout) go func() { @@ -118,8 +117,6 @@ func subset(a, b []string) bool { func (c *nseCache) Load(ctx context.Context, query *registry.NetworkServiceEndpointQuery) []*registry.NetworkServiceEndpoint { values := make([]*registry.NetworkServiceEndpoint, 0) - log.FromContext(ctx).WithField("time", time.Now()).Infof("query: %v\n", query) - if query.NetworkServiceEndpoint.Name != "" { entry, ok := c.entries.Load(query.NetworkServiceEndpoint.Name) if ok { @@ -128,18 +125,12 @@ func (c *nseCache) Load(ctx context.Context, query *registry.NetworkServiceEndpo return values } - log.FromContext(ctx).WithField("time", time.Now()).Infof("Range") c.entries.Range(func(key string, entry *cacheEntry[registry.NetworkServiceEndpoint]) bool { - log.FromContext(ctx).WithField("time", time.Now()).Infof("key: %v\n", key) - log.FromContext(ctx).WithField("time", time.Now()).Infof("entry.value: %v\n", entry.value) if subset(query.NetworkServiceEndpoint.NetworkServiceNames, entry.value.NetworkServiceNames) { - log.FromContext(ctx).WithField("time", time.Now()).Infof("adding entry to nses\n") values = c.add(entry, values) } return true }) - log.FromContext(ctx).WithField("time", time.Now()).Infof("values: %v\n", values) - return values } diff --git a/pkg/registry/common/querycache/nse_client.go b/pkg/registry/common/querycache/nse_client.go index b7a7797af..b274af1b7 100644 --- a/pkg/registry/common/querycache/nse_client.go +++ b/pkg/registry/common/querycache/nse_client.go @@ -27,7 +27,6 @@ import ( "github.com/networkservicemesh/sdk/pkg/registry/core/next" "github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel" - "github.com/networkservicemesh/sdk/pkg/tools/log" ) type queryCacheNSEClient struct { @@ -36,7 +35,7 @@ type queryCacheNSEClient struct { } // NewNetworkServiceEndpointClient creates new querycache NSE registry client that caches all resolved NSEs -func NewNetworkServiceEndpointClient(ctx context.Context, opts ...Option) registry.NetworkServiceEndpointRegistryClient { +func NewNetworkServiceEndpointClient(ctx context.Context, opts ...NSECacheOption) registry.NetworkServiceEndpointRegistryClient { return &queryCacheNSEClient{ ctx: ctx, cache: newNSECache(ctx, opts...), @@ -48,19 +47,14 @@ func (q *queryCacheNSEClient) Register(ctx context.Context, nse *registry.Networ } func (q *queryCacheNSEClient) Find(ctx context.Context, query *registry.NetworkServiceEndpointQuery, opts ...grpc.CallOption) (registry.NetworkServiceEndpointRegistry_FindClient, error) { - log.FromContext(ctx).Infof("queryCacheNSEClient forth") if query.Watch { return next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...) } - log.FromContext(ctx).Info("queryCacheNSEClient search in cache") if client, ok := q.findInCache(ctx, query); ok { - log.FromContext(ctx).Info("queryCacheNSEClient found in cache") return client, nil } - log.FromContext(ctx).Info("queryCacheNSEClient not found in cache") - client, err := next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...) if err != nil { return nil, err @@ -79,21 +73,11 @@ func (q *queryCacheNSEClient) Find(ctx context.Context, query *registry.NetworkS } func (q *queryCacheNSEClient) findInCache(ctx context.Context, query *registry.NetworkServiceEndpointQuery) (registry.NetworkServiceEndpointRegistry_FindClient, bool) { - log.FromContext(ctx).Infof("queryCacheNSEClient checking key: %v", query.NetworkServiceEndpoint) - - q.cache.entries.Range(func(key string, value *cacheEntry[registry.NetworkServiceEndpoint]) bool { - log.FromContext(ctx).Infof("Entries: %v, %v", key, value.value) - - return true - }) - nses := q.cache.Load(ctx, query) if len(nses) == 0 { return nil, false } - log.FromContext(ctx).Infof("found NSEs in cache: %v", nses) - resultCh := make(chan *registry.NetworkServiceEndpointResponse, len(nses)) for _, nse := range nses { resultCh <- ®istry.NetworkServiceEndpointResponse{NetworkServiceEndpoint: nse.Clone()} @@ -104,14 +88,7 @@ func (q *queryCacheNSEClient) findInCache(ctx context.Context, query *registry.N } func (q *queryCacheNSEClient) storeInCache(ctx context.Context, nse *registry.NetworkServiceEndpoint, opts ...grpc.CallOption) { - nseQuery := ®istry.NetworkServiceEndpointQuery{ - NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ - Name: nse.Name, - }, - } - findCtx, cancel := context.WithCancel(q.ctx) - entry, loaded := q.cache.LoadOrStore(nse, cancel) if loaded { cancel() @@ -121,7 +98,12 @@ func (q *queryCacheNSEClient) storeInCache(ctx context.Context, nse *registry.Ne go func() { defer entry.Cleanup() - nseQuery.Watch = true + nseQuery := ®istry.NetworkServiceEndpointQuery{ + NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ + Name: nse.Name, + }, + Watch: true, + } stream, err := next.NetworkServiceEndpointRegistryClient(ctx).Find(findCtx, nseQuery, opts...) if err != nil { diff --git a/pkg/registry/common/querycache/nse_client_test.go b/pkg/registry/common/querycache/nse_client_test.go index 10a5c8800..ba31cb736 100644 --- a/pkg/registry/common/querycache/nse_client_test.go +++ b/pkg/registry/common/querycache/nse_client_test.go @@ -64,7 +64,7 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { failureClient := new(failureNSEClient) c := next.NewNetworkServiceEndpointRegistryClient( - querycache.NewNetworkServiceEndpointClient(ctx, querycache.WithExpireTimeout(expireTimeout)), + querycache.NewNetworkServiceEndpointClient(ctx, querycache.WithNSEExpireTimeout(expireTimeout)), failureClient, adapters.NetworkServiceEndpointServerToClient(mem), ) @@ -86,26 +86,21 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { nseResp, err := stream.Recv() require.NoError(t, err) - require.Equal(t, name, nseResp.NetworkServiceEndpoint.Name) require.Equal(t, url1, nseResp.NetworkServiceEndpoint.Url) // 2. Find from cache atomic.StoreInt32(&failureClient.shouldFail, 1) - require.Eventually(t, func() bool { - if stream, err = c.Find(ctx, testNSEQuery(name)); err != nil { - return false - } - if nseResp, err = stream.Recv(); err != nil { - return false - } - return name == nseResp.NetworkServiceEndpoint.Name && url1 == nseResp.NetworkServiceEndpoint.Url - }, testWait, testTick) + stream, err = c.Find(ctx, testNSEQuery(name)) + require.NoError(t, err) + nseResp, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, name, nseResp.NetworkServiceEndpoint.Name) + require.Equal(t, url1, nseResp.NetworkServiceEndpoint.Url) // 3. Update NSE in memory reg.Url = url2 - reg, err = mem.Register(ctx, reg) require.NoError(t, err) @@ -129,7 +124,7 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { }, testWait, testTick) } -func Test_QueryCacheClient_ShouldCleanUpOnTimeout(t *testing.T) { +func Test_QueryCacheClient_ShouldCleanUpNSEOnTimeout(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) ctx, cancel := context.WithCancel(context.Background()) @@ -142,7 +137,7 @@ func Test_QueryCacheClient_ShouldCleanUpOnTimeout(t *testing.T) { failureClient := new(failureNSEClient) c := next.NewNetworkServiceEndpointRegistryClient( - querycache.NewNetworkServiceEndpointClient(ctx, querycache.WithExpireTimeout(expireTimeout)), + querycache.NewNetworkServiceEndpointClient(ctx, querycache.WithNSEExpireTimeout(expireTimeout)), failureClient, adapters.NetworkServiceEndpointServerToClient(mem), ) diff --git a/pkg/registry/common/querycache/option.go b/pkg/registry/common/querycache/option.go index fd1288f48..b09fa7804 100644 --- a/pkg/registry/common/querycache/option.go +++ b/pkg/registry/common/querycache/option.go @@ -18,12 +18,22 @@ package querycache import "time" -// Option is an option for cache -type Option func(c *nsCache) +// NSCacheOption is an option for NS cache +type NSCacheOption func(c *nsCache) -// WithExpireTimeout sets cache expire timeout -func WithExpireTimeout(expireTimeout time.Duration) Option { +// NSECacheOption is an option for NSE cache +type NSECacheOption func(c *nseCache) + +// WithNSExpireTimeout sets NS cache expire timeout +func WithNSExpireTimeout(expireTimeout time.Duration) NSCacheOption { return func(c *nsCache) { c.expireTimeout = expireTimeout } } + +// WithNSEExpireTimeout sets NSE cache expire timeout +func WithNSEExpireTimeout(expireTimeout time.Duration) NSECacheOption { + return func(c *nseCache) { + c.expireTimeout = expireTimeout + } +} From 32790a295583e7f505104b4ddd100d04647b354e Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Wed, 9 Oct 2024 13:32:13 +0700 Subject: [PATCH 3/5] fix linter issues Signed-off-by: NikitaSkrynnik --- pkg/registry/common/querycache/ns_cache.go | 1 + pkg/registry/common/querycache/ns_client.go | 2 +- .../common/querycache/ns_client_test.go | 18 +++++++++--------- pkg/registry/common/querycache/nse_cache.go | 3 ++- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/pkg/registry/common/querycache/ns_cache.go b/pkg/registry/common/querycache/ns_cache.go index d80463735..6fb6a02e1 100644 --- a/pkg/registry/common/querycache/ns_cache.go +++ b/pkg/registry/common/querycache/ns_cache.go @@ -24,6 +24,7 @@ import ( "github.com/edwarnicke/genericsync" "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/networkservicemesh/sdk/pkg/tools/clock" ) diff --git a/pkg/registry/common/querycache/ns_client.go b/pkg/registry/common/querycache/ns_client.go index 5273f8fcd..c3ab9f582 100644 --- a/pkg/registry/common/querycache/ns_client.go +++ b/pkg/registry/common/querycache/ns_client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. +// Copyright (c) 2024 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/pkg/registry/common/querycache/ns_client_test.go b/pkg/registry/common/querycache/ns_client_test.go index 2bcca942d..303de22c6 100644 --- a/pkg/registry/common/querycache/ns_client_test.go +++ b/pkg/registry/common/querycache/ns_client_test.go @@ -1,18 +1,18 @@ -// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. +// Copyright (c) 2024 Cisco and/or its affiliates. // -// SPDX-Licens-Identifier: Apache-2.0 +// SPDX-License-Identifier: Apache-2.0 // -// Licensd under the Apache Licens, Version 2.0 (the "Licens"); -// you may not use this file except in compliance with the Licens. -// You may obtain a copy of the Licens at: +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: // -// http://www.apache.org/licenss/LICEns-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software -// distributed under the Licens is distributed on an "AS IS" BASIS, +// distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the Licens for the specific language governing permissions and -// limitations under the Licens. +// See the License for the specific language governing permissions and +// limitations under the License. package querycache_test diff --git a/pkg/registry/common/querycache/nse_cache.go b/pkg/registry/common/querycache/nse_cache.go index 4de65314e..fa7f9546a 100644 --- a/pkg/registry/common/querycache/nse_cache.go +++ b/pkg/registry/common/querycache/nse_cache.go @@ -1,6 +1,6 @@ // Copyright (c) 2021 Doc.ai and/or its affiliates. // -// Copyright (c) 2023 Cisco and/or its affiliates. +// Copyright (c) 2023-2024 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -26,6 +26,7 @@ import ( "github.com/edwarnicke/genericsync" "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/networkservicemesh/sdk/pkg/tools/clock" ) From 8d700e5fb57b5ba9f04f065b5404856fedd1a8fd Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Thu, 10 Oct 2024 16:18:48 +0700 Subject: [PATCH 4/5] use interface{} instead of generics Signed-off-by: NikitaSkrynnik --- pkg/registry/common/querycache/ns_cache.go | 21 ++++++++++++--------- pkg/registry/common/querycache/nse_cache.go | 20 ++++++++++++-------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/pkg/registry/common/querycache/ns_cache.go b/pkg/registry/common/querycache/ns_cache.go index 6fb6a02e1..dc06ce306 100644 --- a/pkg/registry/common/querycache/ns_cache.go +++ b/pkg/registry/common/querycache/ns_cache.go @@ -30,7 +30,7 @@ import ( type nsCache struct { expireTimeout time.Duration - entries genericsync.Map[string, *cacheEntry[registry.NetworkService]] + entries genericsync.Map[string, *cacheEntry] clockTime clock.Clock } @@ -52,7 +52,7 @@ func newNSCache(ctx context.Context, opts ...NSCacheOption) *nsCache { ticker.Stop() return case <-ticker.C(): - c.entries.Range(func(_ string, e *cacheEntry[registry.NetworkService]) bool { + c.entries.Range(func(_ string, e *cacheEntry) bool { e.lock.Lock() defer e.lock.Unlock() @@ -68,10 +68,10 @@ func newNSCache(ctx context.Context, opts ...NSCacheOption) *nsCache { return c } -func (c *nsCache) LoadOrStore(value *registry.NetworkService, cancel context.CancelFunc) (*cacheEntry[registry.NetworkService], bool) { +func (c *nsCache) LoadOrStore(value *registry.NetworkService, cancel context.CancelFunc) (*cacheEntry, bool) { var once sync.Once - entry, ok := c.entries.LoadOrStore(value.GetName(), &cacheEntry[registry.NetworkService]{ + entry, ok := c.entries.LoadOrStore(value.GetName(), &cacheEntry{ value: value, expirationTime: c.clockTime.Now().Add(c.expireTimeout), cleanup: func() { @@ -93,28 +93,31 @@ func (c *nsCache) Load(ctx context.Context, query *registry.NetworkService) *reg entry.cleanup() } else { entry.expirationTime = c.clockTime.Now().Add(c.expireTimeout) - return entry.value + ns, ok := entry.value.(*registry.NetworkService) + if ok { + return ns + } } } return nil } -type cacheEntry[T registry.NetworkService | registry.NetworkServiceEndpoint] struct { - value *T +type cacheEntry struct { + value interface{} expirationTime time.Time lock sync.Mutex cleanup func() } -func (e *cacheEntry[T]) Update(value *T) { +func (e *cacheEntry) Update(value interface{}) { e.lock.Lock() defer e.lock.Unlock() e.value = value } -func (e *cacheEntry[_]) Cleanup() { +func (e *cacheEntry) Cleanup() { e.lock.Lock() defer e.lock.Unlock() diff --git a/pkg/registry/common/querycache/nse_cache.go b/pkg/registry/common/querycache/nse_cache.go index fa7f9546a..a3fc36040 100644 --- a/pkg/registry/common/querycache/nse_cache.go +++ b/pkg/registry/common/querycache/nse_cache.go @@ -32,7 +32,7 @@ import ( type nseCache struct { expireTimeout time.Duration - entries genericsync.Map[string, *cacheEntry[registry.NetworkServiceEndpoint]] + entries genericsync.Map[string, *cacheEntry] clockTime clock.Clock } @@ -54,7 +54,7 @@ func newNSECache(ctx context.Context, opts ...NSECacheOption) *nseCache { ticker.Stop() return case <-ticker.C(): - c.entries.Range(func(_ string, e *cacheEntry[registry.NetworkServiceEndpoint]) bool { + c.entries.Range(func(_ string, e *cacheEntry) bool { e.lock.Lock() defer e.lock.Unlock() @@ -70,10 +70,10 @@ func newNSECache(ctx context.Context, opts ...NSECacheOption) *nseCache { return c } -func (c *nseCache) LoadOrStore(value *registry.NetworkServiceEndpoint, cancel context.CancelFunc) (*cacheEntry[registry.NetworkServiceEndpoint], bool) { +func (c *nseCache) LoadOrStore(value *registry.NetworkServiceEndpoint, cancel context.CancelFunc) (*cacheEntry, bool) { var once sync.Once - entry, ok := c.entries.LoadOrStore(value.GetName(), &cacheEntry[registry.NetworkServiceEndpoint]{ + entry, ok := c.entries.LoadOrStore(value.GetName(), &cacheEntry{ value: value, expirationTime: c.clockTime.Now().Add(c.expireTimeout), cleanup: func() { @@ -86,14 +86,17 @@ func (c *nseCache) LoadOrStore(value *registry.NetworkServiceEndpoint, cancel co return entry, ok } -func (c *nseCache) add(entry *cacheEntry[registry.NetworkServiceEndpoint], values []*registry.NetworkServiceEndpoint) []*registry.NetworkServiceEndpoint { +func (c *nseCache) add(entry *cacheEntry, values []*registry.NetworkServiceEndpoint) []*registry.NetworkServiceEndpoint { entry.lock.Lock() defer entry.lock.Unlock() if c.clockTime.Until(entry.expirationTime) < 0 { entry.cleanup() } else { entry.expirationTime = c.clockTime.Now().Add(c.expireTimeout) - values = append(values, entry.value) + nse, ok := entry.value.(*registry.NetworkServiceEndpoint) + if ok { + values = append(values, nse) + } } return values @@ -126,8 +129,9 @@ func (c *nseCache) Load(ctx context.Context, query *registry.NetworkServiceEndpo return values } - c.entries.Range(func(key string, entry *cacheEntry[registry.NetworkServiceEndpoint]) bool { - if subset(query.NetworkServiceEndpoint.NetworkServiceNames, entry.value.NetworkServiceNames) { + c.entries.Range(func(key string, entry *cacheEntry) bool { + nse, ok := entry.value.(*registry.NetworkServiceEndpoint) + if ok && subset(query.NetworkServiceEndpoint.NetworkServiceNames, nse.NetworkServiceNames) { values = c.add(entry, values) } return true From 7237a6127239cf2796ff6906ae67acb49660bc11 Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Thu, 10 Oct 2024 21:45:55 +0700 Subject: [PATCH 5/5] fix linter issues Signed-off-by: NikitaSkrynnik --- pkg/registry/common/querycache/nse_client.go | 2 ++ pkg/registry/common/querycache/nse_client_test.go | 2 ++ pkg/registry/common/querycache/option.go | 2 ++ 3 files changed, 6 insertions(+) diff --git a/pkg/registry/common/querycache/nse_client.go b/pkg/registry/common/querycache/nse_client.go index b274af1b7..f441d257c 100644 --- a/pkg/registry/common/querycache/nse_client.go +++ b/pkg/registry/common/querycache/nse_client.go @@ -1,5 +1,7 @@ // Copyright (c) 2020-2021 Doc.ai and/or its affiliates. // +// Copyright (c) 2024 Cisco and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/pkg/registry/common/querycache/nse_client_test.go b/pkg/registry/common/querycache/nse_client_test.go index ba31cb736..700ffa52c 100644 --- a/pkg/registry/common/querycache/nse_client_test.go +++ b/pkg/registry/common/querycache/nse_client_test.go @@ -1,5 +1,7 @@ // Copyright (c) 2020-2021 Doc.ai and/or its affiliates. // +// Copyright (c) 2024 Cisco and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/pkg/registry/common/querycache/option.go b/pkg/registry/common/querycache/option.go index b09fa7804..55338a20d 100644 --- a/pkg/registry/common/querycache/option.go +++ b/pkg/registry/common/querycache/option.go @@ -1,5 +1,7 @@ // Copyright (c) 2021 Doc.ai and/or its affiliates. // +// Copyright (c) 2024 Cisco and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License");