Skip to content

Commit

Permalink
feat: livestream service tests (#24209)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuziontech authored Aug 6, 2024
1 parent 6ec9210 commit fc8953c
Show file tree
Hide file tree
Showing 17 changed files with 1,002 additions and 15 deletions.
22 changes: 22 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: Go Test (for livestream service)

on:
pull_request:
paths:
- 'livestream/**'

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.22

- name: Run tests
run: cd livestream && go test -v
12 changes: 12 additions & 0 deletions livestream/.mockery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
quiet: False
with-expecter: True
dir: mocks/{{ replaceAll .InterfaceDirRelative "internal" "internal_" }}
mockname: '{{.InterfaceName}}'
outpkg: '{{.PackageName}}'
filename: '{{.InterfaceName}}.go'
all: True
packages:
github.com/posthog/posthog/livestream:
config:
recursive: True
outpkg: mocks
2 changes: 1 addition & 1 deletion livestream/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func loadConfigs() {
err := viper.ReadInConfig()
if err != nil {
sentry.CaptureException(err)
log.Fatalf("fatal error config file: %w", err)
log.Fatalf("fatal error config file: %v", err)
}

viper.OnConfigChange(func(e fsnotify.Event) {
Expand Down
58 changes: 58 additions & 0 deletions livestream/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package main

import (
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestEventWriteTo(t *testing.T) {
tests := []struct {
name string
event Event
expected string
}{
{
name: "Full event",
event: Event{
ID: []byte("1"),
Data: []byte("test data"),
Event: []byte("message"),
Retry: []byte("3000"),
},
expected: "id: 1\ndata: test data\nevent: message\nretry: 3000\n\n",
},
{
name: "Event with multiline data",
event: Event{
ID: []byte("2"),
Data: []byte("line1\nline2\nline3"),
},
expected: "id: 2\ndata: line1\ndata: line2\ndata: line3\n\n",
},
{
name: "Event with comment only",
event: Event{
Comment: []byte("This is a comment"),
},
expected: ": This is a comment\n\n",
},
{
name: "Empty event",
event: Event{},
expected: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := httptest.NewRecorder()
err := tt.event.WriteTo(w)

require.NoError(t, err, "WriteTo() should not return an error")
assert.Equal(t, tt.expected, w.Body.String(), "WriteTo() output should match expected")
})
}
}
183 changes: 183 additions & 0 deletions livestream/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package main

import (
"testing"
"time"

"sync/atomic"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewFilter(t *testing.T) {
subChan := make(chan Subscription)
unSubChan := make(chan Subscription)
inboundChan := make(chan PostHogEvent)

filter := NewFilter(subChan, unSubChan, inboundChan)

assert.NotNil(t, filter)
assert.Equal(t, subChan, filter.subChan)
assert.Equal(t, unSubChan, filter.unSubChan)
assert.Equal(t, inboundChan, filter.inboundChan)
assert.Empty(t, filter.subs)
}

func TestRemoveSubscription(t *testing.T) {
subs := []Subscription{
{ClientId: "1"},
{ClientId: "2"},
{ClientId: "3"},
}

result := removeSubscription("2", subs)

assert.Len(t, result, 2)
assert.Equal(t, "1", result[0].ClientId)
assert.Equal(t, "3", result[1].ClientId)
}

func TestUuidFromDistinctId(t *testing.T) {
result1 := uuidFromDistinctId(1, "user1")
result2 := uuidFromDistinctId(1, "user1")
result3 := uuidFromDistinctId(2, "user1")

assert.NotEmpty(t, result1)
assert.Equal(t, result1, result2)
assert.NotEqual(t, result1, result3)
assert.Empty(t, uuidFromDistinctId(0, "user1"))
assert.Empty(t, uuidFromDistinctId(1, ""))
}

func TestConvertToResponseGeoEvent(t *testing.T) {
event := PostHogEvent{
Lat: 40.7128,
Lng: -74.0060,
}

result := convertToResponseGeoEvent(event)

assert.Equal(t, 40.7128, result.Lat)
assert.Equal(t, -74.0060, result.Lng)
assert.Equal(t, uint(1), result.Count)
}

func TestConvertToResponsePostHogEvent(t *testing.T) {
event := PostHogEvent{
Uuid: "123",
Timestamp: "2023-01-01T00:00:00Z",
DistinctId: "user1",
Event: "pageview",
Properties: map[string]interface{}{"url": "https://example.com"},
}

result := convertToResponsePostHogEvent(event, 1)

assert.Equal(t, "123", result.Uuid)
assert.Equal(t, "2023-01-01T00:00:00Z", result.Timestamp)
assert.Equal(t, "user1", result.DistinctId)
assert.NotEmpty(t, result.PersonId)
assert.Equal(t, "pageview", result.Event)
assert.Equal(t, "https://example.com", result.Properties["url"])
}

func TestFilterRun(t *testing.T) {
subChan := make(chan Subscription)
unSubChan := make(chan Subscription)
inboundChan := make(chan PostHogEvent)

filter := NewFilter(subChan, unSubChan, inboundChan)

go filter.Run()

// Test subscription
eventChan := make(chan interface{}, 1)
sub := Subscription{
ClientId: "1",
TeamId: 1,
Token: "token1",
DistinctId: "user1",
EventTypes: []string{"pageview"},
EventChan: eventChan,
ShouldClose: &atomic.Bool{},
}
subChan <- sub

// Wait for subscription to be processed
time.Sleep(10 * time.Millisecond)

// Test event filtering
event := PostHogEvent{
Uuid: "123",
Timestamp: "2023-01-01T00:00:00Z",
DistinctId: "user1",
Token: "token1",
Event: "pageview",
Properties: map[string]interface{}{"url": "https://example.com"},
}
inboundChan <- event

// Wait for event to be processed
select {
case receivedEvent := <-eventChan:
responseEvent, ok := receivedEvent.(ResponsePostHogEvent)
require.True(t, ok)
assert.Equal(t, "123", responseEvent.Uuid)
assert.Equal(t, "user1", responseEvent.DistinctId)
assert.Equal(t, "pageview", responseEvent.Event)
case <-time.After(100 * time.Millisecond):
t.Fatal("Timed out waiting for event")
}

// Test unsubscription
unSubChan <- sub

// Wait for unsubscription to be processed
time.Sleep(10 * time.Millisecond)

assert.Empty(t, filter.subs)
}

func TestFilterRunWithGeoEvent(t *testing.T) {
subChan := make(chan Subscription)
unSubChan := make(chan Subscription)
inboundChan := make(chan PostHogEvent)

filter := NewFilter(subChan, unSubChan, inboundChan)

go filter.Run()

// Test subscription with Geo enabled
eventChan := make(chan interface{}, 1)
sub := Subscription{
ClientId: "1",
TeamId: 1,
Geo: true,
EventChan: eventChan,
ShouldClose: &atomic.Bool{},
}
subChan <- sub

// Wait for subscription to be processed
time.Sleep(10 * time.Millisecond)

// Test geo event filtering
event := PostHogEvent{
Lat: 40.7128,
Lng: -74.0060,
}
inboundChan <- event

// Wait for event to be processed
select {
case receivedEvent := <-eventChan:
geoEvent, ok := receivedEvent.(ResponseGeoEvent)
require.True(t, ok)
assert.Equal(t, 40.7128, geoEvent.Lat)
assert.Equal(t, -74.0060, geoEvent.Lng)
assert.Equal(t, uint(1), geoEvent.Count)
case <-time.After(100 * time.Millisecond):
t.Fatal("Timed out waiting for geo event")
}
}
12 changes: 8 additions & 4 deletions livestream/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,26 @@ import (
"github.com/oschwald/maxminddb-golang"
)

type GeoLocator struct {
type MaxMindLocator struct {
db *maxminddb.Reader
}

func NewGeoLocator(dbPath string) (*GeoLocator, error) {
type GeoLocator interface {
Lookup(ipString string) (float64, float64, error)
}

func NewMaxMindGeoLocator(dbPath string) (*MaxMindLocator, error) {
db, err := maxminddb.Open(dbPath)
if err != nil {
return nil, err
}

return &GeoLocator{
return &MaxMindLocator{
db: db,
}, nil
}

func (g *GeoLocator) Lookup(ipString string) (float64, float64, error) {
func (g *MaxMindLocator) Lookup(ipString string) (float64, float64, error) {
ip := net.ParseIP(ipString)
if ip == nil {
return 0, 0, errors.New("invalid IP address")
Expand Down
55 changes: 55 additions & 0 deletions livestream/geoip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"errors"
"testing"

"github.com/posthog/posthog/livestream/mocks"
"github.com/stretchr/testify/assert"
)

func TestMaxMindLocator_Lookup_Success(t *testing.T) {
mockLocator := mocks.NewGeoLocator(t)
mockLocator.EXPECT().Lookup("192.0.2.1").Return(40.7128, -74.0060, nil)

latitude, longitude, err := mockLocator.Lookup("192.0.2.1")

assert.NoError(t, err)
assert.Equal(t, 40.7128, latitude)
assert.Equal(t, -74.0060, longitude)
}

func TestMaxMindLocator_Lookup_InvalidIP(t *testing.T) {
mockLocator := mocks.NewGeoLocator(t)
mockLocator.EXPECT().Lookup("invalid_ip").Return(0.0, 0.0, errors.New("invalid IP address"))

latitude, longitude, err := mockLocator.Lookup("invalid_ip")

assert.Error(t, err)
assert.Equal(t, "invalid IP address", err.Error())
assert.Equal(t, 0.0, latitude)
assert.Equal(t, 0.0, longitude)
}

func TestMaxMindLocator_Lookup_DatabaseError(t *testing.T) {
mockLocator := mocks.NewGeoLocator(t)
mockLocator.EXPECT().Lookup("192.0.2.1").Return(0.0, 0.0, errors.New("database error"))

latitude, longitude, err := mockLocator.Lookup("192.0.2.1")

assert.Error(t, err)
assert.Equal(t, "database error", err.Error())
assert.Equal(t, 0.0, latitude)
assert.Equal(t, 0.0, longitude)
}

func TestNewMaxMindGeoLocator_Success(t *testing.T) {
// This test would require mocking the maxminddb.Open function, which is not possible with the current setup.
// In a real scenario, you might use a test database file or mock the file system.
t.Skip("Skipping NewMaxMindGeoLocator test as it requires filesystem interaction")
}

func TestNewMaxMindGeoLocator_Error(t *testing.T) {
// Similar to the success case, this test would require mocking filesystem operations.
t.Skip("Skipping NewMaxMindGeoLocator error test as it requires filesystem interaction")
}
Loading

0 comments on commit fc8953c

Please sign in to comment.