Skip to content

Commit

Permalink
fix: dont sent empty string as etag
Browse files Browse the repository at this point in the history
  • Loading branch information
suthar26 committed Apr 22, 2024
1 parent 6d84704 commit 98a7214
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 25 deletions.
5 changes: 3 additions & 2 deletions bucketing/bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func VariableForUser(sdkKey string, user api.PopulatedUser, variableKey string,
eventErr := eventQueue.QueueVariableEvaluatedEvent(variableKey, featureId, variationId)
if eventErr != nil {
util.Warnf("Failed to queue variable evaluated event: %s", eventErr)

}

return
Expand Down Expand Up @@ -266,12 +267,12 @@ func generateBucketedVariableForUser(sdkKey string, user api.PopulatedUser, key
featForVariable := config.GetFeatureForVariableId(variable.Id)
if featForVariable == nil {
err = ErrMissingFeature
return "", nil, "", "", err
return "", nil, "", "",err
}

th, err := doesUserQualifyForFeature(config, featForVariable, user, clientCustomData)
if err != nil {
return "", nil, "", "", err
return "", nil, "", "",err
}
variation, err := bucketUserForVariation(featForVariable, th)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions bucketing/bucketing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ func TestClientData(t *testing.T) {
// Ensure bucketed config has a feature variation map that's empty
bucketedUserConfig, err := GenerateBucketedConfig("test", user, nil)
require.NoError(t, err)

_, _, _, _, err = generateBucketedVariableForUser("test", user, "num-var", nil)
require.ErrorContainsf(t, err, "does not qualify", "does not qualify")
require.Equal(t, map[string]string{}, bucketedUserConfig.FeatureVariationMap)
Expand Down
2 changes: 1 addition & 1 deletion bucketing/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func GetEtag(sdkKey string) string {
if err != nil {
return ""
}
return config.etag
return config.eTag
}

func GetRawConfig(sdkKey string) []byte {
Expand Down
8 changes: 8 additions & 0 deletions bucketing/datamanager_platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@ func GetClientCustomData(sdkKey string) map[string]interface{} {
func SetClientCustomData(sdkKey string, data map[string]interface{}) {
clientCustomData[sdkKey] = data
}

func GetConfigEtag(sdkKey string) string {
config, err := getConfig(sdkKey)
if err != nil {
return ""
}
return config.eTag
}
15 changes: 11 additions & 4 deletions bucketing/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (u *UserEventQueue) BuildBatchRecords() []api.UserEventsBatchRecord {
return records
}

func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData) api.UserEventsBatchRecord {
func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData, clientUUID string, configEtag string) api.UserEventsBatchRecord {
var aggregateEvents []api.Event
userId, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -82,6 +82,11 @@ func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData
"_feature": feature,
}
}

metaData["clientUUID"] = clientUUID
if configEtag != "" {
metaData["configEtag"] = configEtag
}

event := api.Event{
Type_: _type,
Expand Down Expand Up @@ -184,6 +189,7 @@ func (eq *EventQueue) MergeAggEventQueueKeys(config *configBody) {
}
}


func (eq *EventQueue) queueAggregateEventInternal(variableKey, featureId, variationId, eventType string, defaultReason string) error {
if eq.options != nil && eq.options.IsEventLoggingDisabled(eventType) {
return nil
Expand Down Expand Up @@ -225,11 +231,12 @@ func (eq *EventQueue) QueueEvent(user api.User, event api.Event) error {
}

func (eq *EventQueue) QueueVariableEvaluatedEvent(variableKey, featureId, variationId string) error {

if eq.options.DisableAutomaticEventLogging {
return nil
}

return eq.queueAggregateEventInternal(variableKey, featureId, variationId, api.EventType_AggVariableEvaluated, "")
return eq.queueAggregateEventInternal(variableKey, featureId, variationId, api.EventType_AggVariableEvaluated, "" )
}

func (eq *EventQueue) QueueVariableDefaultedEvent(variableKey, defaultReason string) error {
Expand All @@ -240,13 +247,13 @@ func (eq *EventQueue) QueueVariableDefaultedEvent(variableKey, defaultReason str
return eq.queueAggregateEventInternal(variableKey, "", "", api.EventType_AggVariableDefaulted, defaultReason)
}

func (eq *EventQueue) FlushEventQueue() (map[string]api.FlushPayload, error) {
func (eq *EventQueue) FlushEventQueue(clientUUID string, configEtag string) (map[string]api.FlushPayload, error) {
eq.stateMutex.Lock()
defer eq.stateMutex.Unlock()

var records []api.UserEventsBatchRecord

records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData))
records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData, clientUUID, configEtag))
records = append(records, eq.userEventQueue.BuildBatchRecords()...)
eq.aggEventQueue = make(AggregateEventQueue)
eq.userEventQueue = make(UserEventQueue)
Expand Down
2 changes: 1 addition & 1 deletion bucketing/event_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestEventQueue_QueueAndFlush(t *testing.T) {
require.Equal(t, 2, len(eq.userEventQueue))
require.Equal(t, 0, len(eq.userEventQueueRaw))

payloads, err := eq.FlushEventQueue()
payloads, err := eq.FlushEventQueue("","" )
require.NoError(t, err)
require.Equal(t, 2, len(payloads))
require.Equal(t, 0, len(eq.userEventQueue))
Expand Down
4 changes: 2 additions & 2 deletions bucketing/model_config_body.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type configBody struct {
Environment api.Environment `json:"environment" validate:"required"`
Features []*ConfigFeature `json:"features" validate:"required"`
Variables []*Variable `json:"variables" validate:"required,dive"`
etag string // TODO: remove etag
eTag string
variableIdMap map[string]*Variable
variableKeyMap map[string]*Variable
variableIdToFeatureMap map[string]*ConfigFeature
Expand Down Expand Up @@ -95,7 +95,7 @@ func (c *configBody) compile(etag string) {
c.variableIdToFeatureMap = variableIdToFeatureMap
c.variableIdMap = variableIdMap
c.variableKeyMap = variableKeyMap
c.etag = etag
c.eTag = etag

// Sort the feature distributions by "_variation" attribute in descending alphabetical order
for _, feature := range c.Features {
Expand Down
5 changes: 5 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Client struct {
// Set to true when the client has been initialized, regardless of whether the config has loaded successfully.
isInitialized bool
internalOnInitializedChannel chan bool
clientUUID string
}

type LocalBucketing interface {
Expand Down Expand Up @@ -151,6 +152,10 @@ func (c *Client) IsLocalBucketing() bool {

func (c *Client) handleInitialization() {
c.isInitialized = true

if(c.IsLocalBucketing()){
util.Infof("Client initialized with local bucketing %v", c.clientUUID)
}
if c.DevCycleOptions.OnInitializedChannel != nil {
go func() {
c.DevCycleOptions.OnInitializedChannel <- true
Expand Down
21 changes: 16 additions & 5 deletions client_native_bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/devcyclehq/go-server-sdk/v2/util"
"github.com/google/uuid"

"github.com/devcyclehq/go-server-sdk/v2/api"

Expand All @@ -18,11 +19,13 @@ const NATIVE_SDK = true
var DEFAULT_USER_TIME = time.Time{}

func (c *Client) setLBClient(sdkKey string, options *Options) error {
localBucketing, err := NewNativeLocalBucketing(sdkKey, c.platformData, options)
clientUUID := uuid.New().String()
localBucketing, err := NewNativeLocalBucketing(sdkKey, c.platformData, options, clientUUID)
if err != nil {
return err
}
c.localBucketing = localBucketing
c.clientUUID = clientUUID

return nil
}
Expand All @@ -33,9 +36,10 @@ type NativeLocalBucketing struct {
configMutex sync.RWMutex
platformData *api.PlatformData
eventQueue *bucketing.EventQueue
clientUUID string
}

func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, options *Options) (*NativeLocalBucketing, error) {
func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, options *Options, clientUUID string) (*NativeLocalBucketing, error) {
eq, err := bucketing.NewEventQueue(sdkKey, options.eventQueueOptions(), platformData)
if err != nil {
return nil, err
Expand All @@ -45,11 +49,17 @@ func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, opti
options: options,
platformData: platformData,
eventQueue: eq,
clientUUID: clientUUID,
}, err
}

func (n *NativeLocalBucketing) StoreConfig(configJSON []byte, eTag string) error {
err := bucketing.SetConfig(configJSON, n.sdkKey, eTag, n.eventQueue)
oldETag := bucketing.GetEtag(n.sdkKey)
_,err := n.eventQueue.FlushEventQueue(n.clientUUID, oldETag)
if err != nil {
return fmt.Errorf("Error flushing events for %s: %w", oldETag, err)
}
err = bucketing.SetConfig(configJSON, n.sdkKey, eTag, n.eventQueue)
if err != nil {
return fmt.Errorf("Error parsing config: %w", err)
}
Expand Down Expand Up @@ -92,7 +102,7 @@ func (n *NativeLocalBucketing) Variable(user User, variableKey string, variableT
}
clientCustomData := bucketing.GetClientCustomData(n.sdkKey)
populatedUser := user.GetPopulatedUserWithTime(n.platformData, DEFAULT_USER_TIME)

resultVariableType, resultValue, err := bucketing.VariableForUser(n.sdkKey, populatedUser, variableKey, variableType, n.eventQueue, clientCustomData)
if err != nil {
return defaultVar, nil
Expand Down Expand Up @@ -128,7 +138,8 @@ func (n *NativeLocalBucketing) UserQueueLength() (int, error) {
}

func (n *NativeLocalBucketing) FlushEventQueue(callback EventFlushCallback) error {
payloads, err := n.eventQueue.FlushEventQueue()
configEtag := bucketing.GetConfigEtag(n.sdkKey)
payloads, err := n.eventQueue.FlushEventQueue(n.clientUUID, configEtag)
if err != nil {
return fmt.Errorf("Error flushing event queue: %w", err)
}
Expand Down
17 changes: 15 additions & 2 deletions configmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,29 @@ func (e *EnvironmentConfigManager) fetchConfig(numRetriesRemaining int) (err err

func (e *EnvironmentConfigManager) setConfigFromResponse(response *http.Response) error {
config, err := io.ReadAll(response.Body)

if err != nil {
return err
}

configMap := make(map[string]interface{})
err = json.Unmarshal(config, &configMap)
if err != nil {
return err
}

configMap["eTag"] = response.Header.Get("Etag")
configWithTag, err := json.Marshal(configMap)
if err != nil {
return err
}
// Check
valid := json.Valid(config)
valid := json.Valid(configWithTag)
if !valid {
return fmt.Errorf("invalid JSON data received for config")
}

err = e.setConfig(config, response.Header.Get("ETag"))
err = e.setConfig(configWithTag, response.Header.Get("Etag"))

if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func (e *EventManager) FlushEvents() (err error) {
e.flushMutex.Lock()
defer e.flushMutex.Unlock()

util.Debugf("Started flushing events")

defer func() {
if r := recover(); r != nil {
Expand Down
2 changes: 1 addition & 1 deletion example/cloud/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func main() {
if variableKey == "" {
log.Fatal("DEVCYCLE_VARIABLE_KEY env var not set: set it to a variable key")
}
user := devcycle.User{UserId: "test"}
user := devcycle.User{UserId: "suthar-test-user"}
dvcOptions := devcycle.Options{
EnableEdgeDB: false,
EnableCloudBucketing: true,
Expand Down
54 changes: 48 additions & 6 deletions example/local/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ func main() {
if sdkKey == "" {
log.Fatal("DEVCYCLE_SERVER_SDK_KEY env var not set: set it to your SDK key")
}
variableKey := os.Getenv("DEVCYCLE_VARIABLE_KEY")
variableKey :="rel"
if variableKey == "" {
log.Fatal("DEVCYCLE_VARIABLE_KEY env var not set: set it to a variable key")
}

user := devcycle.User{UserId: "test"}
user := devcycle.User{UserId: "test-user"}
dvcOptions := devcycle.Options{
EnableEdgeDB: false,
EnableCloudBucketing: false,
EventFlushIntervalMS: 0,
EventFlushIntervalMS: 5,
ConfigPollingIntervalMS: 10 * time.Second,
RequestTimeout: 10 * time.Second,
DisableAutomaticEventLogging: false,
Expand All @@ -32,18 +32,20 @@ func main() {

client, err := devcycle.NewClient(sdkKey, &dvcOptions)
time.Sleep(10 * time.Second)
fmt.Println("Error? ", err)
if(err != nil) {
log.Fatalf("Error initializing client: %v", err)
}
fmt.Println(client.GetRawConfig())
log.Printf("client initialized")

features, _ := client.AllFeatures(user)
for key, feature := range features {
log.Printf("Key:%s, feature:%#v", key, feature)
log.Printf("features Key:%s, feature:%#v", key, feature)
}

variables, _ := client.AllVariables(user)
for key, variable := range variables {
log.Printf("Key:%s, variable:%#v", key, variable)
log.Printf("variables Key:%s, variable:%#v", key, variable)
}

existingVariable, err := client.Variable(user, variableKey, "DEFAULT")
Expand All @@ -70,6 +72,12 @@ func main() {
log.Printf("Warning: variable %v should be defaulted", missingVariable.Key)
}

anotherVariable, _ := client.Variable(user, "anotherfea", false)
if err != nil {
log.Fatalf("Error getting variable: %v", err)
}
log.Printf("variable %v: value=%v (%v) defaulted=%t", anotherVariable.Key, anotherVariable.Value, anotherVariable.Type_, anotherVariable.IsDefaulted)

event := devcycle.Event{
Type_: "customEvent",
Target: "somevariable.key",
Expand All @@ -78,4 +86,38 @@ func main() {
if err != nil {
log.Fatalf("Error tracking event: %v", err)
}

err = client.FlushEvents()

if err != nil {
log.Fatalf("Error flushing events: %v", err)
}
time.Sleep(10 * time.Second * 60)

existingVariable, err = client.Variable(user, variableKey, "DEFAULT")
if err != nil {
log.Fatalf("Error getting variable %v: %v", variableKey, err)
}
log.Printf("variable %v: value=%v (%v) defaulted=%t", existingVariable.Key, existingVariable.Value, existingVariable.Type_, existingVariable.IsDefaulted)
if existingVariable.IsDefaulted {
log.Printf("Warning: variable %v should be defaulted", existingVariable.Key)
}

anotherVariable, _ = client.Variable(user, "anotherfea", false)
if err != nil {
log.Fatalf("Error getting variable: %v", err)
}
log.Printf("variable %v: value=%v (%v) defaulted=%t", anotherVariable.Key, anotherVariable.Value, anotherVariable.Type_, anotherVariable.IsDefaulted)

err = client.FlushEvents()
if(err != nil) {
log.Fatalf("Error flushing events: %v", err)
}

time.Sleep(10 * time.Second * 60)
client.Close()




}

0 comments on commit 98a7214

Please sign in to comment.