From b4b41e6bf0b330c151d2e4810eccc316c1ffb47d Mon Sep 17 00:00:00 2001 From: Daniel Morandini <danielmorandini@me.com> Date: Thu, 3 Oct 2024 17:51:48 +0200 Subject: [PATCH 1/4] Improve delete-stream, eos handling --- .../rtmp/source/client_handler_impl.ex | 15 +++++----- .../rtmp/source/source.ex | 6 +++- lib/membrane_rtmp_plugin/rtmp_server.ex | 5 ++-- .../rtmp_server/client_handler.ex | 29 +++++++++++++------ 4 files changed, 36 insertions(+), 19 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex b/lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex index ad5164b..4a8b756 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex @@ -40,18 +40,19 @@ defmodule Membrane.RTMP.Source.ClientHandlerImpl do end @impl true - def handle_end_of_stream(state) do - if state.source_pid != nil, do: send_eos(state.source_pid) + def handle_connection_closed(state) do + if state.source_pid != nil, do: send(state.source_pid, :connection_closed) state end - defp send_data(pid, payload) do - send(pid, {:data, payload}) - :ok + @impl true + def handle_delete_stream(state) do + if state.source_pid != nil, do: send(state.source_pid, :delete_stream) + state end - defp send_eos(pid) do - send(pid, :end_of_stream) + defp send_data(pid, payload) do + send(pid, {:data, payload}) :ok end end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source.ex b/lib/membrane_rtmp_plugin/rtmp/source/source.ex index c265137..3109a10 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex @@ -190,7 +190,7 @@ defmodule Membrane.RTMP.Source do end @impl true - def handle_info(:end_of_stream, ctx, state) do + def handle_info(:connection_closed, ctx, state) do if ctx.pads[:output].end_of_stream? do {[], state} else @@ -198,6 +198,10 @@ defmodule Membrane.RTMP.Source do end end + def handle_info(:delete_stream, _ctx, state) do + {[notify_parent: :stream_deleted], state} + end + @impl true def handle_terminate_request(_ctx, state) do {[terminate: :normal], state} diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index 6e87dba..19ce194 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -27,8 +27,9 @@ defmodule Membrane.RTMPServer do port: :inet.port_number(), use_ssl?: boolean(), name: atom() | nil, - handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> - client_behaviour_spec()), + handle_new_client: + (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> + client_behaviour_spec()), client_timeout: Membrane.Time.t() ] diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index c7a2847..e2c5dfe 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -34,12 +34,19 @@ defmodule Membrane.RTMPServer.ClientHandler do @callback handle_data_available(payload :: binary(), state :: state()) :: state() @doc """ - The callback invoked when the client served by given client handler - stops sending data. - (for instance, when the remote client deletes the stream or - terminates the socket connection) + Callback invoked when the RMTP stream is finished. """ - @callback handle_end_of_stream(state :: state()) :: state() + @callback handle_delete_stream(state :: state()) :: state() + + @doc """ + Callback invoked when the socket connection is terminated. In normal + conditions, delete_stream is called before this one. If delete_stream + is not called and connection_closed is, it might just mean that the + connection was lost. + + It is up to the users to determined how to handle it in their case. + """ + @callback handle_connection_closed(state :: state()) :: state() @doc """ The callback invoked when the client handler receives a message @@ -89,7 +96,7 @@ defmodule Membrane.RTMPServer.ClientHandler do @impl true def handle_info({:tcp_closed, socket}, %{use_ssl?: false} = state) when state.socket == socket do - events = [:end_of_stream] + events = [:connection_closed] state = Enum.reduce(events, state, &handle_event/2) {:noreply, state} @@ -102,7 +109,7 @@ defmodule Membrane.RTMPServer.ClientHandler do @impl true def handle_info({:ssl_closed, socket}, %{use_ssl?: true} = state) when state.socket == socket do - events = [:end_of_stream] + events = [:connection_closed] state = Enum.reduce(events, state, &handle_event/2) {:noreply, state} @@ -191,8 +198,12 @@ defmodule Membrane.RTMPServer.ClientHandler do defp handle_event(event, state) do # call callbacks case event do - :end_of_stream -> - new_handler_state = state.handler.handle_end_of_stream(state.handler_state) + :connection_closed -> + new_handler_state = state.handler.handle_connection_closed(state.handler_state) + %{state | handler_state: new_handler_state} + + :delete_stream -> + new_handler_state = state.handler.handle_delete_stream(state.handler_state) %{state | handler_state: new_handler_state} {:set_chunk_size_required, chunk_size} -> From 147556788c5e0061f7a9a42f70d84bd50e264e58 Mon Sep 17 00:00:00 2001 From: Daniel Morandini <danielmorandini@me.com> Date: Thu, 3 Oct 2024 17:56:21 +0200 Subject: [PATCH 2/4] Add missing bit --- lib/membrane_rtmp_plugin/rtmp/message_handler.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/message_handler.ex b/lib/membrane_rtmp_plugin/rtmp/message_handler.ex index 686ee20..c0e447e 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_handler.ex @@ -28,7 +28,7 @@ defmodule Membrane.RTMP.MessageHandler do {:set_chunk_size_required, non_neg_integer()} | {:connected, Membrane.RTMP.Messages.Connect.t()} | {:published, Membrane.RTMP.Messages.Publish.t()} - | :end_of_stream + | :delete_stream | {:data_available, binary()} @type t() :: %{ socket: :gen_tcp.socket() | :ssl.socket(), @@ -218,7 +218,7 @@ defmodule Membrane.RTMP.MessageHandler do end defp do_handle_client_message(%Messages.DeleteStream{}, _header, state) do - {:halt, %{state | events: [:end_of_stream | state.events]}} + {:halt, %{state | events: [:delete_stream | state.events]}} end # Check bandwidth message From 0e47326f9145b9c3511227dd320e144505d165aa Mon Sep 17 00:00:00 2001 From: Daniel Morandini <danielmorandini@me.com> Date: Fri, 4 Oct 2024 13:52:24 +0200 Subject: [PATCH 3/4] Update lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Łukasz Kita <lukasz.kita0@gmail.com> --- lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index e2c5dfe..91c11ba 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -40,7 +40,7 @@ defmodule Membrane.RTMPServer.ClientHandler do @doc """ Callback invoked when the socket connection is terminated. In normal - conditions, delete_stream is called before this one. If delete_stream + conditions, `handle_delete_stream` is called before this one. If delete_stream is not called and connection_closed is, it might just mean that the connection was lost. From afddf6a40b36d704d0cd9ebdf34c7ef2b88f8255 Mon Sep 17 00:00:00 2001 From: Daniel Morandini <danielmorandini@me.com> Date: Fri, 4 Oct 2024 13:52:32 +0200 Subject: [PATCH 4/4] Update lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Łukasz Kita <lukasz.kita0@gmail.com> --- lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index 91c11ba..95780de 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -42,7 +42,7 @@ defmodule Membrane.RTMPServer.ClientHandler do Callback invoked when the socket connection is terminated. In normal conditions, `handle_delete_stream` is called before this one. If delete_stream is not called and connection_closed is, it might just mean that the - connection was lost. + connection was lost (for instance when TCP socket is closed unexpectedly). It is up to the users to determined how to handle it in their case. """