Skip to content

Commit

Permalink
fix: use different UDP ports for different users.
Browse files Browse the repository at this point in the history
With the latest changes, when we bind the port with the Gstreamer udpsink that would also "eat up" all the additional ping packets from other clients.
Unfortunately, this requires opening up additional ports if someone wants to expose this.
  • Loading branch information
ABeltramo committed Nov 16, 2023
1 parent 83dd201 commit 3f52d8a
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 36 deletions.
16 changes: 10 additions & 6 deletions src/moonlight-server/rtp/udp-ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class udp_server : public boost::enable_shared_from_this<udp_server> {

logs::log(logs::trace, "[RTP] Received ping from {}:{}", client_ip, client_port);
callback(client_port, client_ip);

start_receive();
// Once we get a ping there's no need to keep the socket up and running
// instead of calling start_receive() again we'll just let this thread die of a fast death
}
}

Expand All @@ -51,14 +51,18 @@ void wait_for_ping(
unsigned short port,
const std::function<void(unsigned short /* client_port */, const std::string & /* client_ip */)> &callback) {
try {
boost::asio::io_context io_context;
udp_server server(io_context, port, callback);
auto io_context = std::make_shared<boost::asio::io_context>();
auto server = std::make_shared<udp_server>(*io_context, port, callback);

logs::log(logs::info, "RTP server started on port: {}", port);

io_context.run();
/* This thread will die after receiving a single ping TODO: timeout? */
std::thread([io_context, server, port]() {
io_context->run();
logs::log(logs::debug, "RTP server on port: {} stopped", port);
}).detach();
} catch (std::exception &e) {
logs::log(logs::warning, "[RTP] Unable to start RTP server: {}", e.what());
logs::log(logs::warning, "[RTP] Unable to start RTP server on {}: {}", port, e.what());
}
}

Expand Down
43 changes: 34 additions & 9 deletions src/moonlight-server/rtsp/commands.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <rtsp/parser.hpp>
#include <state/data-structures.hpp>
#include <string>
#include <rtp/udp-ping.hpp>

namespace rtsp::commands {

Expand Down Expand Up @@ -66,18 +67,18 @@ describe(const RTSP_PACKET &req, const state::StreamSession &session) {
return ok_msg(req.seq_number, {}, payloads);
}

RTSP_PACKET setup(const RTSP_PACKET &req) {
RTSP_PACKET setup(const RTSP_PACKET &req, unsigned short number_of_sessions) {

int service_port;
auto type = req.request.stream.type;
logs::log(logs::trace, "[RTSP] setup type: {}", type);

switch (utils::hash(type)) {
case utils::hash("audio"):
service_port = state::AUDIO_PING_PORT;
service_port = state::AUDIO_PING_PORT + number_of_sessions;
break;
case utils::hash("video"):
service_port = state::VIDEO_PING_PORT;
service_port = state::VIDEO_PING_PORT + number_of_sessions;
break;
case utils::hash("control"):
service_port = state::CONTROL_PORT;
Expand Down Expand Up @@ -108,7 +109,10 @@ std::pair<std::string, std::optional<int>> parse_arg_line(const std::pair<std::s
}

RTSP_PACKET
announce(const RTSP_PACKET &req, const state::StreamSession &session, std::shared_ptr<dp::event_bus> event_bus) {
announce(const RTSP_PACKET &req,
const state::StreamSession &session,
std::shared_ptr<dp::event_bus> event_bus,
unsigned short number_of_sessions) {

auto args = req.payloads //
| views::filter([](const std::pair<std::string, std::string> &line) {
Expand Down Expand Up @@ -140,13 +144,22 @@ announce(const RTSP_PACKET &req, const state::StreamSession &session, std::share
gst_pipeline = session.app->h264_gst_pipeline;
}

unsigned short video_port = state::VIDEO_PING_PORT + number_of_sessions;

// Video RTP Ping
rtp::wait_for_ping(video_port, [event_bus](unsigned short client_port, const std::string &client_ip) {
logs::log(logs::trace, "[PING] video from {}:{}", client_ip, client_port);
auto ev = state::RTPVideoPingEvent{.client_ip = client_ip, .client_port = client_port};
event_bus->fire_event(immer::box<state::RTPVideoPingEvent>(ev));
});

state::VideoSession video = {
.display_mode = {.width = display.width, .height = display.height, .refreshRate = display.refreshRate},
.gst_pipeline = gst_pipeline,

.session_id = session.session_id,

.port = state::VIDEO_PING_PORT,
.port = video_port,
.timeout = std::chrono::milliseconds(args["x-nv-video[0].timeoutLengthMs"].value()),
.packet_size = args["x-nv-video[0].packetSize"].value(),
.frames_with_invalid_ref_threshold = args["x-nv-video[0].framesWithInvalidRefThreshold"].value(),
Expand All @@ -161,6 +174,15 @@ announce(const RTSP_PACKET &req, const state::StreamSession &session, std::share
.client_ip = session.ip};
event_bus->fire_event(immer::box<state::VideoSession>(video));

unsigned short audio_port = state::AUDIO_PING_PORT + number_of_sessions;

// Audio RTP Ping
rtp::wait_for_ping(audio_port, [event_bus](unsigned short client_port, const std::string &client_ip) {
logs::log(logs::trace, "[PING] audio from {}:{}", client_ip, client_port);
auto ev = state::RTPAudioPingEvent{.client_ip = client_ip, .client_port = client_port};
event_bus->fire_event(immer::box<state::RTPAudioPingEvent>(ev));
});

// Audio session
state::AudioSession audio = {.gst_pipeline = session.app->opus_gst_pipeline,

Expand All @@ -170,7 +192,7 @@ announce(const RTSP_PACKET &req, const state::StreamSession &session, std::share
.aes_key = session.aes_key,
.aes_iv = session.aes_iv,

.port = state::AUDIO_PING_PORT,
.port = audio_port,
.client_ip = session.ip,

.packet_duration = args["x-nv-aqos.packetDuration"].value(),
Expand All @@ -181,7 +203,10 @@ announce(const RTSP_PACKET &req, const state::StreamSession &session, std::share
}

RTSP_PACKET
message_handler(const RTSP_PACKET &req, const state::StreamSession &session, std::shared_ptr<dp::event_bus> event_bus) {
message_handler(const RTSP_PACKET &req,
const state::StreamSession &session,
std::shared_ptr<dp::event_bus> event_bus,
unsigned short number_of_sessions) {
auto cmd = req.request.cmd;
logs::log(logs::debug, "[RTSP] received command {}", cmd);

Expand All @@ -191,9 +216,9 @@ message_handler(const RTSP_PACKET &req, const state::StreamSession &session, std
case utils::hash("DESCRIBE"):
return describe(req, session);
case utils::hash("SETUP"):
return setup(req);
return setup(req, number_of_sessions);
case utils::hash("ANNOUNCE"):
return announce(req, session, event_bus);
return announce(req, session, event_bus, number_of_sessions);
case utils::hash("PLAY"):
return ok_msg(req.seq_number);
default:
Expand Down
3 changes: 2 additions & 1 deletion src/moonlight-server/rtsp/net.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class tcp_connection : public boost::enable_shared_from_this<tcp_connection> {
auto user_ip = self->socket().remote_endpoint().address().to_string();
auto session = get_session_by_ip(self->stream_sessions->load(), user_ip);
if (session) {
auto response = commands::message_handler(parsed_msg.value(), session.value(), self->event_bus);
auto session_idx = self->stream_sessions->load()->size() - 1;
auto response = commands::message_handler(parsed_msg.value(), session.value(), self->event_bus, session_idx);
self->send_message(response, [self](auto bytes) { self->close(); });
} else {
logs::log(logs::warning, "[RTSP] received packet from unrecognised client: {}", user_ip);
Expand Down
4 changes: 2 additions & 2 deletions src/moonlight-server/state/data-structures.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ enum STANDARD_PORTS_MAPPING {
HTTPS_PORT = 47984,
HTTP_PORT = 47989,
CONTROL_PORT = 47999,
VIDEO_PING_PORT = 47998,
AUDIO_PING_PORT = 48000,
VIDEO_PING_PORT = 48100,
AUDIO_PING_PORT = 48200,
RTSP_SETUP_PORT = 48010
};

Expand Down
18 changes: 0 additions & 18 deletions src/moonlight-server/wolf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,24 +472,6 @@ int main(int argc, char *argv[]) {
control::run_control(state::CONTROL_PORT, sessions, ev_bus);
}).detach();

// Video RTP Ping
std::thread([local_state]() {
rtp::wait_for_ping(state::VIDEO_PING_PORT, [=](unsigned short client_port, const std::string &client_ip) {
logs::log(logs::trace, "[PING] video from {}:{}", client_ip, client_port);
auto ev = state::RTPVideoPingEvent{.client_ip = client_ip, .client_port = client_port};
local_state->event_bus->fire_event(immer::box<state::RTPVideoPingEvent>(ev));
});
}).detach();

// Audio RTP Ping
std::thread([local_state]() {
rtp::wait_for_ping(state::AUDIO_PING_PORT, [=](unsigned short client_port, const std::string &client_ip) {
logs::log(logs::trace, "[PING] audio from {}:{}", client_ip, client_port);
auto ev = state::RTPAudioPingEvent{.client_ip = client_ip, .client_port = client_port};
local_state->event_bus->fire_event(immer::box<state::RTPAudioPingEvent>(ev));
});
}).detach();

auto audio_server = setup_audio_server(runtime_dir);
auto sess_handlers = setup_sessions_handlers(local_state, runtime_dir, audio_server);

Expand Down

0 comments on commit 3f52d8a

Please sign in to comment.