Skip to content

Commit

Permalink
begin should use eventFactory
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Glazychev <[email protected]>
  • Loading branch information
glazychev-art committed Jan 31, 2024
1 parent 90f0c79 commit 12325a1
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 122 deletions.
60 changes: 17 additions & 43 deletions pkg/networkservice/common/begin/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -51,40 +51,28 @@ func (b *beginClient) Request(ctx context.Context, request *networkservice.Netwo
eventFactoryClient, _ := b.LoadOrStore(request.GetConnection().GetId(),
newEventFactoryClient(
ctx,
func() *eventFactoryClient {
currentEventFactoryClient, _ := b.Load(request.GetConnection().GetId())
return currentEventFactoryClient
},
func() {
b.Delete(request.GetRequestConnection().GetId())
},
opts...,
),
)
<-eventFactoryClient.executor.AsyncExec(func() {
// If the eventFactory has changed, usually because the connection has been Closed and re-established
// go back to the beginning and try again.
currentEventFactoryClient, _ := b.Load(request.GetConnection().GetId())
if currentEventFactoryClient != eventFactoryClient {
err = <-eventFactoryClient.Request(
withContext(ctx),
withUserRequest(request),
withGRPCOpts(opts),
withConnectionToReturn(&conn),
)
if err != nil {
if errors.Is(err, &errorEventFactoryInconsistency{}) {
log.FromContext(ctx).Debug("recalling begin.Request because currentEventFactoryClient != eventFactoryClient")
conn, err = b.Request(ctx, request, opts...)
return
}

withEventFactoryCtx := withEventFactory(ctx, eventFactoryClient)
request.Connection = mergeConnection(eventFactoryClient.returnedConnection, request.GetConnection(), eventFactoryClient.request.GetConnection())
conn, err = next.Client(withEventFactoryCtx).Request(withEventFactoryCtx, request, opts...)
if err != nil {
if eventFactoryClient.state != established {
eventFactoryClient.state = closed
b.Delete(request.GetConnection().GetId())
}
return
}
eventFactoryClient.request = request.Clone()
eventFactoryClient.request.Connection = conn.Clone()
eventFactoryClient.opts = opts
eventFactoryClient.state = established
}

eventFactoryClient.returnedConnection = conn.Clone()
eventFactoryClient.updateContext(ctx)
})
return conn, err
}

Expand All @@ -98,23 +86,9 @@ func (b *beginClient) Close(ctx context.Context, conn *networkservice.Connection
// If we don't have a connection to Close, just let it be
return
}
<-eventFactoryClient.executor.AsyncExec(func() {
// If the connection is not established, don't do anything
if eventFactoryClient.state != established || eventFactoryClient.client == nil || eventFactoryClient.request == nil {
return
}
err = <-eventFactoryClient.Close(
withGRPCOpts(opts),
)

// If this isn't the connection we started with, do nothing
currentEventFactoryClient, _ := b.Load(conn.GetId())
if currentEventFactoryClient != eventFactoryClient {
return
}
// Always close with the last valid Connection we got
conn = eventFactoryClient.request.Connection
withEventFactoryCtx := withEventFactory(ctx, eventFactoryClient)
emp, err = next.Client(withEventFactoryCtx).Close(withEventFactoryCtx, conn, opts...)
// afterCloseFunc() is used to cleanup things like the entry in the Map for EventFactories
eventFactoryClient.afterCloseFunc()
})
return emp, err
}
131 changes: 109 additions & 22 deletions pkg/networkservice/common/begin/event_factory.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -29,6 +29,12 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

type errorEventFactoryInconsistency struct{}

func (e *errorEventFactoryInconsistency) Error() string {
return "errorEventFactoryInconsistency error"
}

type connectionState int

const (
Expand All @@ -52,19 +58,26 @@ type eventFactoryClient struct {
ctxFunc func() (context.Context, context.CancelFunc)
request *networkservice.NetworkServiceRequest
returnedConnection *networkservice.Connection
opts []grpc.CallOption
grpcOpts []grpc.CallOption
client networkservice.NetworkServiceClient
beforeRequestFunc func() error
afterCloseFunc func()
}

func newEventFactoryClient(ctx context.Context, afterClose func(), opts ...grpc.CallOption) *eventFactoryClient {
func newEventFactoryClient(ctx context.Context, actualEventFactoryFunc func() *eventFactoryClient, afterClose func()) *eventFactoryClient {
f := &eventFactoryClient{
client: next.Client(ctx),
initialCtxFunc: postpone.Context(ctx),
opts: opts,
}
f.updateContext(ctx)

f.beforeRequestFunc = func() error {
if actualEventFactoryFunc != nil && actualEventFactoryFunc() != f {
return &errorEventFactoryInconsistency{}
}
return nil
}

f.afterCloseFunc = func() {
f.state = closed
if afterClose != nil {
Expand All @@ -90,18 +103,29 @@ func (f *eventFactoryClient) Request(opts ...Option) <-chan error {
opt(o)
}
ch := make(chan error, 1)

f.executor.AsyncExec(func() {
defer close(ch)
if f.state != established {
if err := f.beforeRequestFunc(); err != nil {
ch <- err
return
}
select {
case <-o.cancelCtx.Done():
default:
request := f.request.Clone()
grpcOpts := f.grpcOpts
if o.userRequest != nil {
request = o.userRequest
request.Connection = mergeConnection(f.returnedConnection, o.userRequest.Connection, f.request.GetConnection())
}
if o.grpcOpts != nil {
grpcOpts = o.grpcOpts
}

if o.reselect {
ctx, cancel := f.ctxFunc()
_, _ = f.client.Close(ctx, request.GetConnection(), f.opts...)
_, _ = f.client.Close(ctx, request.GetConnection(), grpcOpts...)
if request.GetConnection() != nil {
request.GetConnection().Mechanism = nil
request.GetConnection().NetworkServiceEndpointName = ""
Expand All @@ -110,12 +134,30 @@ func (f *eventFactoryClient) Request(opts ...Option) <-chan error {
}
cancel()
}
ctx, cancel := f.ctxFunc()
defer cancel()
conn, err := f.client.Request(ctx, request, f.opts...)
if err == nil && f.request != nil {
f.request.Connection = conn

var ctx context.Context
if o.ctx != nil {
ctx = withEventFactory(o.ctx, f)
} else {
var cancel context.CancelFunc
ctx, cancel = f.ctxFunc()
defer cancel()
}

conn, err := f.client.Request(ctx, request, grpcOpts...)
if err == nil {
f.request = request.Clone()
f.request.Connection = conn.Clone()
f.grpcOpts = grpcOpts
f.state = established
f.request.Connection.State = networkservice.State_UP
if o.connectionToReturn != nil {
f.returnedConnection = conn.Clone()
*o.connectionToReturn = conn.Clone()
}
f.updateContext(ctx)
} else if f.state != established {
f.afterCloseFunc()
}
ch <- err
}
Expand All @@ -130,18 +172,26 @@ func (f *eventFactoryClient) Close(opts ...Option) <-chan error {
for _, opt := range opts {
opt(o)
}

ch := make(chan error, 1)
f.executor.AsyncExec(func() {
defer close(ch)
if f.request == nil {
if f.request == nil || f.state != established {
return
}

select {
case <-o.cancelCtx.Done():
default:
grpcOpts := f.grpcOpts
if o.grpcOpts != nil {
grpcOpts = o.grpcOpts
}

ctx, cancel := f.ctxFunc()
defer cancel()
_, err := f.client.Close(ctx, f.request.GetConnection(), f.opts...)

_, err := f.client.Close(ctx, f.request.GetConnection(), grpcOpts...)
f.afterCloseFunc()
ch <- err
}
Expand All @@ -158,17 +208,25 @@ type eventFactoryServer struct {
ctxFunc func() (context.Context, context.CancelFunc)
request *networkservice.NetworkServiceRequest
returnedConnection *networkservice.Connection
afterCloseFunc func()
server networkservice.NetworkServiceServer
beforeRequestFunc func() error
afterCloseFunc func()
}

func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactoryServer {
func newEventFactoryServer(ctx context.Context, actualEventFactoryFunc func() *eventFactoryServer, afterClose func()) *eventFactoryServer {
f := &eventFactoryServer{
server: next.Server(ctx),
initialCtxFunc: postpone.Context(ctx),
}
f.updateContext(ctx)

f.beforeRequestFunc = func() error {
if actualEventFactoryFunc != nil && actualEventFactoryFunc() != f {
return &errorEventFactoryInconsistency{}
}
return nil
}

f.afterCloseFunc = func() {
f.state = closed
afterClose()
Expand All @@ -194,17 +252,46 @@ func (f *eventFactoryServer) Request(opts ...Option) <-chan error {
ch := make(chan error, 1)
f.executor.AsyncExec(func() {
defer close(ch)
if f.state != established {
if err := f.beforeRequestFunc(); err != nil {
ch <- err
return
}
select {
case <-o.cancelCtx.Done():
default:
ctx, cancel := f.ctxFunc()
defer cancel()
conn, err := f.server.Request(ctx, f.request)
if err == nil && f.request != nil {
f.request.Connection = conn
request := f.request.Clone()
if o.userRequest != nil {
request = o.userRequest
}
if f.state == established && request.GetConnection().GetState() == networkservice.State_RESELECT_REQUESTED {
ctx, cancel := f.ctxFunc()
_, _ = f.server.Close(ctx, f.request.GetConnection())
f.state = closed
cancel()
}

var ctx context.Context
if o.ctx != nil {
ctx = withEventFactory(o.ctx, f)
} else {
var cancel context.CancelFunc
ctx, cancel = f.ctxFunc()
defer cancel()
}

conn, err := f.server.Request(ctx, request)
if err == nil {
f.request = request.Clone()
f.request.Connection = conn.Clone()
f.state = established
f.request.Connection.State = networkservice.State_UP
if o.connectionToReturn != nil {
f.returnedConnection = conn.Clone()
*o.connectionToReturn = conn.Clone()
}
f.updateContext(ctx)
} else if f.state != established {
f.afterCloseFunc()
}
ch <- err
}
Expand All @@ -222,7 +309,7 @@ func (f *eventFactoryServer) Close(opts ...Option) <-chan error {
ch := make(chan error, 1)
f.executor.AsyncExec(func() {
defer close(ch)
if f.request == nil {
if f.request == nil || f.state != established {
return
}
select {
Expand Down
38 changes: 37 additions & 1 deletion pkg/networkservice/common/begin/options.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,11 +18,19 @@ package begin

import (
"context"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"
)

type option struct {
cancelCtx context.Context
reselect bool

ctx context.Context
userRequest *networkservice.NetworkServiceRequest
grpcOpts []grpc.CallOption
connectionToReturn **networkservice.Connection
}

// Option - event option
Expand All @@ -41,3 +49,31 @@ func WithReselect() Option {
o.reselect = true
}
}

// withUserRequest - optionally clear Mechanism and NetworkServiceName to force reselect
func withContext(ctx context.Context) Option {
return func(o *option) {
o.ctx = ctx
}
}

// withUserRequest - optionally clear Mechanism and NetworkServiceName to force reselect
func withUserRequest(r *networkservice.NetworkServiceRequest) Option {
return func(o *option) {
o.userRequest = r
}
}

// withOpts - optionally clear Mechanism and NetworkServiceName to force reselect
func withGRPCOpts(opts []grpc.CallOption) Option {
return func(o *option) {
o.grpcOpts = opts
}
}

// withConnectionToReturn - optionally clear Mechanism and NetworkServiceName to force reselect
func withConnectionToReturn(conn **networkservice.Connection) Option {
return func(o *option) {
o.connectionToReturn = conn
}
}
Loading

0 comments on commit 12325a1

Please sign in to comment.