Skip to content

Commit

Permalink
linter issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Freeman committed Oct 5, 2024
1 parent ecf587a commit 8b1b145
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 54 deletions.
File renamed without changes.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ linters:
- errcheck
- errorlint
- exhaustive
- exportloopref
- copyloopvar
- funlen
- gochecknoglobals
- gochecknoinits
Expand Down
2 changes: 1 addition & 1 deletion pkg/eventrunner/app_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"gofr.dev/pkg/gofr/migration"
)

// AppWrapper wraps a *gofr.App and implements AppInterface
// AppWrapper wraps a *gofr.App and implements AppInterface.
type AppWrapper struct {
app *gofr.App
}
Expand Down
27 changes: 22 additions & 5 deletions pkg/eventrunner/cassandra_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,32 @@ func NewCassandraEventSink() *CassandraEventSink {
return &CassandraEventSink{}
}

func (s *CassandraEventSink) ConsumeEvent(ctx *gofr.Context, event *cloudevents.Event) error {
// CassandraInsertError is a custom error type for Cassandra insertion errors.
type CassandraInsertError struct {
OriginalError error
}

// Error implements the error interface for CassandraInsertError.
func (cie *CassandraInsertError) Error() string {
return fmt.Sprintf("failed to insert event into Cassandra: %v", cie.OriginalError)
}

// Unwrap allows errors.Is and errors.As to work with CassandraInsertError.
func (cie *CassandraInsertError) Unwrap() error {
return cie.OriginalError
}

func (*CassandraEventSink) ConsumeEvent(ctx *gofr.Context, event *cloudevents.Event) error {
if ctx == nil {
return fmt.Errorf("nil context provided to CassandraEventSink")
return errNilContext
}

if event == nil {
return fmt.Errorf("nil event provided to CassandraEventSink")
return errNilEvent
}

if ctx.Cassandra == nil {
return fmt.Errorf("cassandra client is nil in CassandraEventSink")
return errNilCassandra
}

// Get the event data as []byte
Expand Down Expand Up @@ -56,7 +73,7 @@ func (s *CassandraEventSink) ConsumeEvent(ctx *gofr.Context, event *cloudevents.
event.SpecVersion(),
)
if err != nil {
return fmt.Errorf("failed to insert event into Cassandra: %w", err)
return &CassandraInsertError{OriginalError: err}
}

return nil
Expand Down
42 changes: 29 additions & 13 deletions pkg/eventrunner/cassandra_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,59 +27,67 @@ func (m *MockCassandra) Exec(stmt string, values ...any) error {
if m.ExecFunc != nil {
return m.ExecFunc(stmt, values...)
}

return nil
}

func (m *MockCassandra) Query(dest any, stmt string, values ...any) error {
if m.QueryFunc != nil {
return m.QueryFunc(dest, stmt, values...)
}

return nil
}

func (m *MockCassandra) ExecCAS(dest any, stmt string, values ...any) (bool, error) {
if m.ExecCASFunc != nil {
return m.ExecCASFunc(dest, stmt, values...)
}

return true, nil
}

func (m *MockCassandra) NewBatch(name string, batchType int) error {
if m.NewBatchFunc != nil {
return m.NewBatchFunc(name, batchType)
}

return nil
}

func (m *MockCassandra) BatchQuery(name, stmt string, values ...any) error {
if m.BatchQueryFunc != nil {
m.BatchQueryFunc(name, stmt, values...)
}

return nil
}

func (m *MockCassandra) ExecuteBatch(name string) error {
if m.ExecuteBatchFunc != nil {
return m.ExecuteBatchFunc(name)
}

return nil
}

func (m *MockCassandra) ExecuteBatchCAS(name string, dest ...any) (bool, error) {
if m.ExecuteBatchCASFunc != nil {
return m.ExecuteBatchCASFunc(name, dest...)
}

return true, nil
}

func (m *MockCassandra) HealthCheck(ctx context.Context) (any, error) {
if m.HealthCheckFunc != nil {
return m.HealthCheckFunc(ctx)
}

return "OK", nil
}

// MockContext attempts to mimic the structure of gofr.Context
// MockContext attempts to mimic the structure of gofr.Context.
type MockContext struct {
*container.Container
}
Expand All @@ -99,28 +107,30 @@ func TestCassandraEventSink_ConsumeEvent(t *testing.T) {
event.SetSubject("test-subject")
event.SetTime(time.Now())
event.SetDataContentType("application/json")
event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"})
err := event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"})
require.NoError(t, err)

mockCassandra := &MockCassandra{}

mockContext := NewMockContext()
mockContext.Container.Cassandra = mockCassandra

err := sink.ConsumeEvent(mockContext, &event)
err = sink.ConsumeEvent(mockContext, &event)
require.NoError(t, err)
}

func TestCassandraEventSink_ConsumeEvent_NilContext(t *testing.T) {
sink := NewCassandraEventSink()
err := sink.ConsumeEvent(nil, &cloudevents.Event{})
assert.Error(t, err)
require.Error(t, err)
assert.Contains(t, err.Error(), "nil context provided")
}

func TestCassandraEventSink_ConsumeEvent_NilEvent(t *testing.T) {
sink := NewCassandraEventSink()
mockContext := NewMockContext()
err := sink.ConsumeEvent(mockContext, nil)
assert.Error(t, err)
require.Error(t, err)
assert.Contains(t, err.Error(), "nil event provided")
}

Expand All @@ -129,19 +139,24 @@ func TestCassandraEventSink_ConsumeEvent_NilCassandra(t *testing.T) {
mockContext := NewMockContext()
mockContext.Container.Cassandra = nil
err := sink.ConsumeEvent(mockContext, &cloudevents.Event{})
assert.Error(t, err)
require.Error(t, err)
assert.Contains(t, err.Error(), "cassandra client is nil")
}

func TestCassandraEventSink_ConsumeEvent_InvalidJSON(t *testing.T) {
sink := NewCassandraEventSink()

event := cloudevents.NewEvent()
event.SetData(cloudevents.ApplicationJSON, []byte("invalid json"))
err := event.SetData(cloudevents.ApplicationJSON, []byte("invalid json"))
require.NoError(t, err)

mockCassandra := &MockCassandra{}

mockContext := NewMockContext()
mockContext.Container.Cassandra = mockCassandra
err := sink.ConsumeEvent(mockContext, &event)
assert.Error(t, err)

err = sink.ConsumeEvent(mockContext, &event)
require.Error(t, err)
assert.Contains(t, err.Error(), "event data is not valid JSON")
}

Expand All @@ -154,17 +169,18 @@ func TestCassandraEventSink_ConsumeEvent_CassandraError(t *testing.T) {
event.SetSubject("test-subject")
event.SetTime(time.Now())
event.SetDataContentType("application/json")
event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"})
err := event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"})
require.NoError(t, err)

mockCassandra := &MockCassandra{
ExecFunc: func(stmt string, values ...any) error {
ExecFunc: func(string, ...any) error {
return assert.AnError
},
}
mockContext := NewMockContext()
mockContext.Container.Cassandra = mockCassandra

err := sink.ConsumeEvent(mockContext, &event)
assert.Error(t, err)
err = sink.ConsumeEvent(mockContext, &event)
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to insert event into Cassandra")
}
36 changes: 30 additions & 6 deletions pkg/eventrunner/consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventrunner

import (
"fmt"
"strings"
"sync"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -31,31 +32,54 @@ func (cm *ConsumerManager) AddConsumer(name string, consumer EventConsumer) {
cm.consumers[name] = consumer
}

// ConsumerErrors is a custom error type that holds multiple errors.
type ConsumerErrors struct {
Errors []error
}

// Error implements the error interface for ConsumerErrors.
func (ce *ConsumerErrors) Error() string {
if len(ce.Errors) == 0 {
return "no errors occurred"
}

errorStrings := make([]string, len(ce.Errors))

for i, err := range ce.Errors {
errorStrings[i] = err.Error()
}

return fmt.Sprintf("errors occurred while consuming event: %s", strings.Join(errorStrings, "; "))
}

func (cm *ConsumerManager) ConsumeEvent(c *gofr.Context, event *cloudevents.Event) error {
cm.mu.RLock()
defer cm.mu.RUnlock()

if c == nil {
return fmt.Errorf("nil context provided to ConsumerManager")
return errNilContext
}

if event == nil {
return fmt.Errorf("nil event provided to ConsumerManager")
return errNilEvent
}

var errors []error
var consumerErrors ConsumerErrors

for name, consumer := range cm.consumers {
if consumer == nil {
cm.logger.Logf("EventConsumer %s is nil, skipping", name)
continue
}

if err := consumer.ConsumeEvent(c, event); err != nil {
cm.logger.Errorf("EventConsumer %s failed: %v", name, err)
errors = append(errors, fmt.Errorf("consumer %s failed: %w", name, err))
consumerErrors.Errors = append(consumerErrors.Errors, fmt.Errorf("consumer %s failed: %w", name, err))
}
}

if len(errors) > 0 {
return fmt.Errorf("errors occurred while consuming event: %v", errors)
if len(consumerErrors.Errors) > 0 {
return &consumerErrors
}

return nil
Expand Down
11 changes: 5 additions & 6 deletions pkg/eventrunner/consumer_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package eventrunner

import (
"fmt"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -60,7 +59,7 @@ func TestConsumerManager_ConsumeEvent(t *testing.T) {
cm.AddConsumer("test", consumer)

err := cm.ConsumeEvent(mockContext, &mockEvent)
assert.NoError(t, err)
require.NoError(t, err)

cm.consumers = make(map[string]EventConsumer) // Reset consumers
})
Expand All @@ -70,7 +69,7 @@ func TestConsumerManager_ConsumeEvent(t *testing.T) {
successConsumer.EXPECT().ConsumeEvent(mockContext, &mockEvent).Return(nil)

failConsumer := NewMockEventConsumer(ctrl)
failConsumer.EXPECT().ConsumeEvent(mockContext, &mockEvent).Return(fmt.Errorf("consumer failed"))
failConsumer.EXPECT().ConsumeEvent(mockContext, &mockEvent).Return(errConsumerFailed)

cm.AddConsumer("success", successConsumer)
cm.AddConsumer("fail", failConsumer)
Expand All @@ -86,20 +85,20 @@ func TestConsumerManager_ConsumeEvent(t *testing.T) {
cm.AddConsumer("nil", nil)

err := cm.ConsumeEvent(mockContext, &mockEvent)
assert.NoError(t, err)
require.NoError(t, err)

cm.consumers = make(map[string]EventConsumer) // Reset consumers
})

t.Run("Nil context", func(t *testing.T) {
err := cm.ConsumeEvent(nil, &mockEvent)
assert.Error(t, err)
require.Error(t, err)
assert.Contains(t, err.Error(), "nil context provided")
})

t.Run("Nil event", func(t *testing.T) {
err := cm.ConsumeEvent(mockContext, nil)
assert.Error(t, err)
require.Error(t, err)
assert.Contains(t, err.Error(), "nil event provided")
})
}
13 changes: 13 additions & 0 deletions pkg/eventrunner/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package eventrunner

import "errors"

var (
errBufferForcedWriteError = errors.New("forced write error")
errPublishError = errors.New("publish error")
errConsumeEventError = errors.New("consume event error")
errConsumerFailed = errors.New("consumer failed")
errNilContext = errors.New("nil context provided")
errNilEvent = errors.New("nil event provided")
errNilCassandra = errors.New("cassandra client is nil in CassandraEventSink")
)
2 changes: 1 addition & 1 deletion pkg/eventrunner/eventsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type LogEventSink struct {
// Add any necessary fields (e.g., database connection)
}

func (s *LogEventSink) LogEvent(ctx context.Context, event *cloudevents.Event) error {
func (*LogEventSink) LogEvent(context.Context, *cloudevents.Event) error {
// Implement event logging logic here
return nil
}
Loading

0 comments on commit 8b1b145

Please sign in to comment.