Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't accept streams until server is ready to read them #94

Merged
merged 35 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e358e26
wip, unifex version bumped
bartkrak Jun 13, 2024
a9166de
works but goes into infinite loop
bartkrak Jun 13, 2024
85e49c2
infinite loop fixed
bartkrak Jun 14, 2024
7018f7a
removed subscription mechanism, lambda on new client connection
bartkrak Jul 3, 2024
44f1690
test tests on CI
bartkrak Jul 5, 2024
6089407
quick test fix
bartkrak Jul 5, 2024
52ff895
example
bartkrak Jul 5, 2024
db779f3
another test fix
bartkrak Jul 5, 2024
2ddc8f3
lambda trigger moved to client handler
bartkrak Jul 8, 2024
6b79c2a
standalone server works with new api
bartkrak Jul 8, 2024
54b6e50
CI tests test
bartkrak Jul 8, 2024
b61aad8
typo
bartkrak Jul 8, 2024
fcffb0d
other app stream key test fix
bartkrak Jul 8, 2024
8283bdc
test timeout fix
bartkrak Jul 8, 2024
614ec22
maybe CI will work this time
bartkrak Jul 8, 2024
d068af2
formatting
bartkrak Jul 8, 2024
0579cfb
review suggestions
bartkrak Jul 8, 2024
ff54122
some warnings added
bartkrak Jul 9, 2024
b3a0720
reject unused connections, tests fix
bartkrak Jul 12, 2024
9bd3632
gitignore fix
bartkrak Jul 12, 2024
859550b
format
bartkrak Jul 12, 2024
29709ae
another format fix
bartkrak Jul 12, 2024
c3357ce
docs minor fix
bartkrak Jul 15, 2024
9aa29b7
added default new_client_callback implementation
bartkrak Jul 23, 2024
032b5e2
credo update, minor fixes
bartkrak Jul 23, 2024
76d45d1
url parsing moved to utils.ex and made public
bartkrak Jul 24, 2024
ca181ea
parse_url doc
bartkrak Jul 24, 2024
b5f8263
Update config.yml
bartkrak Jul 24, 2024
938fb06
bundlex bump
bartkrak Jul 24, 2024
ed16832
dialyzer fix
bartkrak Jul 24, 2024
0fa80b1
mime fix
bartkrak Jul 24, 2024
b31bbd3
CI cache
bartkrak Jul 24, 2024
822aa2e
ci fix
bartkrak Jul 24, 2024
81d30d9
mix lock fix
bartkrak Jul 24, 2024
b2a21c6
cosmetic change
bartkrak Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions examples/source_with_standalone_server.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,36 @@ end

# The client will connect on `rtmp://localhost:1935/app/stream_key`
port = 1935
app = "app"
stream_key = "stream_key"

# example lambda function that upon launching will send client reference back to parent process.
parent_process_pid = self()

new_client_callback = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
end

# Run the standalone server
{:ok, server} =
Membrane.RTMP.Server.start_link(
handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()},
port: port,
use_ssl?: false
use_ssl?: false,
new_client_callback: new_client_callback,
client_timeout: 5_000
)

# Subscribe to receive client reference that connected to the
# server with given app id and stream key
:ok = Membrane.RTMP.Server.subscribe(server, app, stream_key)
app = "app"
stream_key = "stream_key"

# Wait max 10s for client to connect on /app/stream_key
{:ok, client_ref} =
receive do
{:client_ref, client_ref, ^app, ^stream_key} ->
{:ok, client_ref}
after
10_000 -> :timeout
end

# Wait for the client reference
{:ok, client_ref} = Membrane.RTMP.Server.await_subscription(app, stream_key)
# Start the pipeline and provide it with the client_ref
{:ok, _supervisor, pipeline} =
Membrane.Pipeline.start_link(Pipeline, client_ref: client_ref)
Expand Down
8 changes: 1 addition & 7 deletions lib/membrane_rtmp_plugin/rtmp/source/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,7 @@ defmodule Membrane.RTMP.Source.ClientHandler do

@impl true
def handle_stream_published(publish_msg, state) do
state = %{state | stream_key: publish_msg.stream_key}

if state.controlling_process do
send(state.controlling_process, {:client_connected, state.app, state.stream_key})
end

state
%{state | stream_key: publish_msg.stream_key}
end

@impl true
Expand Down
21 changes: 17 additions & 4 deletions lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ defmodule Membrane.RTMP.MessageHandler do
header_sent?: false,
events: [],
receiver_pid: nil,
#
publish_msg: nil,
publish_header: nil,
# how many times the Source tries to get control of the socket
socket_retries: 3,
# epoch required for performing a handshake with the pipeline
Expand All @@ -72,6 +75,17 @@ defmodule Membrane.RTMP.MessageHandler do
end)
end

@spec send_publish_success(map()) :: {map(), list()}
def send_publish_success(state) do
Responses.publish_success(state.publish_msg.stream_key)
|> send_rtmp_payload(state.socket,
chunk_stream_id: 3,
stream_id: state.publish_header.stream_id
)

{%{state | events: []}, [{:published, state.publish_msg} | state.events]}
end

# Expected flow of messages:
# 1. [in] c0_c1 handshake -> [out] s0_s1_s2 handshake
# 2. [in] c2 handshake -> [out] empty
Expand Down Expand Up @@ -141,11 +155,10 @@ defmodule Membrane.RTMP.MessageHandler do
%Messages.UserControl{event_type: @stream_begin_type, data: <<0, 0, 0, 1>>}
|> send_rtmp_payload(state.socket, chunk_stream_id: 2)

Responses.publish_success(publish_msg.stream_key)
|> send_rtmp_payload(state.socket, chunk_stream_id: 3, stream_id: header.stream_id)
# at this point pause the unfinished handshake until pipeline demands data from this client
# (this mechanism prevents accepting streams with no listeners)

state = %{state | events: [{:published, publish_msg} | state.events]}
{:cont, state}
{:halt, %{state | publish_msg: publish_msg, publish_header: header}}
end

# A message containing stream metadata
Expand Down
36 changes: 25 additions & 11 deletions lib/membrane_rtmp_plugin/rtmp/source/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ defmodule Membrane.RTMP.Source do

The source can be used in the following two scenarios:
* by providing the URL on which the client is expected to connect - note, that if the client doesn't
connect on this URL, the source won't complete its setup
* by spawning `Membrane.RTMP.Server`, subscribing for a given app and stream key on which the client
will connect, waiting for a client reference and passing the client reference to the `#{inspect(__MODULE__)}`.
connect on this URL, the source won't complete its setup. Note that all attepted connections to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
connect on this URL, the source won't complete its setup. Note that all attepted connections to
connect on this URL, the source won't complete its setup. Note that all attempted connections to

other app stream_key than specified will be rejected.
varsill marked this conversation as resolved.
Show resolved Hide resolved

* by spawning `Membrane.RTMP.Server`, receiving a client reference and passing it to the `#{inspect(__MODULE__)}`.
"""
use Membrane.Source
require Membrane.Logger
require Logger
alias __MODULE__.ClientHandler, as: SourceClientHandler
alias Membrane.RTMP.Server.ClientHandler

Expand Down Expand Up @@ -81,15 +83,23 @@ defmodule Membrane.RTMP.Source do
def handle_setup(_ctx, %{mode: :builtin_server} = state) do
{use_ssl?, port, app, stream_key} = parse_url(state.url)

parent_pid = self()

new_client_callback = fn client_ref, app, stream_key ->
send(parent_pid, {:client_ref, client_ref, app, stream_key})
end

{:ok, server_pid} =
Membrane.RTMP.Server.start_link(
handler: %SourceClientHandler{controlling_process: self()},
port: port,
use_ssl?: use_ssl?
use_ssl?: use_ssl?,
new_client_callback: new_client_callback,
client_timeout: 100
)

state = %{state | app: app, stream_key: stream_key, server: server_pid}
{[], state}
{[setup: :incomplete], state}
end

@impl true
Expand Down Expand Up @@ -156,19 +166,23 @@ defmodule Membrane.RTMP.Source do
end

@impl true
def handle_info({:client_connected, app, stream_key}, _ctx, %{mode: :builtin_server} = state) do
:ok = Membrane.RTMP.Server.subscribe(state.server, state.app, state.stream_key)
state = %{state | app: app, stream_key: stream_key}
{[], state}
def handle_info(
{:client_ref, client_ref, app, stream_key},
_ctx,
%{mode: :builtin_server} = state
)
when app == state.app and stream_key == state.stream_key do
{[setup: :complete], %{state | client_ref: client_ref}}
end

@impl true
def handle_info(
{:client_ref, client_ref_pid, _app, _stream_key},
{:client_ref, _client_ref, app, stream_key},
_ctx,
%{mode: :builtin_server} = state
) do
{[redemand: :output], %{state | client_ref: client_ref_pid}}
Logger.warning("Unexpected client connected on /#{app}/#{stream_key}")
{[], state}
varsill marked this conversation as resolved.
Show resolved Hide resolved
end

@impl true
Expand Down
70 changes: 15 additions & 55 deletions lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
defmodule Membrane.RTMP.Server do
@moduledoc """
A simple RTMP server, which handles each new incoming connection.
A simple RTMP server, which handles each new incoming connection. When a new client connects, the new_client_callback is invoked.
varsill marked this conversation as resolved.
Show resolved Hide resolved
New connections remain in an incomplete RTMP handshake state until another process makes demand for dara data.
varsill marked this conversation as resolved.
Show resolved Hide resolved
If no data is demanded within the client_timeout period, the connection is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's good to emphasise that it's a client TCP socket that is closed under such circumstances?


Options:
- client_timeout: Time (ms) after which an unused connection is automatically closed.
varsill marked this conversation as resolved.
Show resolved Hide resolved
- new_client_callback: An anonymous function called when a new client connects.
It receives the client reference, app and stream_key, allowing custom processing,
like sending the reference to another process.
"""
use GenServer

Expand All @@ -15,7 +23,12 @@ defmodule Membrane.RTMP.Server do
handler: ClientHandlerBehaviour.t(),
port: :inet.port_number(),
use_ssl?: boolean(),
name: atom() | nil
name: atom() | nil,
new_client_callback: (client_ref :: pid(),
app :: String.t(),
stream_key :: String.t() ->
any()),
client_timeout: non_neg_integer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most plugins we use Membrane.Time for time handling, so it might be good to be consistent with that here as well:

Suggested change
client_timeout: non_neg_integer()
client_timeout: Membrane.Time.t()

]

@type server_identifier :: pid() | atom()
Expand All @@ -30,33 +43,6 @@ defmodule Membrane.RTMP.Server do
GenServer.start_link(__MODULE__, server_options, gen_server_opts)
end

@doc """
Subscribes for the given app and stream key.
When a client connects (or has already connected) to the server with given app and stream key,
the subscriber will be informed.
"""
@spec subscribe(server_identifier(), String.t(), String.t()) :: :ok
def subscribe(server_identifier, app, stream_key) do
GenServer.cast(server_identifier, {:subscribe, app, stream_key, self()})
:ok
end

@doc """
Awaits for the client reference of the connection to which the user has previously subscribed.

Note: this function call is blocking!
Note: first you need to call `#{__MODULE__}.subscribe/3` to subscribe
for a given `app` and `stream_key`.
"""
@spec await_subscription(String.t(), String.t(), non_neg_integer()) :: {:ok, pid()} | :error
def await_subscription(app, stream_key, timeout \\ 5_000) do
receive do
{:client_ref, client_ref, ^app, ^stream_key} -> {:ok, client_ref}
after
timeout -> :error
end
end

@doc """
Returns the port on which the server listens for connection.
"""
Expand All @@ -74,8 +60,6 @@ defmodule Membrane.RTMP.Server do

{:ok,
%{
subscriptions: %{},
client_reference_mapping: %{},
listener: pid,
port: nil,
to_reply: [],
Expand All @@ -92,33 +76,9 @@ defmodule Membrane.RTMP.Server do
end
end

@impl true
def handle_cast({:subscribe, app, stream_key, subscriber_pid}, state) do
state = put_in(state, [:subscriptions, {app, stream_key}], subscriber_pid)
maybe_send_subscription(app, stream_key, state)
{:noreply, state}
end

@impl true
def handle_info({:register_client, app, stream_key, client_reference_pid}, state) do
state = put_in(state, [:client_reference_mapping, {app, stream_key}], client_reference_pid)
maybe_send_subscription(app, stream_key, state)
{:noreply, state}
end

@impl true
def handle_info({:port, port}, state) do
Enum.each(state.to_reply, &GenServer.reply(&1, port))
{:noreply, %{state | port: port, to_reply: []}}
end

defp maybe_send_subscription(app, stream_key, state) do
if state.subscriptions[{app, stream_key}] != nil and
state.client_reference_mapping[{app, stream_key}] != nil do
send(
state.subscriptions[{app, stream_key}],
{:client_ref, state.client_reference_mapping[{app, stream_key}], app, stream_key}
)
end
end
end
54 changes: 50 additions & 4 deletions lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ defmodule Membrane.RTMP.Server.ClientHandler do
stream_key: nil,
server: opts.server,
buffers_demanded: 0,
published?: false
published?: false,
notified_about_client?: false,
new_client_callback: opts.new_client_callback,
client_timeout: opts.client_timeout
}}
end

Expand Down Expand Up @@ -77,11 +80,21 @@ defmodule Membrane.RTMP.Server.ClientHandler do

@impl true
def handle_info({:demand_data, how_many_buffers_demanded}, state) do
state = %{state | buffers_demanded: how_many_buffers_demanded}
state = finish_handshake(state) |> Map.replace!(:buffers_demanded, how_many_buffers_demanded)
request_data(state)
{:noreply, state}
end

@impl true
def handle_info({:client_timeout, app, stream_key}, state) do
if not state.published? do
Logger.warning("No demand made for client /#{app}/#{stream_key}, terminating connection.")
:gen_tcp.close(state.socket)
end

{:noreply, state}
end

@impl true
def handle_info(other_msg, state) do
handler_state = state.handler.handle_info(other_msg, state.handler_state)
Expand All @@ -96,6 +109,31 @@ defmodule Membrane.RTMP.Server.ClientHandler do
{message_handler_state, events} =
MessageHandler.handle_client_messages(messages, state.message_handler_state)

state =
if message_handler_state.publish_msg != nil and not state.notified_about_client? do
%{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} =
message_handler_state

if is_function(state.new_client_callback) do
varsill marked this conversation as resolved.
Show resolved Hide resolved
state.new_client_callback.(self(), state.app, stream_key)
else
raise "new_client_callback is not a function"
end

with {:error, reason} <-
:timer.send_after(
varsill marked this conversation as resolved.
Show resolved Hide resolved
state.client_timeout,
self(),
{:client_timeout, state.app, stream_key}
) do
raise "Client timeout timer failed: #{reason}"
end

%{state | notified_about_client?: true}
else
state
end

state = Enum.reduce(events, state, &handle_event/2)

request_data(state)
Expand Down Expand Up @@ -136,8 +174,6 @@ defmodule Membrane.RTMP.Server.ClientHandler do
%{state | handler_state: new_handler_state, app: connected_msg.app}

{:published, publish_msg} ->
send(state.server, {:register_client, state.app, publish_msg.stream_key, self()})

new_handler_state =
state.handler.handle_stream_published(publish_msg, state.handler_state)

Expand All @@ -159,4 +195,14 @@ defmodule Membrane.RTMP.Server.ClientHandler do
end
end
end

defp finish_handshake(state) when not state.published? do
{message_handler_state, events} =
MessageHandler.send_publish_success(state.message_handler_state)

state = Enum.reduce(events, state, &handle_event/2)
%{state | message_handler_state: message_handler_state}
end

defp finish_handshake(state), do: state
end
4 changes: 3 additions & 1 deletion lib/membrane_rtmp_plugin/rtmp_server/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ defmodule Membrane.RTMP.Server.Listener do
socket: client,
use_ssl?: options.use_ssl?,
handler: options.handler,
server: options.server
server: options.server,
new_client_callback: options.new_client_callback,
client_timeout: options.client_timeout
)

case :gen_tcp.controlling_process(client, client_reference) do
Expand Down
Loading