Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't accept streams until server is ready to read them #94

Merged
merged 35 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e358e26
wip, unifex version bumped
bartkrak Jun 13, 2024
a9166de
works but goes into infinite loop
bartkrak Jun 13, 2024
85e49c2
infinite loop fixed
bartkrak Jun 14, 2024
7018f7a
removed subscription mechanism, lambda on new client connection
bartkrak Jul 3, 2024
44f1690
test tests on CI
bartkrak Jul 5, 2024
6089407
quick test fix
bartkrak Jul 5, 2024
52ff895
example
bartkrak Jul 5, 2024
db779f3
another test fix
bartkrak Jul 5, 2024
2ddc8f3
lambda trigger moved to client handler
bartkrak Jul 8, 2024
6b79c2a
standalone server works with new api
bartkrak Jul 8, 2024
54b6e50
CI tests test
bartkrak Jul 8, 2024
b61aad8
typo
bartkrak Jul 8, 2024
fcffb0d
other app stream key test fix
bartkrak Jul 8, 2024
8283bdc
test timeout fix
bartkrak Jul 8, 2024
614ec22
maybe CI will work this time
bartkrak Jul 8, 2024
d068af2
formatting
bartkrak Jul 8, 2024
0579cfb
review suggestions
bartkrak Jul 8, 2024
ff54122
some warnings added
bartkrak Jul 9, 2024
b3a0720
reject unused connections, tests fix
bartkrak Jul 12, 2024
9bd3632
gitignore fix
bartkrak Jul 12, 2024
859550b
format
bartkrak Jul 12, 2024
29709ae
another format fix
bartkrak Jul 12, 2024
c3357ce
docs minor fix
bartkrak Jul 15, 2024
9aa29b7
added default new_client_callback implementation
bartkrak Jul 23, 2024
032b5e2
credo update, minor fixes
bartkrak Jul 23, 2024
76d45d1
url parsing moved to utils.ex and made public
bartkrak Jul 24, 2024
ca181ea
parse_url doc
bartkrak Jul 24, 2024
b5f8263
Update config.yml
bartkrak Jul 24, 2024
938fb06
bundlex bump
bartkrak Jul 24, 2024
ed16832
dialyzer fix
bartkrak Jul 24, 2024
0fa80b1
mime fix
bartkrak Jul 24, 2024
b31bbd3
CI cache
bartkrak Jul 24, 2024
822aa2e
ci fix
bartkrak Jul 24, 2024
81d30d9
mix lock fix
bartkrak Jul 24, 2024
b2a21c6
cosmetic change
bartkrak Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,4 @@ $RECYCLE.BIN/
*.lnk

# End of https://www.gitignore.io/api/c,vim,linux,macos,elixir,windows,visualstudiocode
received.flv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want this one in .gitignore?

2 changes: 2 additions & 0 deletions examples/source.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ defmodule Pipeline do
end
end

Logger.configure(level: :error)
varsill marked this conversation as resolved.
Show resolved Hide resolved

# Start a pipeline with `Membrane.RTMP.Source` that will spawn an RTMP server waiting for
# the client connection on given URL
{:ok, _supervisor, pipeline} = Membrane.Pipeline.start_link(Pipeline)
Expand Down
29 changes: 21 additions & 8 deletions examples/source_with_standalone_server.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,38 @@ defmodule Pipeline do
end
end

Logger.configure(level: :error)

# The client will connect on `rtmp://localhost:1935/app/stream_key`
port = 1935
app = "app"
stream_key = "stream_key"

# example lambda function that upon launching will send a message back to parent server.
parent_process_pid = self()

new_client_callback = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
end

# Run the standalone server
{:ok, server} =
Membrane.RTMP.Server.start_link(
handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()},
port: port,
use_ssl?: false
use_ssl?: false,
new_client_callback: new_client_callback
)

# Subscribe to receive client reference that connected to the
# server with given app id and stream key
:ok = Membrane.RTMP.Server.subscribe(server, app, stream_key)
app = "app"
stream_key = "stream_key"

{:ok, client_ref} =
receive do
{:client_ref, client_ref, ^app, ^stream_key} ->
{:ok, client_ref}
after
5000 -> :timeout
end

# Wait for the client reference
{:ok, client_ref} = Membrane.RTMP.Server.await_subscription(app, stream_key)
# Start the pipeline and provide it with the client_ref
{:ok, _supervisor, pipeline} =
Membrane.Pipeline.start_link(Pipeline, client_ref: client_ref)
Expand Down
8 changes: 1 addition & 7 deletions lib/membrane_rtmp_plugin/rtmp/source/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,7 @@ defmodule Membrane.RTMP.Source.ClientHandler do

@impl true
def handle_stream_published(publish_msg, state) do
state = %{state | stream_key: publish_msg.stream_key}

if state.controlling_process do
send(state.controlling_process, {:client_connected, state.app, state.stream_key})
end

state
%{state | stream_key: publish_msg.stream_key}
end

@impl true
Expand Down
21 changes: 17 additions & 4 deletions lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ defmodule Membrane.RTMP.MessageHandler do
header_sent?: false,
events: [],
receiver_pid: nil,
#
publish_msg: nil,
publish_header: nil,
# how many times the Source tries to get control of the socket
socket_retries: 3,
# epoch required for performing a handshake with the pipeline
Expand All @@ -72,6 +75,17 @@ defmodule Membrane.RTMP.MessageHandler do
end)
end

@spec send_publish_success(map()) :: {map(), list()}
def send_publish_success(state) do
Responses.publish_success(state.publish_msg.stream_key)
|> send_rtmp_payload(state.socket,
chunk_stream_id: 3,
stream_id: state.publish_header.stream_id
)

{%{state | events: []}, [{:published, state.publish_msg} | state.events]}
end

# Expected flow of messages:
# 1. [in] c0_c1 handshake -> [out] s0_s1_s2 handshake
# 2. [in] c2 handshake -> [out] empty
Expand Down Expand Up @@ -141,11 +155,10 @@ defmodule Membrane.RTMP.MessageHandler do
%Messages.UserControl{event_type: @stream_begin_type, data: <<0, 0, 0, 1>>}
|> send_rtmp_payload(state.socket, chunk_stream_id: 2)

Responses.publish_success(publish_msg.stream_key)
|> send_rtmp_payload(state.socket, chunk_stream_id: 3, stream_id: header.stream_id)
# at this point pause the unfinished handshake until pipeline demands data from this client
# (this mechanism prevents accepting streams with no listeners)

state = %{state | events: [{:published, publish_msg} | state.events]}
{:cont, state}
{:halt, %{state | publish_msg: publish_msg, publish_header: header}}
end

# A message containing stream metadata
Expand Down
26 changes: 18 additions & 8 deletions lib/membrane_rtmp_plugin/rtmp/source/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,22 @@ defmodule Membrane.RTMP.Source do
def handle_setup(_ctx, %{mode: :builtin_server} = state) do
{use_ssl?, port, app, stream_key} = parse_url(state.url)

parent_pid = self()

new_client_callback = fn client_ref, app, stream_key ->
send(parent_pid, {:client_ref, client_ref, app, stream_key})
end

{:ok, server_pid} =
Membrane.RTMP.Server.start_link(
handler: %SourceClientHandler{controlling_process: self()},
port: port,
use_ssl?: use_ssl?
use_ssl?: use_ssl?,
new_client_callback: new_client_callback
)

state = %{state | app: app, stream_key: stream_key, server: server_pid}
{[], state}
{[setup: :incomplete], state}
end

@impl true
Expand Down Expand Up @@ -156,19 +163,22 @@ defmodule Membrane.RTMP.Source do
end

@impl true
def handle_info({:client_connected, app, stream_key}, _ctx, %{mode: :builtin_server} = state) do
:ok = Membrane.RTMP.Server.subscribe(state.server, state.app, state.stream_key)
state = %{state | app: app, stream_key: stream_key}
{[], state}
def handle_info(
{:client_ref, client_ref, app, stream_key},
_ctx,
%{mode: :builtin_server} = state
)
when app == state.app and stream_key == state.stream_key do
{[setup: :complete], %{state | client_ref: client_ref}}
end

@impl true
def handle_info(
{:client_ref, client_ref_pid, _app, _stream_key},
{:client_ref, _client_ref, _app, _stream_key},
_ctx,
%{mode: :builtin_server} = state
) do
{[redemand: :output], %{state | client_ref: client_ref_pid}}
{[], state}
varsill marked this conversation as resolved.
Show resolved Hide resolved
end

@impl true
Expand Down
66 changes: 11 additions & 55 deletions lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
defmodule Membrane.RTMP.Server do
@moduledoc """
A simple RTMP server, which handles each new incoming connection.

When new client connects to the server, it goes into :client_waiting_queue and its RTMP handshake will remanin unfinished.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this part is valid anymore

Only when pipeline tries to pull data from client, its handshake will be finished, and client will be registered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't mention the "pipeline" - the RTMPServer is meant to be used outside of Membrane pipeline as well and using it with the Membrane.RTMP.Source is just one of the options.


Also when new client connects, optional, annonymous function defined by user is triggered.
The lambda function is given PID of parent server, app and stream key.
"""
use GenServer

Expand All @@ -15,7 +21,9 @@ defmodule Membrane.RTMP.Server do
handler: ClientHandlerBehaviour.t(),
port: :inet.port_number(),
use_ssl?: boolean(),
name: atom() | nil
name: atom() | nil,
new_client_callback:
(client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> any()) | nil
]

@type server_identifier :: pid() | atom()
Expand All @@ -30,33 +38,6 @@ defmodule Membrane.RTMP.Server do
GenServer.start_link(__MODULE__, server_options, gen_server_opts)
end

@doc """
Subscribes for the given app and stream key.
When a client connects (or has already connected) to the server with given app and stream key,
the subscriber will be informed.
"""
@spec subscribe(server_identifier(), String.t(), String.t()) :: :ok
def subscribe(server_identifier, app, stream_key) do
GenServer.cast(server_identifier, {:subscribe, app, stream_key, self()})
:ok
end

@doc """
Awaits for the client reference of the connection to which the user has previously subscribed.

Note: this function call is blocking!
Note: first you need to call `#{__MODULE__}.subscribe/3` to subscribe
for a given `app` and `stream_key`.
"""
@spec await_subscription(String.t(), String.t(), non_neg_integer()) :: {:ok, pid()} | :error
def await_subscription(app, stream_key, timeout \\ 5_000) do
receive do
{:client_ref, client_ref, ^app, ^stream_key} -> {:ok, client_ref}
after
timeout -> :error
end
end

@doc """
Returns the port on which the server listens for connection.
"""
Expand All @@ -74,12 +55,11 @@ defmodule Membrane.RTMP.Server do

{:ok,
%{
subscriptions: %{},
client_reference_mapping: %{},
listener: pid,
port: nil,
to_reply: [],
use_ssl?: server_options.use_ssl?
use_ssl?: server_options.use_ssl?,
new_client_callback: server_options.new_client_callback
}}
end

Expand All @@ -92,33 +72,9 @@ defmodule Membrane.RTMP.Server do
end
end

@impl true
def handle_cast({:subscribe, app, stream_key, subscriber_pid}, state) do
state = put_in(state, [:subscriptions, {app, stream_key}], subscriber_pid)
maybe_send_subscription(app, stream_key, state)
{:noreply, state}
end

@impl true
def handle_info({:register_client, app, stream_key, client_reference_pid}, state) do
state = put_in(state, [:client_reference_mapping, {app, stream_key}], client_reference_pid)
maybe_send_subscription(app, stream_key, state)
{:noreply, state}
end

@impl true
def handle_info({:port, port}, state) do
Enum.each(state.to_reply, &GenServer.reply(&1, port))
{:noreply, %{state | port: port, to_reply: []}}
end

defp maybe_send_subscription(app, stream_key, state) do
if state.subscriptions[{app, stream_key}] != nil and
state.client_reference_mapping[{app, stream_key}] != nil do
send(
state.subscriptions[{app, stream_key}],
{:client_ref, state.client_reference_mapping[{app, stream_key}], app, stream_key}
)
end
end
end
32 changes: 28 additions & 4 deletions lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ defmodule Membrane.RTMP.Server.ClientHandler do
stream_key: nil,
server: opts.server,
buffers_demanded: 0,
published?: false
published?: false,
client_register_attempt_made?: false,
varsill marked this conversation as resolved.
Show resolved Hide resolved
new_client_callback: opts.new_client_callback
}}
end

Expand Down Expand Up @@ -77,7 +79,7 @@ defmodule Membrane.RTMP.Server.ClientHandler do

@impl true
def handle_info({:demand_data, how_many_buffers_demanded}, state) do
state = %{state | buffers_demanded: how_many_buffers_demanded}
state = finish_handshake(state) |> Map.replace!(:buffers_demanded, how_many_buffers_demanded)
request_data(state)
{:noreply, state}
end
Expand All @@ -96,6 +98,20 @@ defmodule Membrane.RTMP.Server.ClientHandler do
{message_handler_state, events} =
MessageHandler.handle_client_messages(messages, state.message_handler_state)

state =
if message_handler_state.publish_msg != nil and not state.client_register_attempt_made? do
%{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} =
message_handler_state

if is_function(state.new_client_callback) do
varsill marked this conversation as resolved.
Show resolved Hide resolved
state.new_client_callback.(self(), state.app, stream_key)
end

%{state | client_register_attempt_made?: true}
else
state
end

state = Enum.reduce(events, state, &handle_event/2)

request_data(state)
Expand Down Expand Up @@ -136,8 +152,6 @@ defmodule Membrane.RTMP.Server.ClientHandler do
%{state | handler_state: new_handler_state, app: connected_msg.app}

{:published, publish_msg} ->
send(state.server, {:register_client, state.app, publish_msg.stream_key, self()})

new_handler_state =
state.handler.handle_stream_published(publish_msg, state.handler_state)

Expand All @@ -159,4 +173,14 @@ defmodule Membrane.RTMP.Server.ClientHandler do
end
end
end

defp finish_handshake(state) when not state.published? do
{message_handler_state, events} =
MessageHandler.send_publish_success(state.message_handler_state)

state = Enum.reduce(events, state, &handle_event/2)
%{state | message_handler_state: message_handler_state}
end

defp finish_handshake(state), do: state
end
3 changes: 2 additions & 1 deletion lib/membrane_rtmp_plugin/rtmp_server/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ defmodule Membrane.RTMP.Server.Listener do
socket: client,
use_ssl?: options.use_ssl?,
handler: options.handler,
server: options.server
server: options.server,
new_client_callback: options.new_client_callback
)

case :gen_tcp.controlling_process(client, client_reference) do
Expand Down
Loading