Skip to content

Commit

Permalink
SmartPtr: Support RTC reconnect load test.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jun 19, 2024
1 parent 1829a07 commit 15a9427
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 29 deletions.
9 changes: 9 additions & 0 deletions trunk/3rdparty/srs-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ for ((i=0;;i++)); do
done
```

WebRTC重连测试:

```bash
for ((i=0;;i++)); do
./objs/srs_bench -sfu=rtc -pr=webrtc://localhost/live${i}/livestream -sn=1000 -cap=true;
sleep 10;
done
```

## Regression Test

回归测试需要先启动[SRS](https://github.com/ossrs/srs/issues/307),支持WebRTC推拉流:
Expand Down
21 changes: 18 additions & 3 deletions trunk/3rdparty/srs-bench/srs/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// @see https://github.com/pion/webrtc/blob/master/examples/play-from-disk/main.go
func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps int, enableAudioLevel, enableTWCC bool) error {
func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps int, enableAudioLevel, enableTWCC, closeAfterPublished bool) error {
ctx = logger.WithContext(ctx)

logger.Tf(ctx, "Run publish url=%v, audio=%v, video=%v, fps=%v, audio-level=%v, twcc=%v",
Expand Down Expand Up @@ -77,10 +77,13 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
return nil, err
}

if sourceAudio != "" {
// For CAP, we always add audio track, because both audio and video are disabled for CAP, which will
// cause failed when exchange SDP.
if sourceAudio != "" || closeAfterPublished {
aIngester = newAudioIngester(sourceAudio)
registry.Add(&rtpInteceptorFactory{aIngester.audioLevelInterceptor})
}

if sourceVideo != "" {
vIngester = newVideoIngester(sourceVideo)
registry.Add(&rtpInteceptorFactory{vIngester.markerInterceptor})
Expand Down Expand Up @@ -178,6 +181,7 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i

// Wait for event from context or tracks.
var wg sync.WaitGroup
defer wg.Wait()

wg.Add(1)
go func() {
Expand All @@ -186,6 +190,18 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
doClose() // Interrupt the RTCP read.
}()

// If CAP, directly close the connection after published.
if closeAfterPublished {
select {
case <-ctx.Done():
case <-pcDoneCtx.Done():
}

logger.Tf(ctx, "Close connection after published")
cancel()
return nil
}

wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -295,6 +311,5 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
}
}()

wg.Wait()
return nil
}
13 changes: 9 additions & 4 deletions trunk/3rdparty/srs-bench/srs/srs.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ var clients, streams, delay int

var statListen string

var closeAfterPublished bool

func Parse(ctx context.Context) {
fl := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)

Expand All @@ -71,6 +73,8 @@ func Parse(ctx context.Context) {

fl.StringVar(&statListen, "stat", "", "")

fl.BoolVar(&closeAfterPublished, "cap", false, "")

fl.Usage = func() {
fmt.Println(fmt.Sprintf("Usage: %v [Options]", os.Args[0]))
fmt.Println(fmt.Sprintf("Options:"))
Expand All @@ -95,6 +99,7 @@ func Parse(ctx context.Context) {
fmt.Println(fmt.Sprintf(" -fps [Optional] The fps of .h264 source file."))
fmt.Println(fmt.Sprintf(" -sa [Optional] The file path to read audio, ignore if empty."))
fmt.Println(fmt.Sprintf(" -sv [Optional] The file path to read video, ignore if empty."))
fmt.Println(fmt.Sprintf(" -cap Whether to close connection after publish. Default: false"))
fmt.Println(fmt.Sprintf("\n例如,1个播放,1个推流:"))
fmt.Println(fmt.Sprintf(" %v -sr webrtc://localhost/live/livestream", os.Args[0]))
fmt.Println(fmt.Sprintf(" %v -pr webrtc://localhost/live/livestream -sa avatar.ogg -sv avatar.h264 -fps 25", os.Args[0]))
Expand All @@ -118,7 +123,7 @@ func Parse(ctx context.Context) {
if sr == "" && pr == "" {
showHelp = true
}
if pr != "" && (sourceAudio == "" && sourceVideo == "") {
if pr != "" && !closeAfterPublished && (sourceAudio == "" && sourceVideo == "") {
showHelp = true
}
if showHelp {
Expand All @@ -135,8 +140,8 @@ func Parse(ctx context.Context) {
summaryDesc = fmt.Sprintf("%v, play(url=%v, da=%v, dv=%v, pli=%v)", summaryDesc, sr, dumpAudio, dumpVideo, pli)
}
if pr != "" {
summaryDesc = fmt.Sprintf("%v, publish(url=%v, sa=%v, sv=%v, fps=%v)",
summaryDesc, pr, sourceAudio, sourceVideo, fps)
summaryDesc = fmt.Sprintf("%v, publish(url=%v, sa=%v, sv=%v, fps=%v, cap=%v)",
summaryDesc, pr, sourceAudio, sourceVideo, fps, closeAfterPublished)
}
logger.Tf(ctx, "Run benchmark with %v", summaryDesc)

Expand Down Expand Up @@ -271,7 +276,7 @@ func Run(ctx context.Context) error {
gStatRTC.Publishers.Alive--
}()

if err := startPublish(ctx, pr, sourceAudio, sourceVideo, fps, audioLevel, videoTWCC); err != nil {
if err := startPublish(ctx, pr, sourceAudio, sourceVideo, fps, audioLevel, videoTWCC, closeAfterPublished); err != nil {
if errors.Cause(err) != context.Canceled {
logger.Wf(ctx, "Run err %+v", err)
}
Expand Down
91 changes: 75 additions & 16 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ using namespace std;
const int kRtpMaxPayloadSize = kRtpPacketSize - 300;
#endif

// the time to cleanup source.
#define SRS_RTC_SOURCE_CLEANUP (3 * SRS_UTIME_SECONDS)

// TODO: Add this function into SrsRtpMux class.
srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, char** pbuf, int* pnn_buf)
{
Expand Down Expand Up @@ -244,11 +247,56 @@ void SrsRtcConsumer::on_stream_change(SrsRtcSourceDescription* desc)
SrsRtcSourceManager::SrsRtcSourceManager()
{
lock = srs_mutex_new();
timer_ = new SrsHourGlass("sources", this, 1 * SRS_UTIME_SECONDS);
}

SrsRtcSourceManager::~SrsRtcSourceManager()
{
srs_mutex_destroy(lock);
srs_freep(timer_);
}

srs_error_t SrsRtcSourceManager::initialize()
{
return setup_ticks();
}

srs_error_t SrsRtcSourceManager::setup_ticks()
{
srs_error_t err = srs_success;

if ((err = timer_->tick(1, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}

if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "timer");
}

return err;
}

srs_error_t SrsRtcSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;

std::map< std::string, SrsSharedPtr<SrsRtcSource> >::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsSharedPtr<SrsRtcSource>& source = it->second;

// When source expired, remove it.
// @see https://github.com/ossrs/srs/issues/713
if (source->stream_is_dead()) {
SrsContextId cid = source->source_id();
if (cid.empty()) cid = source->pre_source_id();
srs_trace("RTC: cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size());
pool.erase(it++);
} else {
++it;
}
}

return err;
}

srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsRtcSource>& pps)
Expand Down Expand Up @@ -305,19 +353,6 @@ SrsSharedPtr<SrsRtcSource> SrsRtcSourceManager::fetch(SrsRequest* r)
return source;
}

void SrsRtcSourceManager::eliminate(SrsRequest* r)
{
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);

string stream_url = r->get_stream_url();
std::map< std::string, SrsSharedPtr<SrsRtcSource> >::iterator it = pool.find(stream_url);
if (it != pool.end()) {
pool.erase(it);
}
}

SrsRtcSourceManager* _srs_rtc_sources = NULL;

ISrsRtcPublishStream::ISrsRtcPublishStream()
Expand Down Expand Up @@ -351,6 +386,7 @@ SrsRtcSource::SrsRtcSource()
#endif

pli_for_rtmp_ = pli_elapsed_ = 0;
stream_die_at_ = 0;
}

SrsRtcSource::~SrsRtcSource()
Expand Down Expand Up @@ -384,6 +420,27 @@ srs_error_t SrsRtcSource::initialize(SrsRequest* r)
return err;
}

bool SrsRtcSource::stream_is_dead()
{
// still publishing?
if (is_created_) {
return false;
}

// has any consumers?
if (!consumers.empty()) {
return false;
}

// Delay cleanup source.
srs_utime_t now = srs_get_system_time();
if (now < stream_die_at_ + SRS_RTC_SOURCE_CLEANUP) {
return false;
}

return true;
}

void SrsRtcSource::init_for_play_before_publishing()
{
// If the stream description has already been setup by RTC publisher,
Expand Down Expand Up @@ -497,6 +554,8 @@ srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer)
consumer = new SrsRtcConsumer(this);
consumers.push_back(consumer);

stream_die_at_ = 0;

// TODO: FIXME: Implements edge cluster.

return err;
Expand Down Expand Up @@ -530,7 +589,7 @@ void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer)

// Destroy and cleanup source when no publishers and consumers.
if (!is_created_ && consumers.empty()) {
_srs_rtc_sources->eliminate(req);
stream_die_at_ = srs_get_system_time();
}
}

Expand Down Expand Up @@ -633,8 +692,8 @@ void SrsRtcSource::on_unpublish()
stat->on_stream_close(req);

// Destroy and cleanup source when no publishers and consumers.
if (!is_created_ && consumers.empty()) {
_srs_rtc_sources->eliminate(req);
if (consumers.empty()) {
stream_die_at_ = srs_get_system_time();
}
}

Expand Down
18 changes: 14 additions & 4 deletions trunk/src/app/srs_app_rtc_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,21 @@ class SrsRtcConsumer
void on_stream_change(SrsRtcSourceDescription* desc);
};

class SrsRtcSourceManager
class SrsRtcSourceManager : public ISrsHourGlass
{
private:
srs_mutex_t lock;
std::map< std::string, SrsSharedPtr<SrsRtcSource> > pool;
SrsHourGlass* timer_;
public:
SrsRtcSourceManager();
virtual ~SrsRtcSourceManager();
public:
virtual srs_error_t initialize();
// interface ISrsHourGlass
private:
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
public:
// create source when fetch from cache failed.
// @param r the client request.
Expand All @@ -127,9 +134,6 @@ class SrsRtcSourceManager
public:
// Get the exists source, NULL when not exists.
virtual SrsSharedPtr<SrsRtcSource> fetch(SrsRequest* r);
public:
// Dispose and destroy the source.
virtual void eliminate(SrsRequest* r);
};

// Global singleton instance.
Expand Down Expand Up @@ -195,11 +199,17 @@ class SrsRtcSource : public ISrsFastTimer
// The PLI for RTC2RTMP.
srs_utime_t pli_for_rtmp_;
srs_utime_t pli_elapsed_;
private:
// The last die time, while die means neither publishers nor players.
srs_utime_t stream_die_at_;
public:
SrsRtcSource();
virtual ~SrsRtcSource();
public:
virtual srs_error_t initialize(SrsRequest* r);
public:
// Whether stream is dead, which is no publisher or player.
virtual bool stream_is_dead();
private:
void init_for_play_before_publishing();
public:
Expand Down
11 changes: 9 additions & 2 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ using namespace std;
#ifdef SRS_RTC
#include <srs_app_rtc_network.hpp>
#include <srs_app_rtc_server.hpp>
#include <srs_app_rtc_source.hpp>
#endif
#ifdef SRS_GB28181
#include <srs_app_gb28181.hpp>
Expand Down Expand Up @@ -809,12 +810,18 @@ srs_error_t SrsServer::start(SrsWaitGroup* wg)
srs_error_t err = srs_success;

if ((err = _srs_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "sources");
return srs_error_wrap(err, "live sources");
}

#ifdef SRS_SRT
if ((err = _srs_srt_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "sources");
return srs_error_wrap(err, "srt sources");
}
#endif

#ifdef SRS_RTC
if ((err = _srs_rtc_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc sources");
}
#endif

Expand Down

0 comments on commit 15a9427

Please sign in to comment.