Skip to content

Commit

Permalink
Require passing the packager from the outside
Browse files Browse the repository at this point in the history
  • Loading branch information
philipgiuliani committed Oct 8, 2024
1 parent 6c3dfe5 commit 17756f8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 42 deletions.
13 changes: 11 additions & 2 deletions examples/rtmp.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ defmodule Pipeline do
def handle_init(_ctx, _opts) do
File.rm_rf("tmp")

{:ok, packager_pid} =
Agent.start_link(fn ->
HLS.Packager.new(
storage: HLS.Storage.File.new(),
manifest_uri: URI.new!("file://tmp/stream.m3u8"),
resume_finished_tracks: true,
restore_pending_segments: false
)
end)

structure = [
# Source
child(:source, %Membrane.RTMP.Source{
Expand All @@ -24,9 +34,8 @@ defmodule Pipeline do

# Sink
child(:sink, %Membrane.HLS.SinkBin{
manifest_uri: URI.new!("file://tmp/stream.m3u8"),
packager_pid: packager_pid,
target_segment_duration: Membrane.Time.seconds(7),
storage: HLS.Storage.File.new()
}),

# Audio
Expand Down
50 changes: 13 additions & 37 deletions lib/membrane/hls/sink_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,10 @@ defmodule Membrane.HLS.SinkBin do
require Membrane.Logger

def_options(
manifest_uri: [
spec: URI.t(),
packager_pid: [
spec: pid(),
description: """
Destination URI of the manifest.
Example: file://output/stream.m3u8
"""
],
storage: [
spec: HLS.Storage,
required: true,
description: """
Implementation of the storage.
PID of a `HLS.Packager` which must be wrapped in an Agent (for now).
"""
],
target_segment_duration: [
Expand Down Expand Up @@ -79,27 +71,11 @@ defmodule Membrane.HLS.SinkBin do
%{
opts: opts,
flush: opts.flush_on_end,
packager_pid: nil,
ended_sinks: MapSet.new(),
live_state: nil
}}
end

@impl true
def handle_setup(_context, state) do
{:ok, packager_pid} =
Agent.start_link(fn ->
Packager.new(
storage: state.opts.storage,
manifest_uri: state.opts.manifest_uri,
resume_finished_tracks: true,
restore_pending_segments: false
)
end)

{[], %{state | packager_pid: packager_pid}}
end

@impl true
def handle_element_start_of_stream(
child = {:muxer, _},
Expand Down Expand Up @@ -128,7 +104,7 @@ defmodule Membrane.HLS.SinkBin do
%{pad_options: %{encoding: :AAC} = pad_opts},
state
) do
{_max_pts, track_pts} = resume_info(state.packager_pid, track_id)
{_max_pts, track_pts} = resume_info(state.opts.packager_pid, track_id)

spec =
bin_input(pad)
Expand All @@ -139,7 +115,7 @@ defmodule Membrane.HLS.SinkBin do
})
|> via_out(Pad.ref(:output), options: [tracks: [track_id]])
|> child({:sink, track_id}, %Membrane.HLS.CMAFSink{
packager_pid: state.packager_pid,
packager_pid: state.opts.packager_pid,
track_id: track_id,
target_segment_duration: state.opts.target_segment_duration,
build_stream: pad_opts.build_stream
Expand All @@ -154,7 +130,7 @@ defmodule Membrane.HLS.SinkBin do
%{pad_options: %{encoding: :H264} = pad_opts},
state
) do
{_max_pts, track_pts} = resume_info(state.packager_pid, track_id)
{_max_pts, track_pts} = resume_info(state.opts.packager_pid, track_id)

spec =
bin_input(pad)
Expand All @@ -163,7 +139,7 @@ defmodule Membrane.HLS.SinkBin do
segment_min_duration: pad_opts.segment_duration
})
|> child({:sink, track_id}, %Membrane.HLS.CMAFSink{
packager_pid: state.packager_pid,
packager_pid: state.opts.packager_pid,
track_id: track_id,
target_segment_duration: state.opts.target_segment_duration,
build_stream: pad_opts.build_stream
Expand All @@ -177,7 +153,7 @@ defmodule Membrane.HLS.SinkBin do
%{pad_options: %{encoding: :TEXT} = pad_opts},
state
) do
{_max_pts, track_pts} = resume_info(state.packager_pid, track_id)
{_max_pts, track_pts} = resume_info(state.opts.packager_pid, track_id)

spec =
bin_input(pad)
Expand All @@ -193,7 +169,7 @@ defmodule Membrane.HLS.SinkBin do
]
})
|> child({:sink, track_id}, %Membrane.HLS.WebVTTSink{
packager_pid: state.packager_pid,
packager_pid: state.opts.packager_pid,
track_id: track_id,
target_segment_duration: state.opts.target_segment_duration,
build_stream: pad_opts.build_stream
Expand All @@ -212,7 +188,7 @@ defmodule Membrane.HLS.SinkBin do
|> put_in([:live_state], %{stop: true})
|> put_in([:ended_sinks], ended_sinks)

if state.flush, do: Agent.update(state.packager_pid, &Packager.flush/1, :infinity)
if state.flush, do: Agent.update(state.opts.packager_pid, &Packager.flush/1, :infinity)

{[notify_parent: :end_of_stream], state}
else
Expand All @@ -227,7 +203,7 @@ defmodule Membrane.HLS.SinkBin do
@impl true
def handle_parent_notification(:flush, ctx, state) do
if not state.flush and all_streams_ended?(ctx, state.ended_sinks) do
Agent.update(state.packager_pid, &Packager.flush/1, :infinity)
Agent.update(state.opts.packager_pid, &Packager.flush/1, :infinity)
{[notify_parent: :end_of_stream], %{state | flush: true}}
else
{[], %{state | flush: true}}
Expand All @@ -245,7 +221,7 @@ defmodule Membrane.HLS.SinkBin do
)

Agent.update(
state.packager_pid,
state.opts.packager_pid,
fn p ->
Packager.sync(p, state.live_state.next_sync_point)
end,
Expand Down Expand Up @@ -305,7 +281,7 @@ defmodule Membrane.HLS.SinkBin do
# Tells where in the playlist we should start issuing segments.
next_sync_point =
Agent.get(
state.packager_pid,
state.opts.packager_pid,
&Packager.next_sync_point(
&1,
Membrane.Time.as_seconds(state.opts.target_segment_duration, :round)
Expand Down
15 changes: 12 additions & 3 deletions test/membrane/hls/sink_bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@ defmodule Membrane.HLS.SinkBinTest do

@tag :tmp_dir
test "on a new stream", %{tmp_dir: tmp_dir} do
{:ok, packager_pid} =
Agent.start_link(fn ->
HLS.Packager.new(
manifest_uri: URI.new!("file://#{tmp_dir}/stream.m3u8"),
storage: HLS.Storage.File.new(),
resume_finished_tracks: true,
restore_pending_segments: false
)
end)

spec = [
child(:sink, %Membrane.HLS.SinkBin{
manifest_uri: URI.new!("file://#{tmp_dir}/stream.m3u8"),
target_segment_duration: Membrane.Time.seconds(7),
storage: HLS.Storage.File.new()
packager_pid: packager_pid,
target_segment_duration: Membrane.Time.seconds(7)
}),

# Source
Expand Down

0 comments on commit 17756f8

Please sign in to comment.