Skip to content

Commit

Permalink
Add method for flushing manually
Browse files Browse the repository at this point in the history
  • Loading branch information
philipgiuliani committed Oct 1, 2024
1 parent c22ef1c commit 4825e81
Showing 1 changed file with 30 additions and 9 deletions.
39 changes: 30 additions & 9 deletions lib/membrane/hls/sink_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ defmodule Membrane.HLS.SinkBin do
in the media playlist each target_segment_duration.
* :vod -> At the end of the segment production, playlists are written down.
"""
],
flush_on_end: [
spec: boolean(),
default: true,
description: """
Automatically flush the packager when all streams ended.
Set to `false` if flushing manually (via `:flush` notification).
"""
]
)

Expand All @@ -61,7 +69,14 @@ defmodule Membrane.HLS.SinkBin do

@impl true
def handle_init(_context, opts) do
{[], %{opts: opts, packager_pid: nil, ended_sinks: MapSet.new(), live_state: nil}}
{[],
%{
opts: opts,
flush: opts.flush_on_end,
packager_pid: nil,
ended_sinks: MapSet.new(),
live_state: nil
}}
end

@impl true
Expand Down Expand Up @@ -198,16 +213,10 @@ defmodule Membrane.HLS.SinkBin do

@impl true
def handle_element_end_of_stream({:sink, _track_id} = sink, _pad, ctx, state) do
all_sinks =
ctx.children
|> Map.keys()
|> Enum.filter(&match?({:sink, _}, &1))
|> MapSet.new()

ended_sinks = MapSet.put(state.ended_sinks, sink)

if MapSet.equal?(all_sinks, ended_sinks) do
Agent.update(state.packager_pid, &Packager.flush(&1))
if all_streams_ended?(ctx, ended_sinks) do
if state.flush, do: Agent.update(state.packager_pid, &Packager.flush(&1))

state =
state
Expand All @@ -224,6 +233,10 @@ defmodule Membrane.HLS.SinkBin do
{[], state}
end

def handle_child_notification(:flush, _, _ctx, state) do
{[], %{state | flush: true}}
end

@impl true
def handle_info(:sync, _ctx, state = %{live_state: %{stop: true}}) do
{[], state}
Expand All @@ -239,6 +252,14 @@ defmodule Membrane.HLS.SinkBin do
{[], live_schedule_next_sync(state)}
end

defp all_streams_ended?(ctx, ended_sinks) do
ctx.children
|> Map.keys()
|> Enum.filter(&match?({:sink, _}, &1))
|> MapSet.new()
|> MapSet.equal?(ended_sinks)
end

defp audio_muxer(spec, state) do
child(
spec,
Expand Down

0 comments on commit 4825e81

Please sign in to comment.