From 98a7214275bbcbdb89a3d7424ce636fb06822520 Mon Sep 17 00:00:00 2001 From: Parth Suthar Date: Thu, 18 Apr 2024 11:46:12 -0400 Subject: [PATCH] fix: dont sent empty string as etag --- bucketing/bucketing.go | 5 +-- bucketing/bucketing_test.go | 1 + bucketing/config_manager.go | 2 +- bucketing/datamanager_platform.go | 8 +++++ bucketing/event_queue.go | 15 ++++++--- bucketing/event_queue_test.go | 2 +- bucketing/model_config_body.go | 4 +-- client.go | 5 +++ client_native_bucketing.go | 21 +++++++++--- configmanager.go | 17 ++++++++-- event_manager.go | 1 - example/cloud/main.go | 2 +- example/local/main.go | 54 +++++++++++++++++++++++++++---- 13 files changed, 112 insertions(+), 25 deletions(-) diff --git a/bucketing/bucketing.go b/bucketing/bucketing.go index f4d1bd19..ce68edcf 100644 --- a/bucketing/bucketing.go +++ b/bucketing/bucketing.go @@ -234,6 +234,7 @@ func VariableForUser(sdkKey string, user api.PopulatedUser, variableKey string, eventErr := eventQueue.QueueVariableEvaluatedEvent(variableKey, featureId, variationId) if eventErr != nil { util.Warnf("Failed to queue variable evaluated event: %s", eventErr) + } return @@ -266,12 +267,12 @@ func generateBucketedVariableForUser(sdkKey string, user api.PopulatedUser, key featForVariable := config.GetFeatureForVariableId(variable.Id) if featForVariable == nil { err = ErrMissingFeature - return "", nil, "", "", err + return "", nil, "", "",err } th, err := doesUserQualifyForFeature(config, featForVariable, user, clientCustomData) if err != nil { - return "", nil, "", "", err + return "", nil, "", "",err } variation, err := bucketUserForVariation(featForVariable, th) if err != nil { diff --git a/bucketing/bucketing_test.go b/bucketing/bucketing_test.go index 084eb783..8c34dc9b 100644 --- a/bucketing/bucketing_test.go +++ b/bucketing/bucketing_test.go @@ -472,6 +472,7 @@ func TestClientData(t *testing.T) { // Ensure bucketed config has a feature variation map that's empty bucketedUserConfig, err := GenerateBucketedConfig("test", user, nil) require.NoError(t, err) + _, _, _, _, err = generateBucketedVariableForUser("test", user, "num-var", nil) require.ErrorContainsf(t, err, "does not qualify", "does not qualify") require.Equal(t, map[string]string{}, bucketedUserConfig.FeatureVariationMap) diff --git a/bucketing/config_manager.go b/bucketing/config_manager.go index 6bbfd74b..d98f2183 100644 --- a/bucketing/config_manager.go +++ b/bucketing/config_manager.go @@ -23,7 +23,7 @@ func GetEtag(sdkKey string) string { if err != nil { return "" } - return config.etag + return config.eTag } func GetRawConfig(sdkKey string) []byte { diff --git a/bucketing/datamanager_platform.go b/bucketing/datamanager_platform.go index c5e077f9..c90cddeb 100644 --- a/bucketing/datamanager_platform.go +++ b/bucketing/datamanager_platform.go @@ -13,3 +13,11 @@ func GetClientCustomData(sdkKey string) map[string]interface{} { func SetClientCustomData(sdkKey string, data map[string]interface{}) { clientCustomData[sdkKey] = data } + +func GetConfigEtag(sdkKey string) string { + config, err := getConfig(sdkKey) + if err != nil { + return "" + } + return config.eTag +} \ No newline at end of file diff --git a/bucketing/event_queue.go b/bucketing/event_queue.go index a97d031b..32f107a5 100644 --- a/bucketing/event_queue.go +++ b/bucketing/event_queue.go @@ -53,7 +53,7 @@ func (u *UserEventQueue) BuildBatchRecords() []api.UserEventsBatchRecord { return records } -func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData) api.UserEventsBatchRecord { +func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData, clientUUID string, configEtag string) api.UserEventsBatchRecord { var aggregateEvents []api.Event userId, err := os.Hostname() if err != nil { @@ -82,6 +82,11 @@ func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData "_feature": feature, } } + + metaData["clientUUID"] = clientUUID + if configEtag != "" { + metaData["configEtag"] = configEtag + } event := api.Event{ Type_: _type, @@ -184,6 +189,7 @@ func (eq *EventQueue) MergeAggEventQueueKeys(config *configBody) { } } + func (eq *EventQueue) queueAggregateEventInternal(variableKey, featureId, variationId, eventType string, defaultReason string) error { if eq.options != nil && eq.options.IsEventLoggingDisabled(eventType) { return nil @@ -225,11 +231,12 @@ func (eq *EventQueue) QueueEvent(user api.User, event api.Event) error { } func (eq *EventQueue) QueueVariableEvaluatedEvent(variableKey, featureId, variationId string) error { + if eq.options.DisableAutomaticEventLogging { return nil } - return eq.queueAggregateEventInternal(variableKey, featureId, variationId, api.EventType_AggVariableEvaluated, "") + return eq.queueAggregateEventInternal(variableKey, featureId, variationId, api.EventType_AggVariableEvaluated, "" ) } func (eq *EventQueue) QueueVariableDefaultedEvent(variableKey, defaultReason string) error { @@ -240,13 +247,13 @@ func (eq *EventQueue) QueueVariableDefaultedEvent(variableKey, defaultReason str return eq.queueAggregateEventInternal(variableKey, "", "", api.EventType_AggVariableDefaulted, defaultReason) } -func (eq *EventQueue) FlushEventQueue() (map[string]api.FlushPayload, error) { +func (eq *EventQueue) FlushEventQueue(clientUUID string, configEtag string) (map[string]api.FlushPayload, error) { eq.stateMutex.Lock() defer eq.stateMutex.Unlock() var records []api.UserEventsBatchRecord - records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData)) + records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData, clientUUID, configEtag)) records = append(records, eq.userEventQueue.BuildBatchRecords()...) eq.aggEventQueue = make(AggregateEventQueue) eq.userEventQueue = make(UserEventQueue) diff --git a/bucketing/event_queue_test.go b/bucketing/event_queue_test.go index 25d3ed87..241070be 100644 --- a/bucketing/event_queue_test.go +++ b/bucketing/event_queue_test.go @@ -197,7 +197,7 @@ func TestEventQueue_QueueAndFlush(t *testing.T) { require.Equal(t, 2, len(eq.userEventQueue)) require.Equal(t, 0, len(eq.userEventQueueRaw)) - payloads, err := eq.FlushEventQueue() + payloads, err := eq.FlushEventQueue("","" ) require.NoError(t, err) require.Equal(t, 2, len(payloads)) require.Equal(t, 0, len(eq.userEventQueue)) diff --git a/bucketing/model_config_body.go b/bucketing/model_config_body.go index 00a4a446..f80251e9 100644 --- a/bucketing/model_config_body.go +++ b/bucketing/model_config_body.go @@ -30,7 +30,7 @@ type configBody struct { Environment api.Environment `json:"environment" validate:"required"` Features []*ConfigFeature `json:"features" validate:"required"` Variables []*Variable `json:"variables" validate:"required,dive"` - etag string // TODO: remove etag + eTag string variableIdMap map[string]*Variable variableKeyMap map[string]*Variable variableIdToFeatureMap map[string]*ConfigFeature @@ -95,7 +95,7 @@ func (c *configBody) compile(etag string) { c.variableIdToFeatureMap = variableIdToFeatureMap c.variableIdMap = variableIdMap c.variableKeyMap = variableKeyMap - c.etag = etag + c.eTag = etag // Sort the feature distributions by "_variation" attribute in descending alphabetical order for _, feature := range c.Features { diff --git a/client.go b/client.go index 4fc798d3..bafb4fa3 100644 --- a/client.go +++ b/client.go @@ -55,6 +55,7 @@ type Client struct { // Set to true when the client has been initialized, regardless of whether the config has loaded successfully. isInitialized bool internalOnInitializedChannel chan bool + clientUUID string } type LocalBucketing interface { @@ -151,6 +152,10 @@ func (c *Client) IsLocalBucketing() bool { func (c *Client) handleInitialization() { c.isInitialized = true + + if(c.IsLocalBucketing()){ + util.Infof("Client initialized with local bucketing %v", c.clientUUID) + } if c.DevCycleOptions.OnInitializedChannel != nil { go func() { c.DevCycleOptions.OnInitializedChannel <- true diff --git a/client_native_bucketing.go b/client_native_bucketing.go index 99476f77..1d4e1b65 100644 --- a/client_native_bucketing.go +++ b/client_native_bucketing.go @@ -6,6 +6,7 @@ import ( "time" "github.com/devcyclehq/go-server-sdk/v2/util" + "github.com/google/uuid" "github.com/devcyclehq/go-server-sdk/v2/api" @@ -18,11 +19,13 @@ const NATIVE_SDK = true var DEFAULT_USER_TIME = time.Time{} func (c *Client) setLBClient(sdkKey string, options *Options) error { - localBucketing, err := NewNativeLocalBucketing(sdkKey, c.platformData, options) + clientUUID := uuid.New().String() + localBucketing, err := NewNativeLocalBucketing(sdkKey, c.platformData, options, clientUUID) if err != nil { return err } c.localBucketing = localBucketing + c.clientUUID = clientUUID return nil } @@ -33,9 +36,10 @@ type NativeLocalBucketing struct { configMutex sync.RWMutex platformData *api.PlatformData eventQueue *bucketing.EventQueue + clientUUID string } -func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, options *Options) (*NativeLocalBucketing, error) { +func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, options *Options, clientUUID string) (*NativeLocalBucketing, error) { eq, err := bucketing.NewEventQueue(sdkKey, options.eventQueueOptions(), platformData) if err != nil { return nil, err @@ -45,11 +49,17 @@ func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, opti options: options, platformData: platformData, eventQueue: eq, + clientUUID: clientUUID, }, err } func (n *NativeLocalBucketing) StoreConfig(configJSON []byte, eTag string) error { - err := bucketing.SetConfig(configJSON, n.sdkKey, eTag, n.eventQueue) + oldETag := bucketing.GetEtag(n.sdkKey) + _,err := n.eventQueue.FlushEventQueue(n.clientUUID, oldETag) + if err != nil { + return fmt.Errorf("Error flushing events for %s: %w", oldETag, err) + } + err = bucketing.SetConfig(configJSON, n.sdkKey, eTag, n.eventQueue) if err != nil { return fmt.Errorf("Error parsing config: %w", err) } @@ -92,7 +102,7 @@ func (n *NativeLocalBucketing) Variable(user User, variableKey string, variableT } clientCustomData := bucketing.GetClientCustomData(n.sdkKey) populatedUser := user.GetPopulatedUserWithTime(n.platformData, DEFAULT_USER_TIME) - + resultVariableType, resultValue, err := bucketing.VariableForUser(n.sdkKey, populatedUser, variableKey, variableType, n.eventQueue, clientCustomData) if err != nil { return defaultVar, nil @@ -128,7 +138,8 @@ func (n *NativeLocalBucketing) UserQueueLength() (int, error) { } func (n *NativeLocalBucketing) FlushEventQueue(callback EventFlushCallback) error { - payloads, err := n.eventQueue.FlushEventQueue() + configEtag := bucketing.GetConfigEtag(n.sdkKey) + payloads, err := n.eventQueue.FlushEventQueue(n.clientUUID, configEtag) if err != nil { return fmt.Errorf("Error flushing event queue: %w", err) } diff --git a/configmanager.go b/configmanager.go index c9688217..b548f1d9 100644 --- a/configmanager.go +++ b/configmanager.go @@ -137,16 +137,29 @@ func (e *EnvironmentConfigManager) fetchConfig(numRetriesRemaining int) (err err func (e *EnvironmentConfigManager) setConfigFromResponse(response *http.Response) error { config, err := io.ReadAll(response.Body) + + if err != nil { + return err + } + + configMap := make(map[string]interface{}) + err = json.Unmarshal(config, &configMap) + if err != nil { + return err + } + + configMap["eTag"] = response.Header.Get("Etag") + configWithTag, err := json.Marshal(configMap) if err != nil { return err } // Check - valid := json.Valid(config) + valid := json.Valid(configWithTag) if !valid { return fmt.Errorf("invalid JSON data received for config") } - err = e.setConfig(config, response.Header.Get("ETag")) + err = e.setConfig(configWithTag, response.Header.Get("Etag")) if err != nil { return err diff --git a/event_manager.go b/event_manager.go index d71f4507..76165375 100644 --- a/event_manager.go +++ b/event_manager.go @@ -117,7 +117,6 @@ func (e *EventManager) FlushEvents() (err error) { e.flushMutex.Lock() defer e.flushMutex.Unlock() - util.Debugf("Started flushing events") defer func() { if r := recover(); r != nil { diff --git a/example/cloud/main.go b/example/cloud/main.go index 92b0d6fb..d5b3842d 100644 --- a/example/cloud/main.go +++ b/example/cloud/main.go @@ -17,7 +17,7 @@ func main() { if variableKey == "" { log.Fatal("DEVCYCLE_VARIABLE_KEY env var not set: set it to a variable key") } - user := devcycle.User{UserId: "test"} + user := devcycle.User{UserId: "suthar-test-user"} dvcOptions := devcycle.Options{ EnableEdgeDB: false, EnableCloudBucketing: true, diff --git a/example/local/main.go b/example/local/main.go index 4af111cb..7359047d 100644 --- a/example/local/main.go +++ b/example/local/main.go @@ -14,16 +14,16 @@ func main() { if sdkKey == "" { log.Fatal("DEVCYCLE_SERVER_SDK_KEY env var not set: set it to your SDK key") } - variableKey := os.Getenv("DEVCYCLE_VARIABLE_KEY") + variableKey :="rel" if variableKey == "" { log.Fatal("DEVCYCLE_VARIABLE_KEY env var not set: set it to a variable key") } - user := devcycle.User{UserId: "test"} + user := devcycle.User{UserId: "test-user"} dvcOptions := devcycle.Options{ EnableEdgeDB: false, EnableCloudBucketing: false, - EventFlushIntervalMS: 0, + EventFlushIntervalMS: 5, ConfigPollingIntervalMS: 10 * time.Second, RequestTimeout: 10 * time.Second, DisableAutomaticEventLogging: false, @@ -32,18 +32,20 @@ func main() { client, err := devcycle.NewClient(sdkKey, &dvcOptions) time.Sleep(10 * time.Second) - fmt.Println("Error? ", err) + if(err != nil) { + log.Fatalf("Error initializing client: %v", err) + } fmt.Println(client.GetRawConfig()) log.Printf("client initialized") features, _ := client.AllFeatures(user) for key, feature := range features { - log.Printf("Key:%s, feature:%#v", key, feature) + log.Printf("features Key:%s, feature:%#v", key, feature) } variables, _ := client.AllVariables(user) for key, variable := range variables { - log.Printf("Key:%s, variable:%#v", key, variable) + log.Printf("variables Key:%s, variable:%#v", key, variable) } existingVariable, err := client.Variable(user, variableKey, "DEFAULT") @@ -70,6 +72,12 @@ func main() { log.Printf("Warning: variable %v should be defaulted", missingVariable.Key) } + anotherVariable, _ := client.Variable(user, "anotherfea", false) + if err != nil { + log.Fatalf("Error getting variable: %v", err) + } + log.Printf("variable %v: value=%v (%v) defaulted=%t", anotherVariable.Key, anotherVariable.Value, anotherVariable.Type_, anotherVariable.IsDefaulted) + event := devcycle.Event{ Type_: "customEvent", Target: "somevariable.key", @@ -78,4 +86,38 @@ func main() { if err != nil { log.Fatalf("Error tracking event: %v", err) } + + err = client.FlushEvents() + + if err != nil { + log.Fatalf("Error flushing events: %v", err) + } + time.Sleep(10 * time.Second * 60) + + existingVariable, err = client.Variable(user, variableKey, "DEFAULT") + if err != nil { + log.Fatalf("Error getting variable %v: %v", variableKey, err) + } + log.Printf("variable %v: value=%v (%v) defaulted=%t", existingVariable.Key, existingVariable.Value, existingVariable.Type_, existingVariable.IsDefaulted) + if existingVariable.IsDefaulted { + log.Printf("Warning: variable %v should be defaulted", existingVariable.Key) + } + + anotherVariable, _ = client.Variable(user, "anotherfea", false) + if err != nil { + log.Fatalf("Error getting variable: %v", err) + } + log.Printf("variable %v: value=%v (%v) defaulted=%t", anotherVariable.Key, anotherVariable.Value, anotherVariable.Type_, anotherVariable.IsDefaulted) + + err = client.FlushEvents() + if(err != nil) { + log.Fatalf("Error flushing events: %v", err) + } + + time.Sleep(10 * time.Second * 60) + client.Close() + + + + }