Skip to content

Commit

Permalink
Rely on the NATS cluster connection status when reporting the Eventin…
Browse files Browse the repository at this point in the history
…g CR status
  • Loading branch information
marcobebway committed Jan 15, 2024
1 parent 0df1b52 commit 31c731c
Show file tree
Hide file tree
Showing 22 changed files with 774 additions and 126 deletions.
8 changes: 8 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ linters-settings:
alias: istio$1$2
- pkg: github.com/nats-io/nats-server/v2/(\w+)$
alias: natsio$1
- pkg: github.com/nats-io/nats.go
alias: natsio
- pkg: github.com/kyma-project/eventing-manager/internal/controller/(\w+)$
alias: controller$1
- pkg: github.com/kyma-project/kyma/common/logging/logger
Expand All @@ -159,6 +161,12 @@ linters-settings:
alias: natsv1alpha1
- pkg: github.com/kyma-project/nats-manager/testutils
alias: natstestutils
- pkg: github.com/kyma-project/eventing-manager/internal/connection/nats
alias: natsconnection
- pkg: github.com/kyma-project/eventing-manager/internal/connection/nats/errors
alias: natsconnectionerrors
- pkg: github.com/kyma-project/eventing-manager/internal/connection/nats/mocks
alias: natsconnectionmocks
- pkg: github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/(\w+)$
alias: subscriptioncontroller$1
- pkg: github.com/kyma-project/eventing-manager/internal/controller/operator/eventing
Expand Down
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ RUN go mod download
# Copy the go source
COPY cmd/main.go cmd/main.go
COPY api/ api/
COPY internal/controller/ internal/controller/
COPY internal/label/ internal/label/
COPY internal/ internal/
COPY pkg/ pkg/
COPY testing/ testing/
COPY options/ options/
Expand Down
4 changes: 4 additions & 0 deletions api/operator/v1alpha1/eventing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ func (e *Eventing) SyncStatusActiveBackend() {
e.Status.ActiveBackend = e.Spec.Backend.Type
}

func (e *Eventing) IsPreviousBackendEmpty() bool {
return e.Status.ActiveBackend == ""
}

func (e *Eventing) IsSpecBackendTypeChanged() bool {
return e.Status.ActiveBackend != e.Spec.Backend.Type
}
39 changes: 39 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"flag"
"log"
"os"
"time"

"github.com/go-logr/zapr"
apigatewayv1beta1 "github.com/kyma-project/api-gateway/apis/gateway/v1beta1"
natsio "github.com/nats-io/nats.go"
kapiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
kapixclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -40,6 +42,7 @@ import (
eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha1"
eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2"
operatorv1alpha1 "github.com/kyma-project/eventing-manager/api/operator/v1alpha1"
natsconnection "github.com/kyma-project/eventing-manager/internal/connection/nats"
controllercache "github.com/kyma-project/eventing-manager/internal/controller/cache"
controllerclient "github.com/kyma-project/eventing-manager/internal/controller/client"
eventingcontroller "github.com/kyma-project/eventing-manager/internal/controller/operator/eventing"
Expand Down Expand Up @@ -164,6 +167,14 @@ func main() { //nolint:funlen // main function needs to initialize many object
ctrLogger,
)

// init NATS connection builder
natsConnectionBuilder, err := getNATSConnectionBuilder()
if err != nil {
setupLog.Error(err, "failed to get a NATS connection builder")
syncLogger(ctrLogger)
os.Exit(1)
}

// create Eventing reconciler instance
eventingReconciler := eventingcontroller.NewReconciler(
k8sClient,
Expand All @@ -182,6 +193,7 @@ func main() { //nolint:funlen // main function needs to initialize many object
Namespace: backendConfig.EventingCRNamespace,
},
},
natsConnectionBuilder,
)

if err = (eventingReconciler).SetupWithManager(mgr); err != nil {
Expand Down Expand Up @@ -230,3 +242,30 @@ func main() { //nolint:funlen // main function needs to initialize many object
}
syncLogger(ctrLogger)
}

func getNATSConnectionBuilder() (natsconnection.Builder, error) {
const (
// connectionURL is the NATS connection URL.
// It should be configured as part of https://github.com/kyma-project/eventing-manager/issues/272.
connectionURL = "nats://eventing-nats.kyma-system.svc.cluster.local:4222"

// connectionName is the name to identify the NATS connection.
connectionName = "Eventing Reconciler"
)

// The following constants are used to configure the NATS client re-connectivity.
// Please do not change these values to not change the intended behavior.
const (
maxReconnects = -1
retryOnFailedConnect = true
reconnectWait = time.Second
)

return natsconnection.NewBuilder(
connectionURL,
connectionName,
natsio.MaxReconnects(maxReconnects),
natsio.RetryOnFailedConnect(retryOnFailedConnect),
natsio.ReconnectWait(reconnectWait),
)
}
13 changes: 13 additions & 0 deletions cmd/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package main

import (
"testing"

"github.com/stretchr/testify/require"
)

func Test_getNATSConnectionBuilder(t *testing.T) {
natsConnectionBuilder, err := getNATSConnectionBuilder()
require.NoError(t, err)
require.NotNil(t, natsConnectionBuilder)
}
41 changes: 41 additions & 0 deletions internal/connection/nats/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package nats

import (
"strings"

natsio "github.com/nats-io/nats.go"

"github.com/kyma-project/eventing-manager/internal/connection/nats/errors"
)

type Builder interface {
Build() NATS
}

type ConnectionBuilder struct {
url string
opts []natsio.Option
}

func NewBuilder(url, name string, opts ...natsio.Option) (*ConnectionBuilder, error) {
if len(strings.TrimSpace(url)) == 0 {
return nil, errors.ErrEmptyConnectionURL
}

if len(strings.TrimSpace(name)) == 0 {
return nil, errors.ErrEmptyConnectionName
}

opts = append(opts, natsio.Name(name)) // enforce configuring the connection name
return &ConnectionBuilder{url: url, opts: opts}, nil
}

func (b *ConnectionBuilder) Build() NATS {
return &connection{
url: b.url,
conn: nil,
opts: b.opts,
reconnectHandlerRegistered: false,
disconnectErrHandlerRegistered: false,
}
}
150 changes: 150 additions & 0 deletions internal/connection/nats/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package nats

import (
"reflect"
"testing"
"time"

natsio "github.com/nats-io/nats.go"
"github.com/stretchr/testify/require"

"github.com/kyma-project/eventing-manager/internal/connection/nats/errors"
)

func TestNewBuilder(t *testing.T) {
t.Parallel()

// given
const (
url = "url"
name = "name"
maxReconnects = 10
retryOnFailedConnect = true
reconnectWait = time.Minute
)

type args struct {
url string
name string
opts []natsio.Option
}
tests := []struct {
name string
args args
wantBuilder *ConnectionBuilder
wantErr error
}{
{
name: "should return an error if the URL is empty",
args: args{
url: "",
name: name,
opts: nil,
},
wantBuilder: nil,
wantErr: errors.ErrEmptyConnectionURL,
},
{
name: "should return an error if the name is empty",
args: args{
url: url,
name: "",
opts: nil,
},
wantBuilder: nil,
wantErr: errors.ErrEmptyConnectionName,
},
{
name: "should return a connection builder instance with no errors if there are no given options",
args: args{
url: url,
name: name,
opts: nil, // no given options
},
wantBuilder: &ConnectionBuilder{
url: url,
opts: []natsio.Option{
natsio.Name(name),
},
},
wantErr: nil,
},
{
name: "should return a connection builder instance with no errors if there are given options",
args: args{
url: url,
name: name,
opts: []natsio.Option{
natsio.MaxReconnects(maxReconnects),
natsio.RetryOnFailedConnect(retryOnFailedConnect),
natsio.ReconnectWait(reconnectWait),
},
},
wantBuilder: &ConnectionBuilder{
url: url,
opts: []natsio.Option{
natsio.Name(name),
natsio.MaxReconnects(10),
natsio.RetryOnFailedConnect(true),
natsio.ReconnectWait(time.Minute),
},
},
wantErr: nil,
},
}
for _, tt := range tests {
ttc := tt

t.Run(ttc.name, func(t *testing.T) {
t.Parallel()

// when
gotBuilder, gotErr := NewBuilder(ttc.args.url, ttc.args.name, ttc.args.opts...)

// then
require.Equal(t, ttc.wantErr, gotErr)
require.True(t, builderDeepEqual(t, ttc.wantBuilder, gotBuilder))
})
}
}

func TestConnectionBuilder_Build(t *testing.T) {
b := ConnectionBuilder{
url: "url",
opts: []natsio.Option{
natsio.Name("name"),
natsio.MaxReconnects(10),
natsio.RetryOnFailedConnect(true),
natsio.ReconnectWait(time.Minute),
},
}
require.NotNil(t, b.Build())
}

func builderDeepEqual(t *testing.T, a, b *ConnectionBuilder) bool {
t.Helper()

if a == b {
return true
}

if a.url != b.url {
return false
}

if len(a.opts) != len(b.opts) {
return false
}

aOpts := &natsio.Options{}
for _, opt := range a.opts {
require.NoError(t, opt(aOpts))
}

bOpts := &natsio.Options{}
for _, opt := range b.opts {
require.NoError(t, opt(bOpts))
}

return reflect.DeepEqual(aOpts, bOpts)
}
66 changes: 66 additions & 0 deletions internal/connection/nats/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package nats

import (
natsio "github.com/nats-io/nats.go"

natsconnectionerrors "github.com/kyma-project/eventing-manager/internal/connection/nats/errors"
)

// compile-time check.
var _ NATS = &connection{}

// connection represents a NATS connection.
type connection struct {
url string
conn *natsio.Conn
opts []natsio.Option
reconnectHandlerRegistered bool
disconnectErrHandlerRegistered bool
}

func (c *connection) Connect() error {
if c.isConnected() {
return nil
}

var err error
if c.conn, err = natsio.Connect(c.url, c.opts...); err != nil {
return err
}

if c.isConnected() {
return nil
}

return natsconnectionerrors.ErrCannotConnect
}

func (c *connection) Disconnect() {
if c.conn == nil || c.conn.IsClosed() {
return
}
c.conn.Close()
}

func (c *connection) isConnected() bool {
if c.conn == nil {
return false
}
return c.conn.IsConnected()
}

func (c *connection) RegisterReconnectHandlerIfNotRegistered(handler natsio.ConnHandler) {
if c.conn == nil || c.reconnectHandlerRegistered {
return
}
c.conn.SetReconnectHandler(handler)
c.reconnectHandlerRegistered = true
}

func (c *connection) RegisterDisconnectErrHandlerIfNotRegistered(handler natsio.ConnErrHandler) {
if c.conn == nil || c.disconnectErrHandlerRegistered {
return
}
c.conn.SetDisconnectErrHandler(handler)
c.disconnectErrHandlerRegistered = true
}
Loading

0 comments on commit 31c731c

Please sign in to comment.