Skip to content

Commit

Permalink
Support redis LB for WebRTC.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Sep 5, 2024
1 parent c227165 commit aacfaab
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 16 deletions.
7 changes: 5 additions & 2 deletions proxy/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ module srs-proxy

go 1.18

require (
github.com/go-redis/redis/v8 v8.11.5
github.com/joho/godotenv v1.5.1
)

require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/joho/godotenv v1.5.1 // indirect
)
9 changes: 9 additions & 0 deletions proxy/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cb
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
12 changes: 1 addition & 11 deletions proxy/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.Respons
type HLSPlayStream struct {
// The context for HLS streaming.
ctx context.Context
// The context ID for recovering the context.
ContextID string `json:"cid"`

// The spbhid, used to identify the backend server.
SRSProxyBackendHLSID string `json:"spbhid"`
Expand All @@ -291,15 +289,7 @@ func NewHLSPlayStream(opts ...func(*HLSPlayStream)) *HLSPlayStream {
}

func (v *HLSPlayStream) Initialize(ctx context.Context) *HLSPlayStream {
if v.ctx != nil && v.ContextID != "" {
return v
}

if v.ContextID == "" {
v.ContextID = logger.GenerateContextID()
}
v.ctx = logger.WithContextID(ctx, v.ContextID)

v.ctx = logger.WithContext(ctx)
return v
}

Expand Down
42 changes: 39 additions & 3 deletions proxy/srs.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,9 @@ func (v *srsRedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL str
return nil, errors.Wrapf(err, "set key=%v HLS %v", key, value)
}

if err := v.rdb.Set(ctx, v.redisKeySPBHID(value.SRSProxyBackendHLSID), b, srsHLSAliveDuration).Err(); err != nil {
return nil, errors.Wrapf(err, "set key=%v HLS %v", v.redisKeySPBHID(value.SRSProxyBackendHLSID), value)
key2 := v.redisKeySPBHID(value.SRSProxyBackendHLSID)
if err := v.rdb.Set(ctx, key2, b, srsHLSAliveDuration).Err(); err != nil {
return nil, errors.Wrapf(err, "set key=%v HLS %v", key2, value)
}

// Query the HLS streaming from redis.
Expand All @@ -490,11 +491,46 @@ func (v *srsRedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL str
}

func (v *srsRedisLoadBalancer) StoreWebRTC(ctx context.Context, streamURL string, value *RTCConnection) error {
b, err := json.Marshal(value)
if err != nil {
return errors.Wrapf(err, "marshal WebRTC %v", value)
}

key := v.redisKeyRTC(streamURL)
if err = v.rdb.Set(ctx, key, b, srsRTCAliveDuration).Err(); err != nil {
return errors.Wrapf(err, "set key=%v WebRTC %v", key, value)
}

key2 := v.redisKeyUfrag(value.Ufrag)
if err := v.rdb.Set(ctx, key2, b, srsRTCAliveDuration).Err(); err != nil {
return errors.Wrapf(err, "set key=%v WebRTC %v", key2, value)
}

return nil
}

func (v *srsRedisLoadBalancer) LoadWebRTCByUfrag(ctx context.Context, ufrag string) (*RTCConnection, error) {
return nil, nil
key := v.redisKeyUfrag(ufrag)

b, err := v.rdb.Get(ctx, key).Bytes()
if err != nil {
return nil, errors.Wrapf(err, "get key=%v WebRTC", key)
}

var actual RTCConnection
if err := json.Unmarshal(b, &actual); err != nil {
return nil, errors.Wrapf(err, "unmarshal key=%v WebRTC %v", key, string(b))
}

return &actual, nil
}

func (v *srsRedisLoadBalancer) redisKeyUfrag(ufrag string) string {
return fmt.Sprintf("srs-proxy-ufrag:%v", ufrag)
}

func (v *srsRedisLoadBalancer) redisKeyRTC(streamURL string) string {
return fmt.Sprintf("srs-proxy-rtc:%v", streamURL)
}

func (v *srsRedisLoadBalancer) redisKeySPBHID(spbhid string) string {
Expand Down

0 comments on commit aacfaab

Please sign in to comment.