Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new rtmp api #12

Merged
merged 9 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ workflows:
build:
jobs:
- elixir/build_test:
cache-version: 3
filters: &filters
tags:
only: /v.*/
- elixir/test:
cache-version: 3
filters:
<<: *filters
- elixir/lint:
cache-version: 3
filters:
<<: *filters
- elixir/hex_publish:
cache-version: 3
requires:
- elixir/build_test
- elixir/test
Expand Down
19 changes: 3 additions & 16 deletions lib/boombox/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,6 @@ defmodule Boombox.Pipeline do
|> proceed_result(ctx, state)
end

@impl true
def handle_child_notification(
{:socket_control_needed, _socket, source_pid},
:rtmp_source,
ctx,
state
) do
Boombox.RTMP.handle_socket_control(source_pid, state.rtmp_input_state)
|> proceed_result(ctx, state)
end

@impl true
def handle_child_notification({:new_tracks, tracks}, :webrtc_output, ctx, state) do
%{status: :awaiting_output_link} = state
Expand Down Expand Up @@ -158,11 +147,9 @@ defmodule Boombox.Pipeline do
end

@impl true
def handle_info({:rtmp_tcp_server, server_pid, socket}, ctx, state) do
{result, rtmp_input_state} =
Boombox.RTMP.handle_connection(server_pid, socket, state.rtmp_input_state)

proceed_result(result, ctx, %{state | rtmp_input_state: rtmp_input_state})
def handle_info({:rtmp_client_ref, client_ref}, ctx, state) do
Boombox.RTMP.handle_connection(client_ref)
|> proceed_result(ctx, state)
end

@impl true
Expand Down
54 changes: 23 additions & 31 deletions lib/boombox/rtmp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,47 @@ defmodule Boombox.RTMP do
@moduledoc false

import Membrane.ChildrenSpec
require Membrane.Logger
alias Boombox.Pipeline.{Ready, Wait}
alias Membrane.RTMP.Utils
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name Utils is a bit ambiguous, I'd just alias Membrane.RTMP.


@type state :: %{server_pid: pid()} | nil

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@type state :: %{server_pid: pid()} | nil

@spec create_input(URI.t(), pid()) :: Wait.t()
@spec create_input(String.t(), pid()) :: Wait.t()
def create_input(uri, utility_supervisor) do
uri = URI.new!(uri)
{:ok, ip} = :inet.getaddr(~c"#{uri.host}", :inet)
{use_ssl?, port, target_app, target_stream_key} = Utils.parse_url(uri)

boombox = self()

server_options = %Membrane.RTMP.Source.TcpServer{
port: uri.port,
listen_options: [:binary, packet: :raw, active: false, ip: ip],
socket_handler: fn socket ->
send(boombox, {:rtmp_tcp_server, self(), socket})

receive do
{:rtmp_source_pid, pid} -> {:ok, pid}
:rtmp_already_connected -> {:error, :rtmp_already_connected}
end
new_client_callback = fn client_ref, app, stream_key ->
if app == target_app and stream_key == target_stream_key do
send(boombox, {:rtmp_client_ref, client_ref})
else
Membrane.Logger.warning("Unexpected client connected on /#{app}/#{stream_key}")
end
end

server_options = %{
handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()},
port: port,
use_ssl?: use_ssl?,
new_client_callback: new_client_callback,
client_timeout: 1_000
}

{:ok, _pid} =
{:ok, _server} =
Membrane.UtilitySupervisor.start_link_child(
utility_supervisor,
{Membrane.RTMP.Source.TcpServer, server_options}
{Membrane.RTMP.Server, server_options}
)

%Wait{}
end

@spec handle_connection(pid(), :gen_tcp.socket() | :ssl.sslsocket(), state()) ::
{Ready.t(), state()}
def handle_connection(server_pid, socket, nil = _state) do
@spec handle_connection(pid()) :: Ready.t()
def handle_connection(client_ref) do
spec = [
child(:rtmp_source, %Membrane.RTMP.SourceBin{socket: socket})
child(:rtmp_source, %Membrane.RTMP.SourceBin{client_ref: client_ref})
varsill marked this conversation as resolved.
Show resolved Hide resolved
|> via_out(:audio)
|> child(Membrane.AAC.Parser)
|> child(:aac_decoder, Membrane.AAC.FDK.Decoder)
Expand All @@ -50,17 +53,6 @@ defmodule Boombox.RTMP do
video: get_child(:rtmp_source) |> via_out(:video)
}

{%Ready{spec_builder: spec, track_builders: track_builders}, %{server_pid: server_pid}}
end

def handle_connection(_server_pid, _socket, state) do
send(state.server_pid, :rtmp_already_connected)
{%Wait{}, state}
end

@spec handle_socket_control(pid(), state()) :: Wait.t()
def handle_socket_control(source_pid, state) do
send(state.server_pid, {:rtmp_source_pid, source_pid})
%Wait{}
%Ready{spec_builder: spec, track_builders: track_builders}
end
end
4 changes: 3 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ defmodule Boombox.Mixfile do
{:membrane_mp4_plugin,
github: "membraneframework/membrane_mp4_plugin", branch: "isom-avc3"},
{:membrane_realtimer_plugin, ">= 0.0.0"},
{:membrane_rtmp_plugin, ">= 0.0.0"},
# {:membrane_rtmp_plugin, ">= 0.0.0"},
{:membrane_rtmp_plugin,
github: "membraneframework/membrane_rtmp_plugin", branch: "handle_new_client"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just remember to change it once new rtmp plugin version is released ;)

{:membrane_ffmpeg_swresample_plugin, ">= 0.0.0"},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:dialyxir, ">= 0.0.0", only: :dev, runtime: false},
Expand Down
Loading