diff --git a/pkg/networkservice/chains/nsmgr/server.go b/pkg/networkservice/chains/nsmgr/server.go index e4c45f14e..92c4aaf1c 100644 --- a/pkg/networkservice/chains/nsmgr/server.go +++ b/pkg/networkservice/chains/nsmgr/server.go @@ -240,6 +240,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options chain.NewNetworkServiceRegistryClient( clienturl.NewNetworkServiceRegistryClient(opts.regURL), begin.NewNetworkServiceRegistryClient(), + //querycache.NewNetworkServiceRegistryClient(ctx), clientconn.NewNetworkServiceRegistryClient(), opts.authorizeNSRegistryClient, grpcmetadata.NewNetworkServiceRegistryClient(), @@ -264,11 +265,11 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options registryconnect.NewNetworkServiceEndpointRegistryServer( chain.NewNetworkServiceEndpointRegistryClient( begin.NewNetworkServiceEndpointRegistryClient(), + querycache.NewNetworkServiceEndpointRegistryClient(ctx), clienturl.NewNetworkServiceEndpointRegistryClient(opts.regURL), clientconn.NewNetworkServiceEndpointRegistryClient(), opts.authorizeNSERegistryClient, grpcmetadata.NewNetworkServiceEndpointRegistryClient(), - querycache.NewClient(ctx), dial.NewNetworkServiceEndpointRegistryClient(ctx, dial.WithDialTimeout(opts.dialTimeout), dial.WithDialOptions(opts.dialOptions...), diff --git a/pkg/registry/common/querycache/ns_client.go b/pkg/registry/common/querycache/ns_client.go new file mode 100644 index 000000000..743050925 --- /dev/null +++ b/pkg/registry/common/querycache/ns_client.go @@ -0,0 +1,88 @@ +// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package querycache adds possible to cache Find queries +package querycache + +import ( + "context" + "time" + + cache "github.com/go-pkgz/expirable-cache/v3" + "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc" + + "github.com/networkservicemesh/api/pkg/api/registry" + + "github.com/networkservicemesh/sdk/pkg/registry/core/next" + "github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel" +) + +type queryCacheNSClient struct { + chainContext context.Context + cache cache.Cache[string, []*registry.NetworkService] +} + +// NewNetworkServiceRegistryClient creates new querycache NS registry client that caches all resolved NSs +func NewNetworkServiceRegistryClient(ctx context.Context) registry.NetworkServiceRegistryClient { + var res = &queryCacheNSClient{ + chainContext: ctx, + cache: cache.NewCache[string, []*registry.NetworkService]().WithLRU().WithMaxKeys(32).WithTTL(time.Millisecond * 100), + } + return res +} + +func (q *queryCacheNSClient) Register(ctx context.Context, nse *registry.NetworkService, opts ...grpc.CallOption) (*registry.NetworkService, error) { + resp, err := next.NetworkServiceRegistryClient(ctx).Register(ctx, nse, opts...) + if err == nil { + q.cache.Add(resp.GetName(), []*registry.NetworkService{resp}) + } + return resp, err +} + +func (q *queryCacheNSClient) Find(ctx context.Context, query *registry.NetworkServiceQuery, opts ...grpc.CallOption) (registry.NetworkServiceRegistry_FindClient, error) { + if query.Watch { + return next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...) + } + + var list []*registry.NetworkService + if v, ok := q.cache.Get(query.GetNetworkService().GetName()); ok { + list = v + } else { + var streamClient, err = next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...) + if err != nil { + return streamClient, err + } + list = registry.ReadNetworkServiceList(streamClient) + for _, item := range list { + q.cache.Add(item.GetName(), []*registry.NetworkService{item.Clone()}) + } + } + var resultStreamChannel = make(chan *registry.NetworkServiceResponse, len(list)) + for _, item := range list { + resultStreamChannel <- ®istry.NetworkServiceResponse{NetworkService: item} + } + close(resultStreamChannel) + return streamchannel.NewNetworkServiceFindClient(ctx, resultStreamChannel), nil +} + +func (q *queryCacheNSClient) Unregister(ctx context.Context, in *registry.NetworkService, opts ...grpc.CallOption) (*empty.Empty, error) { + resp, err := next.NetworkServiceRegistryClient(ctx).Unregister(ctx, in, opts...) + if err == nil { + q.cache.Remove(in.GetName()) + } + return resp, err +} diff --git a/pkg/registry/common/querycache/nse_client.go b/pkg/registry/common/querycache/nse_client.go index 188932a86..890505d23 100644 --- a/pkg/registry/common/querycache/nse_client.go +++ b/pkg/registry/common/querycache/nse_client.go @@ -32,15 +32,15 @@ import ( ) type queryCacheNSEClient struct { - ctx context.Context - cache cache.Cache[string, []*registry.NetworkServiceEndpoint] + chainContext context.Context + cache cache.Cache[string, []*registry.NetworkServiceEndpoint] } // NewClient creates new querycache NSE registry client that caches all resolved NSEs -func NewClient(ctx context.Context) registry.NetworkServiceEndpointRegistryClient { +func NewNetworkServiceEndpointRegistryClient(ctx context.Context) registry.NetworkServiceEndpointRegistryClient { var res = &queryCacheNSEClient{ - ctx: ctx, - cache: cache.NewCache[string, []*registry.NetworkServiceEndpoint]().WithLRU().WithMaxKeys(32).WithTTL(time.Millisecond * 300), + chainContext: ctx, + cache: cache.NewCache[string, []*registry.NetworkServiceEndpoint]().WithLRU().WithMaxKeys(32).WithTTL(time.Millisecond * 300), } return res } @@ -67,8 +67,9 @@ func (q *queryCacheNSEClient) Find(ctx context.Context, query *registry.NetworkS return streamClient, err } list = registry.ReadNetworkServiceEndpointList(streamClient) - q.cache.Add(query.GetNetworkServiceEndpoint().GetName(), list) - + for _, item := range list { + q.cache.Add(item.GetName(), []*registry.NetworkServiceEndpoint{item.Clone()}) + } } var resultStreamChannel = make(chan *registry.NetworkServiceEndpointResponse, len(list)) for _, item := range list { diff --git a/pkg/registry/common/querycache/nse_client_test.go b/pkg/registry/common/querycache/nse_client_test.go index fc1e196fd..7f196566a 100644 --- a/pkg/registry/common/querycache/nse_client_test.go +++ b/pkg/registry/common/querycache/nse_client_test.go @@ -18,7 +18,6 @@ package querycache_test import ( "context" - "fmt" "sync/atomic" "testing" "time" @@ -64,7 +63,7 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { failureClient := new(failureNSEClient) c := next.NewNetworkServiceEndpointRegistryClient( - querycache.NewClient(ctx), + querycache.NewNetworkServiceEndpointRegistryClient(ctx), failureClient, adapters.NetworkServiceEndpointServerToClient(mem), ) @@ -109,7 +108,6 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { if nseResp, err = stream.Recv(); err != nil { return false } - fmt.Println(name == nseResp.NetworkServiceEndpoint.Name, url2 == nseResp.NetworkServiceEndpoint.Url) return name == nseResp.NetworkServiceEndpoint.Name && url2 == nseResp.NetworkServiceEndpoint.Url }, testWait, testTick) @@ -139,7 +137,7 @@ func Test_QueryCacheClient_ShouldCleanUpOnTimeout(t *testing.T) { failureClient := new(failureNSEClient) c := next.NewNetworkServiceEndpointRegistryClient( - querycache.NewClient(ctx), + querycache.NewNetworkServiceEndpointRegistryClient(ctx), failureClient, adapters.NetworkServiceEndpointServerToClient(mem), )