diff --git a/src/moonlight-server/rtp/udp-ping.cpp b/src/moonlight-server/rtp/udp-ping.cpp index a316376b..4a5d4d88 100644 --- a/src/moonlight-server/rtp/udp-ping.cpp +++ b/src/moonlight-server/rtp/udp-ping.cpp @@ -36,8 +36,8 @@ class udp_server : public boost::enable_shared_from_this { logs::log(logs::trace, "[RTP] Received ping from {}:{}", client_ip, client_port); callback(client_port, client_ip); - - start_receive(); + // Once we get a ping there's no need to keep the socket up and running + // instead of calling start_receive() again we'll just let this thread die of a fast death } } @@ -51,14 +51,18 @@ void wait_for_ping( unsigned short port, const std::function &callback) { try { - boost::asio::io_context io_context; - udp_server server(io_context, port, callback); + auto io_context = std::make_shared(); + auto server = std::make_shared(*io_context, port, callback); logs::log(logs::info, "RTP server started on port: {}", port); - io_context.run(); + /* This thread will die after receiving a single ping TODO: timeout? */ + std::thread([io_context, server, port]() { + io_context->run(); + logs::log(logs::debug, "RTP server on port: {} stopped", port); + }).detach(); } catch (std::exception &e) { - logs::log(logs::warning, "[RTP] Unable to start RTP server: {}", e.what()); + logs::log(logs::warning, "[RTP] Unable to start RTP server on {}: {}", port, e.what()); } } diff --git a/src/moonlight-server/rtsp/commands.hpp b/src/moonlight-server/rtsp/commands.hpp index 594a9c70..62483814 100644 --- a/src/moonlight-server/rtsp/commands.hpp +++ b/src/moonlight-server/rtsp/commands.hpp @@ -8,6 +8,7 @@ #include #include #include +#include namespace rtsp::commands { @@ -66,7 +67,7 @@ describe(const RTSP_PACKET &req, const state::StreamSession &session) { return ok_msg(req.seq_number, {}, payloads); } -RTSP_PACKET setup(const RTSP_PACKET &req) { +RTSP_PACKET setup(const RTSP_PACKET &req, unsigned short number_of_sessions) { int service_port; auto type = req.request.stream.type; @@ -74,10 +75,10 @@ RTSP_PACKET setup(const RTSP_PACKET &req) { switch (utils::hash(type)) { case utils::hash("audio"): - service_port = state::AUDIO_PING_PORT; + service_port = state::AUDIO_PING_PORT + number_of_sessions; break; case utils::hash("video"): - service_port = state::VIDEO_PING_PORT; + service_port = state::VIDEO_PING_PORT + number_of_sessions; break; case utils::hash("control"): service_port = state::CONTROL_PORT; @@ -108,7 +109,10 @@ std::pair> parse_arg_line(const std::pair event_bus) { +announce(const RTSP_PACKET &req, + const state::StreamSession &session, + std::shared_ptr event_bus, + unsigned short number_of_sessions) { auto args = req.payloads // | views::filter([](const std::pair &line) { @@ -140,13 +144,22 @@ announce(const RTSP_PACKET &req, const state::StreamSession &session, std::share gst_pipeline = session.app->h264_gst_pipeline; } + unsigned short video_port = state::VIDEO_PING_PORT + number_of_sessions; + + // Video RTP Ping + rtp::wait_for_ping(video_port, [event_bus](unsigned short client_port, const std::string &client_ip) { + logs::log(logs::trace, "[PING] video from {}:{}", client_ip, client_port); + auto ev = state::RTPVideoPingEvent{.client_ip = client_ip, .client_port = client_port}; + event_bus->fire_event(immer::box(ev)); + }); + state::VideoSession video = { .display_mode = {.width = display.width, .height = display.height, .refreshRate = display.refreshRate}, .gst_pipeline = gst_pipeline, .session_id = session.session_id, - .port = state::VIDEO_PING_PORT, + .port = video_port, .timeout = std::chrono::milliseconds(args["x-nv-video[0].timeoutLengthMs"].value()), .packet_size = args["x-nv-video[0].packetSize"].value(), .frames_with_invalid_ref_threshold = args["x-nv-video[0].framesWithInvalidRefThreshold"].value(), @@ -161,6 +174,15 @@ announce(const RTSP_PACKET &req, const state::StreamSession &session, std::share .client_ip = session.ip}; event_bus->fire_event(immer::box(video)); + unsigned short audio_port = state::AUDIO_PING_PORT + number_of_sessions; + + // Audio RTP Ping + rtp::wait_for_ping(audio_port, [event_bus](unsigned short client_port, const std::string &client_ip) { + logs::log(logs::trace, "[PING] audio from {}:{}", client_ip, client_port); + auto ev = state::RTPAudioPingEvent{.client_ip = client_ip, .client_port = client_port}; + event_bus->fire_event(immer::box(ev)); + }); + // Audio session state::AudioSession audio = {.gst_pipeline = session.app->opus_gst_pipeline, @@ -170,7 +192,7 @@ announce(const RTSP_PACKET &req, const state::StreamSession &session, std::share .aes_key = session.aes_key, .aes_iv = session.aes_iv, - .port = state::AUDIO_PING_PORT, + .port = audio_port, .client_ip = session.ip, .packet_duration = args["x-nv-aqos.packetDuration"].value(), @@ -181,7 +203,10 @@ announce(const RTSP_PACKET &req, const state::StreamSession &session, std::share } RTSP_PACKET -message_handler(const RTSP_PACKET &req, const state::StreamSession &session, std::shared_ptr event_bus) { +message_handler(const RTSP_PACKET &req, + const state::StreamSession &session, + std::shared_ptr event_bus, + unsigned short number_of_sessions) { auto cmd = req.request.cmd; logs::log(logs::debug, "[RTSP] received command {}", cmd); @@ -191,9 +216,9 @@ message_handler(const RTSP_PACKET &req, const state::StreamSession &session, std case utils::hash("DESCRIBE"): return describe(req, session); case utils::hash("SETUP"): - return setup(req); + return setup(req, number_of_sessions); case utils::hash("ANNOUNCE"): - return announce(req, session, event_bus); + return announce(req, session, event_bus, number_of_sessions); case utils::hash("PLAY"): return ok_msg(req.seq_number); default: diff --git a/src/moonlight-server/rtsp/net.hpp b/src/moonlight-server/rtsp/net.hpp index f68f8a28..a7ddf0ad 100644 --- a/src/moonlight-server/rtsp/net.hpp +++ b/src/moonlight-server/rtsp/net.hpp @@ -70,7 +70,8 @@ class tcp_connection : public boost::enable_shared_from_this { auto user_ip = self->socket().remote_endpoint().address().to_string(); auto session = get_session_by_ip(self->stream_sessions->load(), user_ip); if (session) { - auto response = commands::message_handler(parsed_msg.value(), session.value(), self->event_bus); + auto session_idx = self->stream_sessions->load()->size() - 1; + auto response = commands::message_handler(parsed_msg.value(), session.value(), self->event_bus, session_idx); self->send_message(response, [self](auto bytes) { self->close(); }); } else { logs::log(logs::warning, "[RTSP] received packet from unrecognised client: {}", user_ip); diff --git a/src/moonlight-server/state/data-structures.hpp b/src/moonlight-server/state/data-structures.hpp index 3fa34ae6..15cafc90 100644 --- a/src/moonlight-server/state/data-structures.hpp +++ b/src/moonlight-server/state/data-structures.hpp @@ -43,8 +43,8 @@ enum STANDARD_PORTS_MAPPING { HTTPS_PORT = 47984, HTTP_PORT = 47989, CONTROL_PORT = 47999, - VIDEO_PING_PORT = 47998, - AUDIO_PING_PORT = 48000, + VIDEO_PING_PORT = 48100, + AUDIO_PING_PORT = 48200, RTSP_SETUP_PORT = 48010 }; diff --git a/src/moonlight-server/wolf.cpp b/src/moonlight-server/wolf.cpp index 7855af77..4b6782a3 100644 --- a/src/moonlight-server/wolf.cpp +++ b/src/moonlight-server/wolf.cpp @@ -472,24 +472,6 @@ int main(int argc, char *argv[]) { control::run_control(state::CONTROL_PORT, sessions, ev_bus); }).detach(); - // Video RTP Ping - std::thread([local_state]() { - rtp::wait_for_ping(state::VIDEO_PING_PORT, [=](unsigned short client_port, const std::string &client_ip) { - logs::log(logs::trace, "[PING] video from {}:{}", client_ip, client_port); - auto ev = state::RTPVideoPingEvent{.client_ip = client_ip, .client_port = client_port}; - local_state->event_bus->fire_event(immer::box(ev)); - }); - }).detach(); - - // Audio RTP Ping - std::thread([local_state]() { - rtp::wait_for_ping(state::AUDIO_PING_PORT, [=](unsigned short client_port, const std::string &client_ip) { - logs::log(logs::trace, "[PING] audio from {}:{}", client_ip, client_port); - auto ev = state::RTPAudioPingEvent{.client_ip = client_ip, .client_port = client_port}; - local_state->event_bus->fire_event(immer::box(ev)); - }); - }).detach(); - auto audio_server = setup_audio_server(runtime_dir); auto sess_handlers = setup_sessions_handlers(local_state, runtime_dir, audio_server);