Skip to content

Commit

Permalink
Merge pull request #95 from membraneframework/source_bin_dynamic_pads
Browse files Browse the repository at this point in the history
SourceBin dynamic pads
  • Loading branch information
bartkrak authored Aug 22, 2024
2 parents ba8e03a + dd4bf67 commit 3258ba2
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 26 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The package can be installed by adding `membrane_rtmp_plugin` to your list of de
```elixir
def deps do
[
{:membrane_rtmp_plugin, "~> 0.24.0"}
{:membrane_rtmp_plugin, "~> 0.25.0"}
]
end
```
Expand Down
110 changes: 89 additions & 21 deletions lib/membrane_rtmp_plugin/rtmp/source/source_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ defmodule Membrane.RTMP.SourceBin do

def_output_pad :video,
accepted_format: H264,
availability: :always
availability: :on_request

def_output_pad :audio,
accepted_format: AAC,
availability: :always
availability: :on_request

def_options client_ref: [
default: nil,
Expand All @@ -42,32 +42,55 @@ defmodule Membrane.RTMP.SourceBin do

@impl true
def handle_init(_ctx, %__MODULE__{} = opts) do
structure = [
spec =
child(:src, %RTMP.Source{
client_ref: opts.client_ref,
url: opts.url
})
|> child(:demuxer, Membrane.FLV.Demuxer),
child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :none
}),
child(:video_parser, Membrane.H264.Parser),
#
get_child(:demuxer)
|> via_out(Pad.ref(:audio, 0))
|> get_child(:audio_parser)
|> bin_output(:audio),
#
get_child(:demuxer)
|> via_out(Pad.ref(:video, 0))
|> get_child(:video_parser)
|> bin_output(:video)
]

{[spec: structure], %{}}
|> child(:demuxer, Membrane.FLV.Demuxer)

state = %{
demuxer_audio_pad_ref: nil,
demuxer_video_pad_ref: nil
}

{[spec: spec], state}
end

@impl true
def handle_pad_added(Pad.ref(:audio, _ref) = pad, ctx, state) do
assert_pad_count!(:audio, ctx)

spec =
child(:funnel_audio, Membrane.Funnel, get_if_exists: true)
|> bin_output(pad)

{actions, state} = maybe_link_audio_pad(state)

{[spec: spec] ++ actions, state}
end

def handle_pad_added(Pad.ref(:video, _ref) = pad, ctx, state) do
assert_pad_count!(:video, ctx)

spec =
child(:funnel_video, Membrane.Funnel, get_if_exists: true)
|> bin_output(pad)

{actions, state} = maybe_link_video_pad(state)

{[spec: spec] ++ actions, state}
end

@impl true
def handle_child_notification({:new_stream, pad_ref, :AAC}, :demuxer, _ctx, state) do
maybe_link_audio_pad(%{state | demuxer_audio_pad_ref: pad_ref})
end

def handle_child_notification({:new_stream, pad_ref, :H264}, :demuxer, _ctx, state) do
maybe_link_video_pad(%{state | demuxer_video_pad_ref: pad_ref})
end

def handle_child_notification(
{type, _socket, _pid} = notification,
:src,
Expand Down Expand Up @@ -111,4 +134,49 @@ defmodule Membrane.RTMP.SourceBin do
def secure_pass_control(socket, source) do
:ssl.controlling_process(socket, source)
end

defp maybe_link_audio_pad(state) when state.demuxer_audio_pad_ref != nil do
{[
spec:
get_child(:demuxer)
|> via_out(state.demuxer_audio_pad_ref)
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :none
})
|> child(:funnel_audio, Membrane.Funnel, get_if_exists: true)
], state}
end

defp maybe_link_audio_pad(state) do
{[], state}
end

defp maybe_link_video_pad(state) when state.demuxer_video_pad_ref != nil do
{[
spec:
get_child(:demuxer)
|> via_out(state.demuxer_video_pad_ref)
|> child(:video_parser, Membrane.H264.Parser)
|> child(:funnel_video, Membrane.Funnel, get_if_exists: true)
], state}
end

defp maybe_link_video_pad(state) do
{[], state}
end

defp assert_pad_count!(name, ctx) do
count =
ctx.pads
|> Map.keys()
|> Enum.count(fn pad_ref -> Pad.name_by_ref(pad_ref) == name end)

if count > 1 do
raise(
"Linking more than one #{inspect(name)} output pad to #{inspect(__MODULE__)} is not allowed"
)
end

:ok
end
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.RTMP.Mixfile do
use Mix.Project

@version "0.24.0"
@version "0.25.0"
@github_url "https://github.com/membraneframework/membrane_rtmp_plugin"

def project do
Expand Down Expand Up @@ -46,6 +46,7 @@ defmodule Membrane.RTMP.Mixfile do
{:membrane_aac_plugin, "~> 0.18.1"},
{:membrane_flv_plugin, "~> 0.12.0"},
{:membrane_file_plugin, "~> 0.17.0"},
{:membrane_funnel_plugin, "~> 0.9.0"},
# testing
{:membrane_hackney_plugin, "~> 0.11.0", only: :test},
{:ffmpex, "~> 0.10.0", only: :test},
Expand Down
7 changes: 4 additions & 3 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"},
"ffmpex": {:hex, :ffmpex, "0.10.0", "ce29281eac60bf109c05acb4342eecf813a3cd3f08c1bce350423caad86128af", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:rambo, "~> 0.3.0", [hex: :rambo, repo: "hexpm", optional: false]}], "hexpm", "de8d81f8c51cc258dcee9a3e0b1568b0659c97be004557d9af47795206cff53b"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"},
"finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"},
"hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"},
"hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.4.3", "d3f984eeb96fe53b85d20e0b049f03e57d075b5acda3ac8d465c969a2536c17b", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "9a90e868927f7c777689baa16d86f4d0e086d968db5c05d917ccff6d443e58a3"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"},
"makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},
Expand All @@ -29,6 +29,7 @@
"membrane_core": {:hex, :membrane_core, "1.1.1", "4dcff6e9f3b2ecd4f437c20e201e53957731772c0f15b3005062c41f7f58f500", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3802f3fc071505c59d48792487d9927e803d4edb4039710ffa52cdb60bb0aecc"},
"membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.2", "650e134c2345d946f930082fac8bac9f5aba785a7817d38a9a9da41ffc56fa92", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "df50c6040004cd7b901cf057bd7e99c875bbbd6ae574efc93b2c753c96f43b9d"},
"membrane_flv_plugin": {:hex, :membrane_flv_plugin, "0.12.0", "d715ad405af86dcaf4b2f479e34088e1f6738c7280366828e1066b39d2aa493a", [:mix], [{:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}], "hexpm", "a317872d6d394e550c7bfd8979f12a3a1cc1e89b547d75360321025b403d3279"},
"membrane_funnel_plugin": {:hex, :membrane_funnel_plugin, "0.9.0", "9cfe09e44d65751f7d9d8d3c42e14797f7be69e793ac112ea63cd224af70a7bf", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "988790aca59d453a6115109f050699f7f45a2eb6a7f8dc5c96392760cddead54"},
"membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"},
"membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"},
"membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.2", "caf2790d8c107df35f8d456b45f4e09fb9c56ce6c7669a3a03f7d59972e6ed82", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "becf1ac4a589adecd850137ccd61a33058f686083a514a7e39fcd721bcf9fb2e"},
Expand All @@ -48,7 +49,7 @@
"qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"},
"rambo": {:hex, :rambo, "0.3.4", "8962ac3bd1a633ee9d0e8b44373c7913e3ce3d875b4151dcd060886092d2dce7", [:mix], [], "hexpm", "0cc54ed089fbbc84b65f4b8a774224ebfe60e5c80186fafc7910b3e379ad58f1"},
"ratio": {:hex, :ratio, "4.0.1", "3044166f2fc6890aa53d3aef0c336f84b2bebb889dc57d5f95cc540daa1912f8", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "c60cbb3ccdff9ffa56e7d6d1654b5c70d9f90f4d753ab3a43a6bf40855b881ce"},
"req": {:hex, :req, "0.5.4", "e375e4812adf83ffcf787871d7a124d873e983e3b77466e6608b973582f7f837", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "a17998ffe2ef54f79bfdd782ef9f4cbf987d93851e89444cbc466a6a25eee494"},
"req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"},
"shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
Expand Down

0 comments on commit 3258ba2

Please sign in to comment.