Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't accept streams until server is ready to read them #94

Merged
merged 35 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e358e26
wip, unifex version bumped
bartkrak Jun 13, 2024
a9166de
works but goes into infinite loop
bartkrak Jun 13, 2024
85e49c2
infinite loop fixed
bartkrak Jun 14, 2024
7018f7a
removed subscription mechanism, lambda on new client connection
bartkrak Jul 3, 2024
44f1690
test tests on CI
bartkrak Jul 5, 2024
6089407
quick test fix
bartkrak Jul 5, 2024
52ff895
example
bartkrak Jul 5, 2024
db779f3
another test fix
bartkrak Jul 5, 2024
2ddc8f3
lambda trigger moved to client handler
bartkrak Jul 8, 2024
6b79c2a
standalone server works with new api
bartkrak Jul 8, 2024
54b6e50
CI tests test
bartkrak Jul 8, 2024
b61aad8
typo
bartkrak Jul 8, 2024
fcffb0d
other app stream key test fix
bartkrak Jul 8, 2024
8283bdc
test timeout fix
bartkrak Jul 8, 2024
614ec22
maybe CI will work this time
bartkrak Jul 8, 2024
d068af2
formatting
bartkrak Jul 8, 2024
0579cfb
review suggestions
bartkrak Jul 8, 2024
ff54122
some warnings added
bartkrak Jul 9, 2024
b3a0720
reject unused connections, tests fix
bartkrak Jul 12, 2024
9bd3632
gitignore fix
bartkrak Jul 12, 2024
859550b
format
bartkrak Jul 12, 2024
29709ae
another format fix
bartkrak Jul 12, 2024
c3357ce
docs minor fix
bartkrak Jul 15, 2024
9aa29b7
added default new_client_callback implementation
bartkrak Jul 23, 2024
032b5e2
credo update, minor fixes
bartkrak Jul 23, 2024
76d45d1
url parsing moved to utils.ex and made public
bartkrak Jul 24, 2024
ca181ea
parse_url doc
bartkrak Jul 24, 2024
b5f8263
Update config.yml
bartkrak Jul 24, 2024
938fb06
bundlex bump
bartkrak Jul 24, 2024
ed16832
dialyzer fix
bartkrak Jul 24, 2024
0fa80b1
mime fix
bartkrak Jul 24, 2024
b31bbd3
CI cache
bartkrak Jul 24, 2024
822aa2e
ci fix
bartkrak Jul 24, 2024
81d30d9
mix lock fix
bartkrak Jul 24, 2024
b2a21c6
cosmetic change
bartkrak Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ workflows:
build:
jobs:
- elixir/build_test:
cache-version: 3
filters: &filters
tags:
only: /v.*/
- elixir/test:
cache-version: 3
filters:
<<: *filters
- elixir/lint:
cache-version: 3
filters:
<<: *filters
- elixir/hex_publish:
cache-version: 3
requires:
- elixir/build_test
- elixir/test
Expand Down
29 changes: 21 additions & 8 deletions examples/source_with_standalone_server.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,36 @@ end

# The client will connect on `rtmp://localhost:1935/app/stream_key`
port = 1935
app = "app"
stream_key = "stream_key"

# example lambda function that upon launching will send client reference back to parent process.
parent_process_pid = self()

new_client_callback = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
end

# Run the standalone server
{:ok, server} =
Membrane.RTMP.Server.start_link(
handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()},
port: port,
use_ssl?: false
use_ssl?: false,
new_client_callback: new_client_callback,
client_timeout: 5_000
)

# Subscribe to receive client reference that connected to the
# server with given app id and stream key
:ok = Membrane.RTMP.Server.subscribe(server, app, stream_key)
app = "app"
stream_key = "stream_key"

# Wait max 10s for client to connect on /app/stream_key
{:ok, client_ref} =
receive do
{:client_ref, client_ref, ^app, ^stream_key} ->
{:ok, client_ref}
after
10_000 -> :timeout
end

# Wait for the client reference
{:ok, client_ref} = Membrane.RTMP.Server.await_subscription(app, stream_key)
# Start the pipeline and provide it with the client_ref
{:ok, _supervisor, pipeline} =
Membrane.Pipeline.start_link(Pipeline, client_ref: client_ref)
Expand Down
8 changes: 1 addition & 7 deletions lib/membrane_rtmp_plugin/rtmp/source/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,7 @@ defmodule Membrane.RTMP.Source.ClientHandler do

@impl true
def handle_stream_published(publish_msg, state) do
state = %{state | stream_key: publish_msg.stream_key}

if state.controlling_process do
send(state.controlling_process, {:client_connected, state.app, state.stream_key})
end

state
%{state | stream_key: publish_msg.stream_key}
end

@impl true
Expand Down
28 changes: 20 additions & 8 deletions lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,26 @@ defmodule Membrane.RTMP.MessageHandler do
events: [event()],
receiver_pid: pid() | nil,
socket_retries: pos_integer(),
epoch: non_neg_integer()
epoch: non_neg_integer(),
publish_msg: Messages.Publish.t() | nil,
publish_header: Header.t() | nil
}

@spec init(opts :: %{socket: :gen_tcp.socket() | :ssl.socket(), use_ssl?: boolean()}) :: t()
def init(opts) do
state = %{
%{
socket: opts.socket,
socket_module: if(opts.use_ssl?, do: :ssl, else: :gen_tcp),
header_sent?: false,
events: [],
receiver_pid: nil,
publish_msg: nil,
publish_header: nil,
# how many times the Source tries to get control of the socket
socket_retries: 3,
# epoch required for performing a handshake with the pipeline
epoch: 0
}

state
end

@spec handle_client_messages(list(), map()) :: {map(), list()}
Expand All @@ -72,6 +74,17 @@ defmodule Membrane.RTMP.MessageHandler do
end)
end

@spec send_publish_success(map()) :: {map(), list()}
def send_publish_success(state) do
Responses.publish_success(state.publish_msg.stream_key)
|> send_rtmp_payload(state.socket,
chunk_stream_id: 3,
stream_id: state.publish_header.stream_id
)

{%{state | events: []}, [{:published, state.publish_msg} | state.events]}
end

# Expected flow of messages:
# 1. [in] c0_c1 handshake -> [out] s0_s1_s2 handshake
# 2. [in] c2 handshake -> [out] empty
Expand Down Expand Up @@ -141,11 +154,10 @@ defmodule Membrane.RTMP.MessageHandler do
%Messages.UserControl{event_type: @stream_begin_type, data: <<0, 0, 0, 1>>}
|> send_rtmp_payload(state.socket, chunk_stream_id: 2)

Responses.publish_success(publish_msg.stream_key)
|> send_rtmp_payload(state.socket, chunk_stream_id: 3, stream_id: header.stream_id)
# at this point pause the unfinished handshake until pipeline demands data from this client
# (this mechanism prevents accepting streams with no listeners)

state = %{state | events: [{:published, publish_msg} | state.events]}
{:cont, state}
{:halt, %{state | publish_msg: publish_msg, publish_header: header}}
end

# A message containing stream metadata
Expand Down
58 changes: 27 additions & 31 deletions lib/membrane_rtmp_plugin/rtmp/source/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ defmodule Membrane.RTMP.Source do

The source can be used in the following two scenarios:
* by providing the URL on which the client is expected to connect - note, that if the client doesn't
connect on this URL, the source won't complete its setup
* by spawning `Membrane.RTMP.Server`, subscribing for a given app and stream key on which the client
will connect, waiting for a client reference and passing the client reference to the `#{inspect(__MODULE__)}`.
connect on this URL, the source won't complete its setup. Note that all attempted connections to
other `app` or `stream_key` than specified ones will be rejected.

* by spawning `Membrane.RTMP.Server`, receiving a client reference and passing it to the `#{inspect(__MODULE__)}`.
"""
use Membrane.Source
require Membrane.Logger
require Logger
alias __MODULE__.ClientHandler, as: SourceClientHandler
alias Membrane.RTMP.Server.ClientHandler
alias Membrane.RTMP.Utils

def_output_pad :output,
availability: :always,
Expand Down Expand Up @@ -79,17 +82,25 @@ defmodule Membrane.RTMP.Source do

@impl true
def handle_setup(_ctx, %{mode: :builtin_server} = state) do
{use_ssl?, port, app, stream_key} = parse_url(state.url)
{use_ssl?, port, app, stream_key} = Utils.parse_url(state.url)

parent_pid = self()

new_client_callback = fn client_ref, app, stream_key ->
send(parent_pid, {:client_ref, client_ref, app, stream_key})
end

{:ok, server_pid} =
Membrane.RTMP.Server.start_link(
handler: %SourceClientHandler{controlling_process: self()},
port: port,
use_ssl?: use_ssl?
use_ssl?: use_ssl?,
new_client_callback: new_client_callback,
client_timeout: 100
)

state = %{state | app: app, stream_key: stream_key, server: server_pid}
{[], state}
{[setup: :incomplete], state}
end

@impl true
Expand Down Expand Up @@ -156,19 +167,23 @@ defmodule Membrane.RTMP.Source do
end

@impl true
def handle_info({:client_connected, app, stream_key}, _ctx, %{mode: :builtin_server} = state) do
:ok = Membrane.RTMP.Server.subscribe(state.server, state.app, state.stream_key)
state = %{state | app: app, stream_key: stream_key}
{[], state}
def handle_info(
{:client_ref, client_ref, app, stream_key},
_ctx,
%{mode: :builtin_server} = state
)
when app == state.app and stream_key == state.stream_key do
{[setup: :complete], %{state | client_ref: client_ref}}
end

@impl true
def handle_info(
{:client_ref, client_ref_pid, _app, _stream_key},
{:client_ref, _client_ref, app, stream_key},
_ctx,
%{mode: :builtin_server} = state
) do
{[redemand: :output], %{state | client_ref: client_ref_pid}}
Logger.warning("Unexpected client connected on /#{app}/#{stream_key}")
{[], state}
varsill marked this conversation as resolved.
Show resolved Hide resolved
end

@impl true
Expand All @@ -189,23 +204,4 @@ defmodule Membrane.RTMP.Source do
def handle_terminate_request(_ctx, state) do
{[terminate: :normal], state}
end

defp parse_url(url) do
uri = URI.parse(url)
port = uri.port

{app, stream_key} =
case String.trim_leading(uri.path, "/") |> String.trim_trailing("/") |> String.split("/") do
[app, stream_key] -> {app, stream_key}
[app] -> {app, ""}
end

use_ssl? =
case uri.scheme do
"rtmp" -> false
"rtmps" -> true
end

{use_ssl?, port, app, stream_key}
end
end
4 changes: 2 additions & 2 deletions lib/membrane_rtmp_plugin/rtmp/source/source_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ defmodule Membrane.RTMP.SourceBin do
The bin can be used in the following two scenarios:
* by providing the URL on which the client is expected to connect - note, that if the client doesn't
connect on this URL, the bin won't complete its setup
* by spawning `Membrane.RTMP.Server`, subscribing for a given app and stream key on which the client
will connect, waiting for a client reference and passing the client reference to the `#{inspect(__MODULE__)}`.
* by spawning `Membrane.RTMP.Server`, receiving client reference after client connects on a given `app` and `stream_key`
and passing the client reference to the `#{inspect(__MODULE__)}`.
"""
use Membrane.Bin

Expand Down
82 changes: 29 additions & 53 deletions lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
defmodule Membrane.RTMP.Server do
@moduledoc """
A simple RTMP server, which handles each new incoming connection.
A simple RTMP server, which handles each new incoming connection. When a new client connects, the `new_client_callback` is invoked.
New connections remain in an incomplete RTMP handshake state, until another process makes demand for their data.
If no data is demanded within the client_timeout period, TCP socket is closed.

Options:
- client_timeout: Time (ms) after which an unused client connection is automatically closed.
- new_client_callback: An anonymous function called when a new client connects.
It receives the client reference, `app` and `stream_key`, allowing custom processing,
like sending the reference to another process. If it's not provided, default implementation is used:
{:client_ref, client_ref, app, stream_key} message is sent to the process that invoked RTMP.Server.start_link().
"""
use GenServer

Expand All @@ -15,7 +24,12 @@ defmodule Membrane.RTMP.Server do
handler: ClientHandlerBehaviour.t(),
port: :inet.port_number(),
use_ssl?: boolean(),
name: atom() | nil
name: atom() | nil,
new_client_callback:
(client_ref :: pid(), app :: String.t(), stream_key :: String.t() ->
any())
| nil,
client_timeout: Membrane.Time.t()
]

@type server_identifier :: pid() | atom()
Expand All @@ -26,35 +40,23 @@ defmodule Membrane.RTMP.Server do
@spec start_link(server_options :: t()) :: GenServer.on_start()
def start_link(server_options) do
gen_server_opts = if server_options[:name] == nil, do: [], else: [name: server_options[:name]]

server_options = Enum.into(server_options, %{})
GenServer.start_link(__MODULE__, server_options, gen_server_opts)
end

@doc """
Subscribes for the given app and stream key.
When a client connects (or has already connected) to the server with given app and stream key,
the subscriber will be informed.
"""
@spec subscribe(server_identifier(), String.t(), String.t()) :: :ok
def subscribe(server_identifier, app, stream_key) do
GenServer.cast(server_identifier, {:subscribe, app, stream_key, self()})
:ok
end
server_options =
if Map.get(server_options, :new_client_callback, nil) == nil do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] I believe you could do as well:

Suggested change
if Map.get(server_options, :new_client_callback, nil) == nil do
if server_options[:new_client_callback] == nil do

parent_process_pid = self()

@doc """
Awaits for the client reference of the connection to which the user has previously subscribed.
callback = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
end

Note: this function call is blocking!
Note: first you need to call `#{__MODULE__}.subscribe/3` to subscribe
for a given `app` and `stream_key`.
"""
@spec await_subscription(String.t(), String.t(), non_neg_integer()) :: {:ok, pid()} | :error
def await_subscription(app, stream_key, timeout \\ 5_000) do
receive do
{:client_ref, client_ref, ^app, ^stream_key} -> {:ok, client_ref}
after
timeout -> :error
end
Map.put(server_options, :new_client_callback, callback)
else
server_options
end

GenServer.start_link(__MODULE__, server_options, gen_server_opts)
end

@doc """
Expand All @@ -74,8 +76,6 @@ defmodule Membrane.RTMP.Server do

{:ok,
%{
subscriptions: %{},
client_reference_mapping: %{},
listener: pid,
port: nil,
to_reply: [],
Expand All @@ -92,33 +92,9 @@ defmodule Membrane.RTMP.Server do
end
end

@impl true
def handle_cast({:subscribe, app, stream_key, subscriber_pid}, state) do
state = put_in(state, [:subscriptions, {app, stream_key}], subscriber_pid)
maybe_send_subscription(app, stream_key, state)
{:noreply, state}
end

@impl true
def handle_info({:register_client, app, stream_key, client_reference_pid}, state) do
state = put_in(state, [:client_reference_mapping, {app, stream_key}], client_reference_pid)
maybe_send_subscription(app, stream_key, state)
{:noreply, state}
end

@impl true
def handle_info({:port, port}, state) do
Enum.each(state.to_reply, &GenServer.reply(&1, port))
{:noreply, %{state | port: port, to_reply: []}}
end

defp maybe_send_subscription(app, stream_key, state) do
if state.subscriptions[{app, stream_key}] != nil and
state.client_reference_mapping[{app, stream_key}] != nil do
send(
state.subscriptions[{app, stream_key}],
{:client_ref, state.client_reference_mapping[{app, stream_key}], app, stream_key}
)
end
end
end
Loading