Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't accept streams until server is ready to read them #94

Merged
merged 35 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e358e26
wip, unifex version bumped
bartkrak Jun 13, 2024
a9166de
works but goes into infinite loop
bartkrak Jun 13, 2024
85e49c2
infinite loop fixed
bartkrak Jun 14, 2024
7018f7a
removed subscription mechanism, lambda on new client connection
bartkrak Jul 3, 2024
44f1690
test tests on CI
bartkrak Jul 5, 2024
6089407
quick test fix
bartkrak Jul 5, 2024
52ff895
example
bartkrak Jul 5, 2024
db779f3
another test fix
bartkrak Jul 5, 2024
2ddc8f3
lambda trigger moved to client handler
bartkrak Jul 8, 2024
6b79c2a
standalone server works with new api
bartkrak Jul 8, 2024
54b6e50
CI tests test
bartkrak Jul 8, 2024
b61aad8
typo
bartkrak Jul 8, 2024
fcffb0d
other app stream key test fix
bartkrak Jul 8, 2024
8283bdc
test timeout fix
bartkrak Jul 8, 2024
614ec22
maybe CI will work this time
bartkrak Jul 8, 2024
d068af2
formatting
bartkrak Jul 8, 2024
0579cfb
review suggestions
bartkrak Jul 8, 2024
ff54122
some warnings added
bartkrak Jul 9, 2024
b3a0720
reject unused connections, tests fix
bartkrak Jul 12, 2024
9bd3632
gitignore fix
bartkrak Jul 12, 2024
859550b
format
bartkrak Jul 12, 2024
29709ae
another format fix
bartkrak Jul 12, 2024
c3357ce
docs minor fix
bartkrak Jul 15, 2024
9aa29b7
added default new_client_callback implementation
bartkrak Jul 23, 2024
032b5e2
credo update, minor fixes
bartkrak Jul 23, 2024
76d45d1
url parsing moved to utils.ex and made public
bartkrak Jul 24, 2024
ca181ea
parse_url doc
bartkrak Jul 24, 2024
b5f8263
Update config.yml
bartkrak Jul 24, 2024
938fb06
bundlex bump
bartkrak Jul 24, 2024
ed16832
dialyzer fix
bartkrak Jul 24, 2024
0fa80b1
mime fix
bartkrak Jul 24, 2024
b31bbd3
CI cache
bartkrak Jul 24, 2024
822aa2e
ci fix
bartkrak Jul 24, 2024
81d30d9
mix lock fix
bartkrak Jul 24, 2024
b2a21c6
cosmetic change
bartkrak Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,4 @@ $RECYCLE.BIN/
*.lnk

# End of https://www.gitignore.io/api/c,vim,linux,macos,elixir,windows,visualstudiocode
received.flv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want this one in .gitignore?

21 changes: 15 additions & 6 deletions examples/source_with_standalone_server.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,34 @@ defmodule Pipeline do
end
end

Logger.configure(level: :error)

# The client will connect on `rtmp://localhost:1935/app/stream_key`
port = 1935
app = "app"
stream_key = "stream_key"

# example lambda function that upon launching will send a message back to parent server.
lambda = fn server, app, stream_key ->
send(server, {:lambda, "hello from the other side #{app} #{stream_key}"})
end

# Run the standalone server
{:ok, server} =
Membrane.RTMP.Server.start_link(
handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()},
port: port,
use_ssl?: false
use_ssl?: false,
lambda: lambda
)

# Subscribe to receive client reference that connected to the
# server with given app id and stream key
:ok = Membrane.RTMP.Server.subscribe(server, app, stream_key)
# Subscribe any app stream key to be informed when new client connects,
# so you can await its ref and pull data from it.
:ok = Membrane.RTMP.Server.subscribe_any(server)

# Wait for the client reference for given app stream_key
{:ok, client_ref} = Membrane.RTMP.Server.await_client_ref(app, stream_key)

# Wait for the client reference
{:ok, client_ref} = Membrane.RTMP.Server.await_subscription(app, stream_key)
# Start the pipeline and provide it with the client_ref
{:ok, _supervisor, pipeline} =
Membrane.Pipeline.start_link(Pipeline, client_ref: client_ref)
Expand Down
21 changes: 17 additions & 4 deletions lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ defmodule Membrane.RTMP.MessageHandler do
header_sent?: false,
events: [],
receiver_pid: nil,
#
publish_msg: nil,
publish_header: nil,
# how many times the Source tries to get control of the socket
socket_retries: 3,
# epoch required for performing a handshake with the pipeline
Expand All @@ -72,6 +75,17 @@ defmodule Membrane.RTMP.MessageHandler do
end)
end

@spec send_publish_success(map()) :: {map(), list()}
def send_publish_success(state) do
Responses.publish_success(state.publish_msg.stream_key)
|> send_rtmp_payload(state.socket,
chunk_stream_id: 3,
stream_id: state.publish_header.stream_id
)

{%{state | events: []}, [{:published, state.publish_msg} | state.events]}
end

# Expected flow of messages:
# 1. [in] c0_c1 handshake -> [out] s0_s1_s2 handshake
# 2. [in] c2 handshake -> [out] empty
Expand Down Expand Up @@ -141,11 +155,10 @@ defmodule Membrane.RTMP.MessageHandler do
%Messages.UserControl{event_type: @stream_begin_type, data: <<0, 0, 0, 1>>}
|> send_rtmp_payload(state.socket, chunk_stream_id: 2)

Responses.publish_success(publish_msg.stream_key)
|> send_rtmp_payload(state.socket, chunk_stream_id: 3, stream_id: header.stream_id)
# at this point pause the unfinished handshake until pipeline demands data from this client
# (this mechanism prevents accepting streams with no listeners)

state = %{state | events: [{:published, publish_msg} | state.events]}
{:cont, state}
{:halt, %{state | publish_msg: publish_msg, publish_header: header}}
end

# A message containing stream metadata
Expand Down
5 changes: 3 additions & 2 deletions lib/membrane_rtmp_plugin/rtmp/source/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ defmodule Membrane.RTMP.Source do
Membrane.RTMP.Server.start_link(
handler: %SourceClientHandler{controlling_process: self()},
port: port,
use_ssl?: use_ssl?
use_ssl?: use_ssl?,
lambda: nil
)

state = %{state | app: app, stream_key: stream_key, server: server_pid}
Expand Down Expand Up @@ -157,7 +158,7 @@ defmodule Membrane.RTMP.Source do

@impl true
def handle_info({:client_connected, app, stream_key}, _ctx, %{mode: :builtin_server} = state) do
:ok = Membrane.RTMP.Server.subscribe(state.server, state.app, state.stream_key)
:ok = Membrane.RTMP.Server.subscribe_any(state.server)
state = %{state | app: app, stream_key: stream_key}
{[], state}
end
Expand Down
79 changes: 52 additions & 27 deletions lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
defmodule Membrane.RTMP.Server do
@moduledoc """
A simple RTMP server, which handles each new incoming connection.

When new client connects to the server, it goes into :client_waiting_queue and its RTMP handshake will remanin unfinished.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this part is valid anymore

Only when pipeline tries to pull data from client, its handshake will be finished, and client will be registered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't mention the "pipeline" - the RTMPServer is meant to be used outside of Membrane pipeline as well and using it with the Membrane.RTMP.Source is just one of the options.


Also when new client connects, optional, annonymous function defined by user is triggered.
The lambda function is given PID of parent server, app and stream key.
"""
use GenServer

Expand All @@ -15,7 +21,8 @@ defmodule Membrane.RTMP.Server do
handler: ClientHandlerBehaviour.t(),
port: :inet.port_number(),
use_ssl?: boolean(),
name: atom() | nil
name: atom() | nil,
lambda: function() | nil
varsill marked this conversation as resolved.
Show resolved Hide resolved
]

@type server_identifier :: pid() | atom()
Expand All @@ -31,27 +38,26 @@ defmodule Membrane.RTMP.Server do
end

@doc """
Subscribes for the given app and stream key.
When a client connects (or has already connected) to the server with given app and stream key,
the subscriber will be informed.
Subscribes for any app/stream_key.
When a new client connects, subscriber will be informed, if currently is awaiting client_ref for a given app/stream_key
"""
@spec subscribe(server_identifier(), String.t(), String.t()) :: :ok
def subscribe(server_identifier, app, stream_key) do
GenServer.cast(server_identifier, {:subscribe, app, stream_key, self()})
@spec subscribe_any(server_identifier()) :: :ok
def subscribe_any(server_identifier) do
GenServer.cast(server_identifier, {:subscribe_any, self()})
:ok
end

@doc """
Awaits for the client reference of the connection to which the user has previously subscribed.
Awaits for the client reference of the connection to specified app/stream_key

Note: this function call is blocking!
Note: first you need to call `#{__MODULE__}.subscribe/3` to subscribe
for a given `app` and `stream_key`.
Note: first you need to call `#{__MODULE__}.subscribe_any/1`.
"""
@spec await_subscription(String.t(), String.t(), non_neg_integer()) :: {:ok, pid()} | :error
def await_subscription(app, stream_key, timeout \\ 5_000) do
@spec await_client_ref(String.t(), String.t(), non_neg_integer()) :: {:ok, pid()} | :error
def await_client_ref(app, stream_key, timeout \\ 5_000) do
receive do
{:client_ref, client_ref, ^app, ^stream_key} -> {:ok, client_ref}
{:client_ref, client_ref, ^app, ^stream_key} ->
{:ok, client_ref}
after
timeout -> :error
end
Expand All @@ -74,12 +80,14 @@ defmodule Membrane.RTMP.Server do

{:ok,
%{
subscriptions: %{},
subscriptions_any: [],
client_reference_mapping: %{},
client_waiting_queue: %{},
listener: pid,
port: nil,
to_reply: [],
use_ssl?: server_options.use_ssl?
use_ssl?: server_options.use_ssl?,
lambda: server_options.lambda
}}
end

Expand All @@ -93,16 +101,37 @@ defmodule Membrane.RTMP.Server do
end

@impl true
def handle_cast({:subscribe, app, stream_key, subscriber_pid}, state) do
state = put_in(state, [:subscriptions, {app, stream_key}], subscriber_pid)
maybe_send_subscription(app, stream_key, state)
def handle_cast({:subscribe_any, subscriber_pid}, state) do
subs = [state.subscriptions_any ++ subscriber_pid]
state = %{state | subscriptions_any: subs}

# try to send all client_refs from :client_waiting_queue to this subscriber, maybe is already awaiting one of /app/stream_keys
state.client_waiting_queue
|> Enum.each(fn {{app, stream_key}, client_ref} ->
send(subscriber_pid, {:client_ref, client_ref, app, stream_key})
end)

{:noreply, state}
end

@impl true
def handle_info({:register_client, app, stream_key, client_reference_pid}, state) do
state = put_in(state, [:client_reference_mapping, {app, stream_key}], client_reference_pid)
maybe_send_subscription(app, stream_key, state)
client_waiting_queue = state.client_waiting_queue |> Map.delete({app, stream_key})
{:noreply, %{state | client_waiting_queue: client_waiting_queue}}
end

@impl true
def handle_info({:register_client_in_queue, app, stream_key, client_ref}, state) do
if is_function(state.lambda) do
state.lambda.(self(), app, stream_key)
end

state = put_in(state, [:client_waiting_queue, {app, stream_key}], client_ref)
# send client ref_to anyone possibly awaiting it
state.subscriptions_any
|> Enum.each(fn subscriber -> send(subscriber, {:client_ref, client_ref, app, stream_key}) end)

{:noreply, state}
end

Expand All @@ -112,13 +141,9 @@ defmodule Membrane.RTMP.Server do
{:noreply, %{state | port: port, to_reply: []}}
end

defp maybe_send_subscription(app, stream_key, state) do
if state.subscriptions[{app, stream_key}] != nil and
state.client_reference_mapping[{app, stream_key}] != nil do
send(
state.subscriptions[{app, stream_key}],
{:client_ref, state.client_reference_mapping[{app, stream_key}], app, stream_key}
)
end
@impl true
def handle_info({:lambda, message}, state) do
IO.inspect(message, label: "new message from lambda")
varsill marked this conversation as resolved.
Show resolved Hide resolved
{:noreply, state}
end
end
26 changes: 24 additions & 2 deletions lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ defmodule Membrane.RTMP.Server.ClientHandler do
stream_key: nil,
server: opts.server,
buffers_demanded: 0,
published?: false
published?: false,
client_register_attempt_made?: false
}}
end

Expand Down Expand Up @@ -77,7 +78,7 @@ defmodule Membrane.RTMP.Server.ClientHandler do

@impl true
def handle_info({:demand_data, how_many_buffers_demanded}, state) do
state = %{state | buffers_demanded: how_many_buffers_demanded}
state = %{finish_handshake(state) | buffers_demanded: how_many_buffers_demanded}
varsill marked this conversation as resolved.
Show resolved Hide resolved
request_data(state)
{:noreply, state}
end
Expand All @@ -96,6 +97,17 @@ defmodule Membrane.RTMP.Server.ClientHandler do
{message_handler_state, events} =
MessageHandler.handle_client_messages(messages, state.message_handler_state)

state =
if message_handler_state.publish_msg != nil and not state.client_register_attempt_made? do
%{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} =
message_handler_state

send(state.server, {:register_client_in_queue, state.app, stream_key, self()})
%{state | client_register_attempt_made?: true}
else
state
end

state = Enum.reduce(events, state, &handle_event/2)

request_data(state)
Expand Down Expand Up @@ -159,4 +171,14 @@ defmodule Membrane.RTMP.Server.ClientHandler do
end
end
end

defp finish_handshake(state) when not state.published? do
{message_handler_state, events} =
MessageHandler.send_publish_success(state.message_handler_state)

state = Enum.reduce(events, state, &handle_event/2)
%{state | message_handler_state: message_handler_state}
end

defp finish_handshake(state), do: state
end
9 changes: 5 additions & 4 deletions test/membrane_rtmp_plugin/rtmp_source_bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
defp start_pipeline_with_builtin_rtmp_server(app, stream_key, use_ssl? \\ false) do
options = [
module: Membrane.RTMP.Source.WithBuiltinServerTestPipeline,
custom_args: %{app: app, stream_key: stream_key, port: @default_port, use_ssl?: use_ssl?},
custom_args: %{app: app, stream_key: stream_key, port: @default_port, use_ssl?: use_ssl?, lambda: nil},
test_process: self()
]

Expand All @@ -221,7 +221,8 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
controlling_process: self()
},
port: port,
use_ssl?: use_ssl?
use_ssl?: use_ssl?,
lambda: nil
)

{:ok, assigned_port} = Membrane.RTMP.Server.get_port(server_pid)
Expand All @@ -233,10 +234,10 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
{:client_connected, ^app, ^stream_key} -> :ok
end

:ok = Membrane.RTMP.Server.subscribe(server_pid, app, stream_key)
:ok = Membrane.RTMP.Server.subscribe_any(server_pid)

{:ok, client_reference} =
Membrane.RTMP.Server.await_subscription(app, stream_key)
Membrane.RTMP.Server.await_client_ref(app, stream_key)

options = [
module: Membrane.RTMP.Source.WithExternalServerTestPipeline,
Expand Down