Skip to content

Commit

Permalink
feat: SSE Client (#234)
Browse files Browse the repository at this point in the history
* SDKConfig Event

* SSE - Squash

* Cleanup rebase

* fix rebase

* add flush interval to configupdated base test
  • Loading branch information
JamieSinn authored Jun 11, 2024
1 parent faa5a8c commit fccc783
Show file tree
Hide file tree
Showing 23 changed files with 1,528 additions and 197 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test_examples.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: Test Examples

## For anyone looking to change this (Internal to DevCycle) - the project is here: https://app.devcycle.com/o/org_U9F8YMaTChTEndWw/p/git-hub-actions-integration-tests/features/6642210af1c941418857b237
on:
pull_request:
branches: [ main ]
Expand All @@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-latest
env:
DEVCYCLE_SERVER_SDK_KEY: ${{ secrets.DEVCYCLE_SERVER_SDK_KEY }}
DEVCYCLE_VARIABLE_KEY: test-boolean-variable
DEVCYCLE_VARIABLE_KEY: go-example-tests
steps:
- uses: actions/checkout@v4

Expand Down
27 changes: 19 additions & 8 deletions api/model_event.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
/*
* DevCycle Bucketing API
*
* Documents the DevCycle Bucketing API which provides and API interface to User Bucketing and for generated SDKs.
*
* API version: 1.0.0
* Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git)
*/
package api

import (
Expand All @@ -21,6 +13,25 @@ const (
EventType_CustomEvent = "customEvent"
)

type ClientEvent struct {
EventType ClientEventType `json:"eventType"`
EventData interface{} `json:"eventData"`
Status string `json:"status"`
Error error `json:"error"`
}

type ClientEventType string

const (
ClientEventType_Initialized ClientEventType = "initialized"
ClientEventType_Error ClientEventType = "error"
ClientEventType_ConfigUpdated ClientEventType = "configUpdated"
ClientEventType_RealtimeUpdates ClientEventType = "realtimeUpdates"
ClientEventType_InternalSSEFailure ClientEventType = "internalSSEFailure"
ClientEventType_InternalNewConfigAvailable ClientEventType = "internalNewConfigAvailable"
ClientEventType_InternalSSEConnected ClientEventType = "internalSSEConnected"
)

type Event struct {
Type_ string `json:"type"`
Target string `json:"target,omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions api/model_sse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package api

type MinimalConfig struct {
SSE *SSEHost `json:"sse,omitempty"`
}

type SSEHost struct {
Hostname string `json:"hostname,omitempty"`
Path string `json:"path,omitempty"`
}
3 changes: 1 addition & 2 deletions bucketing/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ func (eq *EventQueue) FlushEventQueue(clientUUID, configEtag, rayId, lastModifie
records = append(records, eq.userEventQueue.BuildBatchRecords()...)
eq.aggEventQueue = make(AggregateEventQueue)
eq.userEventQueue = make(UserEventQueue)
eq.userEventQueueCount = 0

for _, record := range records {
var payload *api.FlushPayload
Expand All @@ -286,7 +285,7 @@ func (eq *EventQueue) FlushEventQueue(clientUUID, configEtag, rayId, lastModifie
}
eq.pendingPayloads[payload.PayloadId] = *payload
}

eq.userEventQueueCount = 0
eq.updateFailedPayloads()

eq.eventsFlushed.Add(int32(len(eq.pendingPayloads)))
Expand Down
1 change: 1 addition & 0 deletions bucketing/model_config_body.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type configBody struct {
etag string
rayId string
lastModified string
SSE api.SSEHost `json:"sse,omitempty"`
variableIdMap map[string]*Variable
variableKeyMap map[string]*Variable
variableIdToFeatureMap map[string]*ConfigFeature
Expand Down
66 changes: 39 additions & 27 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type Client struct {
localBucketing LocalBucketing
platformData *PlatformData
// Set to true when the client has been initialized, regardless of whether the config has loaded successfully.
isInitialized bool
internalOnInitializedChannel chan bool
isInitialized bool
internalClientEventChannel chan api.ClientEvent
}

type LocalBucketing interface {
Expand Down Expand Up @@ -84,8 +84,11 @@ func NewClient(sdkKey string, options *Options) (*Client, error) {
util.Errorf("%v", err)
return nil, err
}
if options == nil {
return nil, errors.New("missing options! Call NewClient with valid options")
}
if !sdkKeyIsValid(sdkKey) {
return nil, fmt.Errorf("Invalid sdk key. Call NewClient with a valid sdk key.")
return nil, fmt.Errorf("invalid sdk key %s. Call NewClient with a valid sdk key", sdkKey)
}
options.CheckDefaults()
cfg := NewConfiguration(options)
Expand All @@ -99,15 +102,14 @@ func NewClient(sdkKey string, options *Options) (*Client, error) {
} else {
c.platformData = GeneratePlatformData()
}
c.internalClientEventChannel = make(chan api.ClientEvent, 1)

if c.DevCycleOptions.Logger != nil {
util.SetLogger(c.DevCycleOptions.Logger)
}
if c.IsLocalBucketing() {
util.Infof("Using Native Bucketing")

c.internalOnInitializedChannel = make(chan bool, 1)

err := c.setLBClient(sdkKey, options)
if err != nil {
return c, fmt.Errorf("Error setting up local bucketing: %w", err)
Expand All @@ -119,29 +121,27 @@ func NewClient(sdkKey string, options *Options) (*Client, error) {
return c, fmt.Errorf("Error initializing event queue: %w", err)
}

c.configManager = NewEnvironmentConfigManager(sdkKey, c.localBucketing, c.eventQueue, options, c.cfg)

c.configManager.StartPolling(options.ConfigPollingIntervalMS)
c.configManager, err = NewEnvironmentConfigManager(sdkKey, c.localBucketing, c.eventQueue, options, c.cfg)

if c.DevCycleOptions.OnInitializedChannel != nil {
// TODO: Pass this error back via a channel internally
if err != nil {
return nil, fmt.Errorf("Error initializing config manager: %w", err)
}
if c.DevCycleOptions.ClientEventHandler != nil {
go func() {
_ = c.configManager.initialFetch()
c.handleInitialization()
}()
} else {
err := c.configManager.initialFetch()
err = c.configManager.initialFetch()
c.handleInitialization()
return c, err
}
} else {
util.Infof("Using Cloud Bucketing")
if c.DevCycleOptions.OnInitializedChannel != nil {
go func() {
c.DevCycleOptions.OnInitializedChannel <- true
}()
if err != nil {
return c, err
}
}
return c, err
}

c.handleInitialization()
return c, nil
}

Expand All @@ -150,18 +150,26 @@ func (c *Client) IsLocalBucketing() bool {
}

func (c *Client) handleInitialization() {
c.isInitialized = true

bucketingInitMessage := "Using cloud bucketing with hostname: " + c.DevCycleOptions.BucketingAPIURI
if c.IsLocalBucketing() {
util.Infof("Client initialized with local bucketing %v", c.localBucketing.GetUUID())
bucketingInitMessage = fmt.Sprintf("Client initialized with local bucketing %v", c.localBucketing.GetUUID())
}
if c.DevCycleOptions.OnInitializedChannel != nil {
initEvent := api.ClientEvent{
EventType: api.ClientEventType_Initialized,
EventData: bucketingInitMessage,
Status: "success",
Error: nil,
}
c.internalClientEventChannel <- initEvent
c.isInitialized = true

if c.DevCycleOptions.ClientEventHandler != nil {
go func() {
c.DevCycleOptions.OnInitializedChannel <- true
c.DevCycleOptions.ClientEventHandler <- initEvent
}()

}
c.internalOnInitializedChannel <- true
util.Infof(bucketingInitMessage)

}

func (c *Client) generateBucketedConfig(user User) (config *BucketedUserConfig, err error) {
Expand Down Expand Up @@ -508,7 +516,11 @@ func (c *Client) Close() (err error) {

if !c.isInitialized {
util.Infof("Awaiting client initialization before closing")
<-c.internalOnInitializedChannel
for event := range c.internalClientEventChannel {
if event.EventType == api.ClientEventType_Initialized {
break
}
}
}

if c.eventQueue != nil {
Expand Down
8 changes: 4 additions & 4 deletions client_native_bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type NativeLocalBucketing struct {
clientUUID string
}

func (n *NativeLocalBucketing) GetUUID() string {
return n.clientUUID
}

func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, options *Options) (*NativeLocalBucketing, error) {
clientUUID := uuid.New().String()

Expand All @@ -53,10 +57,6 @@ func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, opti
}, err
}

func (n *NativeLocalBucketing) GetUUID() string {
return n.clientUUID
}

func (n *NativeLocalBucketing) StoreConfig(configJSON []byte, eTag, rayId, lastModified string) error {
err := bucketing.SetConfig(configJSON, n.sdkKey, eTag, rayId, lastModified, n.eventQueue)
if err != nil {
Expand Down
Loading

0 comments on commit fccc783

Please sign in to comment.