Skip to content

Commit

Permalink
feat: FDN-278 - SDKConfig Event (#259)
Browse files Browse the repository at this point in the history
* SDKConfig Event

* Add cf ray headers

* Check that the event is in the user queue before flushing

* Setup more explicit checks for flushing checks

* Cleanup unused

* remove unused testing code from debug

* Fix uuid not being the correct username

* Set target for sdkconfig event path

* when aggregate user ids match existing user events - set the records

* Add warning for sdkconfig event failing to write

* Capabilities defined in env var test

* lint

* fix userid
  • Loading branch information
JamieSinn authored Jun 7, 2024
1 parent 616be37 commit 6d7c11a
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 32 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/run-test-harness.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jobs:
labels: ubuntu-latest-4-core
steps:
- uses: DevCycleHQ/test-harness@main
env:
SDK_CAPABILITIES: '["cloud","edgeDB","clientCustomData","multithreading","defaultReason","etagReporting","lastModifiedHeader","sdkConfigEvent","clientUUID"]'
with:
sdks-to-test: '["go"]'
sdk-github-sha: ${{github.event.pull_request.head.sha}}
Expand Down
14 changes: 12 additions & 2 deletions api/model_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
EventType_AggVariableEvaluated = "aggVariableEvaluated"
EventType_VariableDefaulted = "variableDefaulted"
EventType_AggVariableDefaulted = "aggVariableDefaulted"
EventType_SDKConfig = "sdkConfig"
EventType_CustomEvent = "customEvent"
)

Expand Down Expand Up @@ -51,6 +52,7 @@ func (fp *FlushPayload) AddBatchRecordForUser(record UserEventsBatchRecord, chun
for _, chunk := range chunkedEvents {
userRecord.Events = append(userRecord.Events, chunk...)
}
fp.setRecordForUser(record.User.UserId, *userRecord)
} else {
for _, chunk := range chunkedEvents {
fp.Records = append(fp.Records, UserEventsBatchRecord{
Expand All @@ -59,7 +61,6 @@ func (fp *FlushPayload) AddBatchRecordForUser(record UserEventsBatchRecord, chun
})
}
}

}

func (fp *FlushPayload) getRecordForUser(userId string) *UserEventsBatchRecord {
Expand All @@ -71,6 +72,15 @@ func (fp *FlushPayload) getRecordForUser(userId string) *UserEventsBatchRecord {
return nil
}

func (fp *FlushPayload) setRecordForUser(userId string, record UserEventsBatchRecord) {
for i, r := range fp.Records {
if r.User.UserId == userId {
fp.Records[i] = record
return
}
}
}

type BatchEventsBody struct {
Batch []UserEventsBatchRecord `json:"batch"`
}
Expand Down Expand Up @@ -103,7 +113,7 @@ func (o *EventQueueOptions) CheckBounds() {

func (o *EventQueueOptions) IsEventLoggingDisabled(eventType string) bool {
switch eventType {
case EventType_VariableEvaluated, EventType_AggVariableEvaluated, EventType_VariableDefaulted, EventType_AggVariableDefaulted:
case EventType_VariableEvaluated, EventType_AggVariableEvaluated, EventType_VariableDefaulted, EventType_AggVariableDefaulted, EventType_SDKConfig:
return o.DisableAutomaticEventLogging
default:
return o.DisableCustomEventLogging
Expand Down
20 changes: 15 additions & 5 deletions bucketing/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ func (u *UserEventQueue) BuildBatchRecords() []api.UserEventsBatchRecord {

func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData, clientUUID, configEtag, rayId, lastModified string) api.UserEventsBatchRecord {
var aggregateEvents []api.Event
userId, err := os.Hostname()
hostname, err := os.Hostname()
if err != nil {
userId = "aggregate"
hostname = "aggregate"
}
userId := fmt.Sprintf("%s@%s", clientUUID, hostname)
emptyFeatureVars := make(map[string]string)

// type is either aggVariableEvaluated or aggVariableDefaulted
Expand Down Expand Up @@ -324,6 +325,7 @@ func (eq *EventQueue) HandleFlushResults(successPayloads []string, failurePayloa
eq.eventsReported.Add(reported)
}

// Metrics returns the number of events flushed, reported, and dropped
func (eq *EventQueue) Metrics() (int32, int32, int32) {
return eq.eventsFlushed.Load(), eq.eventsReported.Load(), eq.eventsDropped.Load()
}
Expand Down Expand Up @@ -382,12 +384,20 @@ func (eq *EventQueue) processEvents(ctx context.Context) {
close(eq.userEventQueueRaw)
close(eq.aggEventQueueRaw)
return
case userEvent := <-eq.userEventQueueRaw:
case userEvent, ok := <-eq.userEventQueueRaw:
// if the channel is closed - ok will be false
if !ok {
return
}
err := eq.processUserEvent(userEvent)
if err != nil {
return
}
case aggEvent := <-eq.aggEventQueueRaw:
case aggEvent, ok := <-eq.aggEventQueueRaw:
// if the channel is closed - ok will be false
if !ok {
return
}
err := eq.processAggregateEvent(aggEvent)
if err != nil {
return
Expand Down Expand Up @@ -420,7 +430,7 @@ func (eq *EventQueue) processUserEvent(event userEventData) (err error) {
event.event.FeatureVars = bucketedConfig.FeatureVariationMap

switch event.event.Type_ {
case api.EventType_AggVariableDefaulted, api.EventType_VariableDefaulted, api.EventType_AggVariableEvaluated, api.EventType_VariableEvaluated:
case api.EventType_AggVariableDefaulted, api.EventType_VariableDefaulted, api.EventType_AggVariableEvaluated, api.EventType_VariableEvaluated, api.EventType_SDKConfig:
break
default:
event.event.CustomType = event.event.Type_
Expand Down
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type LocalBucketing interface {
InternalEventQueue
GenerateBucketedConfigForUser(user User) (ret *BucketedUserConfig, err error)
SetClientCustomData(map[string]interface{}) error
GetClientUUID() string
Variable(user User, key string, variableType string) (variable Variable, err error)
Close()
}
Expand Down Expand Up @@ -120,7 +119,8 @@ func NewClient(sdkKey string, options *Options) (*Client, error) {
return c, fmt.Errorf("Error initializing event queue: %w", err)
}

c.configManager = NewEnvironmentConfigManager(sdkKey, c.localBucketing, options, c.cfg)
c.configManager = NewEnvironmentConfigManager(sdkKey, c.localBucketing, c.eventQueue, options, c.cfg)

c.configManager.StartPolling(options.ConfigPollingIntervalMS)

if c.DevCycleOptions.OnInitializedChannel != nil {
Expand Down Expand Up @@ -153,7 +153,7 @@ func (c *Client) handleInitialization() {
c.isInitialized = true

if c.IsLocalBucketing() {
util.Infof("Client initialized with local bucketing %v", c.localBucketing.GetClientUUID())
util.Infof("Client initialized with local bucketing %v", c.localBucketing.GetUUID())
}
if c.DevCycleOptions.OnInitializedChannel != nil {
go func() {
Expand Down
15 changes: 5 additions & 10 deletions client_native_bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, opti
}, err
}

func (n *NativeLocalBucketing) GetUUID() string {
return n.clientUUID
}

func (n *NativeLocalBucketing) StoreConfig(configJSON []byte, eTag, rayId, lastModified string) error {
oldETag := bucketing.GetEtag(n.sdkKey)
_, err := n.eventQueue.FlushEventQueue(n.clientUUID, oldETag, n.GetRayId(), n.GetLastModified())
if err != nil {
return fmt.Errorf("Error flushing events for %s: %w", oldETag, err)
}
err = bucketing.SetConfig(configJSON, n.sdkKey, eTag, rayId, lastModified, n.eventQueue)
err := bucketing.SetConfig(configJSON, n.sdkKey, eTag, rayId, lastModified, n.eventQueue)
if err != nil {
return fmt.Errorf("Error parsing config: %w", err)
}
Expand All @@ -82,10 +81,6 @@ func (n *NativeLocalBucketing) HasConfig() bool {
return bucketing.HasConfig(n.sdkKey)
}

func (n *NativeLocalBucketing) GetClientUUID() string {
return n.clientUUID
}

func (n *NativeLocalBucketing) GetLastModified() string {
return bucketing.GetLastModified(n.sdkKey)
}
Expand Down
69 changes: 65 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package devcycle
import (
"flag"
"fmt"
"github.com/devcyclehq/go-server-sdk/v2/api"
"github.com/devcyclehq/go-server-sdk/v2/util"
"github.com/stretchr/testify/require"
"io"
"log"
"net/http"
"os"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -434,12 +436,71 @@ func TestClient_Validate_OnInitializedChannel_EnableCloudBucketing_Options(t *te
t.Fatal("Expected config to be loaded")
}
}
func TestClient_ConfigUpdatedEvent(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()
httpConfigMock(200)
responder := func(req *http.Request) (*http.Response, error) {
reqBody, err := io.ReadAll(req.Body)
fmt.Println(string(reqBody))
if err != nil {
return httpmock.NewStringResponse(500, `{}`), err
}
if !strings.Contains(string(reqBody), api.EventType_SDKConfig) {
t.Fatal("Expected config updated event in request body")
}
return httpmock.NewStringResponse(201, `{}`), nil
}
httpmock.RegisterResponder("POST", "https://config-updated.devcycle.com/v1/events/batch", responder)
c, err := NewClient(test_environmentKey, &Options{EventsAPIURI: "https://config-updated.devcycle.com", EventFlushIntervalMS: time.Millisecond * 500})
fatalErr(t, err)
if !c.isInitialized {
t.Fatal("Expected client to be initialized")
}
if !c.hasConfig() {
t.Fatal("Expected client to have config")
}

func fatalErr(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatal(err)
require.Eventually(t, func() bool {
return httpmock.GetCallCountInfo()["POST https://config-updated.devcycle.com/v1/events/batch"] >= 1
}, 1*time.Second, 100*time.Millisecond)
}

func TestClient_ConfigUpdatedEvent_VariableEval(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()
httpConfigMock(200)
responder := func(req *http.Request) (*http.Response, error) {
reqBody, err := io.ReadAll(req.Body)
fmt.Println(string(reqBody))
if err != nil {
return httpmock.NewStringResponse(500, `{}`), err
}
if !strings.Contains(string(reqBody), api.EventType_SDKConfig) || !strings.Contains(string(reqBody), api.EventType_AggVariableDefaulted) {
fmt.Println("Expected config updated event and defaulted event in request body")
}
return httpmock.NewStringResponse(201, `{}`), nil
}
httpmock.RegisterResponder("POST", "https://config-updated.devcycle.com/v1/events/batch", responder)
c, err := NewClient(test_environmentKey, &Options{EventsAPIURI: "https://config-updated.devcycle.com", EventFlushIntervalMS: time.Millisecond * 500})
fatalErr(t, err)
if !c.isInitialized {
t.Fatal("Expected client to be initialized")
}
if !c.hasConfig() {
t.Fatal("Expected client to have config")
}

user := User{UserId: "j_test", DeviceModel: "testing"}
variable, _ := c.Variable(user, "variableThatShouldBeDefaulted", true)

if !variable.IsDefaulted {
t.Fatal("Expected variable to be defaulted")
}

require.Eventually(t, func() bool {
return httpmock.GetCallCountInfo()["POST https://config-updated.devcycle.com/v1/events/batch"] >= 1
}, 1*time.Second, 100*time.Millisecond)
}

var (
Expand Down
12 changes: 11 additions & 1 deletion configmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ type EnvironmentConfigManager struct {
stopPolling context.CancelFunc
httpClient *http.Client
cfg *HTTPConfiguration
eventManager *EventManager
ticker *time.Ticker
}

func NewEnvironmentConfigManager(
sdkKey string,
localBucketing ConfigReceiver,
manager *EventManager,
options *Options,
cfg *HTTPConfiguration,
) (e *EnvironmentConfigManager) {
Expand All @@ -47,7 +49,8 @@ func NewEnvironmentConfigManager(
// Use the configurable timeout because fetching the first config can block SDK initialization.
Timeout: options.RequestTimeout,
},
firstLoad: true,
eventManager: manager,
firstLoad: true,
}

configManager.context, configManager.stopPolling = context.WithCancel(context.Background())
Expand Down Expand Up @@ -114,6 +117,7 @@ func (e *EnvironmentConfigManager) fetchConfig(numRetriesRemaining int) (err err
defer resp.Body.Close()
switch statusCode := resp.StatusCode; {
case statusCode == http.StatusOK:
resp.Request = req
return e.setConfigFromResponse(resp)
case statusCode == http.StatusNotModified:
return nil
Expand Down Expand Up @@ -164,6 +168,12 @@ func (e *EnvironmentConfigManager) setConfigFromResponse(response *http.Response
}

util.Infof("Config set. ETag: %s Last-Modified: %s\n", e.localBucketing.GetETag(), e.localBucketing.GetLastModified())
if e.eventManager != nil {
err = e.eventManager.QueueSDKConfigEvent(*response.Request, *response)
if err != nil {
util.Warnf("Error queuing SDK config event: %s\n", err)
}
}

if e.firstLoad {
e.firstLoad = false
Expand Down
8 changes: 4 additions & 4 deletions configmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestEnvironmentConfigManager_fetchConfig_success(t *testing.T) {
httpConfigMock(200)

localBucketing := &recordingConfigReceiver{}
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, test_options, NewConfiguration(test_options))
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, nil, test_options, NewConfiguration(test_options))

err := manager.initialFetch()
if err != nil {
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestEnvironmentConfigManager_fetchConfig_retries500(t *testing.T) {
)

localBucketing := &recordingConfigReceiver{}
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, test_options, NewConfiguration(test_options))
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, nil, test_options, NewConfiguration(test_options))

err := manager.initialFetch()
if err != nil {
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestEnvironmentConfigManager_fetchConfig_retries_errors(t *testing.T) {
)

localBucketing := &recordingConfigReceiver{}
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, test_options, NewConfiguration(test_options))
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, nil, test_options, NewConfiguration(test_options))

err := manager.initialFetch()
if err != nil {
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestEnvironmentConfigManager_fetchConfig_returns_errors(t *testing.T) {
)

localBucketing := &recordingConfigReceiver{}
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, test_options, NewConfiguration(test_options))
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, nil, test_options, NewConfiguration(test_options))

err := manager.initialFetch()
if err == nil {
Expand Down
Loading

0 comments on commit 6d7c11a

Please sign in to comment.