-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't accept streams until server is ready to read them #94
Conversation
Please make sure tests and linter passes. |
4cc62b6
to
85e49c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could make use of the the lambda
to serve as a "subscription mechanism" and remove the built-in subscription mechanism. It would mean that lambda
should receive client_ref
as one of it's arguments as well.
The, the general flow would be following:
- Client connects and the
{app, stream_key}
pair is resolved - The
lambda
is called and a user custom action is invoked - in most cases the user would simply sendclient_ref
to some process that waits for it - Then the user makes use of the
client_ref
for instance by passing it to theRTMP.Source
(which makes a:demand
) or making a custom:demand
on it - Once the
client_ref
receives the:demand
message for the first time, it finishes the handshake
I believe such an approach would simplify the usage of the server
.gitignore
Outdated
@@ -177,3 +177,4 @@ $RECYCLE.BIN/ | |||
*.lnk | |||
|
|||
# End of https://www.gitignore.io/api/c,vim,linux,macos,elixir,windows,visualstudiocode | |||
received.flv |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really want this one in .gitignore?
@@ -1,6 +1,12 @@ | |||
defmodule Membrane.RTMP.Server do | |||
@moduledoc """ | |||
A simple RTMP server, which handles each new incoming connection. | |||
|
|||
When new client connects to the server, it goes into :client_waiting_queue and its RTMP handshake will remanin unfinished. | |||
Only when pipeline tries to pull data from client, its handshake will be finished, and client will be registered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't mention the "pipeline" - the RTMPServer
is meant to be used outside of Membrane pipeline as well and using it with the Membrane.RTMP.Source
is just one of the options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job! Now it's much more simple.
Concerning the "lambda" (now called new_client_callback
) - what do you think about defining that callback in the ClientHandlerBehaviour?
@@ -1,6 +1,12 @@ | |||
defmodule Membrane.RTMP.Server do | |||
@moduledoc """ | |||
A simple RTMP server, which handles each new incoming connection. | |||
|
|||
When new client connects to the server, it goes into :client_waiting_queue and its RTMP handshake will remanin unfinished. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this part is valid anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are almost there!
I've left some minor comments mostly concerning docs and one suggestion (about default new_client_callback
implementation) to think about
parent_process_pid = self() | ||
|
||
new_client_callback = fn client_ref, app, stream_key -> | ||
send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one seems to look as a kind of a pattern we repeat in most use cases - what do you think about making it a default new_client_callback
implementation?
A simple RTMP server, which handles each new incoming connection. | ||
A simple RTMP server, which handles each new incoming connection. When a new client connects, the new_client_callback is invoked. | ||
New connections remain in an incomplete RTMP handshake state until another process makes demand for dara data. | ||
If no data is demanded within the client_timeout period, the connection is closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it's good to emphasise that it's a client TCP socket that is closed under such circumstances?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left two minor comments. I also believe you need to update credo
to v1.7 to make the CI pass ;) (mix deps.update credo
)
connect on this URL, the source won't complete its setup | ||
* by spawning `Membrane.RTMP.Server`, subscribing for a given app and stream key on which the client | ||
will connect, waiting for a client reference and passing the client reference to the `#{inspect(__MODULE__)}`. | ||
connect on this URL, the source won't complete its setup. Note that all attepted connections to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connect on this URL, the source won't complete its setup. Note that all attepted connections to | |
connect on this URL, the source won't complete its setup. Note that all attempted connections to |
app :: String.t(), | ||
stream_key :: String.t() -> | ||
any()), | ||
client_timeout: non_neg_integer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most plugins we use Membrane.Time
for time handling, so it might be good to be consistent with that here as well:
client_timeout: non_neg_integer() | |
client_timeout: Membrane.Time.t() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥇
server_options = Enum.into(server_options, %{}) | ||
|
||
server_options = | ||
if Map.get(server_options, :new_client_callback, nil) == nil do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NIT] I believe you could do as well:
if Map.get(server_options, :new_client_callback, nil) == nil do | |
if server_options[:new_client_callback] == nil do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will approve this but @varsill can we move the code owner to someone else? I won't be working on the repository anytime soon.
solves: membraneframework/membrane_core#812