From fe106803233c6a6bf61e2fb0303b3f5a7e0fa2c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Tue, 13 Feb 2024 16:21:01 +0100 Subject: [PATCH] Perform refactor. Move parsing functions to message_parser. Separate the message_handler's internal state. Add actions handler in rtmp server --- .../rtmp/source/message_handler.ex | 82 +++++++---------- .../rtmp/source/message_parser.ex | 33 +++++++ lib/membrane_rtmp_plugin/rtmp_server.ex | 89 +++++++++++++------ 3 files changed, 127 insertions(+), 77 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex b/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex index e6a029ef..3e25447e 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex @@ -5,15 +5,12 @@ defmodule Membrane.RTMP.MessageHandler do # Appropriate responses are sent to the messages received during the initialization phase # The data received in video and audio is forwarded to the outputs - require Membrane.Logger - - alias Membrane.{Buffer, Logger} + require Membrane.Logger, as: Logger alias Membrane.RTMP.{ Handshake, Header, Message, - MessageParser, Messages, MessageValidator, Responses @@ -28,10 +25,25 @@ defmodule Membrane.RTMP.MessageHandler do # just to not waste time on chunking @server_chunk_size 4096 + def init(opts) do + %{ + socket: opts.socket, + socket_module: if(opts.use_ssl?, do: :ssl, else: :gen_tcp), + header_sent?: false, + actions: [], + validator: %Membrane.RTMP.MessageValidator.Default{}, + receiver_pid: nil, + # how many times the Source tries to get control of the socket + socket_retries: 3, + # epoch required for performing a handshake with the pipeline + epoch: 0 + } + end + @spec handle_client_messages(list(), map()) :: map() def handle_client_messages([], state) do request_packet(state.socket) - state + {%{state | actions: []}, state.actions} end def handle_client_messages(messages, state) do @@ -42,12 +54,11 @@ defmodule Membrane.RTMP.MessageHandler do |> case do {:error, :stream_validation, state} -> state.socket_module.shutdown(state.socket, :read_write) - - state + {state, state.actions} state -> request_packet(state.socket) - %{state | actions: Enum.reverse(state.actions)} + {%{state | actions: []}, Enum.reverse(state.actions)} end end @@ -76,8 +87,7 @@ defmodule Membrane.RTMP.MessageHandler do end defp do_handle_client_message(%Messages.SetChunkSize{chunk_size: chunk_size}, _header, state) do - parser = %{state.message_parser | chunk_size: chunk_size} - {:cont, %{state | message_parser: parser}} + {:cont, %{state | actions: [{:set_chunk_size, chunk_size} | state.actions]}} end @stream_begin_type 0 @@ -101,7 +111,9 @@ defmodule Membrane.RTMP.MessageHandler do Responses.on_bw_done() |> send_rtmp_payload(state.socket, chunk_stream_id: 3) - {:cont, validation_action(state, @validation_stage, result)} + state = validation_action(state, @validation_stage, result) + state = %{state | actions: [{:connected, connect.app} | state.actions]} + {:cont, state} {:error, _reason} = error -> {:halt, {:error, :stream_validation, validation_action(state, @validation_stage, error)}} @@ -135,7 +147,10 @@ defmodule Membrane.RTMP.MessageHandler do Responses.publish_success(publish.stream_key) |> send_rtmp_payload(state.socket, chunk_stream_id: 3, stream_id: header.stream_id) - {:cont, validation_action(state, @validation_stage, result)} + state = validation_action(state, @validation_stage, result) + + state = %{state | actions: [{:published, publish.stream_key} | state.actions]} + {:cont, state} {:error, _reason} = error -> {:halt, {:error, :stream_validation, validation_action(state, @validation_stage, error)}} @@ -214,7 +229,7 @@ defmodule Membrane.RTMP.MessageHandler do end defp do_handle_client_message(%Messages.DeleteStream{}, _header, state) do - {:halt, %{state | actions: [{:end_of_stream, :output} | state.actions]}} + {:halt, %{state | actions: [:end_of_stream | state.actions]}} end # Check bandwidth message @@ -245,7 +260,7 @@ defmodule Membrane.RTMP.MessageHandler do defp get_media_actions(rtmp_header, data, %{header_sent?: true} = state) do payload = get_flv_tag(rtmp_header, data) - Map.update!(state, :actions, &[{:buffer, {:output, %Buffer{payload: payload}}} | &1]) + Map.update!(state, :actions, &[{:output, payload} | &1]) end defp get_media_actions(rtmp_header, data, state) do @@ -254,7 +269,7 @@ defmodule Membrane.RTMP.MessageHandler do %{ state | header_sent?: true, - actions: [{:buffer, {:output, %Buffer{payload: payload}}} | state.actions] + actions: [{:output, payload} | state.actions] } end @@ -307,46 +322,13 @@ defmodule Membrane.RTMP.MessageHandler do defp validation_action(state, stage, result) do notification = case result do - {:ok, msg} -> {:notify_parent, {:stream_validation_success, stage, msg}} - {:error, reason} -> {:notify_parent, {:stream_validation_error, stage, reason}} + {:ok, msg} -> {:stream_validation_success, stage, msg} + {:error, reason} -> {:stream_validation_error, stage, reason} end Map.update!(state, :actions, &[notification | &1]) end - # The RTMP connection is based on TCP therefore we are operating on a continuous stream of bytes. - # In such case packets received on TCP sockets may contain a partial RTMP packet or several full packets. - # - # `MessageParser` is already able to request more data if packet is incomplete but it is not aware - # if its current buffer contains more than one message, therefore we need to call the `&MessageParser.handle_packet/2` - # as long as we decide to receive more messages (before starting to relay media packets). - # - # Once we hit `:need_more_data` the function returns the list of parsed messages and the message_parser then is ready - # to receive more data to continue with emitting new messages. - @spec parse_packet_messages(packet :: binary(), message_parser :: struct(), [{any(), any()}]) :: - {[Message.t()], message_parser :: struct()} - def parse_packet_messages(packet, message_parser, messages \\ []) - - def parse_packet_messages(<<>>, %{buffer: <<>>} = message_parser, messages) do - {Enum.reverse(messages), message_parser} - end - - def parse_packet_messages(packet, message_parser, messages) do - case MessageParser.handle_packet(packet, message_parser) do - {header, message, message_parser} -> - parse_packet_messages(<<>>, message_parser, [{header, message} | messages]) - - {:need_more_data, message_parser} -> - {Enum.reverse(messages), message_parser} - - {:handshake_done, message_parser} -> - parse_packet_messages(<<>>, message_parser, messages) - - {%Handshake.Step{} = step, message_parser} -> - parse_packet_messages(<<>>, message_parser, [{nil, step} | messages]) - end - end - @compile {:inline, socket_module: 1} defp socket_module({:sslsocket, _1, _2}), do: :ssl defp socket_module(_other), do: :gen_tcp diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex index 646ade10..55d83cd7 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex @@ -43,6 +43,39 @@ defmodule Membrane.RTMP.MessageParser do } end + # The RTMP connection is based on TCP therefore we are operating on a continuous stream of bytes. + # In such case packets received on TCP sockets may contain a partial RTMP packet or several full packets. + # + # `MessageParser` is already able to request more data if packet is incomplete but it is not aware + # if its current buffer contains more than one message, therefore we need to call the `&MessageParser.handle_packet/2` + # as long as we decide to receive more messages (before starting to relay media packets). + # + # Once we hit `:need_more_data` the function returns the list of parsed messages and the message_parser then is ready + # to receive more data to continue with emitting new messages. + @spec parse_packet_messages(packet :: binary(), message_parser :: struct(), [{any(), any()}]) :: + {[Message.t()], message_parser :: struct()} + def parse_packet_messages(packet, message_parser, messages \\ []) + + def parse_packet_messages(<<>>, %{buffer: <<>>} = message_parser, messages) do + {Enum.reverse(messages), message_parser} + end + + def parse_packet_messages(packet, message_parser, messages) do + case handle_packet(packet, message_parser) do + {header, message, message_parser} -> + parse_packet_messages(<<>>, message_parser, [{header, message} | messages]) + + {:need_more_data, message_parser} -> + {Enum.reverse(messages), message_parser} + + {:handshake_done, message_parser} -> + parse_packet_messages(<<>>, message_parser, messages) + + {%Handshake.Step{} = step, message_parser} -> + parse_packet_messages(<<>>, message_parser, [{nil, step} | messages]) + end + end + @doc """ Generates a list of the following transaction tx_ids. diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index 8f7823c7..55b78b42 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -2,6 +2,7 @@ defmodule RTMP do defmodule ClientHandler do use GenServer + require Logger alias Membrane.RTMP.{Handshake, MessageHandler, MessageParser} def init(opts) do @@ -10,50 +11,84 @@ defmodule RTMP do {:ok, %{ socket: opts.socket, - socket_module: if(opts.use_ssl?, do: :ssl, else: :gen_tcp), - header_sent?: false, - message_parser: MessageParser.init(Handshake.init_server()), - actions: [], - validator: %Membrane.RTMP.MessageValidator.Default{}, - receiver_pid: nil, - socket_ready?: true, - # how many times the Source tries to get control of the socket - socket_retries: 3, - # epoch required for performing a handshake with the pipeline - epoch: 0 + use_ssl?: opts.use_ssl?, + message_parser_state: Handshake.init_server() |> MessageParser.init(), + message_handler_state: MessageHandler.init(opts) }} end - def handle_info({:tcp, socket, data}, %{socket_module: :gen_tcp} = state) + def handle_info({:tcp, socket, data}, %{use_ssl?: false} = state) when state.socket == socket do - {messages, message_parser} = - MessageHandler.parse_packet_messages(data, state.message_parser) - - state = MessageHandler.handle_client_messages(messages, state) - {:noreply, %{state | actions: [], message_parser: message_parser}} + handle_data(data, state) end - def handle_info({:ssl, socket, data}, %{socket_module: :ssl} = state) + def handle_info({:ssl, socket, data}, %{use_ssl?: true} = state) when state.socket == socket do - {messages, message_parser} = - MessageHandler.parse_packet_messages(data, state.message_parser) - - state = MessageHandler.handle_client_messages(messages, state) - {:noreply, %{state | actions: [], message_parser: message_parser}} + handle_data(data, state) end def handle_info(:control_granted, state) do - case state.socket_module do - :gen_tcp -> :inet.setopts(state.socket, active: :once) - :ssl -> :ssl.setopts(state.socket, active: :once) + case state.use_ssl? do + false -> :inet.setopts(state.socket, active: :once) + true -> :ssl.setopts(state.socket, active: :once) end {:noreply, state} end - def handle_info(_msg, state) do + def handle_info(msg, state) do + Logger.warning("Unknown message received: #{inspect(msg)}") {:noreply, state} end + + defp handle_data(data, state) do + {messages, message_parser_state} = + MessageParser.parse_packet_messages(data, state.message_parser_state) + + {message_handler_state, actions} = + MessageHandler.handle_client_messages(messages, state.message_handler_state) + + state = handle_actions(actions, state) + + {:noreply, + %{ + state + | message_parser_state: message_parser_state, + message_handler_state: message_handler_state + }} + end + + defp handle_actions([], state) do + state + end + + defp handle_actions([action | rest], state) do + # call callbacks + case action do + :end_of_stream -> + nil + + {:set_chunk_size, _size} -> + nil + + {:output, payload} -> + nil + + {:connected, app} -> + nil + + {:published, stream_key} -> + nil + + {:stream_validation_success, stage, msg} -> + nil + + {:stream_validation_error, stage, reason} -> + nil + end + + handle_actions(rest, state) + end end @moduledoc """