Skip to content

Commit

Permalink
FFM-11569 Periodically send the most recent stream status to Replicas (
Browse files Browse the repository at this point in the history
…#311)

* FFM-11569 Periodically send the most recent stream status to Replicas

**What**

- Starts a thread in the Primary Proxy that sends the most recent stream
  status to replicas every 20 seconds

**Why**

- To mitigate the risk of a replica missing a stream connect/disconnect
  event from the primary due to a network issue with redis

**Testing**

- Tested locally by putting a nginx proxy in between my local Proxy &
  redis and a local http proxy between my primary and Saas. When I kill
the nginx proxy and the http proxy I can see the Primary the primary fails to
send connect/disconnect events to the replica and the replica has the
incorrect state. When I bring nginx back up to simulate the redis
connection coming back up I can see the replica now gets the correct
stream state rather than being stuck
  • Loading branch information
jcox250 authored May 28, 2024
1 parent edb66ff commit 84b82fd
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 20 deletions.
5 changes: 5 additions & 0 deletions cmd/ff-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ func main() {
stream.WithBackoff(backoff.NewConstantBackOff(1*time.Minute)),
)

if !readReplica {
s := stream.NewStatusWorker(streamHealth, primaryToReplicaControlStream, logger)
go s.Start(ctx)
}

// Create repos
targetRepo := repository.NewTargetRepo(sdkCache, logger)
flagRepo := repository.NewFeatureFlagRepo(hashCache)
Expand Down
11 changes: 5 additions & 6 deletions domain/message_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,18 @@ func (r ReadReplicaMessageHandler) HandleMessage(ctx context.Context, msg SSEMes

// handleStreamAction sets the internal StreamHealth in the read replica based on the type of message we get
func (r ReadReplicaMessageHandler) handleStreamAction(ctx context.Context, msg SSEMessage) error {
if msg.Domain == "disconnect" {
r.log.Info("received stream disconnect event from primary proxy")
if msg.Domain == StreamStateDisconnected.String() {
r.log.Info("received stream disconnected event from primary proxy")

if err := r.streamStatus.SetUnhealthy(ctx); err != nil {
r.log.Error("failed to set unhealthy stream status", "err", err)
}

// Return EOF to indicate the stream was closed
return io.EOF
return nil
}

if msg.Domain == "connect" {
r.log.Info("received stream connect event from primary proxy")
if msg.Domain == StreamStateConnected.String() {
r.log.Info("received stream connected event from primary proxy")

if err := r.streamStatus.SetHealthy(ctx); err != nil {
r.log.Error("failed to set healthy stream status", "err", err)
Expand Down
9 changes: 4 additions & 5 deletions domain/message_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package domain

import (
"context"
"io"
"sync"
"testing"

Expand Down Expand Up @@ -62,7 +61,7 @@ func TestReadReplicaMessageHandler_HandleMessage(t *testing.T) {
args: args{
msg: SSEMessage{
Event: "stream_action",
Domain: "disconnect",
Domain: StreamStateDisconnected.String(),
},
},
mocks: mocks{
Expand All @@ -73,15 +72,15 @@ func TestReadReplicaMessageHandler_HandleMessage(t *testing.T) {
},
expected: expected{
health: false,
err: io.EOF,
err: nil,
},
shouldErr: true,
shouldErr: false,
},
"Given I have a unhealthy status and get a stream connect event": {
args: args{
msg: SSEMessage{
Event: "stream_action",
Domain: "connect",
Domain: StreamStateConnected.String(),
},
},
mocks: mocks{
Expand Down
42 changes: 42 additions & 0 deletions stream/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stream
import (
"context"
"errors"
"fmt"
"os"
"time"

Expand Down Expand Up @@ -330,3 +331,44 @@ func (p PollingStatusMetric) Polling() {
func (p PollingStatusMetric) NotPolling() {
p.gauge.WithLabelValues(p.hostName).Set(0)
}

type StatusWorker struct {
health Health
pub Stream
log log.Logger
}

func NewStatusWorker(health Health, pub Stream, logger log.Logger) *StatusWorker {
l := logger.With("component", "StreamStatusWorker")
return &StatusWorker{
health: health,
pub: pub,
log: l,
}
}

func (s *StatusWorker) Start(ctx context.Context) {
ticker := time.NewTicker(20 * time.Second)

for {
select {
case <-ctx.Done():
s.log.Info("exiting StreamStatusWorker.Start", "reason", ctx.Err())
return
case <-ticker.C:

status, err := s.health.Status(ctx)
if err != nil {
s.log.Error("failed to retrieve health status", "err", err)
continue
}

s.log.Info(fmt.Sprintf("publishing %s message for replicas", status.State.String()))
if err := s.pub.Publish(ctx, domain.SSEMessage{Event: "stream_action", Domain: status.State.String()}); err != nil {
s.log.Error(fmt.Sprintf("failed to publish stream %s message to redis", status.State.String()), "err", err)
continue
}
s.log.Info(fmt.Sprintf("successfully published %s message for replicas", status.State.String()))
}
}
}
17 changes: 11 additions & 6 deletions stream/on_connect_disconnect_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ func SaasStreamOnDisconnect(l log.Logger, streamHealth Health, pp Pushpin, redis

// Publish an event to the redis stream that the read replica proxy's are listening on to let them
// know we've disconnected from SaaS.
l.Info("publishing disconnect message for replicas")
if err := redisSSEStream.Publish(ctx, domain.SSEMessage{Event: "stream_action", Domain: "disconnect"}); err != nil {
l.Error("failed to publish stream disconnect message to redis", "err", err)
} else {
l.Info("successfully published disconnect message for replicas")
l.Info("publishing disconnected message for replicas")
if err := redisSSEStream.Publish(ctx, domain.SSEMessage{Event: "stream_action", Domain: domain.StreamStateDisconnected.String()}); err != nil {
l.Error("failed to publish stream disconnected message to redis", "err", err)
return
}

l.Info("successfully published disconnected message for replicas")
}
}

Expand Down Expand Up @@ -94,9 +95,13 @@ func SaasStreamOnConnect(l log.Logger, streamHealth Health, reloadConfig func()

// Publish an event to the redis stream that the read replica proxy's are listening on to let them
// know we've connected to SaaS.
if err := redisSSEStream.Publish(ctx, domain.SSEMessage{Event: "stream_action", Domain: "connect"}); err != nil {
l.Info("publishing stream connected message for replicas")
if err := redisSSEStream.Publish(ctx, domain.SSEMessage{Event: "stream_action", Domain: domain.StreamStateConnected.String()}); err != nil {
l.Error("failed to publish stream connect message to redis", "err", err)
return
}

l.Info("successfully published stream connected message for replicas")
}
}

Expand Down
4 changes: 2 additions & 2 deletions stream/on_connect_disconnect_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestSaasStreamOnDisconnect(t *testing.T) {
},
expected: expected{
events: []interface{}{
domain.SSEMessage{Event: "stream_action", Domain: "disconnect", Identifier: "", Version: 0, Environment: "", Environments: []string(nil), APIKey: ""},
domain.SSEMessage{Event: "stream_action", Domain: domain.StreamStateDisconnected.String(), Identifier: "", Version: 0, Environment: "", Environments: []string(nil), APIKey: ""},
},
streamHealth: false,
},
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestSaasStreamOnConnect(t *testing.T) {
},
expected: expected{
events: []interface{}{
domain.SSEMessage{Event: "stream_action", Domain: "connect", Identifier: "", Version: 0, Environment: "", Environments: []string(nil), APIKey: ""},
domain.SSEMessage{Event: "stream_action", Domain: domain.StreamStateConnected.String(), Identifier: "", Version: 0, Environment: "", Environments: []string(nil), APIKey: ""},
},
streamHealth: true,
},
Expand Down
2 changes: 1 addition & 1 deletion stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s Stream) subscribe(ctx context.Context) {
}

backoffDuration := s.backoff.NextBackOff()
s.log.Warn("disconnected from stream, backing off and retrying", "backoff_duration", backoffDuration, "err", err)
s.log.Warn("disconnected from stream, backing off and retrying", "backoff_duration", backoffDuration, "err", err, "msgID", msgID)
time.Sleep(backoffDuration)
}

Expand Down

0 comments on commit 84b82fd

Please sign in to comment.