Skip to content

Commit

Permalink
Synchronize audio and video
Browse files Browse the repository at this point in the history
  • Loading branch information
philipgiuliani committed Oct 1, 2024
1 parent 263bef6 commit 46969c4
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 101 deletions.
84 changes: 38 additions & 46 deletions examples/rtmp.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,66 +17,32 @@ defmodule Pipeline do

structure = [
# Source
child(:source, %Membrane.RTMP.SourceBin{
url: "rtmp://127.0.0.1:1935/app/stream_key"
}),

child(:source, %Membrane.RTMP.Source{
url: "rtmp://0.0.0.0:1935/app/stream_key"
})
|> child(:transcoder, Membrane.FFmpeg.Transcoder),

# Sink
child(:sink, %Membrane.HLS.SinkBin{
manifest_uri: URI.new!("file://tmp/stream.m3u8"),
min_segment_duration: Membrane.Time.seconds(5),
target_segment_duration: Membrane.Time.seconds(10),
target_segment_duration: Membrane.Time.seconds(7),
storage: HLS.Storage.File.new()
}),

# Audio
get_child(:source)
|> via_out(:audio)
|> child(:aac_parser_post, %Membrane.AAC.Parser{
out_encapsulation: :none,
output_config: :esds
})
|> via_in(Pad.ref(:input, "audio_128k"),
options: [
encoding: :AAC,
build_stream: fn uri, %Membrane.CMAF.Track{} = format ->
%HLS.AlternativeRendition{
uri: uri,
name: "Audio (EN)",
type: :audio,
group_id: "program_audio",
language: "en",
channels: to_string(format.codecs.mp4a.channels),
autoselect: true,
default: true
}
end
]
)
|> get_child(:sink),

# Video
get_child(:source)
|> via_out(:video)
|> child({:parser, :pre}, %Membrane.H264.Parser{
output_stream_structure: :annexb
})
|> child(:transcoder, Membrane.FFmpeg.Transcoder),

# Video HD
get_child(:transcoder)
|> via_out(:output, options: [
|> via_out(:video, options: [
resolution: {-2, 720},
bitrate: 3_300_000,
profile: :high,
fps: 30,
gop_size: 60,
b_frames: 3,
crf: 23,
crf: 26,
preset: :veryfast,
tune: :zerolatency
])
|> child({:parser, :post, :hd}, %Membrane.H264.Parser{
|> child({:parser, :hd}, %Membrane.H264.Parser{
output_stream_structure: :avc1,
output_alignment: :au
})
Expand All @@ -98,18 +64,18 @@ defmodule Pipeline do

# Video SD
get_child(:transcoder)
|> via_out(:output, options: [
|> via_out(:video, options: [
resolution: {-2, 360},
bitrate: 1020800,
profile: :main,
fps: 15,
gop_size: 30,
b_frames: 3,
crf: 23,
crf: 26,
preset: :veryfast,
tune: :zerolatency
])
|> child({:parser, :post, :sd}, %Membrane.H264.Parser{
|> child({:parser, :sd}, %Membrane.H264.Parser{
output_stream_structure: :avc1,
output_alignment: :au
})
Expand All @@ -127,6 +93,32 @@ defmodule Pipeline do
end
]
)
|> get_child(:sink),

# Audio
get_child(:transcoder)
|> via_out(:audio, options: [bitrate: 128_000])
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :none,
output_config: :esds
})
|> via_in(Pad.ref(:input, "audio_128k"),
options: [
encoding: :AAC,
build_stream: fn uri, %Membrane.CMAF.Track{} = format ->
%HLS.AlternativeRendition{
uri: uri,
name: "Audio (EN)",
type: :audio,
group_id: "program_audio",
language: "en",
channels: to_string(format.codecs.mp4a.channels),
autoselect: true,
default: true
}
end
]
)
|> get_child(:sink)
]

Expand Down
143 changes: 92 additions & 51 deletions lib/membrane/hls/sink_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ defmodule Membrane.HLS.SinkBin do
Implementation of the storage.
"""
],
min_segment_duration: [
spec: Membrane.Time.t(),
description: """
Specificies the minimum duration of a CMAF segment.
In order to ensure that all segments are smaller than the `target_segment_duration`,
the keyframe interval of the H264 stream must be subtracted.
"""
],
target_segment_duration: [
spec: Membrane.Time.t(),
description: """
Expand Down Expand Up @@ -101,33 +93,67 @@ defmodule Membrane.HLS.SinkBin do
{[], state}
end

@impl true
def handle_pad_added(_pad, ctx, _state) when ctx.playback == :playing,
do:
raise(
"New pads can be added to #{inspect(__MODULE__)} only before playback transition to :playing"
)

@impl true
def handle_pad_added(
Pad.ref(:input, track_id) = pad,
%{pad_options: %{encoding: encoding} = pad_opts},
%{pad_options: %{encoding: :AAC} = pad_opts},
state
)
when encoding in [:H264, :AAC] do
{max_pts, _track_pts} = resume_info(state.packager_pid, track_id)
) do
{_max_pts, _track_pts} = resume_info(state.packager_pid, track_id)

spec =
bin_input(pad)
|> then(fn spec ->
# if encoding == :AAC do
# spec
# |> child({:filler, track_id}, %Membrane.HLS.AACFiller{duration: max_pts - track_pts})
# |> child(:fix_parser, %Membrane.AAC.Parser{
# out_encapsulation: :none,
# output_config: :esds
# })
# |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: track_pts})
# else
child(spec, {:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts})
# end
end)
|> child({:muxer, track_id}, %Membrane.MP4.Muxer.CMAF{
segment_min_duration: state.opts.min_segment_duration
# |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts})
|> via_in(Pad.ref(:input, track_id))
|> audio_muxer(state)
|> via_out(Pad.ref(:output), options: [tracks: [track_id]])
|> child({:sink, track_id}, %Membrane.HLS.CMAFSink{
packager_pid: state.packager_pid,
track_id: track_id,
target_segment_duration: state.opts.target_segment_duration,
build_stream: pad_opts.build_stream
})

{[spec: spec], state}
end

@impl true
def handle_pad_added(
Pad.ref(:input, track_id) = pad,
%{pad_options: %{encoding: :H264} = pad_opts} = ctx,
state
) do
{_max_pts, _track_pts} = resume_info(state.packager_pid, track_id)

had_video_input? =
Enum.any?(ctx.pads, fn {Pad.ref(:input, id), data} ->
id != track_id and data.options.encoding == :H264
end)

muxer = fn spec ->
if had_video_input? do
child(spec, {:muxer, track_id}, %Membrane.MP4.Muxer.CMAF{
segment_min_duration: segment_duration(state)
})
else
spec
|> via_in(Pad.ref(:input, track_id))
|> audio_muxer(state)
|> via_out(Pad.ref(:output), options: [tracks: [track_id]])
end
end

spec =
bin_input(pad)
# |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts})
|> muxer.()
|> child({:sink, track_id}, %Membrane.HLS.CMAFSink{
packager_pid: state.packager_pid,
track_id: track_id,
Expand All @@ -143,15 +169,15 @@ defmodule Membrane.HLS.SinkBin do
%{pad_options: %{encoding: :TEXT} = pad_opts},
state
) do
{max_pts, track_pts} = resume_info(state.packager_pid, track_id)
{_max_pts, _track_pts} = resume_info(state.packager_pid, track_id)

spec =
bin_input(pad)
|> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts})
|> child({:filler, track_id}, %Membrane.HLS.TextFiller{from: track_pts})
# |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts})
# |> child({:filler, track_id}, %Membrane.HLS.TextFiller{from: track_pts})
|> child({:cues, track_id}, Membrane.WebVTT.CueBuilderFilter)
|> child({:segments, track_id}, %Membrane.WebVTT.SegmentFilter{
segment_duration: state.opts.target_segment_duration,
segment_duration: segment_duration(state),
headers: [
%Subtitle.WebVTT.HeaderLine{key: :description, original: "WEBVTT"},
%Subtitle.WebVTT.HeaderLine{
Expand All @@ -170,26 +196,6 @@ defmodule Membrane.HLS.SinkBin do
{[spec: spec], state}
end

def resume_info(packager_pid, track_id) do
Agent.get(packager_pid, fn packager ->
max_pts =
Packager.max_track_duration(packager)
|> Ratio.new()
|> Membrane.Time.seconds()

track_pts =
if Packager.has_track?(packager, track_id) do
Packager.track_duration(packager, track_id)
|> Ratio.new()
|> Membrane.Time.seconds()
else
0
end

{max_pts, track_pts}
end)
end

@impl true
def handle_element_end_of_stream({:sink, _track_id} = sink, _pad, ctx, state) do
all_sinks =
Expand Down Expand Up @@ -233,6 +239,41 @@ defmodule Membrane.HLS.SinkBin do
{[], live_schedule_next_sync(state)}
end

defp audio_muxer(spec, state) do
child(
spec,
{:muxer, :audio},
%Membrane.MP4.Muxer.CMAF{
segment_min_duration: segment_duration(state) - Membrane.Time.millisecond()
},
get_if_exists: true
)
end

defp segment_duration(state) do
state.opts.target_segment_duration - Membrane.Time.second()
end

defp resume_info(packager_pid, track_id) do
Agent.get(packager_pid, fn packager ->
max_pts =
Packager.max_track_duration(packager)
|> Ratio.new()
|> Membrane.Time.seconds()

track_pts =
if Packager.has_track?(packager, track_id) do
Packager.track_duration(packager, track_id)
|> Ratio.new()
|> Membrane.Time.seconds()
else
0
end

{max_pts, track_pts}
end)
end

defp live_schedule_next_sync(state) do
state =
state
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"},
"coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"},
"heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"},
"kim_hls": {:git, "https://github.com/kim-company/kim_hls.git", "689723bdc39e09829004e4f32fd4c6e932abea5a", []},
"kim_hls": {:git, "https://github.com/kim-company/kim_hls.git", "58abab9d137b885b31ef6e72d07d38652a9dc3ca", []},
"kim_q": {:hex, :kim_q, "1.0.0", "17cfc45e9f7e65485f0f31bbf09893d6ff35cc2fbefc39aed146a3c29740584e", [:mix], [{:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7a8ee76a2c2e774c34345df3c7a234a8effeedc3f3aea845feb7c09030097278"},
"kim_subtitle": {:git, "https://github.com/kim-company/kim_subtitle.git", "8239e1bcea938167829a6b8bd2a9678c63c7bdd4", []},
"logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"},
Expand Down
5 changes: 2 additions & 3 deletions test/membrane/hls/sink_bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ defmodule Membrane.HLS.SinkBinTest do
spec = [
child(:sink, %Membrane.HLS.SinkBin{
manifest_uri: URI.new!("file://#{tmp_dir}/stream.m3u8"),
min_segment_duration: Membrane.Time.seconds(5),
target_segment_duration: Membrane.Time.seconds(10),
target_segment_duration: Membrane.Time.seconds(7),
storage: HLS.Storage.File.new()
}),

Expand Down Expand Up @@ -107,7 +106,7 @@ defmodule Membrane.HLS.SinkBinTest do
bandwidth: 850_000,
resolution: format.resolution,
frame_rate: 25.0,
codecs: Membrane.HLS.serialize_codecs(format.codecs),
codecs: [],
audio: "audio",
subtitles: "subtitles"
}
Expand Down

0 comments on commit 46969c4

Please sign in to comment.