From bdf60226aa0d080b77e0923c2cad7be780a4da74 Mon Sep 17 00:00:00 2001 From: Mateusz Front Date: Thu, 28 Nov 2024 15:14:41 +0100 Subject: [PATCH 1/3] add WHIP support --- README.md | 1 + lib/boombox.ex | 25 +++++++++++++++++++++++++ lib/boombox/webrtc.ex | 4 ++++ test/boombox_test.exs | 14 ++++++++++++++ 4 files changed, 44 insertions(+) diff --git a/README.md b/README.md index 57d5bc0..16957ea 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,7 @@ For more examples, see [examples.livemd](examples.livemd). |---|---|---| | MP4 | `"*.mp4"` | `"*.mp4"` | | WebRTC | `{:webrtc, signaling}` | `{:webrtc, signaling}` | +| WHIP | `{:whip, port: 3721, token: "whip-token"}` | `{:whip, uri: "http://localhost:3721", token: "whip-token"}` | | RTMP | `"rtmp://*"` | _not supported_ | | RTSP | `"rtsp://*"` | _not supported_ | | HLS | _not supported_ | `"*.m3u8"` | diff --git a/lib/boombox.ex b/lib/boombox.ex index 80f0213..1e968fe 100644 --- a/lib/boombox.ex +++ b/lib/boombox.ex @@ -21,6 +21,7 @@ defmodule Boombox do (path_or_uri :: String.t()) | {:mp4, location :: String.t(), transport: :file | :http} | {:webrtc, webrtc_signaling()} + | {:whip, [{:uri, String.t()} | {:token, String.t()}]} | {:rtmp, (uri :: String.t()) | (client_handler :: pid)} | {:rtsp, url :: String.t()} | {:stream, in_stream_opts()} @@ -29,6 +30,13 @@ defmodule Boombox do (path_or_uri :: String.t()) | {:mp4, location :: String.t()} | {:webrtc, webrtc_signaling()} + | {:whip, + [ + {:ip, :inet.socket_address() | String.t()} + | {:port, :inet.port_number()} + | {:token, String.t()} + | {bandit_option :: atom(), term()} + ]} | {:hls, location :: String.t()} | {:stream, out_stream_opts()} @@ -140,6 +148,12 @@ defmodule Boombox do {:webrtc, uri} when is_binary(uri) -> value + {:whip, opts} when is_list(opts) -> + if Keyword.keyword?(opts) do + opts = parse_whip_opts(opts) + {:webrtc, {:whip, opts}} + end + {:rtmp, arg} when direction == :input and (is_binary(arg) or is_pid(arg)) -> value @@ -294,4 +308,15 @@ defmodule Boombox do raise ArgumentError, "Invalid transport: #{inspect(transport)}" end end + + defp parse_whip_opts(opts) do + case opts[:ip] do + ip when is_binary(ip) -> + {:ok, ip} = :inet.parse_address(~c"#{ip}") + Keyword.replace(opts, :ip, ip) + + _other -> + opts + end + end end diff --git a/lib/boombox/webrtc.ex b/lib/boombox/webrtc.ex index d9a9ba0..945c6c5 100644 --- a/lib/boombox/webrtc.ex +++ b/lib/boombox/webrtc.ex @@ -179,6 +179,10 @@ defmodule Boombox.WebRTC do signaling end + defp resolve_signaling({:whip, _opts} = signaling) do + signaling + end + defp resolve_signaling(uri) when is_binary(uri) do uri = URI.new!(uri) {:ok, ip} = :inet.getaddr(~c"#{uri.host}", :inet) diff --git a/test/boombox_test.exs b/test/boombox_test.exs index 3a5317f..406cc98 100644 --- a/test/boombox_test.exs +++ b/test/boombox_test.exs @@ -73,6 +73,20 @@ defmodule BoomboxTest do Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4") end + @tag :file_whip + async_test "mp4 file -> webrtc/whip -> mp4 file", %{tmp_dir: tmp} do + output = Path.join(tmp, "output.mp4") + + t = + Task.async(fn -> + Boombox.run(input: @bbb_mp4, output: {:whip, uri: "http://127.0.0.1:1234"}) + end) + + Boombox.run(input: {:whip, ip: "0.0.0.0", port: 1234}, output: output) + Task.await(t) + Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4") + end + @tag :http_webrtc async_test "http mp4 -> webrtc -> mp4 file", %{tmp_dir: tmp} do output = Path.join(tmp, "output.mp4") From 2d745018f274b00d23825a942a19a3aa6cc02a01 Mon Sep 17 00:00:00 2001 From: Mateusz Front Date: Tue, 10 Dec 2024 14:50:16 +0100 Subject: [PATCH 2/3] refactor WHIP API, make CLI work --- README.md | 6 +++--- examples.livemd | 4 ++-- lib/boombox.ex | 29 +++++++---------------------- lib/boombox/utils/cli.ex | 6 +++--- lib/boombox/webrtc.ex | 18 ++++++++++++------ test/boombox_test.exs | 4 ++-- 6 files changed, 29 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 16957ea..a6b20a9 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ For more examples, see [examples.livemd](examples.livemd). |---|---|---| | MP4 | `"*.mp4"` | `"*.mp4"` | | WebRTC | `{:webrtc, signaling}` | `{:webrtc, signaling}` | -| WHIP | `{:whip, port: 3721, token: "whip-token"}` | `{:whip, uri: "http://localhost:3721", token: "whip-token"}` | +| WHIP | `{:whip, "http://*", token: "token"}` | `{:whip, "http://*", token: "token"}` | | RTMP | `"rtmp://*"` | _not supported_ | | RTSP | `"rtsp://*"` | _not supported_ | | HLS | _not supported_ | `"*.m3u8"` | @@ -103,13 +103,13 @@ Make sure you have [Elixir](https://elixir-lang.org/) installed. The first call The CLI API is similar to the Elixir API, for example: ```elixir -Boombox.run(input: "file.mp4", output: {:webrtc, "ws://localhost:8830"}) +Boombox.run(input: "file.mp4", output: {:whip, "http://localhost:3721", token: "token"}) ``` is equivalent to: ```sh -./boombox -i file.mp4 -o --webrtc ws://localhost:8830 +./boombox -i file.mp4 -o --whip http://localhost:3721 --token token ``` It's also possible to pass an `.exs` script: diff --git a/examples.livemd b/examples.livemd index 8d4b59e..adc1e82 100644 --- a/examples.livemd +++ b/examples.livemd @@ -9,7 +9,7 @@ System.put_env("PATH", "/opt/homebrew/bin:#{System.get_env("PATH")}") # In case of problems installing Nx/EXLA/Bumblebee, # you can remove them and the Nx backend config below. # Examples that don't mention them should still work. -Mix.install([:boombox, :kino, :nx, :exla, :bumblebee, :websockex, :membrane_simple_rtsp_server]) +Mix.install([{:boombox, path: "/Users/matheksm/m/boombox"}, :kino, :nx, :exla, :bumblebee, :websockex]) Nx.global_default_backend(EXLA.Backend) ``` @@ -79,7 +79,7 @@ To send the stream, visit http://localhost:1234/webrtc_from_browser.html. Note: don't stop this cell to finish recording - click 'disconnect' or close the browser tab instead, so the recording is finalized properly. ```elixir -Boombox.run(input: {:webrtc, "ws://localhost:8829"}, output: "#{out_dir}/webrtc_to_mp4.mp4") +Boombox.run(input: {:whip, port: 8829, token: "whip_it!"}, output: "#{out_dir}/webrtc_to_mp4.mp4") ``` ```elixir diff --git a/lib/boombox.ex b/lib/boombox.ex index 1e968fe..becc0da 100644 --- a/lib/boombox.ex +++ b/lib/boombox.ex @@ -21,7 +21,7 @@ defmodule Boombox do (path_or_uri :: String.t()) | {:mp4, location :: String.t(), transport: :file | :http} | {:webrtc, webrtc_signaling()} - | {:whip, [{:uri, String.t()} | {:token, String.t()}]} + | {:whip, uri :: String.t(), token: String.t()} | {:rtmp, (uri :: String.t()) | (client_handler :: pid)} | {:rtsp, url :: String.t()} | {:stream, in_stream_opts()} @@ -30,13 +30,7 @@ defmodule Boombox do (path_or_uri :: String.t()) | {:mp4, location :: String.t()} | {:webrtc, webrtc_signaling()} - | {:whip, - [ - {:ip, :inet.socket_address() | String.t()} - | {:port, :inet.port_number()} - | {:token, String.t()} - | {bandit_option :: atom(), term()} - ]} + | {:whip, uri :: String.t(), [{:token, String.t()} | {bandit_option :: atom(), term()}]} | {:hls, location :: String.t()} | {:stream, out_stream_opts()} @@ -148,10 +142,12 @@ defmodule Boombox do {:webrtc, uri} when is_binary(uri) -> value - {:whip, opts} when is_list(opts) -> + {:whip, uri} when is_binary(uri) -> + parse_opt!(direction, {:whip, uri, []}) + + {:whip, uri, opts} when is_binary(uri) and is_list(opts) -> if Keyword.keyword?(opts) do - opts = parse_whip_opts(opts) - {:webrtc, {:whip, opts}} + {:webrtc, {:whip, uri, opts}} end {:rtmp, arg} when direction == :input and (is_binary(arg) or is_pid(arg)) -> @@ -308,15 +304,4 @@ defmodule Boombox do raise ArgumentError, "Invalid transport: #{inspect(transport)}" end end - - defp parse_whip_opts(opts) do - case opts[:ip] do - ip when is_binary(ip) -> - {:ok, ip} = :inet.parse_address(~c"#{ip}") - Keyword.replace(opts, :ip, ip) - - _other -> - opts - end - end end diff --git a/lib/boombox/utils/cli.ex b/lib/boombox/utils/cli.ex index 741f388..8eb2aa6 100644 --- a/lib/boombox/utils/cli.ex +++ b/lib/boombox/utils/cli.ex @@ -20,9 +20,9 @@ defmodule Boombox.Utils.CLI do i_type = [get_switch_type(argv, :input, aliases), :keep] o_type = [get_switch_type(argv, :output, aliases), :keep] - switches = - [input: i_type, output: o_type] ++ - Keyword.from_keys([:mp4, :webrtc, :rtmp, :hls, :transport], [:string, :keep]) + endpoints = Keyword.from_keys([:mp4, :webrtc, :port, :whip, :hls], [:string, :keep]) + options = Keyword.from_keys([:transport, :uri, :token, :rtmp], [:string, :keep]) + switches = [input: i_type, output: o_type] ++ endpoints ++ options {input, output} = OptionParser.parse(argv, strict: switches, aliases: aliases) diff --git a/lib/boombox/webrtc.ex b/lib/boombox/webrtc.ex index 945c6c5..fffb387 100644 --- a/lib/boombox/webrtc.ex +++ b/lib/boombox/webrtc.ex @@ -12,7 +12,7 @@ defmodule Boombox.WebRTC do @spec create_input(Boombox.webrtc_signaling(), Boombox.output(), State.t()) :: Wait.t() def create_input(signaling, output, state) do - signaling = resolve_signaling(signaling) + signaling = resolve_signaling(signaling, :input) keyframe_interval = case output do @@ -71,7 +71,7 @@ defmodule Boombox.WebRTC do @spec create_output(Boombox.webrtc_signaling(), State.t()) :: {Ready.t() | Wait.t(), State.t()} def create_output(signaling, state) do - signaling = resolve_signaling(signaling) + signaling = resolve_signaling(signaling, :output) startup_tracks = if webrtc_input?(state), do: [:audio, :video], else: [] spec = @@ -175,15 +175,21 @@ defmodule Boombox.WebRTC do %Ready{actions: [spec: spec], eos_info: Map.values(tracks)} end - defp resolve_signaling(%Membrane.WebRTC.SignalingChannel{} = signaling) do + defp resolve_signaling(%Membrane.WebRTC.SignalingChannel{} = signaling, _direction) do signaling end - defp resolve_signaling({:whip, _opts} = signaling) do - signaling + defp resolve_signaling({:whip, uri, opts}, :input) do + uri = URI.new!(uri) + {:ok, ip} = :inet.getaddr(~c"#{uri.host}", :inet) + {:whip, [ip: :any, port: uri.port] ++ opts} + end + + defp resolve_signaling({:whip, uri, opts}, :output) do + {:whip, [uri: uri] ++ opts} end - defp resolve_signaling(uri) when is_binary(uri) do + defp resolve_signaling(uri, _direction) when is_binary(uri) do uri = URI.new!(uri) {:ok, ip} = :inet.getaddr(~c"#{uri.host}", :inet) {:websocket, ip: ip, port: uri.port} diff --git a/test/boombox_test.exs b/test/boombox_test.exs index 406cc98..1828b8b 100644 --- a/test/boombox_test.exs +++ b/test/boombox_test.exs @@ -79,10 +79,10 @@ defmodule BoomboxTest do t = Task.async(fn -> - Boombox.run(input: @bbb_mp4, output: {:whip, uri: "http://127.0.0.1:1234"}) + Boombox.run(input: @bbb_mp4, output: {:whip, "http://127.0.0.1:3721"}) end) - Boombox.run(input: {:whip, ip: "0.0.0.0", port: 1234}, output: output) + Boombox.run(input: {:whip, "http://127.0.0.1:3721"}, output: output) Task.await(t) Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4") end From 601a64230c85dfb0b7df38de4741a37268bb7204 Mon Sep 17 00:00:00 2001 From: Mateusz Front Date: Fri, 3 Jan 2025 14:47:48 +0100 Subject: [PATCH 3/3] fix whip IP --- lib/boombox/webrtc.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/boombox/webrtc.ex b/lib/boombox/webrtc.ex index fffb387..bd35ed4 100644 --- a/lib/boombox/webrtc.ex +++ b/lib/boombox/webrtc.ex @@ -182,7 +182,7 @@ defmodule Boombox.WebRTC do defp resolve_signaling({:whip, uri, opts}, :input) do uri = URI.new!(uri) {:ok, ip} = :inet.getaddr(~c"#{uri.host}", :inet) - {:whip, [ip: :any, port: uri.port] ++ opts} + {:whip, [ip: ip, port: uri.port] ++ opts} end defp resolve_signaling({:whip, uri, opts}, :output) do