Skip to content

Commit

Permalink
chore: add client date
Browse files Browse the repository at this point in the history
  • Loading branch information
suthar26 committed Apr 19, 2024
1 parent 6d84704 commit 316a719
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 39 deletions.
1 change: 1 addition & 0 deletions api/model_bucketed_user_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type BucketedUserConfig struct {
VariableVariationMap map[string]FeatureVariation `json:"variableVariationMap"`
Variables map[string]ReadOnlyVariable `json:"variables"`
KnownVariableKeys []float64 `json:"knownVariableKeys"`
ETag string `json:"eTag"`

User *User `json:"-"`
}
26 changes: 14 additions & 12 deletions bucketing/bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,14 @@ func GenerateBucketedConfig(sdkKey string, user api.PopulatedUser, clientCustomD
FeatureVariationMap: featureVariationMap,
VariableVariationMap: variableVariationMap,
Variables: variableMap,
ETag: config.eTag,
}, nil
}

func VariableForUser(sdkKey string, user api.PopulatedUser, variableKey string, expectedVariableType string, eventQueue *EventQueue, clientCustomData map[string]interface{}) (variableType string, variableValue any, err error) {
variableType, variableValue, featureId, variationId, err := generateBucketedVariableForUser(sdkKey, user, variableKey, clientCustomData)
variableType, variableValue, featureId, variationId, clientUUID, configEtag, err := generateBucketedVariableForUser(sdkKey, user, variableKey, clientCustomData)
if err != nil {
eventErr := eventQueue.QueueVariableDefaultedEvent(variableKey, BucketResultErrorToDefaultReason(err))
eventErr := eventQueue.QueueVariableDefaultedEvent(variableKey, BucketResultErrorToDefaultReason(err), clientUUID, configEtag)
if eventErr != nil {
util.Warnf("Failed to queue variable defaulted event: %s", eventErr)
}
Expand All @@ -224,16 +225,17 @@ func VariableForUser(sdkKey string, user api.PopulatedUser, variableKey string,

if !isVariableTypeValid(variableType, expectedVariableType) && expectedVariableType != "" {
err = ErrInvalidVariableType
eventErr := eventQueue.QueueVariableDefaultedEvent(variableKey, BucketResultErrorToDefaultReason(err))
eventErr := eventQueue.QueueVariableDefaultedEvent(variableKey, BucketResultErrorToDefaultReason(err), clientUUID, configEtag)
if eventErr != nil {
util.Warnf("Failed to queue variable defaulted event: %s", eventErr)
}
return "", nil, err
}

eventErr := eventQueue.QueueVariableEvaluatedEvent(variableKey, featureId, variationId)
eventErr := eventQueue.QueueVariableEvaluatedEvent(variableKey, featureId, variationId, clientUUID, configEtag)
if eventErr != nil {
util.Warnf("Failed to queue variable evaluated event: %s", eventErr)

}

return
Expand All @@ -252,37 +254,37 @@ func isVariableTypeValid(variableType string, expectedVariableType string) bool
return true
}

func generateBucketedVariableForUser(sdkKey string, user api.PopulatedUser, key string, clientCustomData map[string]interface{}) (variableType string, variableValue any, featureId string, variationId string, err error) {
func generateBucketedVariableForUser(sdkKey string, user api.PopulatedUser, key string, clientCustomData map[string]interface{}) (variableType string, variableValue any, featureId string, variationId string, clientUUID string, configEtag string, err error) {
config, err := getConfig(sdkKey)
if err != nil {
util.Warnf("Variable called before client initialized, returning default value")
return "", nil, "", "", ErrConfigMissing
return "", nil, "", "",clientCustomData["clientUUID"].(string), "", ErrConfigMissing
}
variable := config.GetVariableForKey(key)
if variable == nil {
err = ErrMissingVariable
return "", nil, "", "", err
return "", nil, "", "",clientCustomData["clientUUID"].(string), config.eTag, err
}
featForVariable := config.GetFeatureForVariableId(variable.Id)
if featForVariable == nil {
err = ErrMissingFeature
return "", nil, "", "", err
return "", nil, "", "", clientCustomData["clientUUID"].(string), config.eTag,err
}

th, err := doesUserQualifyForFeature(config, featForVariable, user, clientCustomData)
if err != nil {
return "", nil, "", "", err
return "", nil, "", "", clientCustomData["clientUUID"].(string), config.eTag,err
}
variation, err := bucketUserForVariation(featForVariable, th)
if err != nil {
return "", nil, "", "", err
return "", nil, "", "", clientCustomData["clientUUID"].(string), config.eTag, err
}
variationVariable := variation.GetVariableById(variable.Id)
if variationVariable == nil {
err = ErrMissingVariableForVariation
return "", nil, "", "", err
return "", nil, "", "",clientCustomData["clientUUID"].(string), config.eTag, err
}
return variable.Type, variationVariable.Value, featForVariable.Id, variation.Id, nil
return variable.Type, variationVariable.Value, featForVariable.Id, variation.Id, clientCustomData["clientUUID"].(string), config.eTag, nil
}

func BucketResultErrorToDefaultReason(err error) (defaultReason string) {
Expand Down
8 changes: 8 additions & 0 deletions bucketing/datamanager_platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@ func GetClientCustomData(sdkKey string) map[string]interface{} {
func SetClientCustomData(sdkKey string, data map[string]interface{}) {
clientCustomData[sdkKey] = data
}

func GetConfigEtag(sdkKey string) string {
config, err := getConfig(sdkKey)
if err != nil {
return ""
}
return config.eTag
}
32 changes: 24 additions & 8 deletions bucketing/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type aggEventData struct {
featureId string
variationId string
defaultReason string
metaData map[string]interface{}

}

type userEventData struct {
Expand Down Expand Up @@ -53,7 +55,7 @@ func (u *UserEventQueue) BuildBatchRecords() []api.UserEventsBatchRecord {
return records
}

func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData) api.UserEventsBatchRecord {
func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData, clientUUID string, configEtag string) api.UserEventsBatchRecord {
var aggregateEvents []api.Event
userId, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -82,6 +84,8 @@ func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData
"_feature": feature,
}
}
metaData["clientUUID"] = clientUUID
metaData["configEtag"] = configEtag

event := api.Event{
Type_: _type,
Expand Down Expand Up @@ -184,7 +188,8 @@ func (eq *EventQueue) MergeAggEventQueueKeys(config *configBody) {
}
}

func (eq *EventQueue) queueAggregateEventInternal(variableKey, featureId, variationId, eventType string, defaultReason string) error {

func (eq *EventQueue) queueAggregateEventInternal(variableKey, featureId, variationId, eventType string, defaultReason string, clientUUID string, configEtag string) error {
if eq.options != nil && eq.options.IsEventLoggingDisabled(eventType) {
return nil
}
Expand All @@ -200,6 +205,10 @@ func (eq *EventQueue) queueAggregateEventInternal(variableKey, featureId, variat
featureId: featureId,
variationId: variationId,
defaultReason: defaultReason,
metaData: map[string]interface{}{
"clientUUID": clientUUID,
"configEtag": configEtag,
},
}:
default:
eq.eventsDropped.Add(1)
Expand All @@ -224,30 +233,32 @@ func (eq *EventQueue) QueueEvent(user api.User, event api.Event) error {
return nil
}

func (eq *EventQueue) QueueVariableEvaluatedEvent(variableKey, featureId, variationId string) error {
func (eq *EventQueue) QueueVariableEvaluatedEvent(variableKey, featureId, variationId string, clientUUID string, configEtag string) error {

if eq.options.DisableAutomaticEventLogging {
return nil
}

return eq.queueAggregateEventInternal(variableKey, featureId, variationId, api.EventType_AggVariableEvaluated, "")
return eq.queueAggregateEventInternal(variableKey, featureId, variationId, api.EventType_AggVariableEvaluated, "", clientUUID , configEtag )
}

func (eq *EventQueue) QueueVariableDefaultedEvent(variableKey, defaultReason string) error {
func (eq *EventQueue) QueueVariableDefaultedEvent(variableKey, defaultReason string, clientUUID string, configEtag string) error {
if eq.options.DisableAutomaticEventLogging {
return nil
}

return eq.queueAggregateEventInternal(variableKey, "", "", api.EventType_AggVariableDefaulted, defaultReason)
return eq.queueAggregateEventInternal(variableKey, "", "", api.EventType_AggVariableDefaulted, defaultReason, clientUUID, configEtag)
}

func (eq *EventQueue) FlushEventQueue() (map[string]api.FlushPayload, error) {
func (eq *EventQueue) FlushEventQueue(clientUUID string, configEtag string) (map[string]api.FlushPayload, error) {
eq.stateMutex.Lock()
defer eq.stateMutex.Unlock()

var records []api.UserEventsBatchRecord

records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData))
records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData, clientUUID, configEtag))
records = append(records, eq.userEventQueue.BuildBatchRecords()...)
fmt.Println("****Records", records)
eq.aggEventQueue = make(AggregateEventQueue)
eq.userEventQueue = make(UserEventQueue)
eq.userEventQueueCount = 0
Expand All @@ -269,6 +280,7 @@ func (eq *EventQueue) FlushEventQueue() (map[string]api.FlushPayload, error) {
}
payload.AddBatchRecordForUser(record, eq.options.EventRequestChunkSize)
payload.EventCount = len(payload.Records)
fmt.Println("Payload", payload)
if payload.EventCount == 0 {
continue
}
Expand Down Expand Up @@ -377,6 +389,7 @@ func (eq *EventQueue) processEvents(ctx context.Context) {
return
}
case aggEvent := <-eq.aggEventQueueRaw:
fmt.Println("Processing aggregate event", aggEvent)
err := eq.processAggregateEvent(aggEvent)
if err != nil {
return
Expand Down Expand Up @@ -408,6 +421,8 @@ func (eq *EventQueue) processUserEvent(event userEventData) (err error) {
}
event.event.FeatureVars = bucketedConfig.FeatureVariationMap

event.event.MetaData["configEtag"] = bucketedConfig.ETag

switch event.event.Type_ {
case api.EventType_AggVariableDefaulted, api.EventType_VariableDefaulted, api.EventType_AggVariableEvaluated, api.EventType_VariableEvaluated:
break
Expand Down Expand Up @@ -478,5 +493,6 @@ func (eq *EventQueue) processAggregateEvent(event aggEventData) (err error) {
}
variableFeatureVariationAggregationMap[eTarget] = featureVariationAggregationMap
eq.aggEventQueue[eType] = variableFeatureVariationAggregationMap
fmt.Println("AggEventQueue", eq.aggEventQueue)
return nil
}
4 changes: 2 additions & 2 deletions bucketing/model_config_body.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type configBody struct {
Environment api.Environment `json:"environment" validate:"required"`
Features []*ConfigFeature `json:"features" validate:"required"`
Variables []*Variable `json:"variables" validate:"required,dive"`
etag string // TODO: remove etag
eTag string // TODO: remove etag
variableIdMap map[string]*Variable
variableKeyMap map[string]*Variable
variableIdToFeatureMap map[string]*ConfigFeature
Expand Down Expand Up @@ -95,7 +95,7 @@ func (c *configBody) compile(etag string) {
c.variableIdToFeatureMap = variableIdToFeatureMap
c.variableIdMap = variableIdMap
c.variableKeyMap = variableKeyMap
c.etag = etag
c.eTag = etag

// Sort the feature distributions by "_variation" attribute in descending alphabetical order
for _, feature := range c.Features {
Expand Down
12 changes: 12 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"strings"
"time"

"github.com/google/uuid"

"github.com/devcyclehq/go-server-sdk/v2/util"

"github.com/devcyclehq/go-server-sdk/v2/api"
Expand Down Expand Up @@ -55,6 +57,7 @@ type Client struct {
// Set to true when the client has been initialized, regardless of whether the config has loaded successfully.
isInitialized bool
internalOnInitializedChannel chan bool
clientUUID string
}

type LocalBucketing interface {
Expand Down Expand Up @@ -151,6 +154,8 @@ func (c *Client) IsLocalBucketing() bool {

func (c *Client) handleInitialization() {
c.isInitialized = true
c.clientUUID = uuid.NewString()
c.SetClientCustomData(map[string]interface{}{"clientUUID": c.clientUUID})
if c.DevCycleOptions.OnInitializedChannel != nil {
go func() {
c.DevCycleOptions.OnInitializedChannel <- true
Expand Down Expand Up @@ -467,8 +472,14 @@ func (c *Client) Track(user User, event Event) (bool, error) {
return false, errors.New("event type is required")
}

if(event.MetaData == nil) {
event.MetaData = make(map[string]interface{})
}
event.MetaData["clientUUID"] = c.clientUUID

if c.IsLocalBucketing() {
if c.hasConfig() {
event.MetaData["configEtag"] = c.configManager.GetETag()
err := c.eventQueue.QueueEvent(user, event)
if err != nil {
util.Errorf("Error queuing event: %v", err)
Expand Down Expand Up @@ -533,6 +544,7 @@ func (c *Client) FlushEvents() error {
}

func (c *Client) SetClientCustomData(customData map[string]interface{}) error {
customData["ClientId"] = c.clientUUID
if c.IsLocalBucketing() {
if c.isInitialized {
return c.localBucketing.SetClientCustomData(customData)
Expand Down
10 changes: 6 additions & 4 deletions client_native_bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (n *NativeLocalBucketing) Variable(user User, variableKey string, variableT
}
clientCustomData := bucketing.GetClientCustomData(n.sdkKey)
populatedUser := user.GetPopulatedUserWithTime(n.platformData, DEFAULT_USER_TIME)

resultVariableType, resultValue, err := bucketing.VariableForUser(n.sdkKey, populatedUser, variableKey, variableType, n.eventQueue, clientCustomData)
if err != nil {
return defaultVar, nil
Expand All @@ -119,16 +119,18 @@ func (n *NativeLocalBucketing) QueueEvent(user User, event Event) error {
return n.eventQueue.QueueEvent(user, event)
}

func (n *NativeLocalBucketing) QueueVariableDefaulted(variableKey, defaultReason string) error {
return n.eventQueue.QueueVariableDefaultedEvent(variableKey, defaultReason)
func (n *NativeLocalBucketing) QueueVariableDefaulted(variableKey, defaultReason string, clientUUID string, configEtag string) error {
return n.eventQueue.QueueVariableDefaultedEvent(variableKey, defaultReason, clientUUID, configEtag)
}

func (n *NativeLocalBucketing) UserQueueLength() (int, error) {
return n.eventQueue.UserQueueLength(), nil
}

func (n *NativeLocalBucketing) FlushEventQueue(callback EventFlushCallback) error {
payloads, err := n.eventQueue.FlushEventQueue()
clientCustomData := bucketing.GetClientCustomData(n.sdkKey)
configEtag := bucketing.GetConfigEtag(n.sdkKey)
payloads, err := n.eventQueue.FlushEventQueue(clientCustomData["clientUUID"].(string), configEtag)
if err != nil {
return fmt.Errorf("Error flushing event queue: %w", err)
}
Expand Down
17 changes: 15 additions & 2 deletions configmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,29 @@ func (e *EnvironmentConfigManager) fetchConfig(numRetriesRemaining int) (err err

func (e *EnvironmentConfigManager) setConfigFromResponse(response *http.Response) error {
config, err := io.ReadAll(response.Body)

if err != nil {
return err
}

configMap := make(map[string]interface{})
err = json.Unmarshal(config, &configMap)
if err != nil {
return err
}

configMap["eTag"] = response.Header.Get("Etag")
configWithTag, err := json.Marshal(configMap)
if err != nil {
return err
}
// Check
valid := json.Valid(config)
valid := json.Valid(configWithTag)
if !valid {
return fmt.Errorf("invalid JSON data received for config")
}

err = e.setConfig(config, response.Header.Get("ETag"))
err = e.setConfig(configWithTag, response.Header.Get("Etag"))

if err != nil {
return err
Expand Down
9 changes: 5 additions & 4 deletions event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type EventFlushCallback func(payloads map[string]FlushPayload) (*FlushResult, er

type InternalEventQueue interface {
QueueEvent(user User, event Event) error
QueueVariableDefaulted(variableKey, defaultReason string) error
QueueVariableDefaulted(variableKey, defaultReason string, clientUUID string,etag string) error
FlushEventQueue(EventFlushCallback) error
UserQueueLength() (int, error)
Metrics() (int32, int32, int32)
Expand Down Expand Up @@ -102,22 +102,23 @@ func (e *EventManager) QueueEvent(user User, event Event) error {
default:
}
}
fmt.Println("QueueEvent")
fmt.Println(event)
err = e.internalQueue.QueueEvent(user, event)
if err != nil && errors.Is(err, ErrQueueFull) {
return fmt.Errorf("event queue is full, dropping event: %+v", event)
}
return err
}

func (e *EventManager) QueueVariableDefaultedEvent(variableKey string, defaultReason string) error {
return e.internalQueue.QueueVariableDefaulted(variableKey, defaultReason)
func (e *EventManager) QueueVariableDefaultedEvent(variableKey string, defaultReason string, clientUUID string, configEtag string) error {
return e.internalQueue.QueueVariableDefaulted(variableKey, defaultReason, clientUUID, configEtag)
}

func (e *EventManager) FlushEvents() (err error) {
e.flushMutex.Lock()
defer e.flushMutex.Unlock()

util.Debugf("Started flushing events")

defer func() {
if r := recover(); r != nil {
Expand Down
2 changes: 1 addition & 1 deletion example/cloud/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func main() {
if variableKey == "" {
log.Fatal("DEVCYCLE_VARIABLE_KEY env var not set: set it to a variable key")
}
user := devcycle.User{UserId: "test"}
user := devcycle.User{UserId: "suthar-test-user"}
dvcOptions := devcycle.Options{
EnableEdgeDB: false,
EnableCloudBucketing: true,
Expand Down
Loading

0 comments on commit 316a719

Please sign in to comment.