diff --git a/bucketing/event_queue.go b/bucketing/event_queue.go index a97d031b..b91f1e4c 100644 --- a/bucketing/event_queue.go +++ b/bucketing/event_queue.go @@ -53,7 +53,7 @@ func (u *UserEventQueue) BuildBatchRecords() []api.UserEventsBatchRecord { return records } -func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData) api.UserEventsBatchRecord { +func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData, clientUUID string, configEtag string) api.UserEventsBatchRecord { var aggregateEvents []api.Event userId, err := os.Hostname() if err != nil { @@ -82,6 +82,11 @@ func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData "_feature": feature, } } + + metaData["clientUUID"] = clientUUID + if configEtag != "" { + metaData["configEtag"] = configEtag + } event := api.Event{ Type_: _type, @@ -240,13 +245,13 @@ func (eq *EventQueue) QueueVariableDefaultedEvent(variableKey, defaultReason str return eq.queueAggregateEventInternal(variableKey, "", "", api.EventType_AggVariableDefaulted, defaultReason) } -func (eq *EventQueue) FlushEventQueue() (map[string]api.FlushPayload, error) { +func (eq *EventQueue) FlushEventQueue(clientUUID string, configEtag string) (map[string]api.FlushPayload, error) { eq.stateMutex.Lock() defer eq.stateMutex.Unlock() var records []api.UserEventsBatchRecord - records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData)) + records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData, clientUUID, configEtag)) records = append(records, eq.userEventQueue.BuildBatchRecords()...) eq.aggEventQueue = make(AggregateEventQueue) eq.userEventQueue = make(UserEventQueue) diff --git a/bucketing/event_queue_test.go b/bucketing/event_queue_test.go index 25d3ed87..241070be 100644 --- a/bucketing/event_queue_test.go +++ b/bucketing/event_queue_test.go @@ -197,7 +197,7 @@ func TestEventQueue_QueueAndFlush(t *testing.T) { require.Equal(t, 2, len(eq.userEventQueue)) require.Equal(t, 0, len(eq.userEventQueueRaw)) - payloads, err := eq.FlushEventQueue() + payloads, err := eq.FlushEventQueue("","" ) require.NoError(t, err) require.Equal(t, 2, len(payloads)) require.Equal(t, 0, len(eq.userEventQueue)) diff --git a/bucketing/model_config_body.go b/bucketing/model_config_body.go index 00a4a446..2442b570 100644 --- a/bucketing/model_config_body.go +++ b/bucketing/model_config_body.go @@ -30,7 +30,7 @@ type configBody struct { Environment api.Environment `json:"environment" validate:"required"` Features []*ConfigFeature `json:"features" validate:"required"` Variables []*Variable `json:"variables" validate:"required,dive"` - etag string // TODO: remove etag + etag string variableIdMap map[string]*Variable variableKeyMap map[string]*Variable variableIdToFeatureMap map[string]*ConfigFeature diff --git a/client.go b/client.go index 4fc798d3..e005b9d0 100644 --- a/client.go +++ b/client.go @@ -62,6 +62,7 @@ type LocalBucketing interface { InternalEventQueue GenerateBucketedConfigForUser(user User) (ret *BucketedUserConfig, err error) SetClientCustomData(map[string]interface{}) error + GetClientUUID() string Variable(user User, key string, variableType string) (variable Variable, err error) Close() } @@ -151,6 +152,10 @@ func (c *Client) IsLocalBucketing() bool { func (c *Client) handleInitialization() { c.isInitialized = true + + if(c.IsLocalBucketing()){ + util.Infof("Client initialized with local bucketing %v", c.localBucketing.GetClientUUID()) + } if c.DevCycleOptions.OnInitializedChannel != nil { go func() { c.DevCycleOptions.OnInitializedChannel <- true diff --git a/client_native_bucketing.go b/client_native_bucketing.go index 99476f77..1db0e1b1 100644 --- a/client_native_bucketing.go +++ b/client_native_bucketing.go @@ -6,6 +6,7 @@ import ( "time" "github.com/devcyclehq/go-server-sdk/v2/util" + "github.com/google/uuid" "github.com/devcyclehq/go-server-sdk/v2/api" @@ -33,9 +34,12 @@ type NativeLocalBucketing struct { configMutex sync.RWMutex platformData *api.PlatformData eventQueue *bucketing.EventQueue + clientUUID string } func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, options *Options) (*NativeLocalBucketing, error) { + clientUUID := uuid.New().String() + eq, err := bucketing.NewEventQueue(sdkKey, options.eventQueueOptions(), platformData) if err != nil { return nil, err @@ -45,11 +49,17 @@ func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, opti options: options, platformData: platformData, eventQueue: eq, + clientUUID: clientUUID, }, err } func (n *NativeLocalBucketing) StoreConfig(configJSON []byte, eTag string) error { - err := bucketing.SetConfig(configJSON, n.sdkKey, eTag, n.eventQueue) + oldETag := bucketing.GetEtag(n.sdkKey) + _,err := n.eventQueue.FlushEventQueue(n.clientUUID, oldETag) + if err != nil { + return fmt.Errorf("Error flushing events for %s: %w", oldETag, err) + } + err = bucketing.SetConfig(configJSON, n.sdkKey, eTag, n.eventQueue) if err != nil { return fmt.Errorf("Error parsing config: %w", err) } @@ -68,6 +78,10 @@ func (n *NativeLocalBucketing) HasConfig() bool { return bucketing.HasConfig(n.sdkKey) } +func (n *NativeLocalBucketing) GetClientUUID() (string) { + return n.clientUUID +} + func (n *NativeLocalBucketing) GenerateBucketedConfigForUser(user User) (ret *BucketedUserConfig, err error) { populatedUser := user.GetPopulatedUserWithTime(n.platformData, DEFAULT_USER_TIME) clientCustomData := bucketing.GetClientCustomData(n.sdkKey) @@ -92,7 +106,6 @@ func (n *NativeLocalBucketing) Variable(user User, variableKey string, variableT } clientCustomData := bucketing.GetClientCustomData(n.sdkKey) populatedUser := user.GetPopulatedUserWithTime(n.platformData, DEFAULT_USER_TIME) - resultVariableType, resultValue, err := bucketing.VariableForUser(n.sdkKey, populatedUser, variableKey, variableType, n.eventQueue, clientCustomData) if err != nil { return defaultVar, nil @@ -128,7 +141,8 @@ func (n *NativeLocalBucketing) UserQueueLength() (int, error) { } func (n *NativeLocalBucketing) FlushEventQueue(callback EventFlushCallback) error { - payloads, err := n.eventQueue.FlushEventQueue() + configEtag := bucketing.GetEtag(n.sdkKey) + payloads, err := n.eventQueue.FlushEventQueue(n.clientUUID, configEtag) if err != nil { return fmt.Errorf("Error flushing event queue: %w", err) } diff --git a/configmanager.go b/configmanager.go index c9688217..d6250298 100644 --- a/configmanager.go +++ b/configmanager.go @@ -137,6 +137,7 @@ func (e *EnvironmentConfigManager) fetchConfig(numRetriesRemaining int) (err err func (e *EnvironmentConfigManager) setConfigFromResponse(response *http.Response) error { config, err := io.ReadAll(response.Body) + if err != nil { return err } @@ -146,7 +147,7 @@ func (e *EnvironmentConfigManager) setConfigFromResponse(response *http.Response return fmt.Errorf("invalid JSON data received for config") } - err = e.setConfig(config, response.Header.Get("ETag")) + err = e.setConfig(config, response.Header.Get("Etag")) if err != nil { return err diff --git a/event_manager.go b/event_manager.go index d71f4507..e2ba139f 100644 --- a/event_manager.go +++ b/event_manager.go @@ -118,7 +118,7 @@ func (e *EventManager) FlushEvents() (err error) { defer e.flushMutex.Unlock() util.Debugf("Started flushing events") - + defer func() { if r := recover(); r != nil { // get the stack trace and potentially log it here diff --git a/example/cloud/main.go b/example/cloud/main.go index 92b0d6fb..82cf0b42 100644 --- a/example/cloud/main.go +++ b/example/cloud/main.go @@ -17,7 +17,7 @@ func main() { if variableKey == "" { log.Fatal("DEVCYCLE_VARIABLE_KEY env var not set: set it to a variable key") } - user := devcycle.User{UserId: "test"} + user := devcycle.User{UserId: "test-user"} dvcOptions := devcycle.Options{ EnableEdgeDB: false, EnableCloudBucketing: true, diff --git a/example/local/main.go b/example/local/main.go index 4af111cb..925d5d8e 100644 --- a/example/local/main.go +++ b/example/local/main.go @@ -32,18 +32,20 @@ func main() { client, err := devcycle.NewClient(sdkKey, &dvcOptions) time.Sleep(10 * time.Second) - fmt.Println("Error? ", err) + if(err != nil) { + log.Fatalf("Error initializing client: %v", err) + } fmt.Println(client.GetRawConfig()) log.Printf("client initialized") features, _ := client.AllFeatures(user) for key, feature := range features { - log.Printf("Key:%s, feature:%#v", key, feature) + log.Printf("features Key:%s, feature:%#v", key, feature) } variables, _ := client.AllVariables(user) for key, variable := range variables { - log.Printf("Key:%s, variable:%#v", key, variable) + log.Printf("variables Key:%s, variable:%#v", key, variable) } existingVariable, err := client.Variable(user, variableKey, "DEFAULT") @@ -78,4 +80,14 @@ func main() { if err != nil { log.Fatalf("Error tracking event: %v", err) } + + err = client.FlushEvents() + if(err != nil) { + log.Fatalf("Error flushing events: %v", err) + } + + time.Sleep(2 * time.Second) + + client.Close() + }