From fc8953cc6f88cb382dc1f40b29ce4d17fc12b384 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Tue, 6 Aug 2024 15:08:34 -0700 Subject: [PATCH] feat: livestream service tests (#24209) --- .github/workflows/go.yml | 22 +++ livestream/.mockery.yaml | 12 ++ livestream/configs.go | 2 +- livestream/event_test.go | 58 +++++++ livestream/filter_test.go | 183 ++++++++++++++++++++ livestream/geoip.go | 12 +- livestream/geoip_test.go | 55 ++++++ livestream/go.mod | 6 +- livestream/go.sum | 2 + livestream/jwt_test.go | 99 +++++++++++ livestream/kafka.go | 25 ++- livestream/kafka_test.go | 98 +++++++++++ livestream/main.go | 4 +- livestream/main_test.go | 61 +++++++ livestream/mocks/GeoLocator.go | 95 +++++++++++ livestream/mocks/KafkaConsumer.go | 96 +++++++++++ livestream/mocks/KafkaConsumerInterface.go | 187 +++++++++++++++++++++ 17 files changed, 1002 insertions(+), 15 deletions(-) create mode 100644 .github/workflows/go.yml create mode 100644 livestream/.mockery.yaml create mode 100644 livestream/event_test.go create mode 100644 livestream/filter_test.go create mode 100644 livestream/geoip_test.go create mode 100644 livestream/jwt_test.go create mode 100644 livestream/kafka_test.go create mode 100644 livestream/main_test.go create mode 100644 livestream/mocks/GeoLocator.go create mode 100644 livestream/mocks/KafkaConsumer.go create mode 100644 livestream/mocks/KafkaConsumerInterface.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000000000..37c4e75ddb9bc --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,22 @@ +name: Go Test (for livestream service) + +on: + pull_request: + paths: + - 'livestream/**' + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.22 + + - name: Run tests + run: cd livestream && go test -v diff --git a/livestream/.mockery.yaml b/livestream/.mockery.yaml new file mode 100644 index 0000000000000..2e4daa22c99cc --- /dev/null +++ b/livestream/.mockery.yaml @@ -0,0 +1,12 @@ +quiet: False +with-expecter: True +dir: mocks/{{ replaceAll .InterfaceDirRelative "internal" "internal_" }} +mockname: '{{.InterfaceName}}' +outpkg: '{{.PackageName}}' +filename: '{{.InterfaceName}}.go' +all: True +packages: + github.com/posthog/posthog/livestream: + config: + recursive: True + outpkg: mocks diff --git a/livestream/configs.go b/livestream/configs.go index 6c879663270b0..be1f806101de9 100644 --- a/livestream/configs.go +++ b/livestream/configs.go @@ -20,7 +20,7 @@ func loadConfigs() { err := viper.ReadInConfig() if err != nil { sentry.CaptureException(err) - log.Fatalf("fatal error config file: %w", err) + log.Fatalf("fatal error config file: %v", err) } viper.OnConfigChange(func(e fsnotify.Event) { diff --git a/livestream/event_test.go b/livestream/event_test.go new file mode 100644 index 0000000000000..08b705cf0cbdf --- /dev/null +++ b/livestream/event_test.go @@ -0,0 +1,58 @@ +package main + +import ( + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventWriteTo(t *testing.T) { + tests := []struct { + name string + event Event + expected string + }{ + { + name: "Full event", + event: Event{ + ID: []byte("1"), + Data: []byte("test data"), + Event: []byte("message"), + Retry: []byte("3000"), + }, + expected: "id: 1\ndata: test data\nevent: message\nretry: 3000\n\n", + }, + { + name: "Event with multiline data", + event: Event{ + ID: []byte("2"), + Data: []byte("line1\nline2\nline3"), + }, + expected: "id: 2\ndata: line1\ndata: line2\ndata: line3\n\n", + }, + { + name: "Event with comment only", + event: Event{ + Comment: []byte("This is a comment"), + }, + expected: ": This is a comment\n\n", + }, + { + name: "Empty event", + event: Event{}, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := httptest.NewRecorder() + err := tt.event.WriteTo(w) + + require.NoError(t, err, "WriteTo() should not return an error") + assert.Equal(t, tt.expected, w.Body.String(), "WriteTo() output should match expected") + }) + } +} diff --git a/livestream/filter_test.go b/livestream/filter_test.go new file mode 100644 index 0000000000000..4c4189aa10fad --- /dev/null +++ b/livestream/filter_test.go @@ -0,0 +1,183 @@ +package main + +import ( + "testing" + "time" + + "sync/atomic" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewFilter(t *testing.T) { + subChan := make(chan Subscription) + unSubChan := make(chan Subscription) + inboundChan := make(chan PostHogEvent) + + filter := NewFilter(subChan, unSubChan, inboundChan) + + assert.NotNil(t, filter) + assert.Equal(t, subChan, filter.subChan) + assert.Equal(t, unSubChan, filter.unSubChan) + assert.Equal(t, inboundChan, filter.inboundChan) + assert.Empty(t, filter.subs) +} + +func TestRemoveSubscription(t *testing.T) { + subs := []Subscription{ + {ClientId: "1"}, + {ClientId: "2"}, + {ClientId: "3"}, + } + + result := removeSubscription("2", subs) + + assert.Len(t, result, 2) + assert.Equal(t, "1", result[0].ClientId) + assert.Equal(t, "3", result[1].ClientId) +} + +func TestUuidFromDistinctId(t *testing.T) { + result1 := uuidFromDistinctId(1, "user1") + result2 := uuidFromDistinctId(1, "user1") + result3 := uuidFromDistinctId(2, "user1") + + assert.NotEmpty(t, result1) + assert.Equal(t, result1, result2) + assert.NotEqual(t, result1, result3) + assert.Empty(t, uuidFromDistinctId(0, "user1")) + assert.Empty(t, uuidFromDistinctId(1, "")) +} + +func TestConvertToResponseGeoEvent(t *testing.T) { + event := PostHogEvent{ + Lat: 40.7128, + Lng: -74.0060, + } + + result := convertToResponseGeoEvent(event) + + assert.Equal(t, 40.7128, result.Lat) + assert.Equal(t, -74.0060, result.Lng) + assert.Equal(t, uint(1), result.Count) +} + +func TestConvertToResponsePostHogEvent(t *testing.T) { + event := PostHogEvent{ + Uuid: "123", + Timestamp: "2023-01-01T00:00:00Z", + DistinctId: "user1", + Event: "pageview", + Properties: map[string]interface{}{"url": "https://example.com"}, + } + + result := convertToResponsePostHogEvent(event, 1) + + assert.Equal(t, "123", result.Uuid) + assert.Equal(t, "2023-01-01T00:00:00Z", result.Timestamp) + assert.Equal(t, "user1", result.DistinctId) + assert.NotEmpty(t, result.PersonId) + assert.Equal(t, "pageview", result.Event) + assert.Equal(t, "https://example.com", result.Properties["url"]) +} + +func TestFilterRun(t *testing.T) { + subChan := make(chan Subscription) + unSubChan := make(chan Subscription) + inboundChan := make(chan PostHogEvent) + + filter := NewFilter(subChan, unSubChan, inboundChan) + + go filter.Run() + + // Test subscription + eventChan := make(chan interface{}, 1) + sub := Subscription{ + ClientId: "1", + TeamId: 1, + Token: "token1", + DistinctId: "user1", + EventTypes: []string{"pageview"}, + EventChan: eventChan, + ShouldClose: &atomic.Bool{}, + } + subChan <- sub + + // Wait for subscription to be processed + time.Sleep(10 * time.Millisecond) + + // Test event filtering + event := PostHogEvent{ + Uuid: "123", + Timestamp: "2023-01-01T00:00:00Z", + DistinctId: "user1", + Token: "token1", + Event: "pageview", + Properties: map[string]interface{}{"url": "https://example.com"}, + } + inboundChan <- event + + // Wait for event to be processed + select { + case receivedEvent := <-eventChan: + responseEvent, ok := receivedEvent.(ResponsePostHogEvent) + require.True(t, ok) + assert.Equal(t, "123", responseEvent.Uuid) + assert.Equal(t, "user1", responseEvent.DistinctId) + assert.Equal(t, "pageview", responseEvent.Event) + case <-time.After(100 * time.Millisecond): + t.Fatal("Timed out waiting for event") + } + + // Test unsubscription + unSubChan <- sub + + // Wait for unsubscription to be processed + time.Sleep(10 * time.Millisecond) + + assert.Empty(t, filter.subs) +} + +func TestFilterRunWithGeoEvent(t *testing.T) { + subChan := make(chan Subscription) + unSubChan := make(chan Subscription) + inboundChan := make(chan PostHogEvent) + + filter := NewFilter(subChan, unSubChan, inboundChan) + + go filter.Run() + + // Test subscription with Geo enabled + eventChan := make(chan interface{}, 1) + sub := Subscription{ + ClientId: "1", + TeamId: 1, + Geo: true, + EventChan: eventChan, + ShouldClose: &atomic.Bool{}, + } + subChan <- sub + + // Wait for subscription to be processed + time.Sleep(10 * time.Millisecond) + + // Test geo event filtering + event := PostHogEvent{ + Lat: 40.7128, + Lng: -74.0060, + } + inboundChan <- event + + // Wait for event to be processed + select { + case receivedEvent := <-eventChan: + geoEvent, ok := receivedEvent.(ResponseGeoEvent) + require.True(t, ok) + assert.Equal(t, 40.7128, geoEvent.Lat) + assert.Equal(t, -74.0060, geoEvent.Lng) + assert.Equal(t, uint(1), geoEvent.Count) + case <-time.After(100 * time.Millisecond): + t.Fatal("Timed out waiting for geo event") + } +} diff --git a/livestream/geoip.go b/livestream/geoip.go index 8f026d335adca..56a27837dea84 100644 --- a/livestream/geoip.go +++ b/livestream/geoip.go @@ -7,22 +7,26 @@ import ( "github.com/oschwald/maxminddb-golang" ) -type GeoLocator struct { +type MaxMindLocator struct { db *maxminddb.Reader } -func NewGeoLocator(dbPath string) (*GeoLocator, error) { +type GeoLocator interface { + Lookup(ipString string) (float64, float64, error) +} + +func NewMaxMindGeoLocator(dbPath string) (*MaxMindLocator, error) { db, err := maxminddb.Open(dbPath) if err != nil { return nil, err } - return &GeoLocator{ + return &MaxMindLocator{ db: db, }, nil } -func (g *GeoLocator) Lookup(ipString string) (float64, float64, error) { +func (g *MaxMindLocator) Lookup(ipString string) (float64, float64, error) { ip := net.ParseIP(ipString) if ip == nil { return 0, 0, errors.New("invalid IP address") diff --git a/livestream/geoip_test.go b/livestream/geoip_test.go new file mode 100644 index 0000000000000..0da54c33b342f --- /dev/null +++ b/livestream/geoip_test.go @@ -0,0 +1,55 @@ +package main + +import ( + "errors" + "testing" + + "github.com/posthog/posthog/livestream/mocks" + "github.com/stretchr/testify/assert" +) + +func TestMaxMindLocator_Lookup_Success(t *testing.T) { + mockLocator := mocks.NewGeoLocator(t) + mockLocator.EXPECT().Lookup("192.0.2.1").Return(40.7128, -74.0060, nil) + + latitude, longitude, err := mockLocator.Lookup("192.0.2.1") + + assert.NoError(t, err) + assert.Equal(t, 40.7128, latitude) + assert.Equal(t, -74.0060, longitude) +} + +func TestMaxMindLocator_Lookup_InvalidIP(t *testing.T) { + mockLocator := mocks.NewGeoLocator(t) + mockLocator.EXPECT().Lookup("invalid_ip").Return(0.0, 0.0, errors.New("invalid IP address")) + + latitude, longitude, err := mockLocator.Lookup("invalid_ip") + + assert.Error(t, err) + assert.Equal(t, "invalid IP address", err.Error()) + assert.Equal(t, 0.0, latitude) + assert.Equal(t, 0.0, longitude) +} + +func TestMaxMindLocator_Lookup_DatabaseError(t *testing.T) { + mockLocator := mocks.NewGeoLocator(t) + mockLocator.EXPECT().Lookup("192.0.2.1").Return(0.0, 0.0, errors.New("database error")) + + latitude, longitude, err := mockLocator.Lookup("192.0.2.1") + + assert.Error(t, err) + assert.Equal(t, "database error", err.Error()) + assert.Equal(t, 0.0, latitude) + assert.Equal(t, 0.0, longitude) +} + +func TestNewMaxMindGeoLocator_Success(t *testing.T) { + // This test would require mocking the maxminddb.Open function, which is not possible with the current setup. + // In a real scenario, you might use a test database file or mock the file system. + t.Skip("Skipping NewMaxMindGeoLocator test as it requires filesystem interaction") +} + +func TestNewMaxMindGeoLocator_Error(t *testing.T) { + // Similar to the success case, this test would require mocking filesystem operations. + t.Skip("Skipping NewMaxMindGeoLocator error test as it requires filesystem interaction") +} diff --git a/livestream/go.mod b/livestream/go.mod index 350f86ffd6162..0efed981a97d6 100644 --- a/livestream/go.mod +++ b/livestream/go.mod @@ -1,4 +1,4 @@ -module github.com/posthog/livestream +module github.com/posthog/posthog/livestream go 1.22.2 @@ -13,11 +13,13 @@ require ( github.com/labstack/echo/v4 v4.12.0 github.com/oschwald/maxminddb-golang v1.12.0 github.com/spf13/viper v1.18.2 + github.com/stretchr/testify v1.9.0 golang.org/x/exp v0.0.0-20240119083558-1b970713d09a ) require ( github.com/aws/aws-sdk-go-v2/config v1.26.5 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/docker/docker v25.0.5+incompatible // indirect github.com/docker/docker-credential-helpers v0.8.1 // indirect github.com/emicklei/go-restful/v3 v3.11.2 // indirect @@ -36,6 +38,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.18.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.46.0 // indirect @@ -47,6 +50,7 @@ require ( github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect diff --git a/livestream/go.sum b/livestream/go.sum index 445095455f04d..4a523d7facffd 100644 --- a/livestream/go.sum +++ b/livestream/go.sum @@ -309,6 +309,8 @@ github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMV github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/livestream/jwt_test.go b/livestream/jwt_test.go new file mode 100644 index 0000000000000..deaa67163c22b --- /dev/null +++ b/livestream/jwt_test.go @@ -0,0 +1,99 @@ +package main + +import ( + "testing" + "time" + + "github.com/golang-jwt/jwt" + "github.com/spf13/viper" +) + +func TestDecodeAuthToken(t *testing.T) { + // Set up a mock secret for testing + viper.Set("jwt.secret", "test-secret") + + tests := []struct { + name string + authHeader string + expectError bool + expectedAud string + }{ + { + name: "Valid token", + authHeader: "Bearer " + createValidToken(ExpectedScope), + expectError: false, + expectedAud: ExpectedScope, + }, + { + name: "Invalid token format", + authHeader: "InvalidToken", + expectError: true, + }, + { + name: "Missing Bearer prefix", + authHeader: createValidToken(ExpectedScope), + expectError: true, + }, + { + name: "Invalid audience", + authHeader: "Bearer " + createValidToken("invalid:scope"), + expectError: true, + }, + { + name: "Expired token", + authHeader: "Bearer " + createExpiredToken(), + expectError: true, + }, + { + name: "Invalid signature", + authHeader: "Bearer " + createTokenWithInvalidSignature(), + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + claims, err := decodeAuthToken(tt.authHeader) + + if tt.expectError { + if err == nil { + t.Errorf("Expected an error, but got nil") + } + } else { + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if claims["aud"] != tt.expectedAud { + t.Errorf("Expected audience %s, but got %s", tt.expectedAud, claims["aud"]) + } + } + }) + } +} + +func createValidToken(audience string) string { + token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{ + "aud": audience, + "exp": time.Now().Add(time.Hour).Unix(), + }) + tokenString, _ := token.SignedString([]byte(viper.GetString("jwt.secret"))) + return tokenString +} + +func createExpiredToken() string { + token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{ + "aud": ExpectedScope, + "exp": time.Now().Add(-time.Hour).Unix(), + }) + tokenString, _ := token.SignedString([]byte(viper.GetString("jwt.secret"))) + return tokenString +} + +func createTokenWithInvalidSignature() string { + token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{ + "aud": ExpectedScope, + "exp": time.Now().Add(time.Hour).Unix(), + }) + tokenString, _ := token.SignedString([]byte("wrong-secret")) + return tokenString +} diff --git a/livestream/kafka.go b/livestream/kafka.go index 0668a4abce037..cd4704e878414 100644 --- a/livestream/kafka.go +++ b/livestream/kafka.go @@ -28,15 +28,26 @@ type PostHogEvent struct { Lng float64 } -type KafkaConsumer struct { - consumer *kafka.Consumer +type KafkaConsumerInterface interface { + SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error + ReadMessage(timeout time.Duration) (*kafka.Message, error) + Close() error +} + +type KafkaConsumer interface { + Consume() + Close() +} + +type PostHogKafkaConsumer struct { + consumer KafkaConsumerInterface topic string - geolocator *GeoLocator + geolocator GeoLocator outgoingChan chan PostHogEvent statsChan chan PostHogEvent } -func NewKafkaConsumer(brokers string, securityProtocol string, groupID string, topic string, geolocator *GeoLocator, outgoingChan chan PostHogEvent, statsChan chan PostHogEvent) (*KafkaConsumer, error) { +func NewPostHogKafkaConsumer(brokers string, securityProtocol string, groupID string, topic string, geolocator GeoLocator, outgoingChan chan PostHogEvent, statsChan chan PostHogEvent) (*PostHogKafkaConsumer, error) { config := &kafka.ConfigMap{ "bootstrap.servers": brokers, "group.id": groupID, @@ -50,7 +61,7 @@ func NewKafkaConsumer(brokers string, securityProtocol string, groupID string, t return nil, err } - return &KafkaConsumer{ + return &PostHogKafkaConsumer{ consumer: consumer, topic: topic, geolocator: geolocator, @@ -59,7 +70,7 @@ func NewKafkaConsumer(brokers string, securityProtocol string, groupID string, t }, nil } -func (c *KafkaConsumer) Consume() { +func (c *PostHogKafkaConsumer) Consume() { err := c.consumer.SubscribeTopics([]string{c.topic}, nil) if err != nil { sentry.CaptureException(err) @@ -126,6 +137,6 @@ func (c *KafkaConsumer) Consume() { } } -func (c *KafkaConsumer) Close() { +func (c *PostHogKafkaConsumer) Close() { c.consumer.Close() } diff --git a/livestream/kafka_test.go b/livestream/kafka_test.go new file mode 100644 index 0000000000000..40118c9828d1e --- /dev/null +++ b/livestream/kafka_test.go @@ -0,0 +1,98 @@ +package main + +import ( + "encoding/json" + "errors" + "testing" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/posthog/posthog/livestream/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestPostHogKafkaConsumer_Consume(t *testing.T) { + // Create mock objects + mockConsumer := new(mocks.KafkaConsumerInterface) + mockGeoLocator := new(mocks.GeoLocator) + + // Create channels + outgoingChan := make(chan PostHogEvent, 1) + statsChan := make(chan PostHogEvent, 1) + + // Create PostHogKafkaConsumer + consumer := &PostHogKafkaConsumer{ + consumer: mockConsumer, + topic: "test-topic", + geolocator: mockGeoLocator, + outgoingChan: outgoingChan, + statsChan: statsChan, + } + + // Mock SubscribeTopics + mockConsumer.On("SubscribeTopics", []string{"test-topic"}, mock.AnythingOfType("kafka.RebalanceCb")).Return(nil) + + // Create a test message + testWrapper := PostHogEventWrapper{ + Uuid: "test-uuid", + DistinctId: "test-distinct-id", + Ip: "192.0.2.1", + Data: `{"event": "test-event", "properties": {"token": "test-token"}}`, + } + testMessageValue, _ := json.Marshal(testWrapper) + testMessage := &kafka.Message{ + Value: testMessageValue, + } + + // Mock ReadMessage + mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).Return(testMessage, nil).Maybe() + + // Mock GeoLocator Lookup + mockGeoLocator.On("Lookup", "192.0.2.1").Return(37.7749, -122.4194, nil) + + // Run Consume in a goroutine + go consumer.Consume() + + // Wait for the message to be processed + select { + case event := <-outgoingChan: + assert.Equal(t, "test-uuid", event.Uuid) + assert.Equal(t, "test-distinct-id", event.DistinctId) + assert.Equal(t, "test-event", event.Event) + assert.Equal(t, "test-token", event.Token) + assert.Equal(t, 37.7749, event.Lat) + assert.Equal(t, -122.4194, event.Lng) + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + // Check if the message was also sent to statsChan + select { + case <-statsChan: + // Message received in statsChan + case <-time.After(time.Second): + t.Fatal("Timed out waiting for stats message") + } + + // Test error handling + mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).Return(nil, errors.New("read error")).Maybe() + time.Sleep(time.Millisecond * 100) // Give some time for the error to be processed + + // Assert that all expectations were met + mockConsumer.AssertExpectations(t) + mockGeoLocator.AssertExpectations(t) +} + +func TestPostHogKafkaConsumer_Close(t *testing.T) { + mockConsumer := new(mocks.KafkaConsumerInterface) + consumer := &PostHogKafkaConsumer{ + consumer: mockConsumer, + } + + mockConsumer.On("Close").Return(nil) + + consumer.Close() + + mockConsumer.AssertExpectations(t) +} diff --git a/livestream/main.go b/livestream/main.go index b14d3f94e7069..32c624cdd79e1 100644 --- a/livestream/main.go +++ b/livestream/main.go @@ -57,7 +57,7 @@ func main() { log.Fatal("kafka.group_id must be set") } - geolocator, err := NewGeoLocator(mmdb) + geolocator, err := NewMaxMindGeoLocator(mmdb) if err != nil { sentry.CaptureException(err) log.Fatalf("Failed to open MMDB: %v", err) @@ -78,7 +78,7 @@ func main() { if !isProd { kafkaSecurityProtocol = "PLAINTEXT" } - consumer, err := NewKafkaConsumer(brokers, kafkaSecurityProtocol, groupID, topic, geolocator, phEventChan, statsChan) + consumer, err := NewPostHogKafkaConsumer(brokers, kafkaSecurityProtocol, groupID, topic, geolocator, phEventChan, statsChan) if err != nil { sentry.CaptureException(err) log.Fatalf("Failed to create Kafka consumer: %v", err) diff --git a/livestream/main_test.go b/livestream/main_test.go new file mode 100644 index 0000000000000..4bc1296fbacb9 --- /dev/null +++ b/livestream/main_test.go @@ -0,0 +1,61 @@ +// go:generate mockery +package main + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/labstack/echo/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIndex(t *testing.T) { + e := echo.New() + req := httptest.NewRequest(http.MethodGet, "/", nil) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + + if assert.NoError(t, index(c)) { + assert.Equal(t, http.StatusOK, rec.Code) + assert.Equal(t, "RealTime Hog 3000", rec.Body.String()) + } +} + +func TestStatsHandler(t *testing.T) { + e := echo.New() + req := httptest.NewRequest(http.MethodGet, "/stats", nil) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + + // Mock the authorization header + req.Header.Set("Authorization", "Bearer mock_token") + + // Create a mock TeamStats + teamStats := &TeamStats{ + Store: make(map[string]*expirable.LRU[string, string]), + } + teamStats.Store["mock_token"] = expirable.NewLRU[string, string](100, nil, time.Minute) + teamStats.Store["mock_token"].Add("user1", "data1") + + // Add the teamStats to the context + c.Set("teamStats", teamStats) + + handler := func(c echo.Context) error { + return c.JSON(http.StatusOK, map[string]interface{}{ + "users_on_product": teamStats.Store["mock_token"].Len(), + }) + } + + if assert.NoError(t, handler(c)) { + assert.Equal(t, http.StatusOK, rec.Code) + var response map[string]int + err := json.Unmarshal(rec.Body.Bytes(), &response) + require.NoError(t, err) + assert.Equal(t, 1, response["users_on_product"]) + } +} diff --git a/livestream/mocks/GeoLocator.go b/livestream/mocks/GeoLocator.go new file mode 100644 index 0000000000000..db6e847e248e4 --- /dev/null +++ b/livestream/mocks/GeoLocator.go @@ -0,0 +1,95 @@ +// Code generated by mockery v2.44.1. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// GeoLocator is an autogenerated mock type for the GeoLocator type +type GeoLocator struct { + mock.Mock +} + +type GeoLocator_Expecter struct { + mock *mock.Mock +} + +func (_m *GeoLocator) EXPECT() *GeoLocator_Expecter { + return &GeoLocator_Expecter{mock: &_m.Mock} +} + +// Lookup provides a mock function with given fields: ipString +func (_m *GeoLocator) Lookup(ipString string) (float64, float64, error) { + ret := _m.Called(ipString) + + if len(ret) == 0 { + panic("no return value specified for Lookup") + } + + var r0 float64 + var r1 float64 + var r2 error + if rf, ok := ret.Get(0).(func(string) (float64, float64, error)); ok { + return rf(ipString) + } + if rf, ok := ret.Get(0).(func(string) float64); ok { + r0 = rf(ipString) + } else { + r0 = ret.Get(0).(float64) + } + + if rf, ok := ret.Get(1).(func(string) float64); ok { + r1 = rf(ipString) + } else { + r1 = ret.Get(1).(float64) + } + + if rf, ok := ret.Get(2).(func(string) error); ok { + r2 = rf(ipString) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// GeoLocator_Lookup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Lookup' +type GeoLocator_Lookup_Call struct { + *mock.Call +} + +// Lookup is a helper method to define mock.On call +// - ipString string +func (_e *GeoLocator_Expecter) Lookup(ipString interface{}) *GeoLocator_Lookup_Call { + return &GeoLocator_Lookup_Call{Call: _e.mock.On("Lookup", ipString)} +} + +func (_c *GeoLocator_Lookup_Call) Run(run func(ipString string)) *GeoLocator_Lookup_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *GeoLocator_Lookup_Call) Return(_a0 float64, _a1 float64, _a2 error) *GeoLocator_Lookup_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *GeoLocator_Lookup_Call) RunAndReturn(run func(string) (float64, float64, error)) *GeoLocator_Lookup_Call { + _c.Call.Return(run) + return _c +} + +// NewGeoLocator creates a new instance of GeoLocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewGeoLocator(t interface { + mock.TestingT + Cleanup(func()) +}) *GeoLocator { + mock := &GeoLocator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/livestream/mocks/KafkaConsumer.go b/livestream/mocks/KafkaConsumer.go new file mode 100644 index 0000000000000..a5440f002f9e4 --- /dev/null +++ b/livestream/mocks/KafkaConsumer.go @@ -0,0 +1,96 @@ +// Code generated by mockery v2.44.1. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// KafkaConsumer is an autogenerated mock type for the KafkaConsumer type +type KafkaConsumer struct { + mock.Mock +} + +type KafkaConsumer_Expecter struct { + mock *mock.Mock +} + +func (_m *KafkaConsumer) EXPECT() *KafkaConsumer_Expecter { + return &KafkaConsumer_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with given fields: +func (_m *KafkaConsumer) Close() { + _m.Called() +} + +// KafkaConsumer_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type KafkaConsumer_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *KafkaConsumer_Expecter) Close() *KafkaConsumer_Close_Call { + return &KafkaConsumer_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *KafkaConsumer_Close_Call) Run(run func()) *KafkaConsumer_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *KafkaConsumer_Close_Call) Return() *KafkaConsumer_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *KafkaConsumer_Close_Call) RunAndReturn(run func()) *KafkaConsumer_Close_Call { + _c.Call.Return(run) + return _c +} + +// Consume provides a mock function with given fields: +func (_m *KafkaConsumer) Consume() { + _m.Called() +} + +// KafkaConsumer_Consume_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Consume' +type KafkaConsumer_Consume_Call struct { + *mock.Call +} + +// Consume is a helper method to define mock.On call +func (_e *KafkaConsumer_Expecter) Consume() *KafkaConsumer_Consume_Call { + return &KafkaConsumer_Consume_Call{Call: _e.mock.On("Consume")} +} + +func (_c *KafkaConsumer_Consume_Call) Run(run func()) *KafkaConsumer_Consume_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *KafkaConsumer_Consume_Call) Return() *KafkaConsumer_Consume_Call { + _c.Call.Return() + return _c +} + +func (_c *KafkaConsumer_Consume_Call) RunAndReturn(run func()) *KafkaConsumer_Consume_Call { + _c.Call.Return(run) + return _c +} + +// NewKafkaConsumer creates a new instance of KafkaConsumer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewKafkaConsumer(t interface { + mock.TestingT + Cleanup(func()) +}) *KafkaConsumer { + mock := &KafkaConsumer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/livestream/mocks/KafkaConsumerInterface.go b/livestream/mocks/KafkaConsumerInterface.go new file mode 100644 index 0000000000000..90f94c0e05c03 --- /dev/null +++ b/livestream/mocks/KafkaConsumerInterface.go @@ -0,0 +1,187 @@ +// Code generated by mockery v2.44.1. DO NOT EDIT. + +package mocks + +import ( + kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" + mock "github.com/stretchr/testify/mock" + + time "time" +) + +// KafkaConsumerInterface is an autogenerated mock type for the KafkaConsumerInterface type +type KafkaConsumerInterface struct { + mock.Mock +} + +type KafkaConsumerInterface_Expecter struct { + mock *mock.Mock +} + +func (_m *KafkaConsumerInterface) EXPECT() *KafkaConsumerInterface_Expecter { + return &KafkaConsumerInterface_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with given fields: +func (_m *KafkaConsumerInterface) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// KafkaConsumerInterface_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type KafkaConsumerInterface_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *KafkaConsumerInterface_Expecter) Close() *KafkaConsumerInterface_Close_Call { + return &KafkaConsumerInterface_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *KafkaConsumerInterface_Close_Call) Run(run func()) *KafkaConsumerInterface_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *KafkaConsumerInterface_Close_Call) Return(_a0 error) *KafkaConsumerInterface_Close_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *KafkaConsumerInterface_Close_Call) RunAndReturn(run func() error) *KafkaConsumerInterface_Close_Call { + _c.Call.Return(run) + return _c +} + +// ReadMessage provides a mock function with given fields: timeout +func (_m *KafkaConsumerInterface) ReadMessage(timeout time.Duration) (*kafka.Message, error) { + ret := _m.Called(timeout) + + if len(ret) == 0 { + panic("no return value specified for ReadMessage") + } + + var r0 *kafka.Message + var r1 error + if rf, ok := ret.Get(0).(func(time.Duration) (*kafka.Message, error)); ok { + return rf(timeout) + } + if rf, ok := ret.Get(0).(func(time.Duration) *kafka.Message); ok { + r0 = rf(timeout) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*kafka.Message) + } + } + + if rf, ok := ret.Get(1).(func(time.Duration) error); ok { + r1 = rf(timeout) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// KafkaConsumerInterface_ReadMessage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadMessage' +type KafkaConsumerInterface_ReadMessage_Call struct { + *mock.Call +} + +// ReadMessage is a helper method to define mock.On call +// - timeout time.Duration +func (_e *KafkaConsumerInterface_Expecter) ReadMessage(timeout interface{}) *KafkaConsumerInterface_ReadMessage_Call { + return &KafkaConsumerInterface_ReadMessage_Call{Call: _e.mock.On("ReadMessage", timeout)} +} + +func (_c *KafkaConsumerInterface_ReadMessage_Call) Run(run func(timeout time.Duration)) *KafkaConsumerInterface_ReadMessage_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Duration)) + }) + return _c +} + +func (_c *KafkaConsumerInterface_ReadMessage_Call) Return(_a0 *kafka.Message, _a1 error) *KafkaConsumerInterface_ReadMessage_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *KafkaConsumerInterface_ReadMessage_Call) RunAndReturn(run func(time.Duration) (*kafka.Message, error)) *KafkaConsumerInterface_ReadMessage_Call { + _c.Call.Return(run) + return _c +} + +// SubscribeTopics provides a mock function with given fields: topics, rebalanceCb +func (_m *KafkaConsumerInterface) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error { + ret := _m.Called(topics, rebalanceCb) + + if len(ret) == 0 { + panic("no return value specified for SubscribeTopics") + } + + var r0 error + if rf, ok := ret.Get(0).(func([]string, kafka.RebalanceCb) error); ok { + r0 = rf(topics, rebalanceCb) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// KafkaConsumerInterface_SubscribeTopics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SubscribeTopics' +type KafkaConsumerInterface_SubscribeTopics_Call struct { + *mock.Call +} + +// SubscribeTopics is a helper method to define mock.On call +// - topics []string +// - rebalanceCb kafka.RebalanceCb +func (_e *KafkaConsumerInterface_Expecter) SubscribeTopics(topics interface{}, rebalanceCb interface{}) *KafkaConsumerInterface_SubscribeTopics_Call { + return &KafkaConsumerInterface_SubscribeTopics_Call{Call: _e.mock.On("SubscribeTopics", topics, rebalanceCb)} +} + +func (_c *KafkaConsumerInterface_SubscribeTopics_Call) Run(run func(topics []string, rebalanceCb kafka.RebalanceCb)) *KafkaConsumerInterface_SubscribeTopics_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]string), args[1].(kafka.RebalanceCb)) + }) + return _c +} + +func (_c *KafkaConsumerInterface_SubscribeTopics_Call) Return(_a0 error) *KafkaConsumerInterface_SubscribeTopics_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *KafkaConsumerInterface_SubscribeTopics_Call) RunAndReturn(run func([]string, kafka.RebalanceCb) error) *KafkaConsumerInterface_SubscribeTopics_Call { + _c.Call.Return(run) + return _c +} + +// NewKafkaConsumerInterface creates a new instance of KafkaConsumerInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewKafkaConsumerInterface(t interface { + mock.TestingT + Cleanup(func()) +}) *KafkaConsumerInterface { + mock := &KafkaConsumerInterface{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}