From c69a7977fc6e8414361b26fa03b3b2f11a6ed10c Mon Sep 17 00:00:00 2001 From: ty Date: Tue, 29 Oct 2024 15:23:51 -0400 Subject: [PATCH] Send RTMP messages to client handler Add optional callback `handle_rtmp_message/2` to ClientHandlerBehaviour --- .../rtmp_server/client_handler.ex | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index 95780de..9dd6f9b 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -48,12 +48,19 @@ defmodule Membrane.RTMPServer.ClientHandler do """ @callback handle_connection_closed(state :: state()) :: state() + @doc """ + The optional callback invoked when the client handler receives a RTMP message. + """ + @callback handle_rtmp_message(msg :: term(), state()) :: state() + @doc """ The callback invoked when the client handler receives a message that is not recognized as an internal message of the client handler. """ @callback handle_info(msg :: term(), state()) :: state() + @optional_callbacks handle_rtmp_message: 2 + @doc """ Makes the client handler ask client for the desired number of buffers """ @@ -185,6 +192,26 @@ defmodule Membrane.RTMPServer.ClientHandler do state = Enum.reduce(events, state, &handle_event/2) + if state.handler && + state.handler_state && + Kernel.function_exported?(state.handler, :handle_rtmp_message, 2) do + new_handler_state = + Enum.reduce(messages, state.handler_state, fn + {%Membrane.RTMP.Header{}, message}, handler_state when is_map(message) -> + state.handler.handle_rtmp_message(message, handler_state) + + message, handler_state when is_map(message) -> + state.handler.handle_rtmp_message(message, handler_state) + + _message, handler_state -> + handler_state + end) + + %{state | handler_state: new_handler_state} + else + state + end + request_data(state) {:noreply,