Skip to content

Commit

Permalink
feat(realtime): Add realtime support
Browse files Browse the repository at this point in the history
  • Loading branch information
gagantrivedi committed Dec 11, 2024
1 parent dacb607 commit 89b3cea
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 4 deletions.
9 changes: 5 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,16 @@ func NewClient(apiKey string, options ...Option) *Client {
if !strings.HasPrefix(apiKey, "ser.") {
panic("In order to use local evaluation, please generate a server key in the environment settings page.")
}

go c.pollEnvironment(c.ctxLocalEval)
if c.config.useRealtime {
go c.startRealtimeUpdates(c.ctxLocalEval)
} else {
go c.pollEnvironment(c.ctxLocalEval)
}
}
// Initialize analytics processor
if c.config.enableAnalytics {
c.analyticsProcessor = NewAnalyticsProcessor(c.ctxAnalytics, c.client, c.config.baseURL, nil, c.log)
}

return c
}

Expand Down Expand Up @@ -331,7 +333,6 @@ func (c *Client) pollEnvironment(ctx context.Context) {
}
}
}

func (c *Client) UpdateEnvironment(ctx context.Context) error {
var env environments.EnvironmentModel
resp, err := c.client.NewRequest().
Expand Down
94 changes: 94 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func TestGetFlags(t *testing.T) {
assert.Equal(t, fixtures.Feature1Name, allFlags[0].FeatureName)
assert.Equal(t, fixtures.Feature1ID, allFlags[0].FeatureID)
assert.Equal(t, fixtures.Feature1Value, allFlags[0].Value)

}

func TestGetFlagsTransientIdentity(t *testing.T) {
Expand Down Expand Up @@ -861,3 +862,96 @@ func TestPollErrorHandlerIsUsedWhenPollFails(t *testing.T) {
assert.Equal(t, statusCode, 500)
assert.Equal(t, status, "500 Internal Server Error")
}

func TestRealtime(t *testing.T) {
// Given
requestCount := 0
mux := http.NewServeMux()
mux.HandleFunc("/api/v1/environment-document/", func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "GET", req.Method)
fmt.Println(req.URL.Path)
assert.Equal(t, fixtures.EnvironmentAPIKey, req.Header.Get("X-Environment-Key"))

requestCount += 1

rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
_, err := io.WriteString(rw, fixtures.EnvironmentJson)
if err != nil {
panic(err)
}
assert.NoError(t, err)
})
mux.HandleFunc(fmt.Sprintf("/sse/environments/%s/stream", fixtures.ClientAPIKey), func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "GET", req.Method)

// Set the necessary headers for SSE
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")

// Flush headers to the client
flusher, _ := rw.(http.Flusher)
flusher.Flush()

// Use an `updated_at` value that is older than the `updated_at` set on the environment document
// to ensure an older timestamp does not trigger an update.
sendUpdatedAtSSEEvent(rw, flusher, 1640995200.079725)
time.Sleep(10 * time.Millisecond)

// Update the `updated_at`(to trigger the environment update)
sendUpdatedAtSSEEvent(rw, flusher, 1733480514.079725)
time.Sleep(10 * time.Millisecond)
})

ctx := context.Background()

server := httptest.NewServer(mux)
defer server.Close()

// When
client := flagsmith.NewClient(fixtures.EnvironmentAPIKey,
flagsmith.WithBaseURL(server.URL+"/api/v1/"),
flagsmith.WithLocalEvaluation(ctx),
flagsmith.WithRealtime(),
flagsmith.WithRealtimeBaseURL(server.URL+"/"),
)
// Sleep to ensure that the server has time to update the environment
time.Sleep(10 * time.Millisecond)

flags, err := client.GetFlags(ctx, nil)

// Then
assert.NoError(t, err)

allFlags := flags.AllFlags()

assert.Equal(t, 1, len(allFlags))

assert.Equal(t, fixtures.Feature1Name, allFlags[0].FeatureName)
assert.Equal(t, fixtures.Feature1ID, allFlags[0].FeatureID)
assert.Equal(t, fixtures.Feature1Value, allFlags[0].Value)

// Sleep to ensure that the server has time to update the environment
// (After the second sse event)
time.Sleep(10 * time.Millisecond)

assert.Equal(t, 2, requestCount)
}
func sendUpdatedAtSSEEvent(rw http.ResponseWriter, flusher http.Flusher, updatedAt float64) {
// Format the SSE event with the provided updatedAt value
sseEvent := fmt.Sprintf(`event: environment_updated
data: {"updated_at": %f}
`, updatedAt)

// Write the SSE event to the response
_, err := io.WriteString(rw, sseEvent)
if err != nil {
http.Error(rw, "Failed to send SSE event", http.StatusInternalServerError)
return
}

// Flush the event to the client
flusher.Flush()
}
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
DefaultBaseURL = "https://edge.api.flagsmith.com/api/v1/"

bulkIdentifyMaxCount = 100
DefaultRealtimeBaseUrl = "https://realtime.flagsmith.com/"
)

// config contains all configurable Client settings.
Expand All @@ -23,6 +24,8 @@ type config struct {
envRefreshInterval time.Duration
enableAnalytics bool
offlineMode bool
realtimeBaseUrl string
useRealtime bool
}

// defaultConfig returns default configuration.
Expand All @@ -31,5 +34,6 @@ func defaultConfig() config {
baseURL: DefaultBaseURL,
timeout: DefaultTimeout,
envRefreshInterval: time.Second * 60,
realtimeBaseUrl: DefaultRealtimeBaseUrl,
}
}
2 changes: 2 additions & 0 deletions fixtures/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ const Feature1Name = "feature_1"
const Feature1ID = 1

const Feature1OverriddenValue = "some-overridden-value"
const ClientAPIKey = "B62qaMZNwfiqT76p38ggrQ"

const EnvironmentJson = `
{
"api_key": "B62qaMZNwfiqT76p38ggrQ",
"updated_at": "2023-12-06T10:21:54.079725Z",
"project": {
"name": "Test project",
"organisation": {
Expand Down
2 changes: 2 additions & 0 deletions flagengine/environments/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/features"
"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/identities"
"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/projects"
"time"
)

type EnvironmentModel struct {
Expand All @@ -12,4 +13,5 @@ type EnvironmentModel struct {
Project *projects.ProjectModel `json:"project"`
FeatureStates []*features.FeatureStateModel `json:"feature_states"`
IdentityOverrides []*identities.IdentityModel `json:"identity_overrides"`
UpdatedAt time.Time `json:"updated_at"`
}
22 changes: 22 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flagsmith

import (
"context"
"strings"
"time"
)

Expand All @@ -19,6 +20,8 @@ var _ = []Option{
WithCustomHeaders(nil),
WithDefaultHandler(nil),
WithProxy(""),
WithRealtime(),
WithRealtimeBaseURL(""),
}

func WithBaseURL(url string) Option {
Expand Down Expand Up @@ -124,3 +127,22 @@ func WithErrorHandler(handler func(handler *FlagsmithAPIError)) Option {
c.errorHandler = handler
}
}

// WithRealtime returns an Option function that enables real-time updates for the Client.
// NOTE: Before enabling real-time updates, ensure that local evaluation is enabled.
func WithRealtime() Option {
return func(c *Client) {
c.config.useRealtime = true
}
}

// WithRealtimeBaseURL returns an Option function for configuring the real-time base URL of the Client.
func WithRealtimeBaseURL(url string) Option {
return func(c *Client) {
// Ensure the URL ends with a trailing slash
if !strings.HasSuffix(url, "/") {
url += "/"
}
c.config.realtimeBaseUrl = url
}
}
80 changes: 80 additions & 0 deletions realtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package flagsmith

import (
"bufio"
"context"
"encoding/json"
"errors"
"net/http"
"strings"
"time"

"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/environments"
)

func (c *Client) startRealtimeUpdates(ctx context.Context) {
err := c.UpdateEnvironment(ctx)
if err != nil {
panic("Failed to fetch the environment while configuring real-time updates")
}
env, _ := c.environment.Load().(*environments.EnvironmentModel)
stream_url := c.config.realtimeBaseUrl + "sse/environments/" + env.APIKey + "/stream"
envUpdatedAt := env.UpdatedAt
for {
select {
case <-ctx.Done():
return
default:
resp, err := http.Get(stream_url)
if err != nil {
c.log.Errorf("Error connecting to realtime server: %v", err)
}
defer resp.Body.Close()

scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data: ") {
parsedTime, err := parseUpdatedAtFromSSE(line)
if err != nil {
c.log.Errorf("Error reading realtime stream: %v", err)
}
if parsedTime.After(envUpdatedAt) {
err = c.UpdateEnvironment(ctx)
if err != nil {
c.log.Errorf("Failed to update the environment: %v", err)
}
env, _ := c.environment.Load().(*environments.EnvironmentModel)

envUpdatedAt = env.UpdatedAt
}
}
}
if err := scanner.Err(); err != nil {
c.log.Errorf("Error realtime stream: %v", err)
}
}
}
}
func parseUpdatedAtFromSSE(line string) (time.Time, error) {
var eventData struct {
UpdatedAt float64 `json:"updated_at"`
}

data := strings.TrimPrefix(line, "data: ")
err := json.Unmarshal([]byte(data), &eventData)
if err != nil {
return time.Time{}, errors.New("failed to parse event data: " + err.Error())
}

if eventData.UpdatedAt <= 0 {
return time.Time{}, errors.New("invalid 'updated_at' value in event data")
}

// Convert the float timestamp into seconds and nanoseconds
seconds := int64(eventData.UpdatedAt)
nanoseconds := int64((eventData.UpdatedAt - float64(seconds)) * 1e9)

// Return the parsed time
return time.Unix(seconds, nanoseconds), nil
}

0 comments on commit 89b3cea

Please sign in to comment.