From 89b3cea1ab3dd01d8c4d1cf88e15700e6da6caa3 Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Wed, 11 Dec 2024 17:30:56 +0530 Subject: [PATCH] feat(realtime): Add realtime support --- client.go | 9 +-- client_test.go | 94 +++++++++++++++++++++++++++++++ config.go | 4 ++ fixtures/fixture.go | 2 + flagengine/environments/models.go | 2 + options.go | 22 ++++++++ realtime.go | 80 ++++++++++++++++++++++++++ 7 files changed, 209 insertions(+), 4 deletions(-) create mode 100644 realtime.go diff --git a/client.go b/client.go index 9bd1e55..613026e 100644 --- a/client.go +++ b/client.go @@ -89,14 +89,16 @@ func NewClient(apiKey string, options ...Option) *Client { if !strings.HasPrefix(apiKey, "ser.") { panic("In order to use local evaluation, please generate a server key in the environment settings page.") } - - go c.pollEnvironment(c.ctxLocalEval) + if c.config.useRealtime { + go c.startRealtimeUpdates(c.ctxLocalEval) + } else { + go c.pollEnvironment(c.ctxLocalEval) + } } // Initialize analytics processor if c.config.enableAnalytics { c.analyticsProcessor = NewAnalyticsProcessor(c.ctxAnalytics, c.client, c.config.baseURL, nil, c.log) } - return c } @@ -331,7 +333,6 @@ func (c *Client) pollEnvironment(ctx context.Context) { } } } - func (c *Client) UpdateEnvironment(ctx context.Context) error { var env environments.EnvironmentModel resp, err := c.client.NewRequest(). diff --git a/client_test.go b/client_test.go index e449031..a1c3b80 100644 --- a/client_test.go +++ b/client_test.go @@ -200,6 +200,7 @@ func TestGetFlags(t *testing.T) { assert.Equal(t, fixtures.Feature1Name, allFlags[0].FeatureName) assert.Equal(t, fixtures.Feature1ID, allFlags[0].FeatureID) assert.Equal(t, fixtures.Feature1Value, allFlags[0].Value) + } func TestGetFlagsTransientIdentity(t *testing.T) { @@ -861,3 +862,96 @@ func TestPollErrorHandlerIsUsedWhenPollFails(t *testing.T) { assert.Equal(t, statusCode, 500) assert.Equal(t, status, "500 Internal Server Error") } + +func TestRealtime(t *testing.T) { + // Given + requestCount := 0 + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/environment-document/", func(rw http.ResponseWriter, req *http.Request) { + assert.Equal(t, "GET", req.Method) + fmt.Println(req.URL.Path) + assert.Equal(t, fixtures.EnvironmentAPIKey, req.Header.Get("X-Environment-Key")) + + requestCount += 1 + + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + _, err := io.WriteString(rw, fixtures.EnvironmentJson) + if err != nil { + panic(err) + } + assert.NoError(t, err) + }) + mux.HandleFunc(fmt.Sprintf("/sse/environments/%s/stream", fixtures.ClientAPIKey), func(rw http.ResponseWriter, req *http.Request) { + assert.Equal(t, "GET", req.Method) + + // Set the necessary headers for SSE + rw.Header().Set("Content-Type", "text/event-stream") + rw.Header().Set("Cache-Control", "no-cache") + rw.Header().Set("Connection", "keep-alive") + + // Flush headers to the client + flusher, _ := rw.(http.Flusher) + flusher.Flush() + + // Use an `updated_at` value that is older than the `updated_at` set on the environment document + // to ensure an older timestamp does not trigger an update. + sendUpdatedAtSSEEvent(rw, flusher, 1640995200.079725) + time.Sleep(10 * time.Millisecond) + + // Update the `updated_at`(to trigger the environment update) + sendUpdatedAtSSEEvent(rw, flusher, 1733480514.079725) + time.Sleep(10 * time.Millisecond) + }) + + ctx := context.Background() + + server := httptest.NewServer(mux) + defer server.Close() + + // When + client := flagsmith.NewClient(fixtures.EnvironmentAPIKey, + flagsmith.WithBaseURL(server.URL+"/api/v1/"), + flagsmith.WithLocalEvaluation(ctx), + flagsmith.WithRealtime(), + flagsmith.WithRealtimeBaseURL(server.URL+"/"), + ) + // Sleep to ensure that the server has time to update the environment + time.Sleep(10 * time.Millisecond) + + flags, err := client.GetFlags(ctx, nil) + + // Then + assert.NoError(t, err) + + allFlags := flags.AllFlags() + + assert.Equal(t, 1, len(allFlags)) + + assert.Equal(t, fixtures.Feature1Name, allFlags[0].FeatureName) + assert.Equal(t, fixtures.Feature1ID, allFlags[0].FeatureID) + assert.Equal(t, fixtures.Feature1Value, allFlags[0].Value) + + // Sleep to ensure that the server has time to update the environment + // (After the second sse event) + time.Sleep(10 * time.Millisecond) + + assert.Equal(t, 2, requestCount) +} +func sendUpdatedAtSSEEvent(rw http.ResponseWriter, flusher http.Flusher, updatedAt float64) { + // Format the SSE event with the provided updatedAt value + sseEvent := fmt.Sprintf(`event: environment_updated +data: {"updated_at": %f} + +`, updatedAt) + + // Write the SSE event to the response + _, err := io.WriteString(rw, sseEvent) + if err != nil { + http.Error(rw, "Failed to send SSE event", http.StatusInternalServerError) + return + } + + // Flush the event to the client + flusher.Flush() +} diff --git a/config.go b/config.go index 27bbb16..4dbc2e4 100644 --- a/config.go +++ b/config.go @@ -13,6 +13,7 @@ const ( DefaultBaseURL = "https://edge.api.flagsmith.com/api/v1/" bulkIdentifyMaxCount = 100 + DefaultRealtimeBaseUrl = "https://realtime.flagsmith.com/" ) // config contains all configurable Client settings. @@ -23,6 +24,8 @@ type config struct { envRefreshInterval time.Duration enableAnalytics bool offlineMode bool + realtimeBaseUrl string + useRealtime bool } // defaultConfig returns default configuration. @@ -31,5 +34,6 @@ func defaultConfig() config { baseURL: DefaultBaseURL, timeout: DefaultTimeout, envRefreshInterval: time.Second * 60, + realtimeBaseUrl: DefaultRealtimeBaseUrl, } } diff --git a/fixtures/fixture.go b/fixtures/fixture.go index cea45ad..f4bca72 100644 --- a/fixtures/fixture.go +++ b/fixtures/fixture.go @@ -12,10 +12,12 @@ const Feature1Name = "feature_1" const Feature1ID = 1 const Feature1OverriddenValue = "some-overridden-value" +const ClientAPIKey = "B62qaMZNwfiqT76p38ggrQ" const EnvironmentJson = ` { "api_key": "B62qaMZNwfiqT76p38ggrQ", + "updated_at": "2023-12-06T10:21:54.079725Z", "project": { "name": "Test project", "organisation": { diff --git a/flagengine/environments/models.go b/flagengine/environments/models.go index 578d999..ab9e3f0 100644 --- a/flagengine/environments/models.go +++ b/flagengine/environments/models.go @@ -4,6 +4,7 @@ import ( "github.com/Flagsmith/flagsmith-go-client/v4/flagengine/features" "github.com/Flagsmith/flagsmith-go-client/v4/flagengine/identities" "github.com/Flagsmith/flagsmith-go-client/v4/flagengine/projects" + "time" ) type EnvironmentModel struct { @@ -12,4 +13,5 @@ type EnvironmentModel struct { Project *projects.ProjectModel `json:"project"` FeatureStates []*features.FeatureStateModel `json:"feature_states"` IdentityOverrides []*identities.IdentityModel `json:"identity_overrides"` + UpdatedAt time.Time `json:"updated_at"` } diff --git a/options.go b/options.go index c49084b..af919f9 100644 --- a/options.go +++ b/options.go @@ -2,6 +2,7 @@ package flagsmith import ( "context" + "strings" "time" ) @@ -19,6 +20,8 @@ var _ = []Option{ WithCustomHeaders(nil), WithDefaultHandler(nil), WithProxy(""), + WithRealtime(), + WithRealtimeBaseURL(""), } func WithBaseURL(url string) Option { @@ -124,3 +127,22 @@ func WithErrorHandler(handler func(handler *FlagsmithAPIError)) Option { c.errorHandler = handler } } + +// WithRealtime returns an Option function that enables real-time updates for the Client. +// NOTE: Before enabling real-time updates, ensure that local evaluation is enabled. +func WithRealtime() Option { + return func(c *Client) { + c.config.useRealtime = true + } +} + +// WithRealtimeBaseURL returns an Option function for configuring the real-time base URL of the Client. +func WithRealtimeBaseURL(url string) Option { + return func(c *Client) { + // Ensure the URL ends with a trailing slash + if !strings.HasSuffix(url, "/") { + url += "/" + } + c.config.realtimeBaseUrl = url + } +} diff --git a/realtime.go b/realtime.go new file mode 100644 index 0000000..3e8bcc3 --- /dev/null +++ b/realtime.go @@ -0,0 +1,80 @@ +package flagsmith + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "net/http" + "strings" + "time" + + "github.com/Flagsmith/flagsmith-go-client/v4/flagengine/environments" +) + +func (c *Client) startRealtimeUpdates(ctx context.Context) { + err := c.UpdateEnvironment(ctx) + if err != nil { + panic("Failed to fetch the environment while configuring real-time updates") + } + env, _ := c.environment.Load().(*environments.EnvironmentModel) + stream_url := c.config.realtimeBaseUrl + "sse/environments/" + env.APIKey + "/stream" + envUpdatedAt := env.UpdatedAt + for { + select { + case <-ctx.Done(): + return + default: + resp, err := http.Get(stream_url) + if err != nil { + c.log.Errorf("Error connecting to realtime server: %v", err) + } + defer resp.Body.Close() + + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "data: ") { + parsedTime, err := parseUpdatedAtFromSSE(line) + if err != nil { + c.log.Errorf("Error reading realtime stream: %v", err) + } + if parsedTime.After(envUpdatedAt) { + err = c.UpdateEnvironment(ctx) + if err != nil { + c.log.Errorf("Failed to update the environment: %v", err) + } + env, _ := c.environment.Load().(*environments.EnvironmentModel) + + envUpdatedAt = env.UpdatedAt + } + } + } + if err := scanner.Err(); err != nil { + c.log.Errorf("Error realtime stream: %v", err) + } + } + } +} +func parseUpdatedAtFromSSE(line string) (time.Time, error) { + var eventData struct { + UpdatedAt float64 `json:"updated_at"` + } + + data := strings.TrimPrefix(line, "data: ") + err := json.Unmarshal([]byte(data), &eventData) + if err != nil { + return time.Time{}, errors.New("failed to parse event data: " + err.Error()) + } + + if eventData.UpdatedAt <= 0 { + return time.Time{}, errors.New("invalid 'updated_at' value in event data") + } + + // Convert the float timestamp into seconds and nanoseconds + seconds := int64(eventData.UpdatedAt) + nanoseconds := int64((eventData.UpdatedAt - float64(seconds)) * 1e9) + + // Return the parsed time + return time.Unix(seconds, nanoseconds), nil +}