Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Improve socket connection dropped and delete stream events #100

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/membrane_rtmp_plugin/rtmp/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Membrane.RTMP.MessageHandler do
{:set_chunk_size_required, non_neg_integer()}
| {:connected, Membrane.RTMP.Messages.Connect.t()}
| {:published, Membrane.RTMP.Messages.Publish.t()}
| :end_of_stream
| :delete_stream
| {:data_available, binary()}
@type t() :: %{
socket: :gen_tcp.socket() | :ssl.socket(),
Expand Down Expand Up @@ -218,7 +218,7 @@ defmodule Membrane.RTMP.MessageHandler do
end

defp do_handle_client_message(%Messages.DeleteStream{}, _header, state) do
{:halt, %{state | events: [:end_of_stream | state.events]}}
{:halt, %{state | events: [:delete_stream | state.events]}}
end

# Check bandwidth message
Expand Down
15 changes: 8 additions & 7 deletions lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ defmodule Membrane.RTMP.Source.ClientHandlerImpl do
end

@impl true
def handle_end_of_stream(state) do
if state.source_pid != nil, do: send_eos(state.source_pid)
def handle_connection_closed(state) do
if state.source_pid != nil, do: send(state.source_pid, :connection_closed)
state
end

defp send_data(pid, payload) do
send(pid, {:data, payload})
:ok
@impl true
def handle_delete_stream(state) do
if state.source_pid != nil, do: send(state.source_pid, :delete_stream)
state
end

defp send_eos(pid) do
send(pid, :end_of_stream)
defp send_data(pid, payload) do
send(pid, {:data, payload})
:ok
end
end
6 changes: 5 additions & 1 deletion lib/membrane_rtmp_plugin/rtmp/source/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,18 @@ defmodule Membrane.RTMP.Source do
end

@impl true
def handle_info(:end_of_stream, ctx, state) do
def handle_info(:connection_closed, ctx, state) do
if ctx.pads[:output].end_of_stream? do
{[], state}
else
{[end_of_stream: :output], state}
end
end

def handle_info(:delete_stream, _ctx, state) do
{[notify_parent: :stream_deleted], state}
end

@impl true
def handle_terminate_request(_ctx, state) do
{[terminate: :normal], state}
Expand Down
5 changes: 3 additions & 2 deletions lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ defmodule Membrane.RTMPServer do
port: :inet.port_number(),
use_ssl?: boolean(),
name: atom() | nil,
handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() ->
client_behaviour_spec()),
handle_new_client:
(client_ref :: pid(), app :: String.t(), stream_key :: String.t() ->
client_behaviour_spec()),
client_timeout: Membrane.Time.t()
]

Expand Down
29 changes: 20 additions & 9 deletions lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,19 @@ defmodule Membrane.RTMPServer.ClientHandler do
@callback handle_data_available(payload :: binary(), state :: state()) :: state()

@doc """
The callback invoked when the client served by given client handler
stops sending data.
(for instance, when the remote client deletes the stream or
terminates the socket connection)
Callback invoked when the RMTP stream is finished.
"""
@callback handle_end_of_stream(state :: state()) :: state()
@callback handle_delete_stream(state :: state()) :: state()

@doc """
Callback invoked when the socket connection is terminated. In normal
conditions, delete_stream is called before this one. If delete_stream
dmorn marked this conversation as resolved.
Show resolved Hide resolved
is not called and connection_closed is, it might just mean that the
connection was lost.
dmorn marked this conversation as resolved.
Show resolved Hide resolved

It is up to the users to determined how to handle it in their case.
"""
@callback handle_connection_closed(state :: state()) :: state()

@doc """
The callback invoked when the client handler receives a message
Expand Down Expand Up @@ -89,7 +96,7 @@ defmodule Membrane.RTMPServer.ClientHandler do
@impl true
def handle_info({:tcp_closed, socket}, %{use_ssl?: false} = state)
when state.socket == socket do
events = [:end_of_stream]
events = [:connection_closed]
state = Enum.reduce(events, state, &handle_event/2)

{:noreply, state}
Expand All @@ -102,7 +109,7 @@ defmodule Membrane.RTMPServer.ClientHandler do

@impl true
def handle_info({:ssl_closed, socket}, %{use_ssl?: true} = state) when state.socket == socket do
events = [:end_of_stream]
events = [:connection_closed]
state = Enum.reduce(events, state, &handle_event/2)

{:noreply, state}
Expand Down Expand Up @@ -191,8 +198,12 @@ defmodule Membrane.RTMPServer.ClientHandler do
defp handle_event(event, state) do
# call callbacks
case event do
:end_of_stream ->
new_handler_state = state.handler.handle_end_of_stream(state.handler_state)
:connection_closed ->
new_handler_state = state.handler.handle_connection_closed(state.handler_state)
%{state | handler_state: new_handler_state}

:delete_stream ->
new_handler_state = state.handler.handle_delete_stream(state.handler_state)
%{state | handler_state: new_handler_state}

{:set_chunk_size_required, chunk_size} ->
Expand Down
Loading