diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index 95780de..9dd6f9b 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -48,12 +48,19 @@ defmodule Membrane.RTMPServer.ClientHandler do """ @callback handle_connection_closed(state :: state()) :: state() + @doc """ + The optional callback invoked when the client handler receives a RTMP message. + """ + @callback handle_rtmp_message(msg :: term(), state()) :: state() + @doc """ The callback invoked when the client handler receives a message that is not recognized as an internal message of the client handler. """ @callback handle_info(msg :: term(), state()) :: state() + @optional_callbacks handle_rtmp_message: 2 + @doc """ Makes the client handler ask client for the desired number of buffers """ @@ -185,6 +192,26 @@ defmodule Membrane.RTMPServer.ClientHandler do state = Enum.reduce(events, state, &handle_event/2) + if state.handler && + state.handler_state && + Kernel.function_exported?(state.handler, :handle_rtmp_message, 2) do + new_handler_state = + Enum.reduce(messages, state.handler_state, fn + {%Membrane.RTMP.Header{}, message}, handler_state when is_map(message) -> + state.handler.handle_rtmp_message(message, handler_state) + + message, handler_state when is_map(message) -> + state.handler.handle_rtmp_message(message, handler_state) + + _message, handler_state -> + handler_state + end) + + %{state | handler_state: new_handler_state} + else + state + end + request_data(state) {:noreply,