From a277687cffd6bd58521826870af98202a09ec51d Mon Sep 17 00:00:00 2001 From: ty Date: Sat, 28 Sep 2024 00:27:30 -0400 Subject: [PATCH] Relay RTMP messages to pipeline --- .../rtmp/message_handler.ex | 18 ++++-- lib/membrane_rtmp_plugin/rtmp_server.ex | 8 ++- .../rtmp_server/client_handler.ex | 18 +++--- .../rtmp_source_bin_test.exs | 57 ++++++++++++++++++- 4 files changed, 86 insertions(+), 15 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/message_handler.ex b/lib/membrane_rtmp_plugin/rtmp/message_handler.ex index c0e447e..97a93d9 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_handler.ex @@ -161,20 +161,23 @@ defmodule Membrane.RTMP.MessageHandler do end # A message containing stream metadata - defp do_handle_client_message(%Messages.SetDataFrame{} = _data_frame, _header, state) do + defp do_handle_client_message(%Messages.SetDataFrame{} = set_data_frame, _header, state) do + if pid = state.receiver_pid, do: send(pid, set_data_frame) {:cont, state} end - defp do_handle_client_message(%Messages.OnMetaData{} = _on_meta_data, _header, state) do + defp do_handle_client_message(%Messages.OnMetaData{} = on_meta_data, _header, state) do + if pid = state.receiver_pid, do: send(pid, on_meta_data) {:cont, state} end # According to ffmpeg's documentation, this command should prepare the server to receive media streams # We are simply acknowledging the message - defp do_handle_client_message(%Messages.FCPublish{}, _header, state) do + defp do_handle_client_message(%Messages.FCPublish{} = fc_publish, _header, state) do %Messages.Anonymous{name: "onFCPublish", properties: []} |> send_rtmp_payload(state.socket, chunk_stream_id: 3) + if pid = state.receiver_pid, do: send(pid, fc_publish) {:cont, state} end @@ -186,14 +189,16 @@ defmodule Membrane.RTMP.MessageHandler do |> Responses.default_result([:null, stream_id]) |> send_rtmp_payload(state.socket, chunk_stream_id: 3) + if pid = state.receiver_pid, do: send(pid, create_stream) {:cont, state} end # we ignore acknowledgement messages, but they're rarely used anyways - defp do_handle_client_message(%module{}, _header, state) + defp do_handle_client_message(%module{} = msg, _header, state) when module in [Messages.Acknowledgement, Messages.WindowAcknowledgement] do Logger.debug("#{inspect(module)} received, ignoring as acknowledgements are not implemented") + if pid = state.receiver_pid, do: send(pid, msg) {:cont, state} end @@ -214,10 +219,12 @@ defmodule Membrane.RTMP.MessageHandler do defp do_handle_client_message(%Messages.UserControl{} = msg, _header, state) do Logger.warning("Received unsupported user control message of type #{inspect(msg.event_type)}") + if pid = state.receiver_pid, do: send(pid, msg) {:cont, state} end - defp do_handle_client_message(%Messages.DeleteStream{}, _header, state) do + defp do_handle_client_message(%Messages.DeleteStream{} = delete_stream, _header, state) do + if pid = state.receiver_pid, do: send(pid, delete_stream) {:halt, %{state | events: [:delete_stream | state.events]}} end @@ -234,6 +241,7 @@ defmodule Membrane.RTMP.MessageHandler do defp do_handle_client_message(%Messages.Anonymous{} = message, _header, state) do Logger.debug("Unknown message: #{inspect(message)}") + if pid = state.receiver_pid, do: send(pid, message) {:cont, state} end diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index 6e87dba..c0c245a 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -45,7 +45,13 @@ defmodule Membrane.RTMPServer do an input argument of the `c:#{inspect(ClientHandler)}.handle_init/1`. Otherwise, an empty map is passed to the `c:#{inspect(ClientHandler)}.handle_init/1`. """ - @type client_behaviour_spec :: ClientHandler.t() | {ClientHandler.t(), opts :: any()} + + @type receiver_pid :: pid() | nil + + @type client_behaviour_spec :: + ClientHandler.t() + | {ClientHandler.t(), opts :: any()} + | {ClientHandler.t(), opts :: any(), receiver_pid} @type server_identifier :: pid() | atom() diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index 95780de..3f3a7a6 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -161,10 +161,11 @@ defmodule Membrane.RTMPServer.ClientHandler do raise "handle_new_client is not a function" end - {handler_module, opts} = + {handler_module, opts, pid} = case state.handle_new_client.(self(), state.app, stream_key) do - {handler_module, opts} -> {handler_module, opts} - handler_module -> {handler_module, %{}} + {handler_module, opts, pid} when is_pid(pid) -> {handler_module, opts, pid} + {handler_module, opts} -> {handler_module, opts, nil} + handler_module -> {handler_module, %{}, nil} end Process.send_after( @@ -177,10 +178,14 @@ defmodule Membrane.RTMPServer.ClientHandler do state | notified_about_client?: true, handler: handler_module, - handler_state: handler_module.handle_init(opts) + handler_state: handler_module.handle_init(opts), + message_handler_state: %{message_handler_state | receiver_pid: pid} } else - state + %{ + state + | message_handler_state: message_handler_state + } end state = Enum.reduce(events, state, &handle_event/2) @@ -190,8 +195,7 @@ defmodule Membrane.RTMPServer.ClientHandler do {:noreply, %{ state - | message_parser_state: message_parser_state, - message_handler_state: message_handler_state + | message_parser_state: message_parser_state }} end diff --git a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs index 5729b23..d895142 100644 --- a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs @@ -200,6 +200,53 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do assert :ok = Task.await(ffmpeg_task) end + test "Messages are sent to receiver_pid" do + self = self() + + pipeline_startup_task = + Task.async(fn -> + start_pipeline_with_external_rtmp_server(@app, @stream_key, self, 0, false, self) + end) + + port = + receive do + {:port, port} -> port + end + + ffmpeg_task = + Task.async(fn -> + "rtmp://localhost:#{port}/#{@app}/#{@stream_key}" |> start_ffmpeg() + end) + + pipeline = Task.await(pipeline_startup_task) + + assert_receive(%Membrane.RTMP.Messages.SetDataFrame{}) + + assert_buffers(%{ + pipeline: pipeline, + sink: :video_sink, + stream_length: @stream_length_ms, + buffers_expected: div(@stream_length_ms, @video_frame_duration_ms) + }) + + assert_buffers(%{ + pipeline: pipeline, + sink: :audio_sink, + stream_length: @stream_length_ms, + buffers_expected: div(@stream_length_ms, @audio_frame_duration_ms) + }) + + assert_end_of_stream(pipeline, :audio_sink, :input) + assert_end_of_stream(pipeline, :video_sink, :input) + + assert_received(%Membrane.RTMP.Messages.Anonymous{}) + assert_received(%Membrane.RTMP.Messages.DeleteStream{}) + + # Cleanup + Testing.Pipeline.terminate(pipeline) + assert :ok = Task.await(ffmpeg_task) + end + defp start_pipeline_with_builtin_rtmp_server(app, stream_key, use_ssl? \\ false) do options = [ module: Membrane.RTMP.Source.WithBuiltinServerTestPipeline, @@ -216,13 +263,19 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do stream_key, parent, port \\ 0, - use_ssl? \\ false + use_ssl? \\ false, + receiver_pid \\ nil ) do parent_process_pid = self() handle_new_client = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) - Membrane.RTMP.Source.ClientHandlerImpl + + if receiver_pid do + {Membrane.RTMP.Source.ClientHandlerImpl, %{}, receiver_pid} + else + Membrane.RTMP.Source.ClientHandlerImpl + end end {:ok, server_pid} =