Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send RTMP messages to receiver_pid #102

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions lib/membrane_rtmp_plugin/rtmp/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,23 @@ defmodule Membrane.RTMP.MessageHandler do
end

# A message containing stream metadata
defp do_handle_client_message(%Messages.SetDataFrame{} = _data_frame, _header, state) do
defp do_handle_client_message(%Messages.SetDataFrame{} = set_data_frame, _header, state) do
if pid = state.receiver_pid, do: send(pid, set_data_frame)
{:cont, state}
end

defp do_handle_client_message(%Messages.OnMetaData{} = _on_meta_data, _header, state) do
defp do_handle_client_message(%Messages.OnMetaData{} = on_meta_data, _header, state) do
if pid = state.receiver_pid, do: send(pid, on_meta_data)
{:cont, state}
end

# According to ffmpeg's documentation, this command should prepare the server to receive media streams
# We are simply acknowledging the message
defp do_handle_client_message(%Messages.FCPublish{}, _header, state) do
defp do_handle_client_message(%Messages.FCPublish{} = fc_publish, _header, state) do
%Messages.Anonymous{name: "onFCPublish", properties: []}
|> send_rtmp_payload(state.socket, chunk_stream_id: 3)

if pid = state.receiver_pid, do: send(pid, fc_publish)
{:cont, state}
end

Expand All @@ -186,14 +189,16 @@ defmodule Membrane.RTMP.MessageHandler do
|> Responses.default_result([:null, stream_id])
|> send_rtmp_payload(state.socket, chunk_stream_id: 3)

if pid = state.receiver_pid, do: send(pid, create_stream)
{:cont, state}
end

# we ignore acknowledgement messages, but they're rarely used anyways
defp do_handle_client_message(%module{}, _header, state)
defp do_handle_client_message(%module{} = msg, _header, state)
when module in [Messages.Acknowledgement, Messages.WindowAcknowledgement] do
Logger.debug("#{inspect(module)} received, ignoring as acknowledgements are not implemented")

if pid = state.receiver_pid, do: send(pid, msg)
{:cont, state}
end

Expand All @@ -214,10 +219,12 @@ defmodule Membrane.RTMP.MessageHandler do
defp do_handle_client_message(%Messages.UserControl{} = msg, _header, state) do
Logger.warning("Received unsupported user control message of type #{inspect(msg.event_type)}")

if pid = state.receiver_pid, do: send(pid, msg)
{:cont, state}
end

defp do_handle_client_message(%Messages.DeleteStream{}, _header, state) do
defp do_handle_client_message(%Messages.DeleteStream{} = delete_stream, _header, state) do
if pid = state.receiver_pid, do: send(pid, delete_stream)
{:halt, %{state | events: [:delete_stream | state.events]}}
end

Expand All @@ -234,6 +241,7 @@ defmodule Membrane.RTMP.MessageHandler do

defp do_handle_client_message(%Messages.Anonymous{} = message, _header, state) do
Logger.debug("Unknown message: #{inspect(message)}")
if pid = state.receiver_pid, do: send(pid, message)

{:cont, state}
end
Expand Down
8 changes: 7 additions & 1 deletion lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ defmodule Membrane.RTMPServer do
an input argument of the `c:#{inspect(ClientHandler)}.handle_init/1`. Otherwise, an empty
map is passed to the `c:#{inspect(ClientHandler)}.handle_init/1`.
"""
@type client_behaviour_spec :: ClientHandler.t() | {ClientHandler.t(), opts :: any()}

@type receiver_pid :: pid() | nil

@type client_behaviour_spec ::
ClientHandler.t()
| {ClientHandler.t(), opts :: any()}
| {ClientHandler.t(), opts :: any(), receiver_pid}

@type server_identifier :: pid() | atom()

Expand Down
18 changes: 11 additions & 7 deletions lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ defmodule Membrane.RTMPServer.ClientHandler do
raise "handle_new_client is not a function"
end

{handler_module, opts} =
{handler_module, opts, pid} =
case state.handle_new_client.(self(), state.app, stream_key) do
{handler_module, opts} -> {handler_module, opts}
handler_module -> {handler_module, %{}}
{handler_module, opts, pid} when is_pid(pid) -> {handler_module, opts, pid}
{handler_module, opts} -> {handler_module, opts, nil}
handler_module -> {handler_module, %{}, nil}
end

Process.send_after(
Expand All @@ -177,10 +178,14 @@ defmodule Membrane.RTMPServer.ClientHandler do
state
| notified_about_client?: true,
handler: handler_module,
handler_state: handler_module.handle_init(opts)
handler_state: handler_module.handle_init(opts),
message_handler_state: %{message_handler_state | receiver_pid: pid}
}
else
state
%{
state
| message_handler_state: message_handler_state
}
end

state = Enum.reduce(events, state, &handle_event/2)
Expand All @@ -190,8 +195,7 @@ defmodule Membrane.RTMPServer.ClientHandler do
{:noreply,
%{
state
| message_parser_state: message_parser_state,
message_handler_state: message_handler_state
| message_parser_state: message_parser_state
}}
end

Expand Down
57 changes: 55 additions & 2 deletions test/membrane_rtmp_plugin/rtmp_source_bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,53 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
assert :ok = Task.await(ffmpeg_task)
end

test "Messages are sent to receiver_pid" do
self = self()

pipeline_startup_task =
Task.async(fn ->
start_pipeline_with_external_rtmp_server(@app, @stream_key, self, 0, false, self)
end)

port =
receive do
{:port, port} -> port
end

ffmpeg_task =
Task.async(fn ->
"rtmp://localhost:#{port}/#{@app}/#{@stream_key}" |> start_ffmpeg()
end)

pipeline = Task.await(pipeline_startup_task)

assert_receive(%Membrane.RTMP.Messages.SetDataFrame{})

assert_buffers(%{
pipeline: pipeline,
sink: :video_sink,
stream_length: @stream_length_ms,
buffers_expected: div(@stream_length_ms, @video_frame_duration_ms)
})

assert_buffers(%{
pipeline: pipeline,
sink: :audio_sink,
stream_length: @stream_length_ms,
buffers_expected: div(@stream_length_ms, @audio_frame_duration_ms)
})

assert_end_of_stream(pipeline, :audio_sink, :input)
assert_end_of_stream(pipeline, :video_sink, :input)

assert_received(%Membrane.RTMP.Messages.Anonymous{})
assert_received(%Membrane.RTMP.Messages.DeleteStream{})

# Cleanup
Testing.Pipeline.terminate(pipeline)
assert :ok = Task.await(ffmpeg_task)
end

defp start_pipeline_with_builtin_rtmp_server(app, stream_key, use_ssl? \\ false) do
options = [
module: Membrane.RTMP.Source.WithBuiltinServerTestPipeline,
Expand All @@ -216,13 +263,19 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
stream_key,
parent,
port \\ 0,
use_ssl? \\ false
use_ssl? \\ false,
receiver_pid \\ nil
) do
parent_process_pid = self()

handle_new_client = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
Membrane.RTMP.Source.ClientHandlerImpl

if receiver_pid do
{Membrane.RTMP.Source.ClientHandlerImpl, %{}, receiver_pid}
else
Membrane.RTMP.Source.ClientHandlerImpl
end
end

{:ok, server_pid} =
Expand Down