diff --git a/examples/source_with_standalone_server.exs b/examples/source_with_standalone_server.exs index 1f22a78e..3e1129a8 100644 --- a/examples/source_with_standalone_server.exs +++ b/examples/source_with_standalone_server.exs @@ -53,16 +53,14 @@ parent_process_pid = self() handle_new_client = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) + Membrane.RTMP.Source.ClientHandlerImpl end # Run the standalone server {:ok, server} = Membrane.RTMPServer.start_link( - handler: %Membrane.RTMP.Source.ClientHandlerImpl{controlling_process: self()}, port: port, - use_ssl?: false, - handle_new_client: handle_new_client, - client_timeout: 5_000 + handle_new_client: handle_new_client ) app = "app" diff --git a/lib/membrane_rtmp_plugin/rtmp/source/amf0/encoder.ex b/lib/membrane_rtmp_plugin/rtmp/amf0/encoder.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/amf0/encoder.ex rename to lib/membrane_rtmp_plugin/rtmp/amf0/encoder.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/amf0/parser.ex b/lib/membrane_rtmp_plugin/rtmp/amf0/parser.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/amf0/parser.ex rename to lib/membrane_rtmp_plugin/rtmp/amf0/parser.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/amf3/parser.ex b/lib/membrane_rtmp_plugin/rtmp/amf3/parser.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/amf3/parser.ex rename to lib/membrane_rtmp_plugin/rtmp/amf3/parser.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/handshake.ex b/lib/membrane_rtmp_plugin/rtmp/handshake.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/handshake.ex rename to lib/membrane_rtmp_plugin/rtmp/handshake.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/handshake/step.ex b/lib/membrane_rtmp_plugin/rtmp/handshake/step.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/handshake/step.ex rename to lib/membrane_rtmp_plugin/rtmp/handshake/step.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/header.ex b/lib/membrane_rtmp_plugin/rtmp/header.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/header.ex rename to lib/membrane_rtmp_plugin/rtmp/header.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message.ex b/lib/membrane_rtmp_plugin/rtmp/message.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/message.ex rename to lib/membrane_rtmp_plugin/rtmp/message.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex b/lib/membrane_rtmp_plugin/rtmp/message_handler.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex rename to lib/membrane_rtmp_plugin/rtmp/message_handler.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex rename to lib/membrane_rtmp_plugin/rtmp/message_parser.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/acknowledgement.ex b/lib/membrane_rtmp_plugin/rtmp/messages/acknowledgement.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/acknowledgement.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/acknowledgement.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/additional_media.ex b/lib/membrane_rtmp_plugin/rtmp/messages/additional_media.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/additional_media.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/additional_media.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/anonymous.ex b/lib/membrane_rtmp_plugin/rtmp/messages/anonymous.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/anonymous.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/anonymous.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/audio.ex b/lib/membrane_rtmp_plugin/rtmp/messages/audio.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/audio.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/audio.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/connect.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/connect.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/connect.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/connect.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/create_stream.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/create_stream.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/create_stream.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/create_stream.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/delete_stream.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/delete_stream.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/delete_stream.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/delete_stream.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/fc_publish.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/fc_publish.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/fc_publish.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/fc_publish.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/publish.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/publish.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/publish.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/publish.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/release_stream.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/release_stream.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/release_stream.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/release_stream.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/on_expect_additional_media.ex b/lib/membrane_rtmp_plugin/rtmp/messages/on_expect_additional_media.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/on_expect_additional_media.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/on_expect_additional_media.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/on_meta_data.ex b/lib/membrane_rtmp_plugin/rtmp/messages/on_meta_data.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/on_meta_data.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/on_meta_data.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/serializer.ex b/lib/membrane_rtmp_plugin/rtmp/messages/serializer.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/serializer.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/serializer.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/set_chunk_size.ex b/lib/membrane_rtmp_plugin/rtmp/messages/set_chunk_size.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/set_chunk_size.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/set_chunk_size.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/set_data_frame.ex b/lib/membrane_rtmp_plugin/rtmp/messages/set_data_frame.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/set_data_frame.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/set_data_frame.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/set_peer_bandwidth.ex b/lib/membrane_rtmp_plugin/rtmp/messages/set_peer_bandwidth.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/set_peer_bandwidth.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/set_peer_bandwidth.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/user_control.ex b/lib/membrane_rtmp_plugin/rtmp/messages/user_control.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/user_control.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/user_control.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/video.ex b/lib/membrane_rtmp_plugin/rtmp/messages/video.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/video.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/video.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/window_acknowledgement.ex b/lib/membrane_rtmp_plugin/rtmp/messages/window_acknowledgement.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/window_acknowledgement.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/window_acknowledgement.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/responses.ex b/lib/membrane_rtmp_plugin/rtmp/responses.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/responses.ex rename to lib/membrane_rtmp_plugin/rtmp/responses.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex b/lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex similarity index 74% rename from lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex rename to lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex index 6039e115..ad5164bd 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex @@ -6,29 +6,16 @@ defmodule Membrane.RTMP.Source.ClientHandlerImpl do @behaviour Membrane.RTMPServer.ClientHandler - defstruct [:controlling_process] + defstruct [] @impl true - def handle_init(opts) do + def handle_init(_opts) do %{ source_pid: nil, - buffered: [], - app: nil, - stream_key: nil, - controlling_process: opts.controlling_process + buffered: [] } end - @impl true - def handle_connected(connected_msg, state) do - %{state | app: connected_msg.app} - end - - @impl true - def handle_stream_published(publish_msg, state) do - %{state | stream_key: publish_msg.stream_key} - end - @impl true def handle_info({:send_me_data, source_pid}, state) do buffers_to_send = Enum.reverse(state.buffered) diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source.ex b/lib/membrane_rtmp_plugin/rtmp/source/source.ex index 41881fd8..c2651378 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex @@ -86,15 +86,15 @@ defmodule Membrane.RTMP.Source do handle_new_client = fn client_ref, app, stream_key -> send(parent_pid, {:client_ref, client_ref, app, stream_key}) + __MODULE__.ClientHandlerImpl end {:ok, server_pid} = Membrane.RTMPServer.start_link( - handler: %__MODULE__.ClientHandlerImpl{controlling_process: self()}, port: port, use_ssl?: use_ssl?, handle_new_client: handle_new_client, - client_timeout: 100 + client_timeout: Membrane.Time.milliseconds(100) ) state = %{state | app: app, stream_key: stream_key, server: server_pid} diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index 4e76dfe7..6e87dba1 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -5,11 +5,14 @@ defmodule Membrane.RTMPServer do If no data is demanded within the client_timeout period, TCP socket is closed. Options: - - client_timeout: Time (ms) after which an unused client connection is automatically closed. - - handle_new_client: An anonymous function called when a new client connects. + - handle_new_client: An anonymous function called when a new client connects. It receives the client reference, `app` and `stream_key`, allowing custom processing, - like sending the reference to another process. If it's not provided, default implementation is used: - {:client_ref, client_ref, app, stream_key} message is sent to the process that invoked RTMPServer.start_link(). + like sending the reference to another process. The function should return a `t:#{inspect(__MODULE__)}.client_behaviour_spec/0` + which defines how the client should behave. + - port: Port on which RTMP server will listen. Defaults to 1935. + - use_ssl?: If true, SSL socket (for RTMPS) will be used. Othwerwise, TCP socket (for RTMP) will be used. Defaults to false. + - client_timeout: Time after which an unused client connection is automatically closed. Defaults to 5 seconds. + - name: If not nil, value of this field will be used as a name under which the server's process will be registered. Defaults to nil. """ use GenServer @@ -21,17 +24,29 @@ defmodule Membrane.RTMPServer do Defines options for the RTMP server. """ @type t :: [ - handler: ClientHandler.t(), port: :inet.port_number(), use_ssl?: boolean(), name: atom() | nil, - handle_new_client: - (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> - any()) - | nil, + handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> + client_behaviour_spec()), client_timeout: Membrane.Time.t() ] + @default_options %{ + port: 1935, + use_ssl?: false, + name: nil, + client_timeout: Membrane.Time.seconds(5) + } + + @typedoc """ + A type representing how a client handler should behave. + If just a tuple is passed, the second element of that tuple is used as + an input argument of the `c:#{inspect(ClientHandler)}.handle_init/1`. Otherwise, an empty + map is passed to the `c:#{inspect(ClientHandler)}.handle_init/1`. + """ + @type client_behaviour_spec :: ClientHandler.t() | {ClientHandler.t(), opts :: any()} + @type server_identifier :: pid() | atom() @doc """ @@ -40,23 +55,10 @@ defmodule Membrane.RTMPServer do @spec start_link(server_options :: t()) :: GenServer.on_start() def start_link(server_options) do gen_server_opts = if server_options[:name] == nil, do: [], else: [name: server_options[:name]] + server_options_map = Enum.into(server_options, %{}) + server_options_map = Map.merge(@default_options, server_options_map) - server_options = Enum.into(server_options, %{}) - - server_options = - if server_options[:handle_new_client] == nil do - parent_process_pid = self() - - callback = fn client_ref, app, stream_key -> - send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) - end - - Map.put(server_options, :handle_new_client, callback) - else - server_options - end - - GenServer.start_link(__MODULE__, server_options, gen_server_opts) + GenServer.start_link(__MODULE__, server_options_map, gen_server_opts) end @doc """ diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index 5a08a66e..c7a28478 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -12,37 +12,26 @@ defmodule Membrane.RTMPServer.ClientHandler do require Logger alias Membrane.RTMP.{Handshake, MessageHandler, MessageParser} + @typedoc """ + A type representing a module which implements `#{inspect(__MODULE__)}` behaviour. + """ + @type t :: module() + @typedoc """ Type representing the user defined state of the client handler. """ - @type t :: term() + @type state :: any() @doc """ The callback invoked once the client handler is created. It should return the initial state of the client handler. """ - @callback handle_init(any()) :: t() - - @doc """ - The callback invoked when the client sends the `Membrane.RTMP.Messages.Connect.t()` - message. - """ - @callback handle_connected(connected_msg :: Membrane.RTMP.Messages.Connect.t(), state :: t()) :: - t() - - @doc """ - The callback invoked when the client sends the `Membrane.RTMP.Messages.Publish.t()` - message. - """ - @callback handle_stream_published( - publish_msg :: Membrane.RTMP.Messages.Publish.t(), - state :: t() - ) :: t() + @callback handle_init(any()) :: state() @doc """ The callback invoked when new piece of data is received from a given client. """ - @callback handle_data_available(payload :: binary(), state :: t()) :: t() + @callback handle_data_available(payload :: binary(), state :: state()) :: state() @doc """ The callback invoked when the client served by given client handler @@ -50,13 +39,13 @@ defmodule Membrane.RTMPServer.ClientHandler do (for instance, when the remote client deletes the stream or terminates the socket connection) """ - @callback handle_end_of_stream(state :: t()) :: t() + @callback handle_end_of_stream(state :: state()) :: state() @doc """ The callback invoked when the client handler receives a message that is not recognized as an internal message of the client handler. """ - @callback handle_info(msg :: term(), t()) :: t() + @callback handle_info(msg :: term(), state()) :: state() @doc """ Makes the client handler ask client for the desired number of buffers @@ -73,16 +62,14 @@ defmodule Membrane.RTMPServer.ClientHandler do message_parser_state = Handshake.init_server() |> MessageParser.init() message_handler_state = MessageHandler.init(%{socket: opts.socket, use_ssl?: opts.use_ssl?}) - %handler_module{} = opts.handler - {:ok, %{ socket: opts.socket, use_ssl?: opts.use_ssl?, message_parser_state: message_parser_state, message_handler_state: message_handler_state, - handler: handler_module, - handler_state: handler_module.handle_init(opts.handler), + handler: nil, + handler_state: nil, app: nil, stream_key: nil, server: opts.server, @@ -163,15 +150,28 @@ defmodule Membrane.RTMPServer.ClientHandler do %{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} = message_handler_state - if is_function(state.handle_new_client) do - state.handle_new_client.(self(), state.app, stream_key) - else + if not is_function(state.handle_new_client) do raise "handle_new_client is not a function" end - Process.send_after(self(), {:client_timeout, state.app, stream_key}, state.client_timeout) + {handler_module, opts} = + case state.handle_new_client.(self(), state.app, stream_key) do + {handler_module, opts} -> {handler_module, opts} + handler_module -> {handler_module, %{}} + end - %{state | notified_about_client?: true} + Process.send_after( + self(), + {:client_timeout, state.app, stream_key}, + Membrane.Time.as_milliseconds(state.client_timeout, :round) + ) + + %{ + state + | notified_about_client?: true, + handler: handler_module, + handler_state: handler_module.handle_init(opts) + } else state end @@ -210,19 +210,12 @@ defmodule Membrane.RTMPServer.ClientHandler do } {:connected, connected_msg} -> - new_handler_state = - state.handler.handle_connected(connected_msg, state.handler_state) - - %{state | handler_state: new_handler_state, app: connected_msg.app} + %{state | app: connected_msg.app} {:published, publish_msg} -> - new_handler_state = - state.handler.handle_stream_published(publish_msg, state.handler_state) - %{ state - | handler_state: new_handler_state, - stream_key: publish_msg.stream_key, + | stream_key: publish_msg.stream_key, published?: true } end diff --git a/lib/membrane_rtmp_plugin/rtmp_server/listener.ex b/lib/membrane_rtmp_plugin/rtmp_server/listener.ex index 6cf28654..a24312e1 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/listener.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/listener.ex @@ -51,7 +51,6 @@ defmodule Membrane.RTMPServer.Listener do GenServer.start_link(ClientHandler, socket: client, use_ssl?: options.use_ssl?, - handler: options.handler, server: options.server, handle_new_client: options.handle_new_client, client_timeout: options.client_timeout diff --git a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs index 9d5d4896..5729b234 100644 --- a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs @@ -222,17 +222,15 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do handle_new_client = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) + Membrane.RTMP.Source.ClientHandlerImpl end {:ok, server_pid} = Membrane.RTMPServer.start_link( - handler: %Membrane.RTMP.Source.ClientHandlerImpl{ - controlling_process: self() - }, port: port, use_ssl?: use_ssl?, handle_new_client: handle_new_client, - client_timeout: 3_000 + client_timeout: Membrane.Time.seconds(3) ) {:ok, assigned_port} = Membrane.RTMPServer.get_port(server_pid)