Skip to content

Commit

Permalink
Merge pull request #10 from kim-company/packager
Browse files Browse the repository at this point in the history
Implement new SinkBin based on HLS.Packager
  • Loading branch information
philipgiuliani authored Oct 1, 2024
2 parents bff2228 + dcbaf6c commit c22ef1c
Show file tree
Hide file tree
Showing 15 changed files with 995 additions and 23 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ membrane_hls_plugin-*.tar

# Temporary files, for example, from tests.
/tmp/

/output
validation_data.json
7 changes: 2 additions & 5 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
elixir 1.15.7-otp-26
erlang 26.1.2
nodejs 16.16.0
golang 1.16.6
rust 1.74.0
elixir 1.16.3-otp-26
erlang 26.2.5
143 changes: 143 additions & 0 deletions examples/rtmp.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
Mix.install([
:membrane_core,
:membrane_rtmp_plugin,
:membrane_aac_plugin,
:membrane_h26x_plugin,
:membrane_tee_plugin,
{:membrane_ffmpeg_transcoder_plugin, "~> 1.1"},
{:membrane_hls_plugin, path: Path.expand(Path.join(__DIR__, "../"))}
])

defmodule Pipeline do
use Membrane.Pipeline

@impl true
def handle_init(_ctx, _opts) do
File.rm_rf("tmp")

structure = [
# Source
child(:source, %Membrane.RTMP.Source{
url: "rtmp://0.0.0.0:1935/app/stream_key"
})
|> child(:transcoder, Membrane.FFmpeg.Transcoder),

# Sink
child(:sink, %Membrane.HLS.SinkBin{
manifest_uri: URI.new!("file://tmp/stream.m3u8"),
target_segment_duration: Membrane.Time.seconds(7),
storage: HLS.Storage.File.new()
}),

# Audio
get_child(:transcoder)
|> via_out(:audio, options: [bitrate: 128_000])
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :none,
output_config: :esds
})
|> via_in(Pad.ref(:input, "audio_128k"),
options: [
encoding: :AAC,
build_stream: fn uri, %Membrane.CMAF.Track{} = format ->
%HLS.AlternativeRendition{
uri: uri,
name: "Audio (EN)",
type: :audio,
group_id: "program_audio",
language: "en",
channels: to_string(format.codecs.mp4a.channels),
autoselect: true,
default: true
}
end
]
)
|> get_child(:sink),

# Video HD
get_child(:transcoder)
|> via_out(:video, options: [
resolution: {-2, 720},
bitrate: 3_300_000,
profile: :high,
fps: 30,
gop_size: 60,
b_frames: 3,
crf: 26,
preset: :veryfast,
tune: :zerolatency
])
|> child({:parser, :hd}, %Membrane.H264.Parser{
output_stream_structure: :avc1,
output_alignment: :au
})
|> via_in(Pad.ref(:input, "video_720p"),
options: [
encoding: :H264,
build_stream: fn uri, %Membrane.CMAF.Track{} = format ->
%HLS.VariantStream{
uri: uri,
bandwidth: 3951200,
resolution: format.resolution,
codecs: Membrane.HLS.serialize_codecs(format.codecs),
audio: "program_audio"
}
end
]
)
|> get_child(:sink),

# Video SD
get_child(:transcoder)
|> via_out(:video, options: [
resolution: {-2, 360},
bitrate: 1020800,
profile: :main,
fps: 15,
gop_size: 30,
b_frames: 3,
crf: 26,
preset: :veryfast,
tune: :zerolatency
])
|> child({:parser, :sd}, %Membrane.H264.Parser{
output_stream_structure: :avc1,
output_alignment: :au
})
|> via_in(Pad.ref(:input, "video_360p"),
options: [
encoding: :H264,
build_stream: fn uri, %Membrane.CMAF.Track{} = format ->
%HLS.VariantStream{
uri: uri,
bandwidth: 1_200_000,
resolution: format.resolution,
codecs: Membrane.HLS.serialize_codecs(format.codecs),
audio: "program_audio"
}
end
]
)
|> get_child(:sink)
]

{[spec: structure], %{}}
end

def handle_child_notification(:end_of_stream, :sink, _ctx, state) do
{[terminate: :normal], state}
end
end

# Start a pipeline with `Membrane.RTMP.Source` that will spawn an RTMP server waiting for
# the client connection on given URL
{:ok, _supervisor, pipeline} = Membrane.Pipeline.start_link(Pipeline)

# Wait for the pipeline to terminate itself
ref = Process.monitor(pipeline)

:ok =
receive do
{:DOWN, ^ref, _process, ^pipeline, :normal} -> :ok
end
22 changes: 22 additions & 0 deletions lib/membrane/hls.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Membrane.HLS do
def serialize_codecs(codecs) do
codecs
|> Enum.map(&serialize_codec(&1))
|> Enum.reject(&is_nil/1)
end

defp serialize_codec({:avc1, %{profile: profile, compatibility: compatibility, level: level}}) do
[profile, compatibility, level]
|> Enum.map(&Integer.to_string(&1, 16))
|> Enum.map_join(&String.pad_leading(&1, 2, "0"))
|> then(&"avc1.#{&1}")
|> String.downcase()
end

defp serialize_codec({:hvc1, %{profile: profile, level: level}}),
do: "hvc1.#{profile}.4.L#{level}.B0"

defp serialize_codec({:mp4a, %{aot_id: aot_id}}), do: String.downcase("mp4a.40.#{aot_id}")

defp serialize_codec(_other), do: nil
end
115 changes: 115 additions & 0 deletions lib/membrane/hls/cmaf_sink.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
defmodule Membrane.HLS.CMAFSink do
use Membrane.Sink
alias HLS.Packager

def_input_pad(
:input,
accepted_format: Membrane.CMAF.Track
)

def_options(
packager_pid: [
spec: pid(),
description: "PID of the packager."
],
track_id: [
spec: String.t(),
description: "ID of the track."
],
build_stream: [
spec:
(URI.t(), Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()),
description: "Build the stream with the given stream format"
],
target_segment_duration: [
spec: Membrane.Time.t()
]
)

@impl true
def handle_init(_context, opts) do
{[], %{opts: opts, upload_tasks: %{}}}
end

def handle_stream_format(:input, format, _ctx, state) do
track_id = state.opts.track_id

target_segment_duration =
Membrane.Time.as_seconds(state.opts.target_segment_duration, :exact)
|> Ratio.ceil()

Agent.update(state.opts.packager_pid, fn packager ->
packager =
if Packager.has_track?(packager, track_id) do
# Packager.discontinue_track(packager, track_id)
packager
else
uri = Packager.new_variant_uri(packager, track_id)

Packager.add_track(
packager,
track_id,
codecs: Membrane.HLS.serialize_codecs(format.codecs),
stream: state.opts.build_stream.(uri, format),
segment_extension: ".m4s",
target_segment_duration: target_segment_duration
)
end

Packager.put_init_section(packager, track_id, format.header)
end)

{[], state}
end

def handle_buffer(:input, buffer, _ctx, state) do
{job_ref, upload_fun} =
Agent.get_and_update(state.opts.packager_pid, fn packager ->
{packager, {ref, upload_fun}} =
Packager.put_segment_async(
packager,
state.opts.track_id,
buffer.payload,
Membrane.Time.as_seconds(buffer.metadata.duration) |> Ratio.to_float()
)

{{ref, upload_fun}, packager}
end)

task = Task.async(upload_fun)

{[], put_in(state, [:upload_tasks, task.ref], %{job_ref: job_ref, task: task})}
end

def handle_info({task_ref, :ok}, _ctx, state) do
Process.demonitor(task_ref, [:flush])

{data, state} = pop_in(state, [:upload_tasks, task_ref])

Agent.update(state.opts.packager_pid, fn packager ->
Packager.ack_segment(packager, state.opts.track_id, data.job_ref)
end)

{[], state}
end

def handle_info({:DOWN, _ref, _, _, reason}, _ctx, state) do
raise "Cannot write segment of track #{state.track_id} with reason: #{inspect(reason)}."
{[], state}
end

def handle_end_of_stream(:input, _ctx, state) do
state.upload_tasks
|> Map.values()
|> Enum.map(& &1.task)
|> Task.await_many(:infinity)

Agent.update(state.opts.packager_pid, fn packager ->
Enum.reduce(state.upload_tasks, packager, fn {_task_ref, data}, packager ->
Packager.ack_segment(packager, state.opts.track_id, data.job_ref)
end)
end)

{[], %{state | upload_tasks: %{}}}
end
end
56 changes: 56 additions & 0 deletions lib/membrane/hls/filler/aac_filler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
defmodule Membrane.HLS.AACFiller do
use Membrane.Filter

def_input_pad(:input,
accepted_format: Membrane.AAC
)

def_output_pad(:output,
accepted_format: Membrane.RemoteStream
)

def_options(
duration: [
spec: Membrane.Time.t()
]
)

def handle_init(_ctx, opts) do
{[], %{duration: opts.duration, format: nil, filled: false}}
end

def handle_stream_format(:input, format, _ctx, state) do
{[forward: %Membrane.RemoteStream{content_format: Membrane.AAC}], %{state | format: format}}
end

def handle_buffer(:input, buffer, _ctx, state) do
buffer = %{buffer | pts: nil, dts: nil}

if state.filled do
{[forward: buffer], state}
else
format = state.format

silence_buffer =
if Membrane.Time.as_milliseconds(state.duration, :round) > 0 do
duration =
Membrane.Time.as_seconds(state.duration)
|> Ratio.to_float()

{silence, 0} =
System.cmd(
"ffmpeg",
~w(-f lavfi -i anullsrc=r=#{format.sample_rate} -ac #{format.channels} -t #{duration} -c:a aac -f adts -)
)

%Membrane.Buffer{
payload: silence
}
else
nil
end

{[buffer: {:output, List.wrap(silence_buffer) ++ [buffer]}], %{state | filled: true}}
end
end
end
39 changes: 39 additions & 0 deletions lib/membrane/hls/filler/text_filler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule Membrane.HLS.TextFiller do
use Membrane.Filter

def_input_pad(:input,
accepted_format: Membrane.Text
)

def_output_pad(:output,
accepted_format: Membrane.Text
)

def_options(
from: [
spec: Membrane.Time.t()
]
)

def handle_init(_ctx, opts) do
{[], %{from: opts.from, filled: false}}
end

def handle_buffer(:input, buffer, _ctx, state) do
if state.filled do
{[forward: buffer], state}
else
Membrane.Logger.debug(
"Generated empty text buffer with a duration of #{buffer.pts - state.from - Membrane.Time.millisecond()}"
)

silence_buffer = %Membrane.Buffer{
payload: "",
pts: state.from,
metadata: %{to: buffer.pts - Membrane.Time.millisecond()}
}

{[buffer: {:output, [silence_buffer, buffer]}], %{state | filled: true}}
end
end
end
Loading

0 comments on commit c22ef1c

Please sign in to comment.