Skip to content

Commit

Permalink
Add tcp depayloader (#161)
Browse files Browse the repository at this point in the history
* Add skeleton and basic functionality

* Allow detecting split RTSP packets

* Add handling rtsp messages

---------

Co-authored-by: Łukasz Kita <[email protected]>
Co-authored-by: Jakub Pryc <[email protected]>
  • Loading branch information
3 people authored Jan 31, 2024
1 parent 8da1299 commit a70eee2
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 27 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_rtp_plugin` to your list of dep
```elixir
def deps do
[
{:membrane_rtp_plugin, "~> 0.24.1"},
{:membrane_rtp_plugin, "~> 0.25.0"},
{:ex_libsrtp, ">= 0.0.0"} # required only if SRTP/SRTCP support is needed
]
end
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/rtp/jitter_buffer/buffer_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ defmodule Membrane.RTP.JitterBuffer.BufferStore do
:next -> {:next, seq_num + (roc + 1) * @seq_number_limit}
end

if is_fresh_packet?(flush_index, index) do
if fresh_packet?(flush_index, index) do
record = Record.new(buffer, index)
{:ok, add_record(store, record, rollover)}
else
Expand Down Expand Up @@ -176,7 +176,7 @@ defmodule Membrane.RTP.JitterBuffer.BufferStore do
end
end

defp is_fresh_packet?(flush_index, index), do: index > flush_index
defp fresh_packet?(flush_index, index), do: index > flush_index

@spec flush_while(t, (t, Record.t() -> boolean), [Record.t() | nil]) ::
{[Record.t() | nil], t}
Expand Down
6 changes: 3 additions & 3 deletions lib/membrane/rtp/packet_payload_type.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ defmodule Membrane.RTP.Packet.PayloadType do
@doc """
Checks if numerical payload type should be assigned to format type dynamically.
"""
@spec is_dynamic(payload_type :: RTP.payload_type_t()) :: boolean()
def is_dynamic(payload_type) when payload_type in 96..127, do: true
def is_dynamic(_payload_type), do: false
@spec dynamic?(payload_type :: RTP.payload_type_t()) :: boolean()
def dynamic?(payload_type) when payload_type in 96..127, do: true
def dynamic?(_payload_type), do: false
end
122 changes: 122 additions & 0 deletions lib/membrane/rtp/tcp_depayloader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
defmodule Membrane.RTP.TCP.Depayloader do
@moduledoc """
This element provides functionality of depayloading RTP Packets received by TCP and redirecting
RTSP messages received in the same stream. The encapsulation is described in RFC 7826 Section 14.
Encapsulated packets interleaved in the stream will have the following structure:
["$" = 36 :: 1 byte][Channel id :: 1 byte][Length :: 2 bytes][packet :: <Length> bytes]
RTSP Messages
"""
use Membrane.Filter

alias Membrane.{Buffer, RemoteStream, RTP, RTSP}

def_options rtp_channel_id: [
spec: non_neg_integer(),
default: 0,
description: """
Channel identifier which encapsulated RTP packets will have.
"""
],
rtsp_session: [
spec: pid() | nil,
default: nil,
description: """
PID of a RTSP Session (returned from Membrane.RTSP.start or Membrane.RTSP.start_link)
that received RTSP responses will be forwarded to. If nil the responses will be
discarded.
"""
]

def_input_pad :input, accepted_format: %RemoteStream{type: :bytestream}

def_output_pad :output, accepted_format: %RemoteStream{type: :packetized, content_format: RTP}

@impl true
def handle_init(_ctx, opts) do
state =
Map.from_struct(opts)
|> Map.merge(%{
unprocessed_data: <<>>
})

{[], state}
end

@impl true
def handle_playing(_ctx, state) do
stream_format = %RemoteStream{type: :packetized, content_format: RTP}
{[stream_format: {:output, stream_format}], state}
end

@impl true
def handle_stream_format(:input, _stream_format, _ctx, state) do
{[], state}
end

@impl true
def handle_buffer(:input, %Buffer{payload: payload, metadata: metadata}, _ctx, state) do
unprocessed_data =
if rtsp_response?(state.unprocessed_data, payload) do
if state.rtsp_session != nil do
{:ok, %RTSP.Response{status: 200}} =
RTSP.handle_response(state.rtsp_session, state.unprocessed_data)
end

<<>>
else
state.unprocessed_data
end

packets_binary = unprocessed_data <> payload

{unprocessed_data, complete_packets_binaries} =
get_complete_packets(packets_binary, state.rtp_channel_id)

packets_buffers =
Enum.map(complete_packets_binaries, &%Buffer{payload: &1, metadata: metadata})

{[buffer: {:output, packets_buffers}], %{state | unprocessed_data: unprocessed_data}}
end

@spec rtsp_response?(binary(), binary()) :: boolean()
defp rtsp_response?(maybe_rtsp_response, new_payload) do
String.starts_with?(new_payload, "$") and String.starts_with?(maybe_rtsp_response, "RTSP")
end

@spec get_complete_packets(binary(), non_neg_integer()) ::
{unprocessed_data :: binary(), complete_packets :: [binary()]}
defp get_complete_packets(packets_binary, channel_id, complete_packets \\ [])

defp get_complete_packets(packets_binary, _channel_id, complete_packets)
when byte_size(packets_binary) <= 4 do
{packets_binary, Enum.reverse(complete_packets)}
end

defp get_complete_packets(
<<"$", _rest::binary>> = packets_binary,
channel_id,
complete_packets
) do
<<"$", received_channel_id, payload_length::size(16), rest::binary>> = packets_binary

if payload_length > byte_size(rest) do
{packets_binary, Enum.reverse(complete_packets)}
else
<<complete_packet_binary::binary-size(payload_length)-unit(8), rest::binary>> = rest

complete_packets =
if channel_id != received_channel_id,
do: complete_packets,
else: [complete_packet_binary | complete_packets]

get_complete_packets(rest, channel_id, complete_packets)
end
end

defp get_complete_packets(rtsp_message, _channel_id, _complete_packets_binaries) do
# If the payload doesn't start with a "$" then it must be a RTSP message (or a part of it)
{rtsp_message, []}
end
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.RTP.Plugin.MixProject do
use Mix.Project

@version "0.24.1"
@version "0.25.0"
@github_url "https://github.com/membraneframework/membrane_rtp_plugin"

def project do
Expand Down Expand Up @@ -40,6 +40,7 @@ defmodule Membrane.RTP.Plugin.MixProject do
{:membrane_rtp_format, "~> 0.8.0"},
{:membrane_funnel_plugin, "~> 0.9.0"},
{:membrane_telemetry_metrics, "~> 0.1.0"},
{:membrane_rtsp, "~> 0.5.3"},
{:ex_libsrtp, "~> 0.6.0 or ~> 0.7.0", optional: true},
{:qex, "~> 0.5.1"},
{:bunch, "~> 1.5"},
Expand Down
Loading

0 comments on commit a70eee2

Please sign in to comment.