diff --git a/proxy/go.mod b/proxy/go.mod index 673e1b1b3f6..2e2a17ab34c 100644 --- a/proxy/go.mod +++ b/proxy/go.mod @@ -2,9 +2,12 @@ module srs-proxy go 1.18 +require ( + github.com/go-redis/redis/v8 v8.11.5 + github.com/joho/godotenv v1.5.1 +) + require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/go-redis/redis/v8 v8.11.5 // indirect - github.com/joho/godotenv v1.5.1 // indirect ) diff --git a/proxy/go.sum b/proxy/go.sum index 084e8a8755b..1efc5318ed4 100644 --- a/proxy/go.sum +++ b/proxy/go.sum @@ -2,7 +2,16 @@ github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cb github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/proxy/http.go b/proxy/http.go index cc58fe80902..e46664f8ecd 100644 --- a/proxy/http.go +++ b/proxy/http.go @@ -271,8 +271,6 @@ func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.Respons type HLSPlayStream struct { // The context for HLS streaming. ctx context.Context - // The context ID for recovering the context. - ContextID string `json:"cid"` // The spbhid, used to identify the backend server. SRSProxyBackendHLSID string `json:"spbhid"` @@ -291,15 +289,7 @@ func NewHLSPlayStream(opts ...func(*HLSPlayStream)) *HLSPlayStream { } func (v *HLSPlayStream) Initialize(ctx context.Context) *HLSPlayStream { - if v.ctx != nil && v.ContextID != "" { - return v - } - - if v.ContextID == "" { - v.ContextID = logger.GenerateContextID() - } - v.ctx = logger.WithContextID(ctx, v.ContextID) - + v.ctx = logger.WithContext(ctx) return v } diff --git a/proxy/srs.go b/proxy/srs.go index 3bc69f1b2b0..46cf513d8d2 100644 --- a/proxy/srs.go +++ b/proxy/srs.go @@ -471,8 +471,9 @@ func (v *srsRedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL str return nil, errors.Wrapf(err, "set key=%v HLS %v", key, value) } - if err := v.rdb.Set(ctx, v.redisKeySPBHID(value.SRSProxyBackendHLSID), b, srsHLSAliveDuration).Err(); err != nil { - return nil, errors.Wrapf(err, "set key=%v HLS %v", v.redisKeySPBHID(value.SRSProxyBackendHLSID), value) + key2 := v.redisKeySPBHID(value.SRSProxyBackendHLSID) + if err := v.rdb.Set(ctx, key2, b, srsHLSAliveDuration).Err(); err != nil { + return nil, errors.Wrapf(err, "set key=%v HLS %v", key2, value) } // Query the HLS streaming from redis. @@ -490,11 +491,46 @@ func (v *srsRedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL str } func (v *srsRedisLoadBalancer) StoreWebRTC(ctx context.Context, streamURL string, value *RTCConnection) error { + b, err := json.Marshal(value) + if err != nil { + return errors.Wrapf(err, "marshal WebRTC %v", value) + } + + key := v.redisKeyRTC(streamURL) + if err = v.rdb.Set(ctx, key, b, srsRTCAliveDuration).Err(); err != nil { + return errors.Wrapf(err, "set key=%v WebRTC %v", key, value) + } + + key2 := v.redisKeyUfrag(value.Ufrag) + if err := v.rdb.Set(ctx, key2, b, srsRTCAliveDuration).Err(); err != nil { + return errors.Wrapf(err, "set key=%v WebRTC %v", key2, value) + } + return nil } func (v *srsRedisLoadBalancer) LoadWebRTCByUfrag(ctx context.Context, ufrag string) (*RTCConnection, error) { - return nil, nil + key := v.redisKeyUfrag(ufrag) + + b, err := v.rdb.Get(ctx, key).Bytes() + if err != nil { + return nil, errors.Wrapf(err, "get key=%v WebRTC", key) + } + + var actual RTCConnection + if err := json.Unmarshal(b, &actual); err != nil { + return nil, errors.Wrapf(err, "unmarshal key=%v WebRTC %v", key, string(b)) + } + + return &actual, nil +} + +func (v *srsRedisLoadBalancer) redisKeyUfrag(ufrag string) string { + return fmt.Sprintf("srs-proxy-ufrag:%v", ufrag) +} + +func (v *srsRedisLoadBalancer) redisKeyRTC(streamURL string) string { + return fmt.Sprintf("srs-proxy-rtc:%v", streamURL) } func (v *srsRedisLoadBalancer) redisKeySPBHID(spbhid string) string {