Skip to content

Commit

Permalink
Perform refactor. Move parsing functions to message_parser. Separate …
Browse files Browse the repository at this point in the history
…the message_handler's internal state. Add actions handler in rtmp server
  • Loading branch information
varsill committed Feb 13, 2024
1 parent 5c0b665 commit fe10680
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 77 deletions.
82 changes: 32 additions & 50 deletions lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ defmodule Membrane.RTMP.MessageHandler do
# Appropriate responses are sent to the messages received during the initialization phase
# The data received in video and audio is forwarded to the outputs

require Membrane.Logger

alias Membrane.{Buffer, Logger}
require Membrane.Logger, as: Logger

alias Membrane.RTMP.{
Handshake,
Header,
Message,
MessageParser,
Messages,
MessageValidator,
Responses
Expand All @@ -28,10 +25,25 @@ defmodule Membrane.RTMP.MessageHandler do
# just to not waste time on chunking
@server_chunk_size 4096

def init(opts) do
%{
socket: opts.socket,
socket_module: if(opts.use_ssl?, do: :ssl, else: :gen_tcp),
header_sent?: false,
actions: [],
validator: %Membrane.RTMP.MessageValidator.Default{},
receiver_pid: nil,
# how many times the Source tries to get control of the socket
socket_retries: 3,
# epoch required for performing a handshake with the pipeline
epoch: 0
}
end

@spec handle_client_messages(list(), map()) :: map()
def handle_client_messages([], state) do
request_packet(state.socket)
state
{%{state | actions: []}, state.actions}
end

def handle_client_messages(messages, state) do
Expand All @@ -42,12 +54,11 @@ defmodule Membrane.RTMP.MessageHandler do
|> case do
{:error, :stream_validation, state} ->
state.socket_module.shutdown(state.socket, :read_write)

state
{state, state.actions}

state ->
request_packet(state.socket)
%{state | actions: Enum.reverse(state.actions)}
{%{state | actions: []}, Enum.reverse(state.actions)}
end
end

Expand Down Expand Up @@ -76,8 +87,7 @@ defmodule Membrane.RTMP.MessageHandler do
end

defp do_handle_client_message(%Messages.SetChunkSize{chunk_size: chunk_size}, _header, state) do
parser = %{state.message_parser | chunk_size: chunk_size}
{:cont, %{state | message_parser: parser}}
{:cont, %{state | actions: [{:set_chunk_size, chunk_size} | state.actions]}}
end

@stream_begin_type 0
Expand All @@ -101,7 +111,9 @@ defmodule Membrane.RTMP.MessageHandler do
Responses.on_bw_done()
|> send_rtmp_payload(state.socket, chunk_stream_id: 3)

{:cont, validation_action(state, @validation_stage, result)}
state = validation_action(state, @validation_stage, result)
state = %{state | actions: [{:connected, connect.app} | state.actions]}
{:cont, state}

{:error, _reason} = error ->
{:halt, {:error, :stream_validation, validation_action(state, @validation_stage, error)}}
Expand Down Expand Up @@ -135,7 +147,10 @@ defmodule Membrane.RTMP.MessageHandler do
Responses.publish_success(publish.stream_key)
|> send_rtmp_payload(state.socket, chunk_stream_id: 3, stream_id: header.stream_id)

{:cont, validation_action(state, @validation_stage, result)}
state = validation_action(state, @validation_stage, result)

state = %{state | actions: [{:published, publish.stream_key} | state.actions]}
{:cont, state}

{:error, _reason} = error ->
{:halt, {:error, :stream_validation, validation_action(state, @validation_stage, error)}}
Expand Down Expand Up @@ -214,7 +229,7 @@ defmodule Membrane.RTMP.MessageHandler do
end

defp do_handle_client_message(%Messages.DeleteStream{}, _header, state) do
{:halt, %{state | actions: [{:end_of_stream, :output} | state.actions]}}
{:halt, %{state | actions: [:end_of_stream | state.actions]}}
end

# Check bandwidth message
Expand Down Expand Up @@ -245,7 +260,7 @@ defmodule Membrane.RTMP.MessageHandler do
defp get_media_actions(rtmp_header, data, %{header_sent?: true} = state) do
payload = get_flv_tag(rtmp_header, data)

Map.update!(state, :actions, &[{:buffer, {:output, %Buffer{payload: payload}}} | &1])
Map.update!(state, :actions, &[{:output, payload} | &1])
end

defp get_media_actions(rtmp_header, data, state) do
Expand All @@ -254,7 +269,7 @@ defmodule Membrane.RTMP.MessageHandler do
%{
state
| header_sent?: true,
actions: [{:buffer, {:output, %Buffer{payload: payload}}} | state.actions]
actions: [{:output, payload} | state.actions]
}
end

Expand Down Expand Up @@ -307,46 +322,13 @@ defmodule Membrane.RTMP.MessageHandler do
defp validation_action(state, stage, result) do
notification =
case result do
{:ok, msg} -> {:notify_parent, {:stream_validation_success, stage, msg}}
{:error, reason} -> {:notify_parent, {:stream_validation_error, stage, reason}}
{:ok, msg} -> {:stream_validation_success, stage, msg}
{:error, reason} -> {:stream_validation_error, stage, reason}
end

Map.update!(state, :actions, &[notification | &1])
end

# The RTMP connection is based on TCP therefore we are operating on a continuous stream of bytes.
# In such case packets received on TCP sockets may contain a partial RTMP packet or several full packets.
#
# `MessageParser` is already able to request more data if packet is incomplete but it is not aware
# if its current buffer contains more than one message, therefore we need to call the `&MessageParser.handle_packet/2`
# as long as we decide to receive more messages (before starting to relay media packets).
#
# Once we hit `:need_more_data` the function returns the list of parsed messages and the message_parser then is ready
# to receive more data to continue with emitting new messages.
@spec parse_packet_messages(packet :: binary(), message_parser :: struct(), [{any(), any()}]) ::
{[Message.t()], message_parser :: struct()}
def parse_packet_messages(packet, message_parser, messages \\ [])

def parse_packet_messages(<<>>, %{buffer: <<>>} = message_parser, messages) do
{Enum.reverse(messages), message_parser}
end

def parse_packet_messages(packet, message_parser, messages) do
case MessageParser.handle_packet(packet, message_parser) do
{header, message, message_parser} ->
parse_packet_messages(<<>>, message_parser, [{header, message} | messages])

{:need_more_data, message_parser} ->
{Enum.reverse(messages), message_parser}

{:handshake_done, message_parser} ->
parse_packet_messages(<<>>, message_parser, messages)

{%Handshake.Step{} = step, message_parser} ->
parse_packet_messages(<<>>, message_parser, [{nil, step} | messages])
end
end

@compile {:inline, socket_module: 1}
defp socket_module({:sslsocket, _1, _2}), do: :ssl
defp socket_module(_other), do: :gen_tcp
Expand Down
33 changes: 33 additions & 0 deletions lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,39 @@ defmodule Membrane.RTMP.MessageParser do
}
end

# The RTMP connection is based on TCP therefore we are operating on a continuous stream of bytes.
# In such case packets received on TCP sockets may contain a partial RTMP packet or several full packets.
#
# `MessageParser` is already able to request more data if packet is incomplete but it is not aware
# if its current buffer contains more than one message, therefore we need to call the `&MessageParser.handle_packet/2`
# as long as we decide to receive more messages (before starting to relay media packets).
#
# Once we hit `:need_more_data` the function returns the list of parsed messages and the message_parser then is ready
# to receive more data to continue with emitting new messages.
@spec parse_packet_messages(packet :: binary(), message_parser :: struct(), [{any(), any()}]) ::
{[Message.t()], message_parser :: struct()}
def parse_packet_messages(packet, message_parser, messages \\ [])

def parse_packet_messages(<<>>, %{buffer: <<>>} = message_parser, messages) do
{Enum.reverse(messages), message_parser}
end

def parse_packet_messages(packet, message_parser, messages) do
case handle_packet(packet, message_parser) do
{header, message, message_parser} ->
parse_packet_messages(<<>>, message_parser, [{header, message} | messages])

{:need_more_data, message_parser} ->
{Enum.reverse(messages), message_parser}

{:handshake_done, message_parser} ->
parse_packet_messages(<<>>, message_parser, messages)

{%Handshake.Step{} = step, message_parser} ->
parse_packet_messages(<<>>, message_parser, [{nil, step} | messages])
end
end

@doc """
Generates a list of the following transaction tx_ids.
Expand Down
89 changes: 62 additions & 27 deletions lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule RTMP do
defmodule ClientHandler do
use GenServer

require Logger
alias Membrane.RTMP.{Handshake, MessageHandler, MessageParser}

def init(opts) do
Expand All @@ -10,50 +11,84 @@ defmodule RTMP do
{:ok,
%{
socket: opts.socket,
socket_module: if(opts.use_ssl?, do: :ssl, else: :gen_tcp),
header_sent?: false,
message_parser: MessageParser.init(Handshake.init_server()),
actions: [],
validator: %Membrane.RTMP.MessageValidator.Default{},
receiver_pid: nil,
socket_ready?: true,
# how many times the Source tries to get control of the socket
socket_retries: 3,
# epoch required for performing a handshake with the pipeline
epoch: 0
use_ssl?: opts.use_ssl?,
message_parser_state: Handshake.init_server() |> MessageParser.init(),
message_handler_state: MessageHandler.init(opts)
}}
end

def handle_info({:tcp, socket, data}, %{socket_module: :gen_tcp} = state)
def handle_info({:tcp, socket, data}, %{use_ssl?: false} = state)
when state.socket == socket do
{messages, message_parser} =
MessageHandler.parse_packet_messages(data, state.message_parser)

state = MessageHandler.handle_client_messages(messages, state)
{:noreply, %{state | actions: [], message_parser: message_parser}}
handle_data(data, state)
end

def handle_info({:ssl, socket, data}, %{socket_module: :ssl} = state)
def handle_info({:ssl, socket, data}, %{use_ssl?: true} = state)
when state.socket == socket do
{messages, message_parser} =
MessageHandler.parse_packet_messages(data, state.message_parser)

state = MessageHandler.handle_client_messages(messages, state)
{:noreply, %{state | actions: [], message_parser: message_parser}}
handle_data(data, state)
end

def handle_info(:control_granted, state) do
case state.socket_module do
:gen_tcp -> :inet.setopts(state.socket, active: :once)
:ssl -> :ssl.setopts(state.socket, active: :once)
case state.use_ssl? do
false -> :inet.setopts(state.socket, active: :once)
true -> :ssl.setopts(state.socket, active: :once)
end

{:noreply, state}
end

def handle_info(_msg, state) do
def handle_info(msg, state) do
Logger.warning("Unknown message received: #{inspect(msg)}")
{:noreply, state}
end

defp handle_data(data, state) do
{messages, message_parser_state} =
MessageParser.parse_packet_messages(data, state.message_parser_state)

{message_handler_state, actions} =
MessageHandler.handle_client_messages(messages, state.message_handler_state)

state = handle_actions(actions, state)

{:noreply,
%{
state
| message_parser_state: message_parser_state,
message_handler_state: message_handler_state
}}
end

defp handle_actions([], state) do
state
end

defp handle_actions([action | rest], state) do
# call callbacks
case action do
:end_of_stream ->
nil

{:set_chunk_size, _size} ->
nil

{:output, payload} ->
nil

{:connected, app} ->
nil

{:published, stream_key} ->
nil

{:stream_validation_success, stage, msg} ->
nil

{:stream_validation_error, stage, reason} ->
nil
end

handle_actions(rest, state)
end
end

@moduledoc """
Expand Down

0 comments on commit fe10680

Please sign in to comment.