diff --git a/.golangci.yaml b/.golangci.yaml index 795fa3485..f5db2e13d 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -151,6 +151,8 @@ linters-settings: alias: istio$1$2 - pkg: github.com/nats-io/nats-server/v2/(\w+)$ alias: natsio$1 + - pkg: github.com/nats-io/nats.go + alias: natsio - pkg: github.com/kyma-project/eventing-manager/internal/controller/(\w+)$ alias: controller$1 - pkg: github.com/kyma-project/kyma/common/logging/logger @@ -159,6 +161,12 @@ linters-settings: alias: natsv1alpha1 - pkg: github.com/kyma-project/nats-manager/testutils alias: natstestutils + - pkg: github.com/kyma-project/eventing-manager/internal/connection/nats + alias: natsconnection + - pkg: github.com/kyma-project/eventing-manager/internal/connection/nats/errors + alias: natsconnectionerrors + - pkg: github.com/kyma-project/eventing-manager/internal/connection/nats/mocks + alias: natsconnectionmocks - pkg: github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/(\w+)$ alias: subscriptioncontroller$1 - pkg: github.com/kyma-project/eventing-manager/internal/controller/operator/eventing diff --git a/Dockerfile b/Dockerfile index b3da9b6e6..acdb21e54 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,8 +14,7 @@ RUN go mod download # Copy the go source COPY cmd/main.go cmd/main.go COPY api/ api/ -COPY internal/controller/ internal/controller/ -COPY internal/label/ internal/label/ +COPY internal/ internal/ COPY pkg/ pkg/ COPY testing/ testing/ COPY options/ options/ diff --git a/api/operator/v1alpha1/eventing_types.go b/api/operator/v1alpha1/eventing_types.go index 5ad0717d6..5c0c853e6 100644 --- a/api/operator/v1alpha1/eventing_types.go +++ b/api/operator/v1alpha1/eventing_types.go @@ -225,6 +225,10 @@ func (e *Eventing) SyncStatusActiveBackend() { e.Status.ActiveBackend = e.Spec.Backend.Type } +func (e *Eventing) IsPreviousBackendEmpty() bool { + return e.Status.ActiveBackend == "" +} + func (e *Eventing) IsSpecBackendTypeChanged() bool { return e.Status.ActiveBackend != e.Spec.Backend.Type } diff --git a/cmd/main.go b/cmd/main.go index ddc915642..379139f54 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -21,9 +21,11 @@ import ( "flag" "log" "os" + "time" "github.com/go-logr/zapr" apigatewayv1beta1 "github.com/kyma-project/api-gateway/apis/gateway/v1beta1" + natsio "github.com/nats-io/nats.go" kapiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" kapixclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -40,6 +42,7 @@ import ( eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha1" eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" operatorv1alpha1 "github.com/kyma-project/eventing-manager/api/operator/v1alpha1" + natsconnection "github.com/kyma-project/eventing-manager/internal/connection/nats" controllercache "github.com/kyma-project/eventing-manager/internal/controller/cache" controllerclient "github.com/kyma-project/eventing-manager/internal/controller/client" eventingcontroller "github.com/kyma-project/eventing-manager/internal/controller/operator/eventing" @@ -164,6 +167,14 @@ func main() { //nolint:funlen // main function needs to initialize many object ctrLogger, ) + // init NATS connection builder + natsConnectionBuilder, err := getNATSConnectionBuilder() + if err != nil { + setupLog.Error(err, "failed to get a NATS connection builder") + syncLogger(ctrLogger) + os.Exit(1) + } + // create Eventing reconciler instance eventingReconciler := eventingcontroller.NewReconciler( k8sClient, @@ -182,6 +193,7 @@ func main() { //nolint:funlen // main function needs to initialize many object Namespace: backendConfig.EventingCRNamespace, }, }, + natsConnectionBuilder, ) if err = (eventingReconciler).SetupWithManager(mgr); err != nil { @@ -230,3 +242,30 @@ func main() { //nolint:funlen // main function needs to initialize many object } syncLogger(ctrLogger) } + +func getNATSConnectionBuilder() (natsconnection.Builder, error) { + const ( + // connectionURL is the NATS connection URL. + // It should be configured as part of https://github.com/kyma-project/eventing-manager/issues/272. + connectionURL = "nats://eventing-nats.kyma-system.svc.cluster.local:4222" + + // connectionName is the name to identify the NATS connection. + connectionName = "Eventing Reconciler" + ) + + // The following constants are used to configure the NATS client re-connectivity. + // Please do not change these values to not change the intended behavior. + const ( + maxReconnects = -1 + retryOnFailedConnect = true + reconnectWait = time.Second + ) + + return natsconnection.NewBuilder( + connectionURL, + connectionName, + natsio.MaxReconnects(maxReconnects), + natsio.RetryOnFailedConnect(retryOnFailedConnect), + natsio.ReconnectWait(reconnectWait), + ) +} diff --git a/cmd/main_test.go b/cmd/main_test.go new file mode 100644 index 000000000..38c271467 --- /dev/null +++ b/cmd/main_test.go @@ -0,0 +1,13 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_getNATSConnectionBuilder(t *testing.T) { + natsConnectionBuilder, err := getNATSConnectionBuilder() + require.NoError(t, err) + require.NotNil(t, natsConnectionBuilder) +} diff --git a/internal/connection/nats/builder.go b/internal/connection/nats/builder.go new file mode 100644 index 000000000..1594ddb86 --- /dev/null +++ b/internal/connection/nats/builder.go @@ -0,0 +1,41 @@ +package nats + +import ( + "strings" + + natsio "github.com/nats-io/nats.go" + + "github.com/kyma-project/eventing-manager/internal/connection/nats/errors" +) + +type Builder interface { + Build() NATS +} + +type ConnectionBuilder struct { + url string + opts []natsio.Option +} + +func NewBuilder(url, name string, opts ...natsio.Option) (*ConnectionBuilder, error) { + if len(strings.TrimSpace(url)) == 0 { + return nil, errors.ErrEmptyConnectionURL + } + + if len(strings.TrimSpace(name)) == 0 { + return nil, errors.ErrEmptyConnectionName + } + + opts = append(opts, natsio.Name(name)) // enforce configuring the connection name + return &ConnectionBuilder{url: url, opts: opts}, nil +} + +func (b *ConnectionBuilder) Build() NATS { + return &connection{ + url: b.url, + conn: nil, + opts: b.opts, + reconnectHandlerRegistered: false, + disconnectErrHandlerRegistered: false, + } +} diff --git a/internal/connection/nats/builder_test.go b/internal/connection/nats/builder_test.go new file mode 100644 index 000000000..bd9c6a538 --- /dev/null +++ b/internal/connection/nats/builder_test.go @@ -0,0 +1,150 @@ +package nats + +import ( + "reflect" + "testing" + "time" + + natsio "github.com/nats-io/nats.go" + "github.com/stretchr/testify/require" + + "github.com/kyma-project/eventing-manager/internal/connection/nats/errors" +) + +func TestNewBuilder(t *testing.T) { + t.Parallel() + + // given + const ( + url = "url" + name = "name" + maxReconnects = 10 + retryOnFailedConnect = true + reconnectWait = time.Minute + ) + + type args struct { + url string + name string + opts []natsio.Option + } + tests := []struct { + name string + args args + wantBuilder *ConnectionBuilder + wantErr error + }{ + { + name: "should return an error if the URL is empty", + args: args{ + url: "", + name: name, + opts: nil, + }, + wantBuilder: nil, + wantErr: errors.ErrEmptyConnectionURL, + }, + { + name: "should return an error if the name is empty", + args: args{ + url: url, + name: "", + opts: nil, + }, + wantBuilder: nil, + wantErr: errors.ErrEmptyConnectionName, + }, + { + name: "should return a connection builder instance with no errors if there are no given options", + args: args{ + url: url, + name: name, + opts: nil, // no given options + }, + wantBuilder: &ConnectionBuilder{ + url: url, + opts: []natsio.Option{ + natsio.Name(name), + }, + }, + wantErr: nil, + }, + { + name: "should return a connection builder instance with no errors if there are given options", + args: args{ + url: url, + name: name, + opts: []natsio.Option{ + natsio.MaxReconnects(maxReconnects), + natsio.RetryOnFailedConnect(retryOnFailedConnect), + natsio.ReconnectWait(reconnectWait), + }, + }, + wantBuilder: &ConnectionBuilder{ + url: url, + opts: []natsio.Option{ + natsio.Name(name), + natsio.MaxReconnects(10), + natsio.RetryOnFailedConnect(true), + natsio.ReconnectWait(time.Minute), + }, + }, + wantErr: nil, + }, + } + for _, tt := range tests { + ttc := tt + + t.Run(ttc.name, func(t *testing.T) { + t.Parallel() + + // when + gotBuilder, gotErr := NewBuilder(ttc.args.url, ttc.args.name, ttc.args.opts...) + + // then + require.Equal(t, ttc.wantErr, gotErr) + require.True(t, builderDeepEqual(t, ttc.wantBuilder, gotBuilder)) + }) + } +} + +func TestConnectionBuilder_Build(t *testing.T) { + b := ConnectionBuilder{ + url: "url", + opts: []natsio.Option{ + natsio.Name("name"), + natsio.MaxReconnects(10), + natsio.RetryOnFailedConnect(true), + natsio.ReconnectWait(time.Minute), + }, + } + require.NotNil(t, b.Build()) +} + +func builderDeepEqual(t *testing.T, a, b *ConnectionBuilder) bool { + t.Helper() + + if a == b { + return true + } + + if a.url != b.url { + return false + } + + if len(a.opts) != len(b.opts) { + return false + } + + aOpts := &natsio.Options{} + for _, opt := range a.opts { + require.NoError(t, opt(aOpts)) + } + + bOpts := &natsio.Options{} + for _, opt := range b.opts { + require.NoError(t, opt(bOpts)) + } + + return reflect.DeepEqual(aOpts, bOpts) +} diff --git a/internal/connection/nats/connection.go b/internal/connection/nats/connection.go new file mode 100644 index 000000000..ce7df9815 --- /dev/null +++ b/internal/connection/nats/connection.go @@ -0,0 +1,66 @@ +package nats + +import ( + natsio "github.com/nats-io/nats.go" + + natsconnectionerrors "github.com/kyma-project/eventing-manager/internal/connection/nats/errors" +) + +// compile-time check. +var _ NATS = &connection{} + +// connection represents a NATS connection. +type connection struct { + url string + conn *natsio.Conn + opts []natsio.Option + reconnectHandlerRegistered bool + disconnectErrHandlerRegistered bool +} + +func (c *connection) Connect() error { + if c.isConnected() { + return nil + } + + var err error + if c.conn, err = natsio.Connect(c.url, c.opts...); err != nil { + return err + } + + if c.isConnected() { + return nil + } + + return natsconnectionerrors.ErrCannotConnect +} + +func (c *connection) Disconnect() { + if c.conn == nil || c.conn.IsClosed() { + return + } + c.conn.Close() +} + +func (c *connection) isConnected() bool { + if c.conn == nil { + return false + } + return c.conn.IsConnected() +} + +func (c *connection) RegisterReconnectHandlerIfNotRegistered(handler natsio.ConnHandler) { + if c.conn == nil || c.reconnectHandlerRegistered { + return + } + c.conn.SetReconnectHandler(handler) + c.reconnectHandlerRegistered = true +} + +func (c *connection) RegisterDisconnectErrHandlerIfNotRegistered(handler natsio.ConnErrHandler) { + if c.conn == nil || c.disconnectErrHandlerRegistered { + return + } + c.conn.SetDisconnectErrHandler(handler) + c.disconnectErrHandlerRegistered = true +} diff --git a/internal/connection/nats/errors/errors.go b/internal/connection/nats/errors/errors.go new file mode 100644 index 000000000..f0d97cddb --- /dev/null +++ b/internal/connection/nats/errors/errors.go @@ -0,0 +1,20 @@ +package errors + +import ( + "errors" +) + +const ( + errCannotConnect = "cannot connect to NATS" + errEmptyConnectionURL = "empty NATS connection URL" + errEmptyConnectionName = "empty NATS connection name" +) + +// ErrCannotConnect represents an error when NATS connection failed. +var ErrCannotConnect = errors.New(errCannotConnect) + +// ErrEmptyConnectionURL represents an error when NATS connection URL is empty. +var ErrEmptyConnectionURL = errors.New(errEmptyConnectionURL) + +// ErrEmptyConnectionName represents an error when NATS connection name is empty. +var ErrEmptyConnectionName = errors.New(errEmptyConnectionName) diff --git a/internal/connection/nats/interface.go b/internal/connection/nats/interface.go new file mode 100644 index 000000000..e6d92f43b --- /dev/null +++ b/internal/connection/nats/interface.go @@ -0,0 +1,20 @@ +package nats + +import ( + natsio "github.com/nats-io/nats.go" +) + +//go:generate go run github.com/vektra/mockery/v2 --name=NATS --filename=nats.go +type NATS interface { + // Connect connects to NATS and returns an error if it cannot. + Connect() error + + // Disconnect disconnects the active connection. + Disconnect() + + // RegisterReconnectHandlerIfNotRegistered registers a ReconnectHandler only if not registered. + RegisterReconnectHandlerIfNotRegistered(natsio.ConnHandler) + + // RegisterDisconnectErrHandlerIfNotRegistered registers a DisconnectErrHandler only if not registered. + RegisterDisconnectErrHandlerIfNotRegistered(natsio.ConnErrHandler) +} diff --git a/internal/connection/nats/mocks/builder.go b/internal/connection/nats/mocks/builder.go new file mode 100644 index 000000000..109550dbe --- /dev/null +++ b/internal/connection/nats/mocks/builder.go @@ -0,0 +1,17 @@ +package mocks + +import ( + natsconnection "github.com/kyma-project/eventing-manager/internal/connection/nats" +) + +type Builder struct { + mock *NATS +} + +func NewBuilder(mock *NATS) *Builder { + return &Builder{mock: mock} +} + +func (b *Builder) Build() natsconnection.NATS { + return b.mock +} diff --git a/internal/connection/nats/mocks/nats.go b/internal/connection/nats/mocks/nats.go new file mode 100644 index 000000000..1c2c02437 --- /dev/null +++ b/internal/connection/nats/mocks/nats.go @@ -0,0 +1,179 @@ +// Code generated by mockery v2.39.2. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + + nats_go "github.com/nats-io/nats.go" +) + +// NATS is an autogenerated mock type for the NATS type +type NATS struct { + mock.Mock +} + +type NATS_Expecter struct { + mock *mock.Mock +} + +func (_m *NATS) EXPECT() *NATS_Expecter { + return &NATS_Expecter{mock: &_m.Mock} +} + +// Connect provides a mock function with given fields: +func (_m *NATS) Connect() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Connect") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NATS_Connect_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Connect' +type NATS_Connect_Call struct { + *mock.Call +} + +// Connect is a helper method to define mock.On call +func (_e *NATS_Expecter) Connect() *NATS_Connect_Call { + return &NATS_Connect_Call{Call: _e.mock.On("Connect")} +} + +func (_c *NATS_Connect_Call) Run(run func()) *NATS_Connect_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *NATS_Connect_Call) Return(_a0 error) *NATS_Connect_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *NATS_Connect_Call) RunAndReturn(run func() error) *NATS_Connect_Call { + _c.Call.Return(run) + return _c +} + +// Disconnect provides a mock function with given fields: +func (_m *NATS) Disconnect() { + _m.Called() +} + +// NATS_Disconnect_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Disconnect' +type NATS_Disconnect_Call struct { + *mock.Call +} + +// Disconnect is a helper method to define mock.On call +func (_e *NATS_Expecter) Disconnect() *NATS_Disconnect_Call { + return &NATS_Disconnect_Call{Call: _e.mock.On("Disconnect")} +} + +func (_c *NATS_Disconnect_Call) Run(run func()) *NATS_Disconnect_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *NATS_Disconnect_Call) Return() *NATS_Disconnect_Call { + _c.Call.Return() + return _c +} + +func (_c *NATS_Disconnect_Call) RunAndReturn(run func()) *NATS_Disconnect_Call { + _c.Call.Return(run) + return _c +} + +// RegisterDisconnectErrHandlerIfNotRegistered provides a mock function with given fields: _a0 +func (_m *NATS) RegisterDisconnectErrHandlerIfNotRegistered(_a0 nats_go.ConnErrHandler) { + _m.Called(_a0) +} + +// NATS_RegisterDisconnectErrHandlerIfNotRegistered_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterDisconnectErrHandlerIfNotRegistered' +type NATS_RegisterDisconnectErrHandlerIfNotRegistered_Call struct { + *mock.Call +} + +// RegisterDisconnectErrHandlerIfNotRegistered is a helper method to define mock.On call +// - _a0 nats_go.ConnErrHandler +func (_e *NATS_Expecter) RegisterDisconnectErrHandlerIfNotRegistered(_a0 interface{}) *NATS_RegisterDisconnectErrHandlerIfNotRegistered_Call { + return &NATS_RegisterDisconnectErrHandlerIfNotRegistered_Call{Call: _e.mock.On("RegisterDisconnectErrHandlerIfNotRegistered", _a0)} +} + +func (_c *NATS_RegisterDisconnectErrHandlerIfNotRegistered_Call) Run(run func(_a0 nats_go.ConnErrHandler)) *NATS_RegisterDisconnectErrHandlerIfNotRegistered_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(nats_go.ConnErrHandler)) + }) + return _c +} + +func (_c *NATS_RegisterDisconnectErrHandlerIfNotRegistered_Call) Return() *NATS_RegisterDisconnectErrHandlerIfNotRegistered_Call { + _c.Call.Return() + return _c +} + +func (_c *NATS_RegisterDisconnectErrHandlerIfNotRegistered_Call) RunAndReturn(run func(nats_go.ConnErrHandler)) *NATS_RegisterDisconnectErrHandlerIfNotRegistered_Call { + _c.Call.Return(run) + return _c +} + +// RegisterReconnectHandlerIfNotRegistered provides a mock function with given fields: _a0 +func (_m *NATS) RegisterReconnectHandlerIfNotRegistered(_a0 nats_go.ConnHandler) { + _m.Called(_a0) +} + +// NATS_RegisterReconnectHandlerIfNotRegistered_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterReconnectHandlerIfNotRegistered' +type NATS_RegisterReconnectHandlerIfNotRegistered_Call struct { + *mock.Call +} + +// RegisterReconnectHandlerIfNotRegistered is a helper method to define mock.On call +// - _a0 nats_go.ConnHandler +func (_e *NATS_Expecter) RegisterReconnectHandlerIfNotRegistered(_a0 interface{}) *NATS_RegisterReconnectHandlerIfNotRegistered_Call { + return &NATS_RegisterReconnectHandlerIfNotRegistered_Call{Call: _e.mock.On("RegisterReconnectHandlerIfNotRegistered", _a0)} +} + +func (_c *NATS_RegisterReconnectHandlerIfNotRegistered_Call) Run(run func(_a0 nats_go.ConnHandler)) *NATS_RegisterReconnectHandlerIfNotRegistered_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(nats_go.ConnHandler)) + }) + return _c +} + +func (_c *NATS_RegisterReconnectHandlerIfNotRegistered_Call) Return() *NATS_RegisterReconnectHandlerIfNotRegistered_Call { + _c.Call.Return() + return _c +} + +func (_c *NATS_RegisterReconnectHandlerIfNotRegistered_Call) RunAndReturn(run func(nats_go.ConnHandler)) *NATS_RegisterReconnectHandlerIfNotRegistered_Call { + _c.Call.Return(run) + return _c +} + +// NewNATS creates a new instance of NATS. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewNATS(t interface { + mock.TestingT + Cleanup(func()) +}) *NATS { + mock := &NATS{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/controller/operator/eventing/controller.go b/internal/controller/operator/eventing/controller.go index 505a0916f..423dd5559 100644 --- a/internal/controller/operator/eventing/controller.go +++ b/internal/controller/operator/eventing/controller.go @@ -20,8 +20,10 @@ import ( "context" "errors" "fmt" + "os" "strings" + natsio "github.com/nats-io/nats.go" "go.uber.org/zap" kappsv1 "k8s.io/api/apps/v1" kautoscalingv2 "k8s.io/api/autoscaling/v2" @@ -44,6 +46,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" operatorv1alpha1 "github.com/kyma-project/eventing-manager/api/operator/v1alpha1" + natsconnection "github.com/kyma-project/eventing-manager/internal/connection/nats" + natsconnectionerrors "github.com/kyma-project/eventing-manager/internal/connection/nats/errors" "github.com/kyma-project/eventing-manager/options" "github.com/kyma-project/eventing-manager/pkg/env" "github.com/kyma-project/eventing-manager/pkg/eventing" @@ -72,11 +76,11 @@ const ( ) var ( - ErrSubscriptionExists = errors.New(SubscriptionExistsErrMessage) - ErrUnsupportedBackedType = errors.New("backend type not supported") - ErrNatsModuleMissing = errors.New("NATS module has to be installed") - ErrNATSServerUnavailable = errors.New(NatsServerNotAvailableMsg) - ErrAPIGatewayModuleMissing = errors.New("API-Gateway module is needed for EventMesh backend. APIRules CRD is not installed") + ErrSubscriptionExists = errors.New(SubscriptionExistsErrMessage) + ErrUnsupportedBackedType = errors.New("backend type not supported") + ErrNatsModuleMissing = errors.New("NATS module has to be installed") + ErrAPIGatewayModuleMissing = errors.New("API-Gateway module is needed for EventMesh backend. APIRules CRD is not installed") + ErrNilNATSConnectionBuilder = errors.New("connection builder for NATS backend is nil") ) // Reconciler reconciles an Eventing object @@ -105,6 +109,9 @@ type Reconciler struct { clusterScopedResourcesWatched bool natsCRWatchStarted bool natsWatchers map[string]watcher.Watcher + natsConnection natsconnection.NATS + genericEvents chan event.GenericEvent + natsConnectionBuilder natsconnection.Builder } func NewReconciler( @@ -119,6 +126,7 @@ func NewReconciler( subManagerFactory subscriptionmanager.ManagerFactory, opts *options.Options, allowedEventingCR *operatorv1alpha1.Eventing, + natsConnectionBuilder natsconnection.Builder, ) *Reconciler { return &Reconciler{ Client: client, @@ -137,6 +145,9 @@ func NewReconciler( natsConfigHandler: NewNatsConfigHandler(kubeClient, opts), allowedEventingCR: allowedEventingCR, natsWatchers: make(map[string]watcher.Watcher), + natsConnection: nil, // This will be initialized only when reconciling the NATS backend. + genericEvents: make(chan event.GenericEvent), + natsConnectionBuilder: natsConnectionBuilder, } } @@ -300,6 +311,7 @@ func (r *Reconciler) SetupWithManager(mgr kctrl.Manager) error { }, ), ). + WatchesRawSource(&source.Channel{Source: r.genericEvents}, &handler.EnqueueRequestForObject{}). WithOptions(controller.Options{ MaxConcurrentReconciles: 0, }). @@ -467,11 +479,9 @@ func (r *Reconciler) handleEventingReconcile(ctx context.Context, // set webhook condition to true. eventing.Status.SetWebhookReadyConditionToTrue() - // handle switching of backend. - if eventing.Status.ActiveBackend != "" { - if err := r.handleBackendSwitching(ctx, eventing, log); err != nil { - return kctrl.Result{}, r.syncStatusWithSubscriptionManagerErr(ctx, eventing, err, log) - } + // handle backend switching. + if err := r.handleBackendSwitching(ctx, eventing, log); err != nil { + return kctrl.Result{}, r.syncStatusWithSubscriptionManagerErr(ctx, eventing, err, log) } // update ActiveBackend in status. @@ -492,38 +502,45 @@ func (r *Reconciler) handleEventingReconcile(ctx context.Context, case operatorv1alpha1.EventMeshBackendType: return r.reconcileEventMeshBackend(ctx, eventing, log) default: - return kctrl.Result{Requeue: false}, fmt.Errorf("%w: %s", ErrUnsupportedBackedType, eventing.Spec.Backend.Type) + return kctrl.Result{}, reconcile.TerminalError(fmt.Errorf("%w: %s", ErrUnsupportedBackedType, eventing.Spec.Backend.Type)) } } +// handleBackendSwitching handles backend switching. +// It stops the previously active backend if needed. +// It updates the Eventing CR status properly. func (r *Reconciler) handleBackendSwitching(ctx context.Context, - eventing *operatorv1alpha1.Eventing, log *zap.SugaredLogger, + eventingCR *operatorv1alpha1.Eventing, log *zap.SugaredLogger, ) error { - // check if the backend was changed. - if !eventing.IsSpecBackendTypeChanged() { + if eventingCR.IsPreviousBackendEmpty() || !eventingCR.IsSpecBackendTypeChanged() { return nil } - // stop the previously active backend. - if eventing.Status.ActiveBackend == operatorv1alpha1.NatsBackendType { + // stop the previous active backend. + previousBackend := eventingCR.Status.ActiveBackend + if previousBackend == operatorv1alpha1.NatsBackendType { log.Info("Stopping the NATS subscription manager because backend is switched") if err := r.stopNATSSubManager(true, log); err != nil { return err } - r.stopNATSCRWatch(eventing) - } else if eventing.Status.ActiveBackend == operatorv1alpha1.EventMeshBackendType { + r.stopNATSCRWatch(eventingCR) + if r.natsConnection != nil { + r.natsConnection.Disconnect() + } + } else { + log.Info("Stopping the EventMesh subscription manager because backend is switched") if err := r.stopEventMeshSubManager(true, log); err != nil { return err } } // update the Eventing CR status. - eventing.Status.SetStateProcessing() - eventing.Status.ClearConditions() + eventingCR.Status.SetStateProcessing() + eventingCR.Status.ClearConditions() // delete publisher proxy resources. log.Infof("deleting publisher proxy resources") - err := r.eventingManager.DeletePublisherProxyResources(ctx, eventing) + err := r.eventingManager.DeletePublisherProxyResources(ctx, eventingCR) if err != nil { return err } @@ -531,7 +548,9 @@ func (r *Reconciler) handleBackendSwitching(ctx context.Context, return nil } -func (r *Reconciler) reconcileNATSBackend(ctx context.Context, eventing *operatorv1alpha1.Eventing, log *zap.SugaredLogger) (kctrl.Result, error) { +func (r *Reconciler) reconcileNATSBackend(ctx context.Context, + eventingCR *operatorv1alpha1.Eventing, log *zap.SugaredLogger, +) (kctrl.Result, error) { // retrieves the NATS CRD _, err := r.kubeClient.GetCRD(ctx, k8s.NatsGVK().GroupResource().String()) if err != nil { @@ -539,52 +558,81 @@ func (r *Reconciler) reconcileNATSBackend(ctx context.Context, eventing *operato // delete the publisher proxy resources, because the publisher deployment will go // into CrashLoopBackOff. log.Infof("NATS module not enabled, deleting publisher proxy resources") - delErr := r.eventingManager.DeletePublisherProxyResources(ctx, eventing) + delErr := r.eventingManager.DeletePublisherProxyResources(ctx, eventingCR) if delErr != nil { return kctrl.Result{}, delErr } // update the Eventing CR status. notFoundErr := fmt.Errorf("%w: %w", ErrNatsModuleMissing, err) - return kctrl.Result{}, r.syncStatusWithNATSState(ctx, operatorv1alpha1.StateWarning, eventing, + return kctrl.Result{}, r.syncStatusWithNATSState(ctx, operatorv1alpha1.StateWarning, eventingCR, notFoundErr, log) } return kctrl.Result{}, err } - if err = r.startNATSCRWatch(eventing); err != nil { - return kctrl.Result{}, r.syncStatusWithNATSErr(ctx, eventing, err, log) + if err = r.startNATSCRWatch(eventingCR); err != nil { + return kctrl.Result{}, r.syncStatusWithNATSErr(ctx, eventingCR, err, log) } - // check nats CR if it exists and is in natsAvailable state - err = r.checkNATSAvailability(ctx, eventing) - if err != nil { - return kctrl.Result{}, r.syncStatusWithNATSErr(ctx, eventing, err, log) + if err = r.connectToNATS(eventingCR); err != nil { + if errors.Is(err, ErrNilNATSConnectionBuilder) { + r.namedLogger().Error(err) + os.Exit(1) + } + + if errors.Is(err, natsconnectionerrors.ErrCannotConnect) { + return kctrl.Result{}, reconcile.TerminalError( + r.syncStatusWithNATSErr(ctx, eventingCR, err, log), + ) + } + + return kctrl.Result{}, r.syncStatusWithNATSErr(ctx, eventingCR, err, log) } // set NATSAvailable condition to true and update status - eventing.Status.SetNATSAvailableConditionToTrue() + eventingCR.Status.SetNATSAvailableConditionToTrue() - deployment, err := r.handlePublisherProxy(ctx, eventing, eventing.Spec.Backend.Type) + deployment, err := r.handlePublisherProxy(ctx, eventingCR, eventingCR.Spec.Backend.Type) if err != nil { - return kctrl.Result{}, r.syncStatusWithPublisherProxyErr(ctx, eventing, err, log) + return kctrl.Result{}, r.syncStatusWithPublisherProxyErr(ctx, eventingCR, err, log) } // start NATS subscription manager - if err := r.reconcileNATSSubManager(eventing, log); err != nil { - return kctrl.Result{}, r.syncStatusWithNATSErr(ctx, eventing, err, log) + if err := r.reconcileNATSSubManager(eventingCR, log); err != nil { + return kctrl.Result{}, r.syncStatusWithNATSErr(ctx, eventingCR, err, log) } - return r.handleEventingState(ctx, deployment, eventing, log) + return r.handleEventingState(ctx, deployment, eventingCR, log) } -func (r *Reconciler) checkNATSAvailability(ctx context.Context, eventing *operatorv1alpha1.Eventing) error { - natsAvailable, err := r.eventingManager.IsNATSAvailable(ctx, eventing.Namespace) - if err != nil { - return err +// connectToNATS connects to NATS and returns an error if it failed. +// It also registers handlers for reconnection and disconnection. +func (r *Reconciler) connectToNATS(eventingCR *operatorv1alpha1.Eventing) error { + if r.natsConnectionBuilder == nil { + return ErrNilNATSConnectionBuilder } - if !natsAvailable { - return ErrNATSServerUnavailable + + if r.natsConnection == nil { + r.natsConnection = r.natsConnectionBuilder.Build() + } + + if err := r.natsConnection.Connect(); err != nil { + return err } + + // At this point, it is safe to register handlers + // because the internal nats connection is initialized. + r.natsConnection.RegisterReconnectHandlerIfNotRegistered( + func(_ *natsio.Conn) { + r.genericEvents <- event.GenericEvent{Object: eventingCR} + }, + ) + r.natsConnection.RegisterDisconnectErrHandlerIfNotRegistered( + func(_ *natsio.Conn, _ error) { + r.genericEvents <- event.GenericEvent{Object: eventingCR} + }, + ) + return nil } diff --git a/internal/controller/operator/eventing/integrationtests/controller/integration_test.go b/internal/controller/operator/eventing/integrationtests/controller/integration_test.go index 292dedfa5..d8afad635 100644 --- a/internal/controller/operator/eventing/integrationtests/controller/integration_test.go +++ b/internal/controller/operator/eventing/integrationtests/controller/integration_test.go @@ -51,7 +51,7 @@ func TestMain(m *testing.M) { ApplicationRuleCRDEnabled: true, NATSCRDEnabled: true, AllowedEventingCR: nil, - }) + }, nil) if err != nil { panic(err) } @@ -78,23 +78,6 @@ func Test_CreateEventingCR_NATS(t *testing.T) { wantMatches gomegatypes.GomegaMatcher wantEnsureK8sObjects bool }{ - { - name: "Eventing CR should error state due to NATS is unavailable", - givenEventing: utils.NewEventingCR( - utils.WithEventingCRMinimal(), - utils.WithEventingStreamData("Memory", "1M", 1, 1), - utils.WithEventingPublisherData(1, 1, "199m", "99Mi", "399m", "199Mi"), - ), - givenNATS: natstestutils.NewNATSCR( - natstestutils.WithNATSCRDefaults(), - ), - givenNATSReady: false, - wantMatches: gomega.And( - matchers.HaveStatusError(), - matchers.HaveNATSNotAvailableCondition(), - matchers.HaveFinalizer(), - ), - }, { name: "Eventing CR should have ready state when all deployment replicas are ready", givenEventing: utils.NewEventingCR( @@ -895,60 +878,6 @@ func Test_WatcherNATSResource(t *testing.T) { wantOriginalEventingMatches gomegatypes.GomegaMatcher wantTargetEventingMatches gomegatypes.GomegaMatcher }{ - { - name: "should update Eventing CR state if NATS CR state changes from ready to error", - givenOriginalNATS: natstestutils.NewNATSCR( - natstestutils.WithNATSCRDefaults(), - natstestutils.WithNATSStateReady(), - ), - givenTargetNATS: natstestutils.NewNATSCR( - natstestutils.WithNATSCRDefaults(), - natstestutils.WithNATSStateError(), - ), - wantOriginalEventingMatches: gomega.And( - matchers.HaveStatusReady(), - matchers.HaveNATSAvailableCondition(), - ), - wantTargetEventingMatches: gomega.And( - matchers.HaveStatusError(), - matchers.HaveNATSNotAvailableCondition(), - ), - }, - { - name: "should update Eventing CR state if NATS CR state changes from error to ready", - givenOriginalNATS: natstestutils.NewNATSCR( - natstestutils.WithNATSCRDefaults(), - natstestutils.WithNATSStateError(), - ), - givenTargetNATS: natstestutils.NewNATSCR( - natstestutils.WithNATSCRDefaults(), - natstestutils.WithNATSStateReady(), - ), - wantOriginalEventingMatches: gomega.And( - matchers.HaveStatusError(), - matchers.HaveNATSNotAvailableCondition(), - ), - wantTargetEventingMatches: gomega.And( - matchers.HaveStatusReady(), - matchers.HaveNATSAvailableCondition(), - ), - }, - { - name: "should update Eventing CR state to error when NATS CR is deleted", - givenOriginalNATS: natstestutils.NewNATSCR( - natstestutils.WithNATSCRDefaults(), - natstestutils.WithNATSStateReady(), - ), - givenTargetNATS: nil, // means, NATS CR is deleted. - wantOriginalEventingMatches: gomega.And( - matchers.HaveStatusReady(), - matchers.HaveNATSAvailableCondition(), - ), - wantTargetEventingMatches: gomega.And( - matchers.HaveStatusError(), - matchers.HaveNATSNotAvailableCondition(), - ), - }, { name: "should not reconcile Eventing CR in EventMesh mode", givenOriginalNATS: natstestutils.NewNATSCR( diff --git a/internal/controller/operator/eventing/integrationtests/controllersinglecr/integration_test.go b/internal/controller/operator/eventing/integrationtests/controllersinglecr/integration_test.go index 467c23105..df8b7c7c8 100644 --- a/internal/controller/operator/eventing/integrationtests/controllersinglecr/integration_test.go +++ b/internal/controller/operator/eventing/integrationtests/controllersinglecr/integration_test.go @@ -43,7 +43,7 @@ func TestMain(m *testing.M) { ApplicationRuleCRDEnabled: true, NATSCRDEnabled: true, AllowedEventingCR: givenAllowedEventingCR, - }) + }, nil) if err != nil { panic(err) } diff --git a/internal/controller/operator/eventing/integrationtests/controllerswitching/integration_test.go b/internal/controller/operator/eventing/integrationtests/controllerswitching/integration_test.go index 87e852b5e..7a788fa7c 100644 --- a/internal/controller/operator/eventing/integrationtests/controllerswitching/integration_test.go +++ b/internal/controller/operator/eventing/integrationtests/controllerswitching/integration_test.go @@ -40,7 +40,7 @@ func TestMain(m *testing.M) { ApplicationRuleCRDEnabled: true, NATSCRDEnabled: true, AllowedEventingCR: nil, - }) + }, nil) if err != nil { panic(err) } diff --git a/internal/controller/operator/eventing/integrationtests/nats_disabled/integration_test.go b/internal/controller/operator/eventing/integrationtests/nats_disabled/integration_test.go index 747a328a5..631096eae 100644 --- a/internal/controller/operator/eventing/integrationtests/nats_disabled/integration_test.go +++ b/internal/controller/operator/eventing/integrationtests/nats_disabled/integration_test.go @@ -39,7 +39,7 @@ func TestMain(m *testing.M) { ApplicationRuleCRDEnabled: true, NATSCRDEnabled: true, AllowedEventingCR: nil, - }) + }, nil) if err != nil { panic(err) } diff --git a/internal/controller/operator/eventing/integrationtests/natsconnection/integration_test.go b/internal/controller/operator/eventing/integrationtests/natsconnection/integration_test.go new file mode 100644 index 000000000..d3c9abbd1 --- /dev/null +++ b/internal/controller/operator/eventing/integrationtests/natsconnection/integration_test.go @@ -0,0 +1,100 @@ +package natsconnection + +import ( + "testing" + + natstestutils "github.com/kyma-project/nats-manager/testutils" + "github.com/onsi/gomega" + gomegatypes "github.com/onsi/gomega/types" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + kappsv1 "k8s.io/api/apps/v1" + + operatorv1alpha1 "github.com/kyma-project/eventing-manager/api/operator/v1alpha1" + natsconnectionerrors "github.com/kyma-project/eventing-manager/internal/connection/nats/errors" + natsconnectionmocks "github.com/kyma-project/eventing-manager/internal/connection/nats/mocks" + eventingcontroller "github.com/kyma-project/eventing-manager/internal/controller/operator/eventing" + "github.com/kyma-project/eventing-manager/test/matchers" + "github.com/kyma-project/eventing-manager/test/utils" + testutilsintegration "github.com/kyma-project/eventing-manager/test/utils/integration" +) + +// Test_NATSConnection tests the Eventing CR status when connecting to NATS. +func Test_NATSConnection(t *testing.T) { + // given + testCases := []struct { + name string + givenNATSConnectionMock func() *natsconnectionmocks.NATS + wantMatches gomegatypes.GomegaMatcher + }{ + { + name: "Eventing CR should be in ready state if connected to NATS", + givenNATSConnectionMock: func() *natsconnectionmocks.NATS { + conn := &natsconnectionmocks.NATS{} + conn.On("Connect").Return(nil) + conn.On("IsConnected").Return(true) + conn.On("RegisterReconnectHandlerIfNotRegistered", mock.Anything).Return() + conn.On("RegisterDisconnectErrHandlerIfNotRegistered", mock.Anything).Return() + return conn + }, + wantMatches: gomega.And( + matchers.HaveStatusReady(), + matchers.HaveNATSAvailableCondition(), + matchers.HavePublisherProxyReadyConditionDeployed(), + matchers.HaveFinalizer(), + ), + }, + { + name: "Eventing CR should be in error state if not connected to NATS", + givenNATSConnectionMock: func() *natsconnectionmocks.NATS { + conn := &natsconnectionmocks.NATS{} + conn.On("Connect").Return(natsconnectionerrors.ErrCannotConnect) + conn.On("IsConnected").Return(false) + conn.On("RegisterReconnectHandlerIfNotRegistered", mock.Anything).Return() + conn.On("RegisterDisconnectErrHandlerIfNotRegistered", mock.Anything).Return() + return conn + }, + wantMatches: gomega.And( + matchers.HaveStatusError(), + matchers.HaveBackendNotAvailableConditionWith( + natsconnectionerrors.ErrCannotConnect.Error(), + operatorv1alpha1.ConditionReasonNATSNotAvailable, + ), + matchers.HaveFinalizer(), + ), + }, + } + + for _, tc := range testCases { + tcc := tc + + t.Run(tcc.name, func(t *testing.T) { + t.Parallel() + + // setup environment + te, err := testutilsintegration.NewTestEnvironment( + testutilsintegration.TestEnvironmentConfig{ + NATSCRDEnabled: true, + ProjectRootDir: "../../../../../../", + }, + tcc.givenNATSConnectionMock(), + ) + require.NoError(t, err) + defer func() { require.NoError(t, te.TearDown()) }() // always cleanup + eventingcontroller.IsDeploymentReady = func(deployment *kappsv1.Deployment) bool { return true } + + // prepare resources + natsCR := natstestutils.NewNATSCR(natstestutils.WithNATSCRDefaults()) + eventingCR := utils.NewEventingCR(utils.WithEventingCRMinimal(), utils.WithEventingDomain(utils.Domain)) + natsCR.SetNamespace(eventingCR.Namespace) + + // create resources + te.EnsureNamespaceCreation(t, eventingCR.Namespace) + te.EnsureK8sResourceCreated(t, natsCR) + te.EnsureK8sResourceCreated(t, eventingCR) + + // then + te.GetEventingAssert(gomega.NewWithT(t), eventingCR).Should(tcc.wantMatches) + }) + } +} diff --git a/internal/controller/operator/eventing/integrationtests/validation/integration_test.go b/internal/controller/operator/eventing/integrationtests/validation/integration_test.go index ad78e2b83..115487e5a 100644 --- a/internal/controller/operator/eventing/integrationtests/validation/integration_test.go +++ b/internal/controller/operator/eventing/integrationtests/validation/integration_test.go @@ -91,7 +91,7 @@ func TestMain(m *testing.M) { ApplicationRuleCRDEnabled: true, NATSCRDEnabled: true, AllowedEventingCR: nil, - }) + }, nil) if err != nil { panic(err) } diff --git a/internal/controller/operator/eventing/integrationtests/without_apirule_crd/integration_test.go b/internal/controller/operator/eventing/integrationtests/without_apirule_crd/integration_test.go index 502ec12b8..72254d719 100644 --- a/internal/controller/operator/eventing/integrationtests/without_apirule_crd/integration_test.go +++ b/internal/controller/operator/eventing/integrationtests/without_apirule_crd/integration_test.go @@ -33,7 +33,7 @@ func TestMain(m *testing.M) { ApplicationRuleCRDEnabled: true, NATSCRDEnabled: true, AllowedEventingCR: nil, - }) + }, nil) if err != nil { panic(err) } diff --git a/internal/controller/operator/eventing/unit_test.go b/internal/controller/operator/eventing/unit_test.go index 54200e88f..f32c0e5a7 100644 --- a/internal/controller/operator/eventing/unit_test.go +++ b/internal/controller/operator/eventing/unit_test.go @@ -84,6 +84,7 @@ func NewMockedUnitTestEnvironment(t *testing.T, objs ...client.Object) *MockedUn nil, opts, nil, + nil, ) reconciler.ctrlManager = mockManager diff --git a/test/utils/integration/integration.go b/test/utils/integration/integration.go index 6a9a3f311..f9062d902 100644 --- a/test/utils/integration/integration.go +++ b/test/utils/integration/integration.go @@ -42,6 +42,7 @@ import ( eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" "github.com/kyma-project/eventing-manager/api/operator/v1alpha1" + natsconnectionmocks "github.com/kyma-project/eventing-manager/internal/connection/nats/mocks" eventingcontroller "github.com/kyma-project/eventing-manager/internal/controller/operator/eventing" "github.com/kyma-project/eventing-manager/options" "github.com/kyma-project/eventing-manager/pkg/env" @@ -91,14 +92,16 @@ type TestEnvironmentConfig struct { } //nolint:funlen // Used in testing -func NewTestEnvironment(config TestEnvironmentConfig) (*TestEnvironment, error) { +func NewTestEnvironment(config TestEnvironmentConfig, connMock *natsconnectionmocks.NATS) (*TestEnvironment, error) { var err error // setup context ctx := context.Background() - opts := options.New() - if err := opts.Parse(); err != nil { - return nil, err + opts := &options.Options{ + Env: options.Env{ + LogFormat: "json", + LogLevel: "info", + }, } // setup logger @@ -190,6 +193,16 @@ func NewTestEnvironment(config TestEnvironmentConfig) (*TestEnvironment, error) subManagerFactoryMock.On("NewJetStreamManager", mock.Anything, mock.Anything).Return(jetStreamSubManagerMock) subManagerFactoryMock.On("NewEventMeshManager", mock.Anything).Return(eventMeshSubManagerMock, nil) + // setup default mock + if connMock == nil { + connMock = &natsconnectionmocks.NATS{} + connMock.On("Connect").Return(nil) + connMock.On("IsConnected").Return(true) + connMock.On("Disconnect").Return() + connMock.On("RegisterReconnectHandlerIfNotRegistered", mock.Anything).Return() + connMock.On("RegisterDisconnectErrHandlerIfNotRegistered", mock.Anything).Return() + } + // create a new watcher eventingReconciler := eventingcontroller.NewReconciler( k8sClient, @@ -203,6 +216,7 @@ func NewTestEnvironment(config TestEnvironmentConfig) (*TestEnvironment, error) subManagerFactoryMock, opts, config.AllowedEventingCR, + natsconnectionmocks.NewBuilder(connMock), ) if err = (eventingReconciler).SetupWithManager(ctrlMgr); err != nil { @@ -213,7 +227,7 @@ func NewTestEnvironment(config TestEnvironmentConfig) (*TestEnvironment, error) var cancelCtx context.CancelFunc go func() { var mgrCtx context.Context - mgrCtx, cancelCtx = context.WithCancel(kctrl.SetupSignalHandler()) + mgrCtx, cancelCtx = context.WithCancel(context.Background()) err = ctrlMgr.Start(mgrCtx) if err != nil { panic(err)