Skip to content

Commit

Permalink
Address the review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
marcobebway committed Jan 26, 2024
1 parent df439d7 commit 7c9d981
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 129 deletions.
10 changes: 7 additions & 3 deletions internal/connection/nats/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type connection struct {
disconnectErrHandlerRegistered bool
}

func (c *connection) Connect() error {
func (c *connection) Connect(connHandler natsio.ConnHandler, connErrHandler natsio.ConnErrHandler) error {
if c.isConnected() {
return nil
}
Expand All @@ -28,6 +28,10 @@ func (c *connection) Connect() error {
return err
}

// register handlers
c.registerReconnectHandler(connHandler)
c.registerDisconnectErrHandler(connErrHandler)

if c.isConnected() {
return nil
}
Expand All @@ -49,15 +53,15 @@ func (c *connection) isConnected() bool {
return c.conn.IsConnected()
}

func (c *connection) RegisterReconnectHandlerIfNotRegistered(handler natsio.ConnHandler) {
func (c *connection) registerReconnectHandler(handler natsio.ConnHandler) {
if c.conn == nil || c.reconnectHandlerRegistered {
return
}
c.conn.SetReconnectHandler(handler)
c.reconnectHandlerRegistered = true
}

func (c *connection) RegisterDisconnectErrHandlerIfNotRegistered(handler natsio.ConnErrHandler) {
func (c *connection) registerDisconnectErrHandler(handler natsio.ConnErrHandler) {
if c.conn == nil || c.disconnectErrHandlerRegistered {
return
}
Expand Down
9 changes: 2 additions & 7 deletions internal/connection/nats/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,9 @@ import (
//go:generate go run github.com/vektra/mockery/v2 --name=Interface --structname=Connection --filename=connection.go
type Interface interface {
// Connect connects to NATS and returns an error if it cannot connect.
Connect() error
// It also registers both the connect and disconnect handlers.
Connect(natsio.ConnHandler, natsio.ConnErrHandler) error

// Disconnect disconnects the NATS connection.
Disconnect()

// RegisterReconnectHandlerIfNotRegistered registers a ReconnectHandler only if it was not registered before.
RegisterReconnectHandlerIfNotRegistered(natsio.ConnHandler)

// RegisterDisconnectErrHandlerIfNotRegistered registers a DisconnectErrHandler only if it was not registered before.
RegisterDisconnectErrHandlerIfNotRegistered(natsio.ConnErrHandler)
}
91 changes: 13 additions & 78 deletions internal/connection/nats/mocks/connection.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 12 additions & 32 deletions internal/controller/operator/eventing/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"errors"
"fmt"
"os"
"strings"

natsio "github.com/nats-io/nats.go"
Expand Down Expand Up @@ -76,11 +75,10 @@ const (
)

var (
ErrSubscriptionExists = errors.New(SubscriptionExistsErrMessage)
ErrUnsupportedBackedType = errors.New("backend type not supported")
ErrNatsModuleMissing = errors.New("NATS module has to be installed")
ErrAPIGatewayModuleMissing = errors.New("API-Gateway module is needed for EventMesh backend. APIRules CRD is not installed")
ErrNilNATSConnectionBuilder = errors.New("connection builder for NATS backend is nil")
ErrSubscriptionExists = errors.New(SubscriptionExistsErrMessage)
ErrUnsupportedBackedType = errors.New("backend type not supported")
ErrNatsModuleMissing = errors.New("NATS module has to be installed")
ErrAPIGatewayModuleMissing = errors.New("API-Gateway module is needed for EventMesh backend. APIRules CRD is not installed")
)

// Reconciler reconciles an Eventing object
Expand Down Expand Up @@ -575,11 +573,6 @@ func (r *Reconciler) reconcileNATSBackend(ctx context.Context,
}

if connErr := r.connectToNATS(eventingCR); connErr != nil {
if errors.Is(connErr, ErrNilNATSConnectionBuilder) {
r.namedLogger().Error(connErr)
os.Exit(1)
}

if errors.Is(connErr, natsconnectionerrors.ErrCannotConnect) {
return kctrl.Result{}, reconcile.TerminalError(
r.syncStatusWithNATSErr(ctx, eventingCR, connErr, log),
Expand Down Expand Up @@ -608,34 +601,21 @@ func (r *Reconciler) reconcileNATSBackend(ctx context.Context,
// connectToNATS connects to NATS and returns an error if it failed.
// It also registers handlers for reconnection and disconnection.
func (r *Reconciler) connectToNATS(eventingCR *operatorv1alpha1.Eventing) error {
if r.natsConnectionBuilder == nil {
return ErrNilNATSConnectionBuilder
}

if r.natsConnection == nil {
r.natsConnection = r.natsConnectionBuilder.Build()
}

if err := r.natsConnection.Connect(); err != nil {
return err
connHandler := func(_ *natsio.Conn) {
r.namedLogger().Debug("Handle NATS reconnection")
r.genericEvents <- event.GenericEvent{Object: eventingCR}
}

// At this point, it is safe to register handlers
// because the internal nats connection is initialized.
r.natsConnection.RegisterReconnectHandlerIfNotRegistered(
func(_ *natsio.Conn) {
r.namedLogger().Debug("Handle NATS reconnection")
r.genericEvents <- event.GenericEvent{Object: eventingCR}
},
)
r.natsConnection.RegisterDisconnectErrHandlerIfNotRegistered(
func(_ *natsio.Conn, _ error) {
r.namedLogger().Debug("Handle NATS disconnection")
r.genericEvents <- event.GenericEvent{Object: eventingCR}
},
)
connErrHandler := func(_ *natsio.Conn, _ error) {
r.namedLogger().Debug("Handle NATS disconnection")
r.genericEvents <- event.GenericEvent{Object: eventingCR}
}

return nil
return r.natsConnection.Connect(connHandler, connErrHandler)
}

func (r *Reconciler) handlePublisherProxy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func Test_NATSConnection(t *testing.T) {
name: "Eventing CR should be in ready state if connected to NATS",
givenNATSConnectionMock: func() *natsconnectionmocks.Connection {
conn := &natsconnectionmocks.Connection{}
conn.On("Connect").Return(nil)
conn.On("Connect", mock.Anything, mock.Anything).Return(nil)
conn.On("IsConnected").Return(true)
conn.On("RegisterReconnectHandlerIfNotRegistered", mock.Anything).Return()
conn.On("RegisterDisconnectErrHandlerIfNotRegistered", mock.Anything).Return()
conn.On("RegisterReconnectHandler", mock.Anything).Return()
conn.On("RegisterDisconnectErrHandler", mock.Anything).Return()
return conn
},
wantMatches: gomega.And(
Expand All @@ -48,10 +48,10 @@ func Test_NATSConnection(t *testing.T) {
name: "Eventing CR should be in error state if not connected to NATS",
givenNATSConnectionMock: func() *natsconnectionmocks.Connection {
conn := &natsconnectionmocks.Connection{}
conn.On("Connect").Return(natsconnectionerrors.ErrCannotConnect)
conn.On("Connect", mock.Anything, mock.Anything).Return(natsconnectionerrors.ErrCannotConnect)
conn.On("IsConnected").Return(false)
conn.On("RegisterReconnectHandlerIfNotRegistered", mock.Anything).Return()
conn.On("RegisterDisconnectErrHandlerIfNotRegistered", mock.Anything).Return()
conn.On("RegisterReconnectHandler", mock.Anything).Return()
conn.On("RegisterDisconnectErrHandler", mock.Anything).Return()
return conn
},
wantMatches: gomega.And(
Expand Down
6 changes: 3 additions & 3 deletions test/utils/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ func NewTestEnvironment(config TestEnvironmentConfig, connMock *natsconnectionmo
// setup default mock
if connMock == nil {
connMock = &natsconnectionmocks.Connection{}
connMock.On("Connect").Return(nil)
connMock.On("Connect", mock.Anything, mock.Anything).Return(nil)
connMock.On("IsConnected").Return(true)
connMock.On("Disconnect").Return()
connMock.On("RegisterReconnectHandlerIfNotRegistered", mock.Anything).Return()
connMock.On("RegisterDisconnectErrHandlerIfNotRegistered", mock.Anything).Return()
connMock.On("RegisterReconnectHandler", mock.Anything).Return()
connMock.On("RegisterDisconnectErrHandler", mock.Anything).Return()
}

// create a new watcher
Expand Down

0 comments on commit 7c9d981

Please sign in to comment.