Skip to content

Commit

Permalink
SmartPtr: Support shared ptr for live source.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jun 14, 2024
1 parent 9dba99a commit 699b5bf
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 78 deletions.
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_coworkers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ SrsRequest* SrsCoWorkers::find_stream_info(string vhost, string app, string stre
return it->second;
}

srs_error_t SrsCoWorkers::on_publish(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsCoWorkers::on_publish(SrsRequest* r)
{
srs_error_t err = srs_success;

Expand All @@ -140,7 +140,7 @@ srs_error_t SrsCoWorkers::on_publish(SrsLiveSource* s, SrsRequest* r)
return err;
}

void SrsCoWorkers::on_unpublish(SrsLiveSource* s, SrsRequest* r)
void SrsCoWorkers::on_unpublish(SrsRequest* r)
{
string url = r->get_stream_url();

Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_coworkers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class SrsCoWorkers
private:
virtual SrsRequest* find_stream_info(std::string vhost, std::string app, std::string stream);
public:
virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r);
virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t on_publish(SrsRequest* r);
virtual void on_unpublish(SrsRequest* r);
};

#endif
8 changes: 4 additions & 4 deletions trunk/src/app/srs_app_http_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,13 +547,13 @@ srs_error_t SrsHttpServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
return http_static->mux.serve_http(w, r);
}

srs_error_t SrsHttpServer::http_mount(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsHttpServer::http_mount(SrsRequest* r)
{
return http_stream->http_mount(s, r);
return http_stream->http_mount(r);
}

void SrsHttpServer::http_unmount(SrsLiveSource* s, SrsRequest* r)
void SrsHttpServer::http_unmount(SrsRequest* r)
{
http_stream->http_unmount(s, r);
http_stream->http_unmount(r);
}

4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_http_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ class SrsHttpServer : public ISrsHttpServeMux
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
public:
virtual srs_error_t http_mount(SrsLiveSource* s, SrsRequest* r);
virtual void http_unmount(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t http_mount(SrsRequest* r);
virtual void http_unmount(SrsRequest* r);
};

#endif
Expand Down
46 changes: 24 additions & 22 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ using namespace std;
#include <srs_app_recv_thread.hpp>
#include <srs_app_http_hooks.hpp>

SrsBufferCache::SrsBufferCache(SrsLiveSource* s, SrsRequest* r)
SrsBufferCache::SrsBufferCache(SrsRequest* r)
{
req = r->copy()->as_http();
source = s;
queue = new SrsMessageQueue(true);
trd = new SrsSTCoroutine("http-stream", this);

Expand All @@ -59,12 +58,11 @@ SrsBufferCache::~SrsBufferCache()
srs_freep(req);
}

srs_error_t SrsBufferCache::update_auth(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsBufferCache::update_auth(SrsRequest* r)
{
srs_freep(req);
req = r->copy();
source = s;


return srs_success;
}

Expand Down Expand Up @@ -107,6 +105,11 @@ srs_error_t SrsBufferCache::cycle()
srs_usleep(SRS_STREAM_CACHE_CYCLE);
return err;
}

SrsLiveSource* source = _srs_sources->fetch(req);
if (!source) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}

// the stream cache will create consumer to cache stream,
// which will trigger to fetch stream from origin for edge.
Expand Down Expand Up @@ -553,9 +556,8 @@ srs_error_t SrsBufferWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwri
return writer->writev(iov, iovcnt, pnwrite);
}

SrsLiveStream::SrsLiveStream(SrsLiveSource* s, SrsRequest* r, SrsBufferCache* c)
SrsLiveStream::SrsLiveStream(SrsRequest* r, SrsBufferCache* c)
{
source = s;
cache = c;
req = r->copy()->as_http();
security_ = new SrsSecurity();
Expand All @@ -567,10 +569,8 @@ SrsLiveStream::~SrsLiveStream()
srs_freep(security_);
}

srs_error_t SrsLiveStream::update_auth(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsLiveStream::update_auth(SrsRequest* r)
{
source = s;

srs_freep(req);
req = r->copy()->as_http();

Expand Down Expand Up @@ -660,6 +660,11 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess

// Enter chunked mode, because we didn't set the content-length.
w->write_header(SRS_CONSTS_HTTP_OK);

SrsLiveSource* source = _srs_sources->fetch(req);
if (!source) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}

// create consumer of souce, ignore gop cache, use the audio gop cache.
SrsLiveConsumer* consumer = NULL;
Expand Down Expand Up @@ -876,7 +881,6 @@ SrsLiveEntry::SrsLiveEntry(std::string m)
cache = NULL;

req = NULL;
source = NULL;

std::string ext = srs_path_filext(m);
_is_flv = (ext == ".flv");
Expand Down Expand Up @@ -954,7 +958,7 @@ srs_error_t SrsHttpStreamServer::initialize()
}

// TODO: FIXME: rename for HTTP FLV mount.
srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -982,10 +986,9 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)

entry = new SrsLiveEntry(mount);

entry->source = s;
entry->req = r->copy()->as_http();
entry->cache = new SrsBufferCache(s, r);
entry->stream = new SrsLiveStream(s, r, entry->cache);
entry->cache = new SrsBufferCache(r);
entry->stream = new SrsLiveStream(r, entry->cache);

// TODO: FIXME: maybe refine the logic of http remux service.
// if user push streams followed:
Expand All @@ -994,8 +997,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
// and they will using the same template, such as: [vhost]/[app]/[stream].flv
// so, need to free last request object, otherwise, it will cause memory leak.
srs_freep(tmpl->req);

tmpl->source = s;

tmpl->req = r->copy()->as_http();

sflvs[sid] = entry;
Expand All @@ -1015,8 +1017,8 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
} else {
// The entry exists, we reuse it and update the request of stream and cache.
entry = sflvs[sid];
entry->stream->update_auth(s, r);
entry->cache->update_auth(s, r);
entry->stream->update_auth(r);
entry->cache->update_auth(r);
}

if (entry->stream) {
Expand All @@ -1027,7 +1029,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
return err;
}

void SrsHttpStreamServer::http_unmount(SrsLiveSource* s, SrsRequest* r)
void SrsHttpStreamServer::http_unmount(SrsRequest* r)
{
std::string sid = r->get_stream_url();

Expand Down Expand Up @@ -1133,7 +1135,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
}
}
}

SrsLiveSource* s = NULL;
if ((err = _srs_sources->fetch_or_create(r, server, &s)) != srs_success) {
return srs_error_wrap(err, "source create");
Expand All @@ -1146,7 +1148,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
s->set_gop_cache_max_frames(gcmf);

// create http streaming handler.
if ((err = http_mount(s, r)) != srs_success) {
if ((err = http_mount(r)) != srs_success) {
return srs_error_wrap(err, "http mount");
}

Expand Down
16 changes: 6 additions & 10 deletions trunk/src/app/srs_app_http_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ class SrsBufferCache : public ISrsCoroutineHandler
srs_utime_t fast_cache;
private:
SrsMessageQueue* queue;
SrsLiveSource* source;
SrsRequest* req;
SrsCoroutine* trd;
public:
SrsBufferCache(SrsLiveSource* s, SrsRequest* r);
SrsBufferCache(SrsRequest* r);
virtual ~SrsBufferCache();
virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t update_auth(SrsRequest* r);
public:
virtual srs_error_t start();
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
Expand Down Expand Up @@ -178,13 +177,12 @@ class SrsLiveStream : public ISrsHttpHandler
{
private:
SrsRequest* req;
SrsLiveSource* source;
SrsBufferCache* cache;
SrsSecurity* security_;
public:
SrsLiveStream(SrsLiveSource* s, SrsRequest* r, SrsBufferCache* c);
SrsLiveStream(SrsRequest* r, SrsBufferCache* c);
virtual ~SrsLiveStream();
virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t update_auth(SrsRequest* r);
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private:
Expand All @@ -205,8 +203,6 @@ struct SrsLiveEntry
public:
// We will free the request.
SrsRequest* req;
// Shared source.
SrsLiveSource* source;
public:
// For template, the mount contains variables.
// For concrete stream, the mount is url to access.
Expand Down Expand Up @@ -244,8 +240,8 @@ class SrsHttpStreamServer : public ISrsReloadHandler
virtual srs_error_t initialize();
public:
// HTTP flv/ts/mp3/aac stream
virtual srs_error_t http_mount(SrsLiveSource* s, SrsRequest* r);
virtual void http_unmount(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t http_mount(SrsRequest* r);
virtual void http_unmount(SrsRequest* r);
// Interface ISrsHttpMatchHijacker
public:
virtual srs_error_t hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph);
Expand Down
12 changes: 6 additions & 6 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1302,28 +1302,28 @@ srs_error_t SrsServer::on_reload_listen()
return err;
}

srs_error_t SrsServer::on_publish(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsServer::on_publish(SrsRequest* r)
{
srs_error_t err = srs_success;

if ((err = http_server->http_mount(s, r)) != srs_success) {
if ((err = http_server->http_mount(r)) != srs_success) {
return srs_error_wrap(err, "http mount");
}

SrsCoWorkers* coworkers = SrsCoWorkers::instance();
if ((err = coworkers->on_publish(s, r)) != srs_success) {
if ((err = coworkers->on_publish(r)) != srs_success) {
return srs_error_wrap(err, "coworkers");
}

return err;
}

void SrsServer::on_unpublish(SrsLiveSource* s, SrsRequest* r)
void SrsServer::on_unpublish(SrsRequest* r)
{
http_server->http_unmount(s, r);
http_server->http_unmount(r);

SrsCoWorkers* coworkers = SrsCoWorkers::instance();
coworkers->on_unpublish(s, r);
coworkers->on_unpublish(r);
}

SrsServerAdapter::SrsServerAdapter()
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ class SrsServer : public ISrsReloadHandler, public ISrsLiveSourceHandler, public
virtual srs_error_t on_reload_listen();
// Interface ISrsLiveSourceHandler
public:
virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r);
virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t on_publish(SrsRequest* r);
virtual void on_unpublish(SrsRequest* r);
};

// The SRS server adapter, the master server.
Expand Down
46 changes: 22 additions & 24 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1765,11 +1765,15 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH

// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
// TODO: FIXME: Use smaller lock.
// TODO: FIXME: Use smaller scope lock.
SrsLocker(lock);

SrsLiveSource* source = NULL;
if ((source = fetch(r)) != NULL) {

string stream_url = r->get_stream_url();
std::map<std::string, SrsLiveSource*>::iterator it = pool.find(stream_url);

if (it != pool.end()) {
SrsLiveSource* source = it->second;

// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
Expand All @@ -1778,40 +1782,34 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH
return err;
}

string stream_url = r->get_stream_url();
string vhost = r->vhost;

// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());

SrsLiveSource* source = new SrsLiveSource();
srs_trace("new live source, stream_url=%s", stream_url.c_str());

source = new SrsLiveSource();
if ((err = source->initialize(r, h)) != srs_success) {
err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
goto failed;
srs_freep(source);
return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
}

pool[stream_url] = source;
*pps = source;
return err;

failed:
srs_freep(source);
return err;
}

SrsLiveSource* SrsLiveSourceManager::fetch(SrsRequest* r)
{
SrsLiveSource* source = NULL;
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
// TODO: FIXME: Use smaller scope lock.
SrsLocker(lock);

string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
std::map<std::string, SrsLiveSource*>::iterator it = pool.find(stream_url);

if (it == pool.end()) {
return NULL;
}

source = pool[stream_url];


SrsLiveSource* source = it->second;
return source;
}

Expand Down Expand Up @@ -2600,7 +2598,7 @@ srs_error_t SrsLiveSource::on_publish()

// notify the handler.
srs_assert(handler);
if ((err = handler->on_publish(this, req)) != srs_success) {
if ((err = handler->on_publish(req)) != srs_success) {
return srs_error_wrap(err, "handle publish");
}

Expand Down Expand Up @@ -2652,7 +2650,7 @@ void SrsLiveSource::on_unpublish()
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_close(req);

handler->on_unpublish(this, req);
handler->on_unpublish(req);

if (bridge_) {
bridge_->on_unpublish();
Expand Down
Loading

0 comments on commit 699b5bf

Please sign in to comment.