Skip to content

Commit

Permalink
chore: add client date
Browse files Browse the repository at this point in the history
  • Loading branch information
suthar26 committed Apr 19, 2024
1 parent 6d84704 commit 7953ea9
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 46 deletions.
1 change: 1 addition & 0 deletions api/model_bucketed_user_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type BucketedUserConfig struct {
VariableVariationMap map[string]FeatureVariation `json:"variableVariationMap"`
Variables map[string]ReadOnlyVariable `json:"variables"`
KnownVariableKeys []float64 `json:"knownVariableKeys"`
ETag string `json:"eTag"`

User *User `json:"-"`
}
26 changes: 14 additions & 12 deletions bucketing/bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,14 @@ func GenerateBucketedConfig(sdkKey string, user api.PopulatedUser, clientCustomD
FeatureVariationMap: featureVariationMap,
VariableVariationMap: variableVariationMap,
Variables: variableMap,
ETag: config.eTag,
}, nil
}

func VariableForUser(sdkKey string, user api.PopulatedUser, variableKey string, expectedVariableType string, eventQueue *EventQueue, clientCustomData map[string]interface{}) (variableType string, variableValue any, err error) {
variableType, variableValue, featureId, variationId, err := generateBucketedVariableForUser(sdkKey, user, variableKey, clientCustomData)
variableType, variableValue, featureId, variationId, clientUUID, configEtag, err := generateBucketedVariableForUser(sdkKey, user, variableKey, clientCustomData)
if err != nil {
eventErr := eventQueue.QueueVariableDefaultedEvent(variableKey, BucketResultErrorToDefaultReason(err))
eventErr := eventQueue.QueueVariableDefaultedEvent(variableKey, BucketResultErrorToDefaultReason(err), clientUUID, configEtag)
if eventErr != nil {
util.Warnf("Failed to queue variable defaulted event: %s", eventErr)
}
Expand All @@ -224,16 +225,17 @@ func VariableForUser(sdkKey string, user api.PopulatedUser, variableKey string,

if !isVariableTypeValid(variableType, expectedVariableType) && expectedVariableType != "" {
err = ErrInvalidVariableType
eventErr := eventQueue.QueueVariableDefaultedEvent(variableKey, BucketResultErrorToDefaultReason(err))
eventErr := eventQueue.QueueVariableDefaultedEvent(variableKey, BucketResultErrorToDefaultReason(err), clientUUID, configEtag)
if eventErr != nil {
util.Warnf("Failed to queue variable defaulted event: %s", eventErr)
}
return "", nil, err
}

eventErr := eventQueue.QueueVariableEvaluatedEvent(variableKey, featureId, variationId)
eventErr := eventQueue.QueueVariableEvaluatedEvent(variableKey, featureId, variationId, clientUUID, configEtag)
if eventErr != nil {
util.Warnf("Failed to queue variable evaluated event: %s", eventErr)

}

return
Expand All @@ -252,37 +254,37 @@ func isVariableTypeValid(variableType string, expectedVariableType string) bool
return true
}

func generateBucketedVariableForUser(sdkKey string, user api.PopulatedUser, key string, clientCustomData map[string]interface{}) (variableType string, variableValue any, featureId string, variationId string, err error) {
func generateBucketedVariableForUser(sdkKey string, user api.PopulatedUser, key string, clientCustomData map[string]interface{}) (variableType string, variableValue any, featureId string, variationId string, clientUUID string, configEtag string, err error) {
config, err := getConfig(sdkKey)
if err != nil {
util.Warnf("Variable called before client initialized, returning default value")
return "", nil, "", "", ErrConfigMissing
return "", nil, "", "",clientCustomData["clientUUID"].(string), "", ErrConfigMissing
}
variable := config.GetVariableForKey(key)
if variable == nil {
err = ErrMissingVariable
return "", nil, "", "", err
return "", nil, "", "",clientCustomData["clientUUID"].(string), config.eTag, err
}
featForVariable := config.GetFeatureForVariableId(variable.Id)
if featForVariable == nil {
err = ErrMissingFeature
return "", nil, "", "", err
return "", nil, "", "", clientCustomData["clientUUID"].(string), config.eTag,err
}

th, err := doesUserQualifyForFeature(config, featForVariable, user, clientCustomData)
if err != nil {
return "", nil, "", "", err
return "", nil, "", "", clientCustomData["clientUUID"].(string), config.eTag,err
}
variation, err := bucketUserForVariation(featForVariable, th)
if err != nil {
return "", nil, "", "", err
return "", nil, "", "", clientCustomData["clientUUID"].(string), config.eTag, err
}
variationVariable := variation.GetVariableById(variable.Id)
if variationVariable == nil {
err = ErrMissingVariableForVariation
return "", nil, "", "", err
return "", nil, "", "",clientCustomData["clientUUID"].(string), config.eTag, err
}
return variable.Type, variationVariable.Value, featForVariable.Id, variation.Id, nil
return variable.Type, variationVariable.Value, featForVariable.Id, variation.Id, clientCustomData["clientUUID"].(string), config.eTag, nil
}

func BucketResultErrorToDefaultReason(err error) (defaultReason string) {
Expand Down
11 changes: 7 additions & 4 deletions bucketing/bucketing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,9 @@ func TestClientData(t *testing.T) {
// Ensure bucketed config has a feature variation map that's empty
bucketedUserConfig, err := GenerateBucketedConfig("test", user, nil)
require.NoError(t, err)
_, _, _, _, err = generateBucketedVariableForUser("test", user, "num-var", nil)
SetClientCustomData("test",map[string]interface{}{"clientUUID": "clientuuid"})

_, _, _, _,_,_, err = generateBucketedVariableForUser("test", user, "num-var", map[string]interface{}{"clientUUID": "clientuuid"})
require.ErrorContainsf(t, err, "does not qualify", "does not qualify")
require.Equal(t, map[string]string{}, bucketedUserConfig.FeatureVariationMap)

Expand All @@ -481,6 +483,7 @@ func TestClientData(t *testing.T) {
clientCustomData := map[string]interface{}{
"favouriteFood": "NOT PIZZA!!",
"favouriteDrink": "coffee",
"clientUUID":"clientuuid",
}

bucketedUserConfig, err = GenerateBucketedConfig("test", user, clientCustomData)
Expand All @@ -489,7 +492,7 @@ func TestClientData(t *testing.T) {
"614ef6aa473928459060721a": "615357cf7e9ebdca58446ed0",
"614ef6aa475928459060721a": "615382338424cb11646d7667",
}, bucketedUserConfig.FeatureVariationMap)
variableType, value, featureId, variationId, err := generateBucketedVariableForUser("test", user, "num-var", clientCustomData)
variableType, value, featureId, variationId, _, _, err := generateBucketedVariableForUser("test", user, "num-var", clientCustomData)
require.Equal(t, VariableTypesNumber, variableType)
require.Equal(t, "614ef6aa473928459060721a", featureId)
require.Equal(t, "615357cf7e9ebdca58446ed0", variationId)
Expand All @@ -512,7 +515,7 @@ func TestClientData(t *testing.T) {
"614ef6aa473928459060721a": "615357cf7e9ebdca58446ed0",
"614ef6aa475928459060721a": "615382338424cb11646d7667",
}, bucketedUserConfig.FeatureVariationMap)
variableType, value, featureId, variationId, err = generateBucketedVariableForUser("test", userWithPrivateCustomData, "num-var", clientCustomData)
variableType, value, featureId, variationId, _,_, err = generateBucketedVariableForUser("test", userWithPrivateCustomData, "num-var", clientCustomData)
require.Equal(t, VariableTypesNumber, variableType)
require.Equal(t, "614ef6aa473928459060721a", featureId)
require.Equal(t, "615357cf7e9ebdca58446ed0", variationId)
Expand Down Expand Up @@ -573,7 +576,7 @@ func TestVariableForUser(t *testing.T) {
err := SetConfig(test_config, "test", "")
require.NoError(t, err)

variableType, value, featureId, variationId, err := generateBucketedVariableForUser("test", user, "json-var", nil)
variableType, value, featureId, variationId, _, _, err := generateBucketedVariableForUser("test", user, "json-var", map[string]interface{}{"clientUUID": "clientuuid"})
require.NoError(t, err)
require.Equal(t, VariableTypesJSON, variableType)
require.Equal(t, "614ef6aa473928459060721a", featureId)
Expand Down
2 changes: 1 addition & 1 deletion bucketing/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func GetEtag(sdkKey string) string {
if err != nil {
return ""
}
return config.etag
return config.eTag
}

func GetRawConfig(sdkKey string) []byte {
Expand Down
8 changes: 8 additions & 0 deletions bucketing/datamanager_platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@ func GetClientCustomData(sdkKey string) map[string]interface{} {
func SetClientCustomData(sdkKey string, data map[string]interface{}) {
clientCustomData[sdkKey] = data
}

func GetConfigEtag(sdkKey string) string {
config, err := getConfig(sdkKey)
if err != nil {
return ""
}
return config.eTag
}
28 changes: 20 additions & 8 deletions bucketing/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type aggEventData struct {
featureId string
variationId string
defaultReason string
metaData map[string]interface{}

}

type userEventData struct {
Expand Down Expand Up @@ -53,7 +55,7 @@ func (u *UserEventQueue) BuildBatchRecords() []api.UserEventsBatchRecord {
return records
}

func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData) api.UserEventsBatchRecord {
func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData, clientUUID string, configEtag string) api.UserEventsBatchRecord {
var aggregateEvents []api.Event
userId, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -82,6 +84,8 @@ func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData
"_feature": feature,
}
}
metaData["clientUUID"] = clientUUID
metaData["configEtag"] = configEtag

event := api.Event{
Type_: _type,
Expand Down Expand Up @@ -184,7 +188,8 @@ func (eq *EventQueue) MergeAggEventQueueKeys(config *configBody) {
}
}

func (eq *EventQueue) queueAggregateEventInternal(variableKey, featureId, variationId, eventType string, defaultReason string) error {

func (eq *EventQueue) queueAggregateEventInternal(variableKey, featureId, variationId, eventType string, defaultReason string, clientUUID string, configEtag string) error {
if eq.options != nil && eq.options.IsEventLoggingDisabled(eventType) {
return nil
}
Expand All @@ -200,6 +205,10 @@ func (eq *EventQueue) queueAggregateEventInternal(variableKey, featureId, variat
featureId: featureId,
variationId: variationId,
defaultReason: defaultReason,
metaData: map[string]interface{}{
"clientUUID": clientUUID,
"configEtag": configEtag,
},
}:
default:
eq.eventsDropped.Add(1)
Expand All @@ -224,30 +233,32 @@ func (eq *EventQueue) QueueEvent(user api.User, event api.Event) error {
return nil
}

func (eq *EventQueue) QueueVariableEvaluatedEvent(variableKey, featureId, variationId string) error {
func (eq *EventQueue) QueueVariableEvaluatedEvent(variableKey, featureId, variationId string, clientUUID string, configEtag string) error {

if eq.options.DisableAutomaticEventLogging {
return nil
}

return eq.queueAggregateEventInternal(variableKey, featureId, variationId, api.EventType_AggVariableEvaluated, "")
return eq.queueAggregateEventInternal(variableKey, featureId, variationId, api.EventType_AggVariableEvaluated, "", clientUUID , configEtag )
}

func (eq *EventQueue) QueueVariableDefaultedEvent(variableKey, defaultReason string) error {
func (eq *EventQueue) QueueVariableDefaultedEvent(variableKey, defaultReason string, clientUUID string, configEtag string) error {
if eq.options.DisableAutomaticEventLogging {
return nil
}

return eq.queueAggregateEventInternal(variableKey, "", "", api.EventType_AggVariableDefaulted, defaultReason)
return eq.queueAggregateEventInternal(variableKey, "", "", api.EventType_AggVariableDefaulted, defaultReason, clientUUID, configEtag)
}

func (eq *EventQueue) FlushEventQueue() (map[string]api.FlushPayload, error) {
func (eq *EventQueue) FlushEventQueue(clientUUID string, configEtag string) (map[string]api.FlushPayload, error) {
eq.stateMutex.Lock()
defer eq.stateMutex.Unlock()

var records []api.UserEventsBatchRecord

records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData))
records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData, clientUUID, configEtag))
records = append(records, eq.userEventQueue.BuildBatchRecords()...)
fmt.Println("****Records", records)
eq.aggEventQueue = make(AggregateEventQueue)
eq.userEventQueue = make(UserEventQueue)
eq.userEventQueueCount = 0
Expand All @@ -269,6 +280,7 @@ func (eq *EventQueue) FlushEventQueue() (map[string]api.FlushPayload, error) {
}
payload.AddBatchRecordForUser(record, eq.options.EventRequestChunkSize)
payload.EventCount = len(payload.Records)
fmt.Println("Payload", payload)
if payload.EventCount == 0 {
continue
}
Expand Down
10 changes: 8 additions & 2 deletions bucketing/event_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func TestEventQueue_ProcessUserEvent(t *testing.T) {
},
}
err := SetConfig(test_config, "dvc_server_token_hash", "")
SetClientCustomData("dvc_server_token_hash", map[string]interface{}{
"clientUUID": "clientuuid",
})
require.NoError(t, err)
eq, err := NewEventQueue("dvc_server_token_hash", &api.EventQueueOptions{}, (&api.PlatformData{}).Default())
require.NoError(t, err)
Expand Down Expand Up @@ -129,7 +132,7 @@ func TestEventQueue_AddToAggQueue(t *testing.T) {
require.NoError(t, err)
eq, err := NewEventQueue("dvc_server_token_hash", &api.EventQueueOptions{FlushEventsInterval: time.Hour}, (&api.PlatformData{}).Default())
require.NoError(t, err)
err = eq.QueueVariableEvaluatedEvent("somevariablekey", "featureId", "variationId")
err = eq.QueueVariableEvaluatedEvent("somevariablekey", "featureId", "variationId","","")
require.NoError(t, err)
require.Eventually(t, func() bool { return eq.aggQueueLength() == 1 }, 10*time.Second, time.Millisecond)
}
Expand Down Expand Up @@ -174,6 +177,9 @@ func TestEventQueue_QueueAndFlush(t *testing.T) {
}
err := SetConfig(test_config, "dvc_server_token_hash", "")
require.NoError(t, err)
SetClientCustomData("dvc_server_token_hash", map[string]interface{}{
"clientUUID": "clientuuid",
})
eq, err := NewEventQueue("dvc_server_token_hash", &api.EventQueueOptions{
FlushEventsInterval: time.Hour,
}, api.PlatformData{}.Default())
Expand All @@ -197,7 +203,7 @@ func TestEventQueue_QueueAndFlush(t *testing.T) {
require.Equal(t, 2, len(eq.userEventQueue))
require.Equal(t, 0, len(eq.userEventQueueRaw))

payloads, err := eq.FlushEventQueue()
payloads, err := eq.FlushEventQueue("","" )
require.NoError(t, err)
require.Equal(t, 2, len(payloads))
require.Equal(t, 0, len(eq.userEventQueue))
Expand Down
4 changes: 2 additions & 2 deletions bucketing/model_config_body.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type configBody struct {
Environment api.Environment `json:"environment" validate:"required"`
Features []*ConfigFeature `json:"features" validate:"required"`
Variables []*Variable `json:"variables" validate:"required,dive"`
etag string // TODO: remove etag
eTag string // TODO: remove etag
variableIdMap map[string]*Variable
variableKeyMap map[string]*Variable
variableIdToFeatureMap map[string]*ConfigFeature
Expand Down Expand Up @@ -95,7 +95,7 @@ func (c *configBody) compile(etag string) {
c.variableIdToFeatureMap = variableIdToFeatureMap
c.variableIdMap = variableIdMap
c.variableKeyMap = variableKeyMap
c.etag = etag
c.eTag = etag

// Sort the feature distributions by "_variation" attribute in descending alphabetical order
for _, feature := range c.Features {
Expand Down
12 changes: 12 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"strings"
"time"

"github.com/google/uuid"

"github.com/devcyclehq/go-server-sdk/v2/util"

"github.com/devcyclehq/go-server-sdk/v2/api"
Expand Down Expand Up @@ -55,6 +57,7 @@ type Client struct {
// Set to true when the client has been initialized, regardless of whether the config has loaded successfully.
isInitialized bool
internalOnInitializedChannel chan bool
clientUUID string
}

type LocalBucketing interface {
Expand Down Expand Up @@ -151,6 +154,8 @@ func (c *Client) IsLocalBucketing() bool {

func (c *Client) handleInitialization() {
c.isInitialized = true
c.clientUUID = uuid.NewString()
c.SetClientCustomData(map[string]interface{}{"clientUUID": c.clientUUID})

Check failure on line 158 in client.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `c.SetClientCustomData` is not checked (errcheck)

Check failure on line 158 in client.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `c.SetClientCustomData` is not checked (errcheck)
if c.DevCycleOptions.OnInitializedChannel != nil {
go func() {
c.DevCycleOptions.OnInitializedChannel <- true
Expand Down Expand Up @@ -467,8 +472,14 @@ func (c *Client) Track(user User, event Event) (bool, error) {
return false, errors.New("event type is required")
}

if(event.MetaData == nil) {
event.MetaData = make(map[string]interface{})
}
event.MetaData["clientUUID"] = c.clientUUID

if c.IsLocalBucketing() {
if c.hasConfig() {
event.MetaData["configEtag"] = c.configManager.GetETag()
err := c.eventQueue.QueueEvent(user, event)
if err != nil {
util.Errorf("Error queuing event: %v", err)
Expand Down Expand Up @@ -533,6 +544,7 @@ func (c *Client) FlushEvents() error {
}

func (c *Client) SetClientCustomData(customData map[string]interface{}) error {
customData["clientUUID"] = c.clientUUID
if c.IsLocalBucketing() {
if c.isInitialized {
return c.localBucketing.SetClientCustomData(customData)
Expand Down
10 changes: 6 additions & 4 deletions client_native_bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (n *NativeLocalBucketing) Variable(user User, variableKey string, variableT
}
clientCustomData := bucketing.GetClientCustomData(n.sdkKey)
populatedUser := user.GetPopulatedUserWithTime(n.platformData, DEFAULT_USER_TIME)

resultVariableType, resultValue, err := bucketing.VariableForUser(n.sdkKey, populatedUser, variableKey, variableType, n.eventQueue, clientCustomData)
if err != nil {
return defaultVar, nil
Expand All @@ -119,16 +119,18 @@ func (n *NativeLocalBucketing) QueueEvent(user User, event Event) error {
return n.eventQueue.QueueEvent(user, event)
}

func (n *NativeLocalBucketing) QueueVariableDefaulted(variableKey, defaultReason string) error {
return n.eventQueue.QueueVariableDefaultedEvent(variableKey, defaultReason)
func (n *NativeLocalBucketing) QueueVariableDefaulted(variableKey, defaultReason string, clientUUID string, configEtag string) error {
return n.eventQueue.QueueVariableDefaultedEvent(variableKey, defaultReason, clientUUID, configEtag)
}

func (n *NativeLocalBucketing) UserQueueLength() (int, error) {
return n.eventQueue.UserQueueLength(), nil
}

func (n *NativeLocalBucketing) FlushEventQueue(callback EventFlushCallback) error {
payloads, err := n.eventQueue.FlushEventQueue()
clientCustomData := bucketing.GetClientCustomData(n.sdkKey)
configEtag := bucketing.GetConfigEtag(n.sdkKey)
payloads, err := n.eventQueue.FlushEventQueue(clientCustomData["clientUUID"].(string), configEtag)
if err != nil {
return fmt.Errorf("Error flushing event queue: %w", err)
}
Expand Down
Loading

0 comments on commit 7953ea9

Please sign in to comment.