Skip to content

Commit

Permalink
feat: Add last-modified to metadata for events. (#253)
Browse files Browse the repository at this point in the history
Add last-modified to metadata for events.
  • Loading branch information
JamieSinn authored Apr 29, 2024
1 parent a62bb31 commit 73f0ce8
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
9 changes: 6 additions & 3 deletions bucketing/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (u *UserEventQueue) BuildBatchRecords() []api.UserEventsBatchRecord {
return records
}

func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData, clientUUID string, configEtag string, rayId string) api.UserEventsBatchRecord {
func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData, clientUUID, configEtag, rayId, lastModified string) api.UserEventsBatchRecord {
var aggregateEvents []api.Event
userId, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -90,6 +90,9 @@ func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData
if rayId != "" {
metaData["configRayId"] = rayId
}
if lastModified != "" {
metaData["lastModified"] = lastModified
}

event := api.Event{
Type_: _type,
Expand Down Expand Up @@ -248,13 +251,13 @@ func (eq *EventQueue) QueueVariableDefaultedEvent(variableKey, defaultReason str
return eq.queueAggregateEventInternal(variableKey, "", "", api.EventType_AggVariableDefaulted, defaultReason)
}

func (eq *EventQueue) FlushEventQueue(clientUUID string, configEtag string, rayId string) (map[string]api.FlushPayload, error) {
func (eq *EventQueue) FlushEventQueue(clientUUID, configEtag, rayId, lastModified string) (map[string]api.FlushPayload, error) {
eq.stateMutex.Lock()
defer eq.stateMutex.Unlock()

var records []api.UserEventsBatchRecord

records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData, clientUUID, configEtag, rayId))
records = append(records, eq.aggEventQueue.BuildBatchRecords(eq.platformData, clientUUID, configEtag, rayId, lastModified))
records = append(records, eq.userEventQueue.BuildBatchRecords()...)
eq.aggEventQueue = make(AggregateEventQueue)
eq.userEventQueue = make(UserEventQueue)
Expand Down
2 changes: 1 addition & 1 deletion bucketing/event_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestEventQueue_QueueAndFlush(t *testing.T) {
require.Equal(t, 2, len(eq.userEventQueue))
require.Equal(t, 0, len(eq.userEventQueueRaw))

payloads, err := eq.FlushEventQueue("", "", "")
payloads, err := eq.FlushEventQueue("", "", "", "")
require.NoError(t, err)
require.Equal(t, 2, len(payloads))
require.Equal(t, 0, len(eq.userEventQueue))
Expand Down
6 changes: 2 additions & 4 deletions client_native_bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, opti

func (n *NativeLocalBucketing) StoreConfig(configJSON []byte, eTag, rayId, lastModified string) error {
oldETag := bucketing.GetEtag(n.sdkKey)
_, err := n.eventQueue.FlushEventQueue(n.clientUUID, oldETag, n.GetRayId())
_, err := n.eventQueue.FlushEventQueue(n.clientUUID, oldETag, n.GetRayId(), n.GetLastModified())
if err != nil {
return fmt.Errorf("Error flushing events for %s: %w", oldETag, err)
}
Expand Down Expand Up @@ -149,9 +149,7 @@ func (n *NativeLocalBucketing) UserQueueLength() (int, error) {
}

func (n *NativeLocalBucketing) FlushEventQueue(callback EventFlushCallback) error {
configEtag := bucketing.GetEtag(n.sdkKey)
rayId := bucketing.GetRayId(n.sdkKey)
payloads, err := n.eventQueue.FlushEventQueue(n.clientUUID, configEtag, rayId)
payloads, err := n.eventQueue.FlushEventQueue(n.clientUUID, n.GetETag(), n.GetRayId(), n.GetLastModified())
if err != nil {
return fmt.Errorf("Error flushing event queue: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions example/local/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {

client, err := devcycle.NewClient(sdkKey, &dvcOptions)
time.Sleep(10 * time.Second)
if(err != nil) {
if err != nil {
log.Fatalf("Error initializing client: %v", err)
}
fmt.Println(client.GetRawConfig())
Expand Down Expand Up @@ -82,7 +82,7 @@ func main() {
}

err = client.FlushEvents()
if(err != nil) {
if err != nil {
log.Fatalf("Error flushing events: %v", err)
}

Expand Down

0 comments on commit 73f0ce8

Please sign in to comment.