Skip to content

Commit

Permalink
Support proxy SRT media server.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Sep 5, 2024
1 parent aacfaab commit c0889a0
Show file tree
Hide file tree
Showing 10 changed files with 692 additions and 26 deletions.
20 changes: 16 additions & 4 deletions proxy/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func buildDefaultEnvironmentVariables(ctx context.Context) {
setEnvDefault("PROXY_RTMP_SERVER", "11935")
// The WebRTC media server, via UDP protocol.
setEnvDefault("PROXY_WEBRTC_SERVER", "18000")
// The SRT media server, via UDP protocol.
setEnvDefault("PROXY_SRT_SERVER", "20080")
// The API server of proxy itself.
setEnvDefault("PROXY_SYSTEM_API", "12025")

Expand All @@ -70,30 +72,36 @@ func buildDefaultEnvironmentVariables(ctx context.Context) {
setEnvDefault("PROXY_DEFAULT_BACKEND_API", "1985")
// Default backend udp rtc port, for debugging.
setEnvDefault("PROXY_DEFAULT_BACKEND_RTC", "8000")
// Default backend udp srt port, for debugging.
setEnvDefault("PROXY_DEFAULT_BACKEND_SRT", "10080")

logger.Df(ctx, "load .env as GO_PPROF=%v, "+
"PROXY_FORCE_QUIT_TIMEOUT=%v, PROXY_GRACE_QUIT_TIMEOUT=%v, "+
"PROXY_HTTP_API=%v, PROXY_HTTP_SERVER=%v, PROXY_RTMP_SERVER=%v, "+
"PROXY_WEBRTC_SERVER=%v, "+
"PROXY_WEBRTC_SERVER=%v, PROXY_SRT_SERVER=%v, "+
"PROXY_SYSTEM_API=%v, PROXY_DEFAULT_BACKEND_ENABLED=%v, "+
"PROXY_DEFAULT_BACKEND_IP=%v, PROXY_DEFAULT_BACKEND_RTMP=%v, "+
"PROXY_DEFAULT_BACKEND_HTTP=%v, PROXY_DEFAULT_BACKEND_API=%v, "+
"PROXY_DEFAULT_BACKEND_RTC=%v, "+
"PROXY_DEFAULT_BACKEND_RTC=%v, PROXY_DEFAULT_BACKEND_SRT=%v, "+
"PROXY_LOAD_BALANCER_TYPE=%v, PROXY_REDIS_HOST=%v, PROXY_REDIS_PORT=%v, "+
"PROXY_REDIS_PASSWORD=%v, PROXY_REDIS_DB=%v",
envGoPprof(),
envForceQuitTimeout(), envGraceQuitTimeout(),
envHttpAPI(), envHttpServer(), envRtmpServer(),
envWebRTCServer(),
envWebRTCServer(), envSRTServer(),
envSystemAPI(), envDefaultBackendEnabled(),
envDefaultBackendIP(), envDefaultBackendRTMP(),
envDefaultBackendHttp(), envDefaultBackendAPI(),
envDefaultBackendRTC(),
envDefaultBackendRTC(), envDefaultBackendSRT(),
envLoadBalancerType(), envRedisHost(), envRedisPort(),
envRedisPassword(), envRedisDB(),
)
}

func envDefaultBackendSRT() string {
return os.Getenv("PROXY_DEFAULT_BACKEND_SRT")
}

func envDefaultBackendRTC() string {
return os.Getenv("PROXY_DEFAULT_BACKEND_RTC")
}
Expand All @@ -102,6 +110,10 @@ func envDefaultBackendAPI() string {
return os.Getenv("PROXY_DEFAULT_BACKEND_API")
}

func envSRTServer() string {
return os.Getenv("PROXY_SRT_SERVER")
}

func envWebRTCServer() string {
return os.Getenv("PROXY_WEBRTC_SERVER")
}
Expand Down
9 changes: 5 additions & 4 deletions proxy/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,9 @@ func (v *httpServer) Run(ctx context.Context) error {
stream, _ := srsLoadBalancer.LoadOrStoreHLS(ctx, streamURL, NewHLSPlayStream(func(s *HLSPlayStream) {
s.SRSProxyBackendHLSID = logger.GenerateContextID()
s.StreamURL, s.FullURL = streamURL, fullURL
s.Initialize(ctx)
}))

stream.ServeHTTP(w, r)
stream.Initialize(ctx).ServeHTTP(w, r)
return
}

Expand Down Expand Up @@ -262,7 +261,7 @@ func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.Respons
}

// HLSPlayStream is an HLS stream proxy, which represents the stream level object. This means multiple HLS
// clients will share this object, and they use the same ctx among proxy servers.
// clients will share this object, and they do not use the same ctx among proxy servers.
//
// Unlike the HTTP FLV or TS connection, HLS client may request the m3u8 or ts via different HTTP connections.
// Especially for requesting ts, we need to identify the stream URl or backend server for it. So we create
Expand All @@ -289,7 +288,9 @@ func NewHLSPlayStream(opts ...func(*HLSPlayStream)) *HLSPlayStream {
}

func (v *HLSPlayStream) Initialize(ctx context.Context) *HLSPlayStream {
v.ctx = logger.WithContext(ctx)
if v.ctx == nil {
v.ctx = logger.WithContext(ctx)
}
return v
}

Expand Down
7 changes: 7 additions & 0 deletions proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ func doMain(ctx context.Context) error {
return errors.Wrapf(err, "http api server")
}

// Start the SRT server.
srtServer := newSRTServer()
defer srtServer.Close()
if err := srtServer.Run(ctx); err != nil {
return errors.Wrapf(err, "srt server")
}

// Start the System API server.
systemAPI := NewSystemAPI(func(server *systemAPI) {
server.gracefulQuitTimeout = gracefulQuitTimeout
Expand Down
22 changes: 12 additions & 10 deletions proxy/rtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,17 @@ func (v *rtcServer) Run(ctx context.Context) error {
endpoint = fmt.Sprintf(":%v", endpoint)
}

addr, err := net.ResolveUDPAddr("udp", endpoint)
saddr, err := net.ResolveUDPAddr("udp", endpoint)
if err != nil {
return errors.Wrapf(err, "resolve udp addr %v", endpoint)
}

listener, err := net.ListenUDP("udp", addr)
listener, err := net.ListenUDP("udp", saddr)
if err != nil {
return errors.Wrapf(err, "listen udp %v", addr)
return errors.Wrapf(err, "listen udp %v", saddr)
}
v.listener = listener
logger.Df(ctx, "WebRTC server listen at %v", addr)
logger.Df(ctx, "WebRTC server listen at %v", saddr)

// Consume all messages from UDP media transport.
v.wg.Add(1)
Expand All @@ -247,15 +247,15 @@ func (v *rtcServer) Run(ctx context.Context) error {

for ctx.Err() == nil {
buf := make([]byte, 4096)
n, addr, err := listener.ReadFromUDP(buf)
n, caddr, err := listener.ReadFromUDP(buf)
if err != nil {
// TODO: If WebRTC server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "read from udp failed, err=%+v", err)
continue
}

if err := v.handleClientUDP(ctx, addr, buf[:n]); err != nil {
logger.Wf(ctx, "handle udp %vB failed, addr=%v, err=%+v", n, addr, err)
if err := v.handleClientUDP(ctx, caddr, buf[:n]); err != nil {
logger.Wf(ctx, "handle udp %vB failed, addr=%v, err=%+v", n, caddr, err)
}
}
}()
Expand All @@ -268,7 +268,7 @@ func (v *rtcServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data

// If STUN binding request, parse the ufrag and identify the connection.
if err := func() error {
if rtc_is_rtp_or_rtcp(data) || !rtc_is_stun(data) {
if rtcIsRTPOrRTCP(data) || !rtcIsSTUN(data) {
return nil
}

Expand Down Expand Up @@ -358,7 +358,9 @@ func NewRTCConnection(opts ...func(*RTCConnection)) *RTCConnection {
}

func (v *RTCConnection) Initialize(ctx context.Context, listener *net.UDPConn) *RTCConnection {
v.ctx = logger.WithContext(ctx)
if v.ctx == nil {
v.ctx = logger.WithContext(ctx)
}
if listener != nil {
v.listenerUDP = listener
}
Expand Down Expand Up @@ -431,7 +433,7 @@ func (v *RTCConnection) connectBackend(ctx context.Context) error {
}

// Connect to backend SRS server via UDP client.
// TODO: Support close the connection when timeout or DTLS alert.
// TODO: FIXME: Support close the connection when timeout or DTLS alert.
backendAddr := net.UDPAddr{IP: net.ParseIP(backend.IP), Port: udpPort}
if backendUDP, err := net.DialUDP("udp", nil, &backendAddr); err != nil {
return errors.Wrapf(err, "dial udp to %v", backendAddr)
Expand Down
3 changes: 3 additions & 0 deletions proxy/srs.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func NewDefaultSRSForDebugging() (*SRSServer, error) {
if envDefaultBackendRTC() != "" {
server.RTC = []string{envDefaultBackendRTC()}
}
if envDefaultBackendSRT() != "" {
server.SRT = []string{envDefaultBackendSRT()}
}
return server, nil
}

Expand Down
Loading

0 comments on commit c0889a0

Please sign in to comment.