Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add general use RTMP server #85

Merged
merged 47 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
5c0b665
Add POC of RTMP server capable of serving multiple RTMP clients
varsill Feb 13, 2024
fe10680
Perform refactor. Move parsing functions to message_parser. Separate …
varsill Feb 13, 2024
02f1f9d
Move client handler to a separate module
varsill Feb 13, 2024
2495cc6
Remove validator. Add RTMP.Server.Behaviour
varsill Feb 16, 2024
6c56ca0
Add ability to specify initial client handler state. Remove handle_in…
varsill Feb 19, 2024
982bc0d
Rewrite Source and SourceBin to support new architecture. Adjust test…
varsill Feb 20, 2024
a821766
Perform formatting
varsill Feb 20, 2024
6262d90
Fix compilation warnings
varsill Feb 20, 2024
7888b2c
Fix formatting
varsill Feb 20, 2024
2eaead3
Refactor
varsill Feb 20, 2024
0f4a5cd
Merge branch 'master' of https://github.com/membraneframework/membran…
varsill Feb 21, 2024
6747ddd
Add moduledocs and typespecs
varsill Feb 21, 2024
348f525
Remove nonexisting reference in docs
varsill Feb 21, 2024
1d55c5e
fix dialyzer error
varsill Feb 21, 2024
af6a05e
Allow for spawning RTMP server under desired name
varsill Feb 26, 2024
4e6358e
Fix a misspelling of a Source behaviour module
varsill Feb 26, 2024
855c71d
Add typespec for `RTMP.Server.start_link/1`
varsill Feb 26, 2024
6c6a4b4
perform formatting
varsill Feb 26, 2024
e9df383
Implement reviewers suggestions. Add backpressure mechanism.
varsill May 21, 2024
baa1014
Fix tests running
varsill May 21, 2024
3c3b351
Add hansling of elements demands
varsill May 21, 2024
6a68408
Fix credo warnings
varsill May 22, 2024
27a5d20
Merge branch 'master' of https://github.com/membraneframework/membran…
varsill May 22, 2024
cf98cfd
Fix options name
varsill May 22, 2024
3518d97
simplify handling of demands
varsill May 22, 2024
bc80001
Add helper function to send demands from the client handler
varsill May 22, 2024
e288276
Move all the sockets data reading to the client handle
varsill May 22, 2024
aa77ff4
make credo happy
varsill May 22, 2024
406e061
make dialyzer happy
varsill May 22, 2024
dd28fcd
Finish setup immediately in source
varsill May 22, 2024
692c664
Make source properly handle playing mode enterance in different modes
varsill May 22, 2024
385bce1
Make the rtmp source test server bind to port 0
varsill May 23, 2024
a5a521d
remove debugging leftover
varsill May 23, 2024
8a53668
Add ability not to pass controlling process to default source behaviour
varsill May 27, 2024
7752fd8
Update RTMPServer.subscribe() spec
varsill May 27, 2024
b833ff9
Change handle_info into handle_cast for subscribe handling in RTMP se…
varsill May 28, 2024
2620570
Add await_subscription/2 function in rtmp server
varsill May 28, 2024
6e15040
Fix a spec in rtmp server
varsill May 28, 2024
bd74a5a
Add struct to define options in default source handler behaviour impl…
varsill May 28, 2024
83adaf3
Fix credo warnings
varsill May 28, 2024
ecb4c3f
Implement reviewers suggestions.
varsill Jun 11, 2024
9dd8317
Update source description in the README. Add more comprehensive docs …
varsill Jun 11, 2024
5d11bfd
Update source.exs example
varsill Jun 11, 2024
8eee65c
Add an example with standalone server. Use mix run instead of elixir …
varsill Jun 11, 2024
cdc36d7
Revoke usage of elixir <script.exs> for sink examples.
varsill Jun 11, 2024
bdbd699
Implement reviewers suggestions. Rename client behaviour into client …
varsill Jun 14, 2024
d976959
Fix credo warning
varsill Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ sudo apt-get install ffmpeg
pacman -S ffmpeg
```

## RTMP Server
An simple RTMP server that accepts clients connecting on a given port and allows to distinguish between them
based on app ID and stream key. Each client that has connected is asigned a dedicated client handler, which
behaviour can be provided by RTMP server user.

## SourceBin

Requires a socket, which has been connected to the client. It receives RTMP stream, demuxes it and outputs H264 video and AAC audio.
Requires a client handler, which has been connected to the client, or an URL on which the client is supposed to connect. It receives RTMP stream, demuxes it and outputs H264 video and AAC audio.

## Client

Expand All @@ -53,10 +58,6 @@ Currently only the following codecs are supported:
- H264 for video
- AAC for audio

## TCP Server

It's a simple implementation of tcp server. It opens a tcp port and listens for incoming connections. For each new connection, a user-provided function is executed.

### Prerequisites

In order to successfully build and install the plugin, you need to have **ffmpeg == 4.4** installed on your system
Expand All @@ -65,16 +66,32 @@ In order to successfully build and install the plugin, you need to have **ffmpeg

### RTMP receiver

Server-side example, in which Membrane will act as an RTMP server and receive the stream, can be found under [`examples/source.exs`](examples/source.exs). Run it with:
Server-side example, in which Membrane element will act as an RTMP server and receive the stream, can be found under [`examples/source.exs`](examples/source.exs). Run it with:
varsill marked this conversation as resolved.
Show resolved Hide resolved

```bash
mix run examples/source.exs
```

When the server is ready you can connect to it with RTMP. If you just want to test it, you can use FFmpeg:

```bash
ffmpeg -re -i test/fixtures/testsrc.flv -f flv -c:v copy -c:a copy rtmp://localhost:1935/app/stream_key
```

### RTMP receive with standalone RTMP server

If you want to see how you could setup the `Membrane.RTMP.Server` on your own and use it
with cooperation with the `Membane.RTMP.SourceBin`, take a look at [`examples/source_with_standalone_server.exs`](examples/source_with_standalone_server.exs)
Run it with:

```bash
elixir examples/source.exs
mix run examples/source.exs
```

When the server is ready you can connect to it with RTMP. If you just want to test it, you can use FFmpeg:

```bash
ffmpeg -re -i test/fixtures/testsrc.flv -f flv -c:v copy -c:a copy rtmp://localhost:5000
ffmpeg -re -i test/fixtures/testsrc.flv -f flv -c:v copy -c:a copy rtmp://localhost:1935/app/stream_key
```

### Streaming with RTMP
Expand Down
98 changes: 16 additions & 82 deletions examples/source.exs
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
# After running this script, you can access the server at rtmp://localhost:5000
# After running this script, you can access the server at rtmp://localhost:1935
# You can use FFmpeg to stream to it
# ffmpeg -re -i test/fixtures/testsrc.flv -f flv -c:v copy -c:a copy rtmp://localhost:5000

Mix.install([
:membrane_aac_plugin,
:membrane_h264_plugin,
:membrane_flv_plugin,
:membrane_file_plugin,
{:membrane_rtmp_plugin, path: __DIR__ |> Path.join("..") |> Path.expand()}
])
# ffmpeg -re -i test/fixtures/testsrc.flv -f flv -c:v copy -c:a copy rtmp://localhost:1935/app/stream_key

defmodule Pipeline do
use Membrane.Pipeline

@output_file "received.flv"

@impl true
def handle_init(_ctx, socket: socket) do
def handle_init(_ctx, opts) do
structure = [
child(:source, %Membrane.RTMP.SourceBin{
socket: socket
url: "rtmp://127.0.0.1:1935/app/stream_key"
})
|> via_out(:audio)
|> child(:audio_parser, %Membrane.AAC.Parser{
Expand All @@ -38,43 +30,14 @@ defmodule Pipeline do
|> get_child(:muxer)
]

{[spec: structure], %{}}
end

# Once the source initializes, we grant it the control over the tcp socket
@impl true
def handle_child_notification(
{:socket_control_needed, _socket, _source} = notification,
:source,
_ctx,
state
) do
send(self(), notification)

{[], state}
end

def handle_child_notification(_notification, _child, _ctx, state) do
{[], state}
end

@impl true
def handle_info({:socket_control_needed, socket, source} = notification, _ctx, state) do
case Membrane.RTMP.SourceBin.pass_control(socket, source) do
:ok ->
:ok

{:error, :not_owner} ->
Process.send_after(self(), notification, 200)
end

{[], state}
{[spec: structure], %{controller_pid: opts[:controller_pid]}}
end

# The rest of the module is used for self-termination of the pipeline after processing finishes
@impl true
def handle_element_end_of_stream(:sink, _pad, _ctx, state) do
{[terminate: :shutdown], state}
send(state.controller_pid, :eos)
{[], state}
varsill marked this conversation as resolved.
Show resolved Hide resolved
end

@impl true
Expand All @@ -83,44 +46,15 @@ defmodule Pipeline do
end
end

defmodule Example do
@server_ip {127, 0, 0, 1}
@server_port 5000

def run() do
parent = self()
# Start a pipeline with `Membrane.RTMP.Source` that will spawn an RTMP server waiting for
# the client connection on given URL
{:ok, _supervisor, pipeline} = Membrane.Pipeline.start_link(Pipeline, controller_pid: self())

server_options = %Membrane.RTMP.Source.TcpServer{
port: @server_port,
listen_options: [
:binary,
packet: :raw,
active: false,
ip: @server_ip
],
socket_handler: fn socket ->
# On new connection a pipeline is started
{:ok, _supervisor, pipeline} = Membrane.Pipeline.start_link(Pipeline, socket: socket)
send(parent, {:pipeline_spawned, pipeline})
{:ok, pipeline}
end
}

Membrane.RTMP.Source.TcpServer.start_link(server_options)

pipeline =
receive do
{:pipeline_spawned, pid} ->
pid
end

ref = Process.monitor(pipeline)

receive do
{:DOWN, ^ref, :process, _obj, _reason} ->
:ok
end
# Wait for end of stream
:ok =
receive do
:eos -> :ok
end
end

Example.run()
# Terminate the pipeline
:ok = Membrane.Pipeline.terminate(pipeline)
83 changes: 83 additions & 0 deletions examples/source_with_standalone_server.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# After running this script, you can access the server at rtmp://localhost:1935
# You can use FFmpeg to stream to it
# ffmpeg -re -i test/fixtures/testsrc.flv -f flv -c:v copy -c:a copy rtmp://localhost:1935/app/stream_key

defmodule Pipeline do
use Membrane.Pipeline

@output_file "received.flv"

@impl true
def handle_init(_ctx, opts) do
structure = [
child(:source, %Membrane.RTMP.SourceBin{
client_handler: opts[:client_handler]
})
|> via_out(:audio)
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :none,
output_config: :audio_specific_config
})
|> via_in(Pad.ref(:audio, 0))
|> child(:muxer, Membrane.FLV.Muxer)
|> child(:sink, %Membrane.File.Sink{location: @output_file}),
get_child(:source)
|> via_out(:video)
|> child(:video_parser, %Membrane.H264.Parser{
output_stream_structure: :avc1
})
|> via_in(Pad.ref(:video, 0))
|> get_child(:muxer)
]

{[spec: structure], %{controller_pid: opts[:controller_pid]}}
end

# The rest of the module is used for self-termination of the pipeline after processing finishes
@impl true
def handle_element_end_of_stream(:sink, _pad, _ctx, state) do
send(state.controller_pid, :eos)
{[terminate: :normal], state}
end

@impl true
def handle_element_end_of_stream(_child, _pad, _ctx, state) do
{[], state}
end
end

# Run the standalone server
{:ok, server} =
Membrane.RTMP.Server.start_link(
behaviour: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()},
varsill marked this conversation as resolved.
Show resolved Hide resolved
port: 1935,
use_ssl?: false
)

# Subscribe to receive client handler that connected to the
# server with given app id and stream key
:ok = Membrane.RTMP.Server.subscribe(server, "app", "stream_key")

# Wait for the client handler
client_handler =
receive do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could have a very simple GenServer that would start a pipeline each time it gets the client handler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this one would require adding an option to subscribe to "any" app and any stream_key. It could be quite useful to have such an option, but perhaps let' add it in a separate PR, WDYT?

Copy link
Member

@mat-hek mat-hek Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varsill Ok, please create an issue, put it in TODO and assign @bartkrak ;) We should also have documentation for the message with client_ref, and the way to get the app and the stream key from there. Maybe we should allow awaiting for any app and stream key too.

{:client_handler, client_handler} ->
varsill marked this conversation as resolved.
Show resolved Hide resolved
client_handler
end

# Start the pipeline and provide it with the client_handler

{:ok, _supervisor, pipeline} =
Membrane.Pipeline.start_link(Pipeline, client_handler: client_handler, controller_pid: self())

# Wait for end of stream
:ok =
receive do
:eos -> :ok
end

# Terminate the server
Process.exit(server, :normal)

# Terminate the pipeline
:ok = Membrane.Pipeline.terminate(pipeline)
82 changes: 82 additions & 0 deletions lib/membrane_rtmp_plugin/rtmp/source/client_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
defmodule Membrane.RTMP.Source.ClientHandler do
@moduledoc """
An implementation of `Membrane.RTMP.Server.ClienHandlerBehaviour` compatible with the
`Membrane.RTMP.Source` element.
"""

@behaviour Membrane.RTMP.Server.ClientHandlerBehaviour

defstruct [:controlling_process]

@impl true
def handle_init(opts) do
%{
source_pid: nil,
buffered: [],
app: nil,
stream_key: nil,
controlling_process: opts.controlling_process
}
end

@impl true
def handle_connected(connected_msg, state) do
%{state | app: connected_msg.app}
end

@impl true
def handle_stream_published(publish_msg, state) do
state = %{state | stream_key: publish_msg.stream_key}

if state.controlling_process do
send(state.controlling_process, {:client_connected, state.app, state.stream_key})
end

state
end

@impl true
def handle_info({:send_me_data, source_pid}, state) do
buffers_to_send = Enum.reverse(state.buffered)
state = %{state | source_pid: source_pid, buffered: []}
Enum.each(buffers_to_send, fn buffer -> send_data(state.source_pid, buffer) end)
state
end

@impl true
def handle_info(_other, state) do
state
end

@impl true
def handle_data_available(payload, state) do
if state.source_pid do
:ok = send_data(state.source_pid, payload)
state
else
%{state | buffered: [payload | state.buffered]}
end
end

@impl true
def handle_end_of_stream(state) do
if state.source_pid != nil, do: send_eos(state.source_pid)
state
end

defp send_data(pid, payload) do
send(pid, {:data, payload})
:ok
end

defp send_eos(pid) do
send(pid, :end_of_stream)
:ok
end

@spec request_for_data(pid()) :: :ok
def request_for_data(client_handler_pid) do
send(client_handler_pid, {:send_me_data, self()})
:ok
end
end
Loading