Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add event fields #238

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions bucketing/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (u *UserEventQueue) BuildBatchRecords() []api.UserEventsBatchRecord {
return records
}

func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData) api.UserEventsBatchRecord {
func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData, clientUUID string, configEtag string) api.UserEventsBatchRecord {
var aggregateEvents []api.Event
userId, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -82,6 +82,11 @@ func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData
"_feature": feature,
}
}

metaData["clientUUID"] = clientUUID
if configEtag != "" {
metaData["configEtag"] = configEtag
}

event := api.Event{
Type_: _type,
Expand Down Expand Up @@ -240,13 +245,13 @@ func (eq *EventQueue) QueueVariableDefaultedEvent(variableKey, defaultReason str
return eq.queueAggregateEventInternal(variableKey, "", "", api.EventType_AggVariableDefaulted, defaultReason)
}

func (eq *EventQueue) FlushEventQueue() (map[string]api.FlushPayload, error) {
func (eq *EventQueue) FlushEventQueue(clientUUID string, configEtag string) (map[string]api.FlushPayload, error) {
suthar26 marked this conversation as resolved.
Show resolved Hide resolved
eq.stateMutex.Lock()
defer eq.stateMutex.Unlock()

var records []api.UserEventsBatchRecord

records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData))
records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData, clientUUID, configEtag))
records = append(records, eq.userEventQueue.BuildBatchRecords()...)
eq.aggEventQueue = make(AggregateEventQueue)
eq.userEventQueue = make(UserEventQueue)
Expand Down
2 changes: 1 addition & 1 deletion bucketing/event_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestEventQueue_QueueAndFlush(t *testing.T) {
require.Equal(t, 2, len(eq.userEventQueue))
require.Equal(t, 0, len(eq.userEventQueueRaw))

payloads, err := eq.FlushEventQueue()
payloads, err := eq.FlushEventQueue("","" )
require.NoError(t, err)
require.Equal(t, 2, len(payloads))
require.Equal(t, 0, len(eq.userEventQueue))
Expand Down
2 changes: 1 addition & 1 deletion bucketing/model_config_body.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type configBody struct {
Environment api.Environment `json:"environment" validate:"required"`
Features []*ConfigFeature `json:"features" validate:"required"`
Variables []*Variable `json:"variables" validate:"required,dive"`
etag string // TODO: remove etag
etag string
variableIdMap map[string]*Variable
variableKeyMap map[string]*Variable
variableIdToFeatureMap map[string]*ConfigFeature
Expand Down
5 changes: 5 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type LocalBucketing interface {
InternalEventQueue
GenerateBucketedConfigForUser(user User) (ret *BucketedUserConfig, err error)
SetClientCustomData(map[string]interface{}) error
GetClientUUID() string
Variable(user User, key string, variableType string) (variable Variable, err error)
Close()
}
Expand Down Expand Up @@ -151,6 +152,10 @@ func (c *Client) IsLocalBucketing() bool {

func (c *Client) handleInitialization() {
c.isInitialized = true

if(c.IsLocalBucketing()){
util.Infof("Client initialized with local bucketing %v", c.localBucketing.GetClientUUID())
}
if c.DevCycleOptions.OnInitializedChannel != nil {
go func() {
c.DevCycleOptions.OnInitializedChannel <- true
Expand Down
20 changes: 17 additions & 3 deletions client_native_bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/devcyclehq/go-server-sdk/v2/util"
"github.com/google/uuid"

"github.com/devcyclehq/go-server-sdk/v2/api"

Expand Down Expand Up @@ -33,9 +34,12 @@ type NativeLocalBucketing struct {
configMutex sync.RWMutex
platformData *api.PlatformData
eventQueue *bucketing.EventQueue
clientUUID string
}

func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, options *Options) (*NativeLocalBucketing, error) {
clientUUID := uuid.New().String()

eq, err := bucketing.NewEventQueue(sdkKey, options.eventQueueOptions(), platformData)
if err != nil {
return nil, err
Expand All @@ -45,11 +49,17 @@ func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, opti
options: options,
platformData: platformData,
eventQueue: eq,
clientUUID: clientUUID,
}, err
}

func (n *NativeLocalBucketing) StoreConfig(configJSON []byte, eTag string) error {
err := bucketing.SetConfig(configJSON, n.sdkKey, eTag, n.eventQueue)
oldETag := bucketing.GetEtag(n.sdkKey)
_,err := n.eventQueue.FlushEventQueue(n.clientUUID, oldETag)
if err != nil {
return fmt.Errorf("Error flushing events for %s: %w", oldETag, err)
}
err = bucketing.SetConfig(configJSON, n.sdkKey, eTag, n.eventQueue)
if err != nil {
return fmt.Errorf("Error parsing config: %w", err)
}
Expand All @@ -68,6 +78,10 @@ func (n *NativeLocalBucketing) HasConfig() bool {
return bucketing.HasConfig(n.sdkKey)
}

func (n *NativeLocalBucketing) GetClientUUID() (string) {
return n.clientUUID
}

func (n *NativeLocalBucketing) GenerateBucketedConfigForUser(user User) (ret *BucketedUserConfig, err error) {
populatedUser := user.GetPopulatedUserWithTime(n.platformData, DEFAULT_USER_TIME)
clientCustomData := bucketing.GetClientCustomData(n.sdkKey)
Expand All @@ -92,7 +106,6 @@ func (n *NativeLocalBucketing) Variable(user User, variableKey string, variableT
}
clientCustomData := bucketing.GetClientCustomData(n.sdkKey)
populatedUser := user.GetPopulatedUserWithTime(n.platformData, DEFAULT_USER_TIME)

resultVariableType, resultValue, err := bucketing.VariableForUser(n.sdkKey, populatedUser, variableKey, variableType, n.eventQueue, clientCustomData)
if err != nil {
return defaultVar, nil
Expand Down Expand Up @@ -128,7 +141,8 @@ func (n *NativeLocalBucketing) UserQueueLength() (int, error) {
}

func (n *NativeLocalBucketing) FlushEventQueue(callback EventFlushCallback) error {
payloads, err := n.eventQueue.FlushEventQueue()
configEtag := bucketing.GetEtag(n.sdkKey)
payloads, err := n.eventQueue.FlushEventQueue(n.clientUUID, configEtag)
if err != nil {
return fmt.Errorf("Error flushing event queue: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion configmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (e *EnvironmentConfigManager) fetchConfig(numRetriesRemaining int) (err err

func (e *EnvironmentConfigManager) setConfigFromResponse(response *http.Response) error {
config, err := io.ReadAll(response.Body)

if err != nil {
return err
}
Expand All @@ -146,7 +147,7 @@ func (e *EnvironmentConfigManager) setConfigFromResponse(response *http.Response
return fmt.Errorf("invalid JSON data received for config")
}

err = e.setConfig(config, response.Header.Get("ETag"))
err = e.setConfig(config, response.Header.Get("Etag"))

if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (e *EventManager) FlushEvents() (err error) {
defer e.flushMutex.Unlock()

util.Debugf("Started flushing events")

defer func() {
if r := recover(); r != nil {
// get the stack trace and potentially log it here
Expand Down
2 changes: 1 addition & 1 deletion example/cloud/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func main() {
if variableKey == "" {
log.Fatal("DEVCYCLE_VARIABLE_KEY env var not set: set it to a variable key")
}
user := devcycle.User{UserId: "test"}
user := devcycle.User{UserId: "test-user"}
dvcOptions := devcycle.Options{
EnableEdgeDB: false,
EnableCloudBucketing: true,
Expand Down
18 changes: 15 additions & 3 deletions example/local/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,20 @@ func main() {

client, err := devcycle.NewClient(sdkKey, &dvcOptions)
time.Sleep(10 * time.Second)
fmt.Println("Error? ", err)
if(err != nil) {
log.Fatalf("Error initializing client: %v", err)
}
fmt.Println(client.GetRawConfig())
log.Printf("client initialized")

features, _ := client.AllFeatures(user)
for key, feature := range features {
log.Printf("Key:%s, feature:%#v", key, feature)
log.Printf("features Key:%s, feature:%#v", key, feature)
}

variables, _ := client.AllVariables(user)
for key, variable := range variables {
log.Printf("Key:%s, variable:%#v", key, variable)
log.Printf("variables Key:%s, variable:%#v", key, variable)
}

existingVariable, err := client.Variable(user, variableKey, "DEFAULT")
Expand Down Expand Up @@ -78,4 +80,14 @@ func main() {
if err != nil {
log.Fatalf("Error tracking event: %v", err)
}

err = client.FlushEvents()
if(err != nil) {
log.Fatalf("Error flushing events: %v", err)
}

time.Sleep(2 * time.Second)

client.Close()

}
Loading