generated from membraneframework/membrane_template_plugin
-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't accept streams until server is ready to read them #94
Merged
Merged
Changes from 23 commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
e358e26
wip, unifex version bumped
bartkrak a9166de
works but goes into infinite loop
bartkrak 85e49c2
infinite loop fixed
bartkrak 7018f7a
removed subscription mechanism, lambda on new client connection
bartkrak 44f1690
test tests on CI
bartkrak 6089407
quick test fix
bartkrak 52ff895
example
bartkrak db779f3
another test fix
bartkrak 2ddc8f3
lambda trigger moved to client handler
bartkrak 6b79c2a
standalone server works with new api
bartkrak 54b6e50
CI tests test
bartkrak b61aad8
typo
bartkrak fcffb0d
other app stream key test fix
bartkrak 8283bdc
test timeout fix
bartkrak 614ec22
maybe CI will work this time
bartkrak d068af2
formatting
bartkrak 0579cfb
review suggestions
bartkrak ff54122
some warnings added
bartkrak b3a0720
reject unused connections, tests fix
bartkrak 9bd3632
gitignore fix
bartkrak 859550b
format
bartkrak 29709ae
another format fix
bartkrak c3357ce
docs minor fix
bartkrak 9aa29b7
added default new_client_callback implementation
bartkrak 032b5e2
credo update, minor fixes
bartkrak 76d45d1
url parsing moved to utils.ex and made public
bartkrak ca181ea
parse_url doc
bartkrak b5f8263
Update config.yml
bartkrak 938fb06
bundlex bump
bartkrak ed16832
dialyzer fix
bartkrak 0fa80b1
mime fix
bartkrak b31bbd3
CI cache
bartkrak 822aa2e
ci fix
bartkrak 81d30d9
mix lock fix
bartkrak b2a21c6
cosmetic change
bartkrak File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,6 +1,14 @@ | ||||||
defmodule Membrane.RTMP.Server do | ||||||
@moduledoc """ | ||||||
A simple RTMP server, which handles each new incoming connection. | ||||||
A simple RTMP server, which handles each new incoming connection. When a new client connects, the `new_client_callback` is invoked. | ||||||
New connections remain in an incomplete RTMP handshake state, until another process makes demand for their data. | ||||||
If no data is demanded within the client_timeout period, TCP socket is closed. | ||||||
|
||||||
Options: | ||||||
- client_timeout: Time (ms) after which an unused client connection is automatically closed. | ||||||
- new_client_callback: An anonymous function called when a new client connects. | ||||||
It receives the client reference, `app` and `stream_key`, allowing custom processing, | ||||||
like sending the reference to another process. | ||||||
""" | ||||||
use GenServer | ||||||
|
||||||
|
@@ -15,7 +23,12 @@ defmodule Membrane.RTMP.Server do | |||||
handler: ClientHandlerBehaviour.t(), | ||||||
port: :inet.port_number(), | ||||||
use_ssl?: boolean(), | ||||||
name: atom() | nil | ||||||
name: atom() | nil, | ||||||
new_client_callback: (client_ref :: pid(), | ||||||
app :: String.t(), | ||||||
stream_key :: String.t() -> | ||||||
any()), | ||||||
client_timeout: non_neg_integer() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In most plugins we use
Suggested change
|
||||||
] | ||||||
|
||||||
@type server_identifier :: pid() | atom() | ||||||
|
@@ -30,33 +43,6 @@ defmodule Membrane.RTMP.Server do | |||||
GenServer.start_link(__MODULE__, server_options, gen_server_opts) | ||||||
end | ||||||
|
||||||
@doc """ | ||||||
Subscribes for the given app and stream key. | ||||||
When a client connects (or has already connected) to the server with given app and stream key, | ||||||
the subscriber will be informed. | ||||||
""" | ||||||
@spec subscribe(server_identifier(), String.t(), String.t()) :: :ok | ||||||
def subscribe(server_identifier, app, stream_key) do | ||||||
GenServer.cast(server_identifier, {:subscribe, app, stream_key, self()}) | ||||||
:ok | ||||||
end | ||||||
|
||||||
@doc """ | ||||||
Awaits for the client reference of the connection to which the user has previously subscribed. | ||||||
|
||||||
Note: this function call is blocking! | ||||||
Note: first you need to call `#{__MODULE__}.subscribe/3` to subscribe | ||||||
for a given `app` and `stream_key`. | ||||||
""" | ||||||
@spec await_subscription(String.t(), String.t(), non_neg_integer()) :: {:ok, pid()} | :error | ||||||
def await_subscription(app, stream_key, timeout \\ 5_000) do | ||||||
receive do | ||||||
{:client_ref, client_ref, ^app, ^stream_key} -> {:ok, client_ref} | ||||||
after | ||||||
timeout -> :error | ||||||
end | ||||||
end | ||||||
|
||||||
@doc """ | ||||||
Returns the port on which the server listens for connection. | ||||||
""" | ||||||
|
@@ -74,8 +60,6 @@ defmodule Membrane.RTMP.Server do | |||||
|
||||||
{:ok, | ||||||
%{ | ||||||
subscriptions: %{}, | ||||||
client_reference_mapping: %{}, | ||||||
listener: pid, | ||||||
port: nil, | ||||||
to_reply: [], | ||||||
|
@@ -92,33 +76,9 @@ defmodule Membrane.RTMP.Server do | |||||
end | ||||||
end | ||||||
|
||||||
@impl true | ||||||
def handle_cast({:subscribe, app, stream_key, subscriber_pid}, state) do | ||||||
state = put_in(state, [:subscriptions, {app, stream_key}], subscriber_pid) | ||||||
maybe_send_subscription(app, stream_key, state) | ||||||
{:noreply, state} | ||||||
end | ||||||
|
||||||
@impl true | ||||||
def handle_info({:register_client, app, stream_key, client_reference_pid}, state) do | ||||||
state = put_in(state, [:client_reference_mapping, {app, stream_key}], client_reference_pid) | ||||||
maybe_send_subscription(app, stream_key, state) | ||||||
{:noreply, state} | ||||||
end | ||||||
|
||||||
@impl true | ||||||
def handle_info({:port, port}, state) do | ||||||
Enum.each(state.to_reply, &GenServer.reply(&1, port)) | ||||||
{:noreply, %{state | port: port, to_reply: []}} | ||||||
end | ||||||
|
||||||
defp maybe_send_subscription(app, stream_key, state) do | ||||||
if state.subscriptions[{app, stream_key}] != nil and | ||||||
state.client_reference_mapping[{app, stream_key}] != nil do | ||||||
send( | ||||||
state.subscriptions[{app, stream_key}], | ||||||
{:client_ref, state.client_reference_mapping[{app, stream_key}], app, stream_key} | ||||||
) | ||||||
end | ||||||
end | ||||||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.