Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] begin should use eventFactory #1580

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 17 additions & 43 deletions pkg/networkservice/common/begin/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -51,40 +51,28 @@ func (b *beginClient) Request(ctx context.Context, request *networkservice.Netwo
eventFactoryClient, _ := b.LoadOrStore(request.GetConnection().GetId(),
newEventFactoryClient(
ctx,
func() *eventFactoryClient {
currentEventFactoryClient, _ := b.Load(request.GetConnection().GetId())
return currentEventFactoryClient
},
func() {
b.Delete(request.GetRequestConnection().GetId())
},
opts...,
),
)
<-eventFactoryClient.executor.AsyncExec(func() {
// If the eventFactory has changed, usually because the connection has been Closed and re-established
// go back to the beginning and try again.
currentEventFactoryClient, _ := b.Load(request.GetConnection().GetId())
if currentEventFactoryClient != eventFactoryClient {
err = <-eventFactoryClient.Request(
withContext(ctx),
withUserRequest(request),
withGRPCOpts(opts),
withConnectionToReturn(&conn),
)
if err != nil {
if errors.Is(err, &errorEventFactoryInconsistency{}) {
log.FromContext(ctx).Debug("recalling begin.Request because currentEventFactoryClient != eventFactoryClient")
conn, err = b.Request(ctx, request, opts...)
return
}

withEventFactoryCtx := withEventFactory(ctx, eventFactoryClient)
request.Connection = mergeConnection(eventFactoryClient.returnedConnection, request.GetConnection(), eventFactoryClient.request.GetConnection())
conn, err = next.Client(withEventFactoryCtx).Request(withEventFactoryCtx, request, opts...)
if err != nil {
if eventFactoryClient.state != established {
eventFactoryClient.state = closed
b.Delete(request.GetConnection().GetId())
}
return
}
eventFactoryClient.request = request.Clone()
eventFactoryClient.request.Connection = conn.Clone()
eventFactoryClient.opts = opts
eventFactoryClient.state = established
}

eventFactoryClient.returnedConnection = conn.Clone()
eventFactoryClient.updateContext(ctx)
})
return conn, err
}

Expand All @@ -98,23 +86,9 @@ func (b *beginClient) Close(ctx context.Context, conn *networkservice.Connection
// If we don't have a connection to Close, just let it be
return
}
<-eventFactoryClient.executor.AsyncExec(func() {
// If the connection is not established, don't do anything
if eventFactoryClient.state != established || eventFactoryClient.client == nil || eventFactoryClient.request == nil {
return
}
err = <-eventFactoryClient.Close(
withGRPCOpts(opts),
)

// If this isn't the connection we started with, do nothing
currentEventFactoryClient, _ := b.Load(conn.GetId())
if currentEventFactoryClient != eventFactoryClient {
return
}
// Always close with the last valid Connection we got
conn = eventFactoryClient.request.Connection
withEventFactoryCtx := withEventFactory(ctx, eventFactoryClient)
emp, err = next.Client(withEventFactoryCtx).Close(withEventFactoryCtx, conn, opts...)
// afterCloseFunc() is used to cleanup things like the entry in the Map for EventFactories
eventFactoryClient.afterCloseFunc()
})
return emp, err
}
131 changes: 109 additions & 22 deletions pkg/networkservice/common/begin/event_factory.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -29,6 +29,12 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

type errorEventFactoryInconsistency struct{}

func (e *errorEventFactoryInconsistency) Error() string {
return "errorEventFactoryInconsistency error"
}

type connectionState int

const (
Expand All @@ -52,19 +58,26 @@ type eventFactoryClient struct {
ctxFunc func() (context.Context, context.CancelFunc)
request *networkservice.NetworkServiceRequest
returnedConnection *networkservice.Connection
opts []grpc.CallOption
grpcOpts []grpc.CallOption
client networkservice.NetworkServiceClient
beforeRequestFunc func() error
afterCloseFunc func()
}

func newEventFactoryClient(ctx context.Context, afterClose func(), opts ...grpc.CallOption) *eventFactoryClient {
func newEventFactoryClient(ctx context.Context, actualEventFactoryFunc func() *eventFactoryClient, afterClose func()) *eventFactoryClient {
f := &eventFactoryClient{
client: next.Client(ctx),
initialCtxFunc: postpone.Context(ctx),
opts: opts,
}
f.updateContext(ctx)

f.beforeRequestFunc = func() error {
if actualEventFactoryFunc != nil && actualEventFactoryFunc() != f {
return &errorEventFactoryInconsistency{}
}
return nil
}

f.afterCloseFunc = func() {
f.state = closed
if afterClose != nil {
Expand All @@ -90,18 +103,29 @@ func (f *eventFactoryClient) Request(opts ...Option) <-chan error {
opt(o)
}
ch := make(chan error, 1)

f.executor.AsyncExec(func() {
defer close(ch)
if f.state != established {
if err := f.beforeRequestFunc(); err != nil {
ch <- err
return
}
select {
case <-o.cancelCtx.Done():
default:
request := f.request.Clone()
grpcOpts := f.grpcOpts
if o.userRequest != nil {
request = o.userRequest
request.Connection = mergeConnection(f.returnedConnection, o.userRequest.Connection, f.request.GetConnection())
}
if o.grpcOpts != nil {
grpcOpts = o.grpcOpts
}

if o.reselect {
ctx, cancel := f.ctxFunc()
_, _ = f.client.Close(ctx, request.GetConnection(), f.opts...)
_, _ = f.client.Close(ctx, request.GetConnection(), grpcOpts...)
if request.GetConnection() != nil {
request.GetConnection().Mechanism = nil
request.GetConnection().NetworkServiceEndpointName = ""
Expand All @@ -110,12 +134,30 @@ func (f *eventFactoryClient) Request(opts ...Option) <-chan error {
}
cancel()
}
ctx, cancel := f.ctxFunc()
defer cancel()
conn, err := f.client.Request(ctx, request, f.opts...)
if err == nil && f.request != nil {
f.request.Connection = conn

var ctx context.Context
if o.ctx != nil {
ctx = withEventFactory(o.ctx, f)
} else {
var cancel context.CancelFunc
ctx, cancel = f.ctxFunc()
defer cancel()
}

conn, err := f.client.Request(ctx, request, grpcOpts...)
if err == nil {
f.request = request.Clone()
f.request.Connection = conn.Clone()
f.grpcOpts = grpcOpts
f.state = established
f.request.Connection.State = networkservice.State_UP
if o.connectionToReturn != nil {
f.returnedConnection = conn.Clone()
*o.connectionToReturn = conn.Clone()
}
f.updateContext(ctx)
} else if f.state != established {
f.afterCloseFunc()
}
ch <- err
}
Expand All @@ -130,18 +172,26 @@ func (f *eventFactoryClient) Close(opts ...Option) <-chan error {
for _, opt := range opts {
opt(o)
}

ch := make(chan error, 1)
f.executor.AsyncExec(func() {
defer close(ch)
if f.request == nil {
if f.request == nil || f.state != established {
return
}

select {
case <-o.cancelCtx.Done():
default:
grpcOpts := f.grpcOpts
if o.grpcOpts != nil {
grpcOpts = o.grpcOpts
}

ctx, cancel := f.ctxFunc()
defer cancel()
_, err := f.client.Close(ctx, f.request.GetConnection(), f.opts...)

_, err := f.client.Close(ctx, f.request.GetConnection(), grpcOpts...)
f.afterCloseFunc()
ch <- err
}
Expand All @@ -158,17 +208,25 @@ type eventFactoryServer struct {
ctxFunc func() (context.Context, context.CancelFunc)
request *networkservice.NetworkServiceRequest
returnedConnection *networkservice.Connection
afterCloseFunc func()
server networkservice.NetworkServiceServer
beforeRequestFunc func() error
afterCloseFunc func()
}

func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactoryServer {
func newEventFactoryServer(ctx context.Context, actualEventFactoryFunc func() *eventFactoryServer, afterClose func()) *eventFactoryServer {
f := &eventFactoryServer{
server: next.Server(ctx),
initialCtxFunc: postpone.Context(ctx),
}
f.updateContext(ctx)

f.beforeRequestFunc = func() error {
if actualEventFactoryFunc != nil && actualEventFactoryFunc() != f {
return &errorEventFactoryInconsistency{}
}
return nil
}

f.afterCloseFunc = func() {
f.state = closed
afterClose()
Expand All @@ -194,17 +252,46 @@ func (f *eventFactoryServer) Request(opts ...Option) <-chan error {
ch := make(chan error, 1)
f.executor.AsyncExec(func() {
defer close(ch)
if f.state != established {
if err := f.beforeRequestFunc(); err != nil {
ch <- err
return
}
select {
case <-o.cancelCtx.Done():
default:
ctx, cancel := f.ctxFunc()
defer cancel()
conn, err := f.server.Request(ctx, f.request)
if err == nil && f.request != nil {
f.request.Connection = conn
request := f.request.Clone()
if o.userRequest != nil {
request = o.userRequest
}
if f.state == established && request.GetConnection().GetState() == networkservice.State_RESELECT_REQUESTED {
ctx, cancel := f.ctxFunc()
_, _ = f.server.Close(ctx, f.request.GetConnection())
f.state = closed
cancel()
}

var ctx context.Context
if o.ctx != nil {
ctx = withEventFactory(o.ctx, f)
} else {
var cancel context.CancelFunc
ctx, cancel = f.ctxFunc()
defer cancel()
}

conn, err := f.server.Request(ctx, request)
if err == nil {
f.request = request.Clone()
f.request.Connection = conn.Clone()
f.state = established
f.request.Connection.State = networkservice.State_UP
if o.connectionToReturn != nil {
f.returnedConnection = conn.Clone()
*o.connectionToReturn = conn.Clone()
}
f.updateContext(ctx)
} else if f.state != established {
f.afterCloseFunc()
}
ch <- err
}
Expand All @@ -222,7 +309,7 @@ func (f *eventFactoryServer) Close(opts ...Option) <-chan error {
ch := make(chan error, 1)
f.executor.AsyncExec(func() {
defer close(ch)
if f.request == nil {
if f.request == nil || f.state != established {
return
}
select {
Expand Down
38 changes: 37 additions & 1 deletion pkg/networkservice/common/begin/options.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,11 +18,19 @@ package begin

import (
"context"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"
)

type option struct {
cancelCtx context.Context
reselect bool

ctx context.Context
userRequest *networkservice.NetworkServiceRequest
grpcOpts []grpc.CallOption
connectionToReturn **networkservice.Connection
}

// Option - event option
Expand All @@ -41,3 +49,31 @@ func WithReselect() Option {
o.reselect = true
}
}

// withUserRequest - optionally clear Mechanism and NetworkServiceName to force reselect
func withContext(ctx context.Context) Option {
return func(o *option) {
o.ctx = ctx
}
}

// withUserRequest - optionally clear Mechanism and NetworkServiceName to force reselect
func withUserRequest(r *networkservice.NetworkServiceRequest) Option {
return func(o *option) {
o.userRequest = r
}
}

// withOpts - optionally clear Mechanism and NetworkServiceName to force reselect
func withGRPCOpts(opts []grpc.CallOption) Option {
return func(o *option) {
o.grpcOpts = opts
}
}

// withConnectionToReturn - optionally clear Mechanism and NetworkServiceName to force reselect
func withConnectionToReturn(conn **networkservice.Connection) Option {
return func(o *option) {
o.connectionToReturn = conn
}
}
Loading
Loading