Skip to content

Commit

Permalink
add a cache for NS/NSE registry
Browse files Browse the repository at this point in the history
Signed-off-by: NikitaSkrynnik <[email protected]>
  • Loading branch information
NikitaSkrynnik committed Oct 7, 2024
1 parent 95c7ff7 commit f7ffef5
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco and/or its affiliates.
// Copyright (c) 2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -24,19 +22,19 @@ import (
"time"

"github.com/edwarnicke/genericsync"
"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/api/pkg/api/registry"

Check failure on line 26 in pkg/registry/common/querycache/ns_cache.go

View workflow job for this annotation

GitHub Actions / golangci-lint / golangci-lint

File is not `goimports`-ed with -local github.com/networkservicemesh/sdk (goimports)
"github.com/networkservicemesh/sdk/pkg/tools/clock"
)

type cache struct {
type nsCache struct {
expireTimeout time.Duration
entries genericsync.Map[string, *cacheEntry]
entries genericsync.Map[string, *cacheEntry[registry.NetworkService]]
clockTime clock.Clock
}

func newCache(ctx context.Context, opts ...Option) *cache {
c := &cache{
func newNSCache(ctx context.Context, opts ...Option) *nsCache {
c := &nsCache{
expireTimeout: time.Minute,
clockTime: clock.FromContext(ctx),
}
Expand All @@ -53,14 +51,13 @@ func newCache(ctx context.Context, opts ...Option) *cache {
ticker.Stop()
return
case <-ticker.C():
c.entries.Range(func(_ string, e *cacheEntry) bool {
c.entries.Range(func(_ string, e *cacheEntry[registry.NetworkService]) bool {
e.lock.Lock()
defer e.lock.Unlock()

if c.clockTime.Until(e.expirationTime) < 0 {
e.cleanup()
}

return true
})
}
Expand All @@ -70,54 +67,53 @@ func newCache(ctx context.Context, opts ...Option) *cache {
return c
}

func (c *cache) LoadOrStore(key string, nse *registry.NetworkServiceEndpoint, cancel context.CancelFunc) (*cacheEntry, bool) {
func (c *nsCache) LoadOrStore(value *registry.NetworkService, cancel context.CancelFunc) (*cacheEntry[registry.NetworkService], bool) {
var once sync.Once
return c.entries.LoadOrStore(key, &cacheEntry{
nse: nse,

entry, ok := c.entries.LoadOrStore(value.GetName(), &cacheEntry[registry.NetworkService]{
value: value,
expirationTime: c.clockTime.Now().Add(c.expireTimeout),
cleanup: func() {
once.Do(func() {
c.entries.Delete(key)
c.entries.Delete(value.GetName())
cancel()
})
},
})
}
}})

func (c *cache) Load(key string) (*registry.NetworkServiceEndpoint, bool) {
e, ok := c.entries.Load(key)
if !ok {
return nil, false
}

e.lock.Lock()
defer e.lock.Unlock()
return entry, ok
}

if c.clockTime.Until(e.expirationTime) < 0 {
e.cleanup()
return nil, false
func (c *nsCache) Load(ctx context.Context, query *registry.NetworkService) *registry.NetworkService {
entry, ok := c.entries.Load(query.Name)
if ok {
entry.lock.Lock()
defer entry.lock.Unlock()
if c.clockTime.Until(entry.expirationTime) < 0 {
entry.cleanup()
} else {
entry.expirationTime = c.clockTime.Now().Add(c.expireTimeout)
return entry.value
}
}

e.expirationTime = c.clockTime.Now().Add(c.expireTimeout)

return e.nse, true
return nil
}

type cacheEntry struct {
nse *registry.NetworkServiceEndpoint
type cacheEntry[T registry.NetworkService | registry.NetworkServiceEndpoint] struct {
value *T

Check failure on line 103 in pkg/registry/common/querycache/ns_cache.go

View workflow job for this annotation

GitHub Actions / golangci-lint / golangci-lint

`value` is unused (structcheck)
expirationTime time.Time

Check failure on line 104 in pkg/registry/common/querycache/ns_cache.go

View workflow job for this annotation

GitHub Actions / golangci-lint / golangci-lint

`expirationTime` is unused (structcheck)
lock sync.Mutex
cleanup func()
}

func (e *cacheEntry) Update(nse *registry.NetworkServiceEndpoint) {
func (e *cacheEntry[T]) Update(value *T) {
e.lock.Lock()
defer e.lock.Unlock()

e.nse = nse
e.value = value
}

func (e *cacheEntry) Cleanup() {
func (e *cacheEntry[_]) Cleanup() {
e.lock.Lock()
defer e.lock.Unlock()

Expand Down
138 changes: 138 additions & 0 deletions pkg/registry/common/querycache/ns_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.

Check failure on line 1 in pkg/registry/common/querycache/ns_client.go

View workflow job for this annotation

GitHub Actions / golangci-lint / golangci-lint

Pattern (Copyright \(c\) (((20\d\d\-2024)|(2024))) .*\n\n)+ doesn't match. (goheader)
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package querycache adds possibility to cache Find queries
package querycache

import (
"context"
"time"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type queryCacheNSClient struct {
ctx context.Context
cache *nsCache
}

// NewNetworkServiceClient creates new querycache NS registry client that caches all resolved NSs
func NewNetworkServiceClient(ctx context.Context, opts ...Option) registry.NetworkServiceRegistryClient {
return &queryCacheNSClient{
ctx: ctx,
cache: newNSCache(ctx, opts...),
}
}

func (q *queryCacheNSClient) Register(ctx context.Context, nse *registry.NetworkService, opts ...grpc.CallOption) (*registry.NetworkService, error) {
return next.NetworkServiceRegistryClient(ctx).Register(ctx, nse, opts...)
}

func (q *queryCacheNSClient) Find(ctx context.Context, query *registry.NetworkServiceQuery, opts ...grpc.CallOption) (registry.NetworkServiceRegistry_FindClient, error) {
log.FromContext(ctx).WithField("time", time.Now()).Infof("queryCacheNSClient forth")
if query.Watch {
return next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...)
}

log.FromContext(ctx).WithField("time", time.Now()).Info("queryCacheNSClient search in cache")
if client, ok := q.findInCache(ctx, query); ok {
log.FromContext(ctx).Info("queryCacheNSClient found in cache")
return client, nil
}

log.FromContext(ctx).WithField("time", time.Now()).Info("queryCacheNSClient not found in cache")

client, err := next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...)
if err != nil {
return nil, err
}

nses := registry.ReadNetworkServiceList(client)

resultCh := make(chan *registry.NetworkServiceResponse, len(nses))
for _, nse := range nses {
resultCh <- &registry.NetworkServiceResponse{NetworkService: nse}
q.storeInCache(ctx, nse.Clone(), opts...)
}
close(resultCh)

return streamchannel.NewNetworkServiceFindClient(ctx, resultCh), nil
}

func (q *queryCacheNSClient) findInCache(ctx context.Context, query *registry.NetworkServiceQuery) (registry.NetworkServiceRegistry_FindClient, bool) {
log.FromContext(ctx).WithField("time", time.Now()).Infof("queryCacheNSClient checking key: %v", query.NetworkService)
ns := q.cache.Load(ctx, query.NetworkService)
if ns == nil {
return nil, false
}

log.FromContext(ctx).WithField("time", time.Now()).Infof("found NS in cache: %v", ns)

resultCh := make(chan *registry.NetworkServiceResponse, 1)
resultCh <- &registry.NetworkServiceResponse{NetworkService: ns.Clone()}
close(resultCh)

return streamchannel.NewNetworkServiceFindClient(ctx, resultCh), true
}

func (q *queryCacheNSClient) storeInCache(ctx context.Context, ns *registry.NetworkService, opts ...grpc.CallOption) {
nsQuery := &registry.NetworkServiceQuery{
NetworkService: &registry.NetworkService{
Name: ns.Name,
},
}

findCtx, cancel := context.WithCancel(q.ctx)

entry, loaded := q.cache.LoadOrStore(ns, cancel)
if loaded {
cancel()
return
}

go func() {
defer entry.Cleanup()

nsQuery.Watch = true

stream, err := next.NetworkServiceRegistryClient(ctx).Find(findCtx, nsQuery, opts...)
if err != nil {
return
}

for nsResp, err := stream.Recv(); err == nil; nsResp, err = stream.Recv() {
if nsResp.NetworkService.Name != nsQuery.NetworkService.Name {
continue
}
if nsResp.Deleted {
break
}

entry.Update(nsResp.NetworkService)
}
}()
}

func (q *queryCacheNSClient) Unregister(ctx context.Context, in *registry.NetworkService, opts ...grpc.CallOption) (*empty.Empty, error) {
return next.NetworkServiceRegistryClient(ctx).Unregister(ctx, in, opts...)
}
Loading

0 comments on commit f7ffef5

Please sign in to comment.