Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RTMP namespace #96

Merged
merged 8 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The package can be installed by adding `membrane_rtmp_plugin` to your list of de
```elixir
def deps do
[
{:membrane_rtmp_plugin, "~> 0.23.4"}
{:membrane_rtmp_plugin, "~> 0.24.0"}
]
end
```
Expand Down Expand Up @@ -83,7 +83,7 @@ When the script terminates, the `testsrc` content should be available in the `re

### RTMP receive with standalone RTMP server

If you want to see how you could setup the `Membrane.RTMP.Server` on your own and use it
If you want to see how you could setup the `Membrane.RTMPServer` on your own and use it
with cooperation with the `Membane.RTMP.SourceBin`, take a look at [`examples/source_with_standalone_server.exs`](examples/source_with_standalone_server.exs)
Run it with:

Expand Down
8 changes: 4 additions & 4 deletions examples/source_with_standalone_server.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ port = 1935
# example lambda function that upon launching will send client reference back to parent process.
parent_process_pid = self()

new_client_callback = fn client_ref, app, stream_key ->
handle_new_client = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
end

# Run the standalone server
{:ok, server} =
Membrane.RTMP.Server.start_link(
handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()},
Membrane.RTMPServer.start_link(
handler: %Membrane.RTMP.Source.ClientHandlerForSource{controlling_process: self()},
port: port,
use_ssl?: false,
new_client_callback: new_client_callback,
handle_new_client: handle_new_client,
client_timeout: 5_000
)

Expand Down
9 changes: 0 additions & 9 deletions lib/membrane_rtmp_plugin/rtmp/avc/utils.ex

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
defmodule Membrane.RTMP.Source.ClientHandler do
defmodule Membrane.RTMP.Source.ClientHandlerForSource do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand it's to avoid confusion with the behaviour, but I'd leave it as is, just never alias it. Alternatively let's call it ClientHandlerImpl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm up to the second option ;)

@moduledoc """
An implementation of `Membrane.RTMP.Server.ClienHandlerBehaviour` compatible with the
An implementation of `Membrane.RTMPServer.ClienHandlerBehaviour` compatible with the
`Membrane.RTMP.Source` element.
"""

@behaviour Membrane.RTMP.Server.ClientHandlerBehaviour
@behaviour Membrane.RTMPServer.ClientHandler

defstruct [:controlling_process]

Expand Down Expand Up @@ -67,10 +67,4 @@ defmodule Membrane.RTMP.Source.ClientHandler do
send(pid, :end_of_stream)
:ok
end

@spec request_for_data(pid()) :: :ok
def request_for_data(client_reference) do
send(client_reference, {:send_me_data, self()})
:ok
end
end
22 changes: 10 additions & 12 deletions lib/membrane_rtmp_plugin/rtmp/source/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ defmodule Membrane.RTMP.Source do
connect on this URL, the source won't complete its setup. Note that all attempted connections to
other `app` or `stream_key` than specified ones will be rejected.

* by spawning `Membrane.RTMP.Server`, receiving a client reference and passing it to the `#{inspect(__MODULE__)}`.
* by spawning `Membrane.RTMPServer`, receiving a client reference and passing it to the `#{inspect(__MODULE__)}`.
"""
use Membrane.Source
require Membrane.Logger
require Logger
alias __MODULE__.ClientHandler, as: SourceClientHandler
alias Membrane.RTMP.Server.ClientHandler
alias Membrane.RTMP.Utils
alias Membrane.RTMPServer.ClientHandler

def_output_pad :output,
availability: :always,
Expand All @@ -28,7 +26,7 @@ defmodule Membrane.RTMP.Source do
spec: pid(),
description: """
A pid of a process acting as a client reference.
Can be gained with the use of `Membrane.RTMP.Server`.
Can be gained with the use of `Membrane.RTMPServer`.
"""
],
url: [
Expand Down Expand Up @@ -82,20 +80,20 @@ defmodule Membrane.RTMP.Source do

@impl true
def handle_setup(_ctx, %{mode: :builtin_server} = state) do
{use_ssl?, port, app, stream_key} = Utils.parse_url(state.url)
{use_ssl?, port, app, stream_key} = Membrane.RTMPServer.parse_url(state.url)

parent_pid = self()

new_client_callback = fn client_ref, app, stream_key ->
handle_new_client = fn client_ref, app, stream_key ->
send(parent_pid, {:client_ref, client_ref, app, stream_key})
end

{:ok, server_pid} =
Membrane.RTMP.Server.start_link(
handler: %SourceClientHandler{controlling_process: self()},
Membrane.RTMPServer.start_link(
handler: %__MODULE__.ClientHandlerForSource{controlling_process: self()},
port: port,
use_ssl?: use_ssl?,
new_client_callback: new_client_callback,
handle_new_client: handle_new_client,
client_timeout: 100
)

Expand All @@ -115,7 +113,7 @@ defmodule Membrane.RTMP.Source do
{:output, %Membrane.RemoteStream{content_format: Membrane.FLV, type: :bytestream}}
]

:ok = SourceClientHandler.request_for_data(state.client_ref)
send(state.client_ref, {:send_me_data, self()})

{stream_format, state}
end
Expand Down Expand Up @@ -150,7 +148,7 @@ defmodule Membrane.RTMP.Source do
%{client_ref: client_ref, mode: :builtin_server} = state
) do
:ok = ClientHandler.demand_data(client_ref, size)
:ok = SourceClientHandler.request_for_data(client_ref)
send(client_ref, {:send_me_data, self()})
{[], state}
end

Expand Down
4 changes: 2 additions & 2 deletions lib/membrane_rtmp_plugin/rtmp/source/source_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.RTMP.SourceBin do
The bin can be used in the following two scenarios:
* by providing the URL on which the client is expected to connect - note, that if the client doesn't
connect on this URL, the bin won't complete its setup
* by spawning `Membrane.RTMP.Server`, receiving client reference after client connects on a given `app` and `stream_key`
* by spawning `Membrane.RTMPServer`, receiving client reference after client connects on a given `app` and `stream_key`
and passing the client reference to the `#{inspect(__MODULE__)}`.
"""
use Membrane.Bin
Expand All @@ -28,7 +28,7 @@ defmodule Membrane.RTMP.SourceBin do
spec: pid(),
description: """
A pid of a process acting as a client reference.
Can be gained with the use of `Membrane.RTMP.Server`.
Can be gained with the use of `Membrane.RTMPServer`.
"""
],
url: [
Expand Down
46 changes: 36 additions & 10 deletions lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
defmodule Membrane.RTMP.Server do
defmodule Membrane.RTMPServer do
@moduledoc """
A simple RTMP server, which handles each new incoming connection. When a new client connects, the `new_client_callback` is invoked.
A simple RTMP server, which handles each new incoming connection. When a new client connects, the `handle_new_client` is invoked.
New connections remain in an incomplete RTMP handshake state, until another process makes demand for their data.
If no data is demanded within the client_timeout period, TCP socket is closed.

Options:
- client_timeout: Time (ms) after which an unused client connection is automatically closed.
- new_client_callback: An anonymous function called when a new client connects.
- handle_new_client: An anonymous function called when a new client connects.
It receives the client reference, `app` and `stream_key`, allowing custom processing,
like sending the reference to another process. If it's not provided, default implementation is used:
{:client_ref, client_ref, app, stream_key} message is sent to the process that invoked RTMP.Server.start_link().
{:client_ref, client_ref, app, stream_key} message is sent to the process that invoked RTMPServer.start_link().
"""
use GenServer

require Logger

alias Membrane.RTMP.Server.ClientHandlerBehaviour
alias Membrane.RTMPServer.ClientHandler

@typedoc """
Defines options for the RTMP server.
"""
@type t :: [
handler: ClientHandlerBehaviour.t(),
handler: ClientHandler.t(),
port: :inet.port_number(),
use_ssl?: boolean(),
name: atom() | nil,
new_client_callback:
handle_new_client:
(client_ref :: pid(), app :: String.t(), stream_key :: String.t() ->
any())
| nil,
Expand All @@ -44,14 +44,14 @@ defmodule Membrane.RTMP.Server do
server_options = Enum.into(server_options, %{})

server_options =
if server_options[:new_client_callback] == nil do
if server_options[:handle_new_client] == nil do
parent_process_pid = self()

callback = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
end

Map.put(server_options, :new_client_callback, callback)
Map.put(server_options, :handle_new_client, callback)
else
server_options
end
Expand All @@ -70,7 +70,7 @@ defmodule Membrane.RTMP.Server do
@impl true
def init(server_options) do
pid =
Task.start_link(Membrane.RTMP.Server.Listener, :run, [
Task.start_link(Membrane.RTMPServer.Listener, :run, [
Map.merge(server_options, %{server: self()})
])

Expand All @@ -97,4 +97,30 @@ defmodule Membrane.RTMP.Server do
Enum.each(state.to_reply, &GenServer.reply(&1, port))
{:noreply, %{state | port: port, to_reply: []}}
end

@doc """
Extracts ssl, port, app and stream_key from url.
"""
@spec parse_url(url :: String.t()) :: {boolean(), integer(), String.t(), String.t()}
def parse_url(url) do
uri = URI.parse(url)
port = uri.port

{app, stream_key} =
case (uri.path || "")
|> String.trim_leading("/")
|> String.trim_trailing("/")
|> String.split("/") do
[app, stream_key] -> {app, stream_key}
[app] -> {app, ""}
end

use_ssl? =
case uri.scheme do
"rtmp" -> false
"rtmps" -> true
end

{use_ssl?, port, app, stream_key}
end
end
63 changes: 56 additions & 7 deletions lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex
Original file line number Diff line number Diff line change
@@ -1,14 +1,63 @@
defmodule Membrane.RTMP.Server.ClientHandler do
@moduledoc false
defmodule Membrane.RTMPServer.ClientHandler do
@moduledoc """
A behaviour describing the actions that might be taken by the client
handler in response to different events.
"""

# Module responsible for maintaining the lifecycle of the
# It also containts functions responsible for maintaining the lifecycle of the
# client connection.

use GenServer

require Logger
alias Membrane.RTMP.{Handshake, MessageHandler, MessageParser}

@typedoc """
Type representing the user defined state of the client handler.
"""
@type t :: term()

@doc """
The callback invoked once the client handler is created.
It should return the initial state of the client handler.
"""
@callback handle_init(any()) :: t()

@doc """
The callback invoked when the client sends the `Membrane.RTMP.Messages.Connect.t()`
message.
"""
@callback handle_connected(connected_msg :: Membrane.RTMP.Messages.Connect.t(), state :: t()) ::
t()

@doc """
The callback invoked when the client sends the `Membrane.RTMP.Messages.Publish.t()`
message.
"""
@callback handle_stream_published(
publish_msg :: Membrane.RTMP.Messages.Publish.t(),
state :: t()
) :: t()

@doc """
The callback invoked when new piece of data is received from a given client.
"""
@callback handle_data_available(payload :: binary(), state :: t()) :: t()

@doc """
The callback invoked when the client served by given client handler
stops sending data.
(for instance, when the remote client deletes the stream or
terminates the socket connection)
"""
@callback handle_end_of_stream(state :: t()) :: t()

@doc """
The callback invoked when the client handler receives a message
that is not recognized as an internal message of the client handler.
"""
@callback handle_info(msg :: term(), t()) :: t()

@doc """
Makes the client handler ask client for the desired number of buffers
"""
Expand Down Expand Up @@ -40,7 +89,7 @@ defmodule Membrane.RTMP.Server.ClientHandler do
buffers_demanded: 0,
published?: false,
notified_about_client?: false,
new_client_callback: opts.new_client_callback,
handle_new_client: opts.handle_new_client,
client_timeout: opts.client_timeout
}}
end
Expand Down Expand Up @@ -114,10 +163,10 @@ defmodule Membrane.RTMP.Server.ClientHandler do
%{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} =
message_handler_state

if is_function(state.new_client_callback) do
state.new_client_callback.(self(), state.app, stream_key)
if is_function(state.handle_new_client) do
state.handle_new_client.(self(), state.app, stream_key)
else
raise "new_client_callback is not a function"
raise "handle_new_client is not a function"
end

Process.send_after(self(), {:client_timeout, state.app, stream_key}, state.client_timeout)
Expand Down
Loading