From f150283ee2f05bb47553b9a57e3914e6bfb0ca91 Mon Sep 17 00:00:00 2001 From: ty Date: Tue, 29 Oct 2024 15:23:51 -0400 Subject: [PATCH] Send RTMP messages to client handler Add optional callback `handle_rtmp_message/2` to ClientHandlerBehaviour --- .../rtmp_server/client_handler.ex | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index 95780de..1f69502 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -48,12 +48,19 @@ defmodule Membrane.RTMPServer.ClientHandler do """ @callback handle_connection_closed(state :: state()) :: state() + @doc """ + The optional callback invoked when the client handler receives a RTMP message. + """ + @callback handle_rtmp_message(msg :: term(), state()) :: state() + @doc """ The callback invoked when the client handler receives a message that is not recognized as an internal message of the client handler. """ @callback handle_info(msg :: term(), state()) :: state() + @optional_callbacks handle_rtmp_message: 2 + @doc """ Makes the client handler ask client for the desired number of buffers """ @@ -185,6 +192,25 @@ defmodule Membrane.RTMPServer.ClientHandler do state = Enum.reduce(events, state, &handle_event/2) + state = + if state.handler + && state.handler_state + && Kernel.function_exported?(state.handler, :handle_rtmp_message, 2) do + + new_handler_state = + Enum.reduce(messages, state.handler_state, fn + ({%Membrane.RTMP.Header{}, message}, handler_state) when is_map(message) -> + state.handler.handle_rtmp_message(message, handler_state) + (message, handler_state) when is_map(message) -> + state.handler.handle_rtmp_message(message, handler_state) + (_, handler_state) -> handler_state + end) + + %{state | handler_state: new_handler_state} + else + state + end + request_data(state) {:noreply,