Skip to content

Commit

Permalink
Add unit tests for the event functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
zivkovicmilos committed Dec 25, 2023
1 parent 32d74d0 commit 974f088
Show file tree
Hide file tree
Showing 5 changed files with 373 additions and 5 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: 1.20.x
go-version: 1.21.x

- name: Checkout code
uses: actions/checkout@v4
Expand All @@ -23,7 +23,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: 1.20.x
go-version: 1.21.x

- name: Checkout code
uses: actions/checkout@v4
Expand Down
266 changes: 266 additions & 0 deletions events/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package events

import (
"crypto/rand"
"math/big"
mathRand "math/rand"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestManager_SubscribeCancel(t *testing.T) {
t.Parallel()

numSubscriptions := 10
subscriptions := make([]*Subscription, numSubscriptions)
defaultEvents := []Type{"dummy"}
IDMap := make(map[SubscriptionID]bool)

m := NewManager()
defer m.Close()

// Create the subscriptions
for i := 0; i < numSubscriptions; i++ {
subscriptions[i] = m.Subscribe(defaultEvents)

// Check that the number is up-to-date
assert.Equal(t, int64(i+1), m.numSubscriptions)

// Check if a duplicate ID has been issued
if _, ok := IDMap[subscriptions[i].ID]; ok {
t.Fatalf("Duplicate ID entry")
} else {
IDMap[subscriptions[i].ID] = true
}
}

// Cancel them one by one
for indx, subscription := range subscriptions {
m.CancelSubscription(subscription.ID)

// Check that the number is up-to-date
assert.Equal(t, int64(numSubscriptions-indx-1), m.numSubscriptions)

// Check that the appropriate channel is closed
if _, more := <-subscription.SubCh; more {
t.Fatalf("Subscription channel not closed for index %d", indx)
}
}
}

func TestManager_SubscribeClose(t *testing.T) {
t.Parallel()

numSubscriptions := 10
subscriptions := make([]*Subscription, numSubscriptions)
defaultEvents := []Type{"dummy"}

m := NewManager()

// Create the subscriptions
for i := 0; i < numSubscriptions; i++ {
subscriptions[i] = m.Subscribe(defaultEvents)

// Check that the number is up-to-date
assert.Equal(t, int64(i+1), m.numSubscriptions)
}

// Close off the event manager
m.Close()
assert.Equal(t, int64(0), m.numSubscriptions)

// Check if the subscription channels are closed
for indx, subscription := range subscriptions {
if _, more := <-subscription.SubCh; more {
t.Fatalf("Subscription channel not closed for index %d", indx)
}
}
}

func TestManager_SignalEvent(t *testing.T) {
t.Parallel()

totalEvents := 10
invalidEvents := 3
validEvents := totalEvents - invalidEvents
supportedEventTypes := []Type{"dummy1", "dummy2"}

m := NewManager()
defer m.Close()

subscription := m.Subscribe(supportedEventTypes)

eventSupported := func(eventType Type) bool {
for _, supportedType := range supportedEventTypes {
if supportedType == eventType {
return true
}
}

return false
}

mockEvents := getMockEvents(
t,
supportedEventTypes,
totalEvents,
invalidEvents,
)

// Send the events
for _, mockEvent := range mockEvents {
m.SignalEvent(mockEvent)
}

// Make sure all valid events get processed
eventsProcessed := 0
supportedEventsProcessed := 0

completed := false
for !completed {
select {
case event := <-subscription.SubCh:
eventsProcessed++

if eventSupported(event.GetType()) {
supportedEventsProcessed++
}

if eventsProcessed == validEvents ||
supportedEventsProcessed == validEvents {
completed = true
}
case <-time.After(time.Second * 5):
completed = true
}
}

assert.Equal(t, validEvents, eventsProcessed)
assert.Equal(t, validEvents, supportedEventsProcessed)
}

func TestManager_SignalEventOrder(t *testing.T) {
t.Parallel()

totalEvents := 1000
supportedEventTypes := []Type{
"dummy 1",
"dummy 2",
"dummy 3",
"dummy 4",
"dummy 5",
}

m := NewManager()
defer m.Close()

subscription := m.Subscribe(supportedEventTypes)

mockEvents := getMockEvents(t, supportedEventTypes, totalEvents, 0)
eventsProcessed := 0

var wg sync.WaitGroup

wg.Add(totalEvents)

go func() {
for {
select {
case event, more := <-subscription.SubCh:
if more {
assert.Equal(t, mockEvents[eventsProcessed].GetType(), event.GetType())

eventsProcessed++

wg.Done()
}
case <-time.After(time.Second * 5):
for i := 0; i < totalEvents-eventsProcessed; i++ {
wg.Done()
}
}
}
}()

// Send the events
for _, mockEvent := range mockEvents {
m.SignalEvent(mockEvent)
}

// Make sure all valid events get processed
wg.Wait()

assert.Equal(t, totalEvents, eventsProcessed)
}

// getMockEvents generates mock events
// of supported types, and unsupported types,
// shuffling them in the process
func getMockEvents(
t *testing.T,
supportedTypes []Type,
count int,
numInvalid int,
) []*mockEvent {
t.Helper()

if count == 0 || len(supportedTypes) == 0 {
return []*mockEvent{}
}

if numInvalid > count {
numInvalid = count
}

allEvents := []Type{
"random type 1",
"random type 2",
"random type 3",
}

allEvents = append(allEvents, supportedTypes...)

tempSubscription := &eventSubscription{eventTypes: supportedTypes}

randomEventType := func(supported bool) Type {
for {
randNum, err := rand.Int(rand.Reader, big.NewInt(int64(len(allEvents))))
require.NoError(t, err)

randType := allEvents[randNum.Int64()]
if tempSubscription.eventSupported(randType) == supported {
return randType
}
}
}

events := make([]*mockEvent, 0)

// Fill in the unsupported events first
for invalidFilled := 0; invalidFilled < numInvalid; invalidFilled++ {
events = append(events, &mockEvent{
eventType: randomEventType(false),
data: []byte("data"),
})
}

// Fill in the supported events
for validFilled := 0; validFilled < count-numInvalid; validFilled++ {
events = append(events, &mockEvent{
eventType: randomEventType(true),
data: []byte("data"),
},
)
}

// Shuffle the events
mathRand.Shuffle(len(events), func(i, j int) {
events[i], events[j] = events[j], events[i]
})

return events
}
14 changes: 14 additions & 0 deletions events/mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package events

type mockEvent struct {
data any
eventType Type
}

func (m *mockEvent) GetType() Type {
return m.eventType
}

func (m *mockEvent) GetData() any {
return m.data
}
6 changes: 3 additions & 3 deletions events/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func (es *eventSubscription) runLoop() {
}

// eventSupported checks if any notification event needs to be triggered
func (es *eventSubscription) eventSupported(event Event) bool {
func (es *eventSubscription) eventSupported(eventType Type) bool {
for _, supportedType := range es.eventTypes {
if supportedType == event.GetType() {
if supportedType == eventType {
return true
}
}
Expand All @@ -62,7 +62,7 @@ func (es *eventSubscription) eventSupported(event Event) bool {

// pushEvent sends the event off for processing by the subscription. [NON-BLOCKING]
func (es *eventSubscription) pushEvent(event Event) {
if !es.eventSupported(event) {
if !es.eventSupported(event.GetType()) {
return
}

Expand Down
Loading

0 comments on commit 974f088

Please sign in to comment.