From 46969c4f1db25be123d9920bd03296aa878d86e5 Mon Sep 17 00:00:00 2001 From: Philip Giuliani Date: Tue, 1 Oct 2024 11:50:32 +0200 Subject: [PATCH] Synchronize audio and video --- examples/rtmp.exs | 84 ++++++++-------- lib/membrane/hls/sink_bin.ex | 143 ++++++++++++++++++---------- mix.lock | 2 +- test/membrane/hls/sink_bin_test.exs | 5 +- 4 files changed, 133 insertions(+), 101 deletions(-) diff --git a/examples/rtmp.exs b/examples/rtmp.exs index e77a0cd..653f676 100644 --- a/examples/rtmp.exs +++ b/examples/rtmp.exs @@ -17,66 +17,32 @@ defmodule Pipeline do structure = [ # Source - child(:source, %Membrane.RTMP.SourceBin{ - url: "rtmp://127.0.0.1:1935/app/stream_key" - }), - + child(:source, %Membrane.RTMP.Source{ + url: "rtmp://0.0.0.0:1935/app/stream_key" + }) + |> child(:transcoder, Membrane.FFmpeg.Transcoder), + # Sink child(:sink, %Membrane.HLS.SinkBin{ manifest_uri: URI.new!("file://tmp/stream.m3u8"), - min_segment_duration: Membrane.Time.seconds(5), - target_segment_duration: Membrane.Time.seconds(10), + target_segment_duration: Membrane.Time.seconds(7), storage: HLS.Storage.File.new() }), - # Audio - get_child(:source) - |> via_out(:audio) - |> child(:aac_parser_post, %Membrane.AAC.Parser{ - out_encapsulation: :none, - output_config: :esds - }) - |> via_in(Pad.ref(:input, "audio_128k"), - options: [ - encoding: :AAC, - build_stream: fn uri, %Membrane.CMAF.Track{} = format -> - %HLS.AlternativeRendition{ - uri: uri, - name: "Audio (EN)", - type: :audio, - group_id: "program_audio", - language: "en", - channels: to_string(format.codecs.mp4a.channels), - autoselect: true, - default: true - } - end - ] - ) - |> get_child(:sink), - - # Video - get_child(:source) - |> via_out(:video) - |> child({:parser, :pre}, %Membrane.H264.Parser{ - output_stream_structure: :annexb - }) - |> child(:transcoder, Membrane.FFmpeg.Transcoder), - # Video HD get_child(:transcoder) - |> via_out(:output, options: [ + |> via_out(:video, options: [ resolution: {-2, 720}, bitrate: 3_300_000, profile: :high, fps: 30, gop_size: 60, b_frames: 3, - crf: 23, + crf: 26, preset: :veryfast, tune: :zerolatency ]) - |> child({:parser, :post, :hd}, %Membrane.H264.Parser{ + |> child({:parser, :hd}, %Membrane.H264.Parser{ output_stream_structure: :avc1, output_alignment: :au }) @@ -98,18 +64,18 @@ defmodule Pipeline do # Video SD get_child(:transcoder) - |> via_out(:output, options: [ + |> via_out(:video, options: [ resolution: {-2, 360}, bitrate: 1020800, profile: :main, fps: 15, gop_size: 30, b_frames: 3, - crf: 23, + crf: 26, preset: :veryfast, tune: :zerolatency ]) - |> child({:parser, :post, :sd}, %Membrane.H264.Parser{ + |> child({:parser, :sd}, %Membrane.H264.Parser{ output_stream_structure: :avc1, output_alignment: :au }) @@ -127,6 +93,32 @@ defmodule Pipeline do end ] ) + |> get_child(:sink), + + # Audio + get_child(:transcoder) + |> via_out(:audio, options: [bitrate: 128_000]) + |> child(:audio_parser, %Membrane.AAC.Parser{ + out_encapsulation: :none, + output_config: :esds + }) + |> via_in(Pad.ref(:input, "audio_128k"), + options: [ + encoding: :AAC, + build_stream: fn uri, %Membrane.CMAF.Track{} = format -> + %HLS.AlternativeRendition{ + uri: uri, + name: "Audio (EN)", + type: :audio, + group_id: "program_audio", + language: "en", + channels: to_string(format.codecs.mp4a.channels), + autoselect: true, + default: true + } + end + ] + ) |> get_child(:sink) ] diff --git a/lib/membrane/hls/sink_bin.ex b/lib/membrane/hls/sink_bin.ex index d36e66d..e8820a5 100644 --- a/lib/membrane/hls/sink_bin.ex +++ b/lib/membrane/hls/sink_bin.ex @@ -23,14 +23,6 @@ defmodule Membrane.HLS.SinkBin do Implementation of the storage. """ ], - min_segment_duration: [ - spec: Membrane.Time.t(), - description: """ - Specificies the minimum duration of a CMAF segment. - In order to ensure that all segments are smaller than the `target_segment_duration`, - the keyframe interval of the H264 stream must be subtracted. - """ - ], target_segment_duration: [ spec: Membrane.Time.t(), description: """ @@ -101,33 +93,67 @@ defmodule Membrane.HLS.SinkBin do {[], state} end + @impl true + def handle_pad_added(_pad, ctx, _state) when ctx.playback == :playing, + do: + raise( + "New pads can be added to #{inspect(__MODULE__)} only before playback transition to :playing" + ) + @impl true def handle_pad_added( Pad.ref(:input, track_id) = pad, - %{pad_options: %{encoding: encoding} = pad_opts}, + %{pad_options: %{encoding: :AAC} = pad_opts}, state - ) - when encoding in [:H264, :AAC] do - {max_pts, _track_pts} = resume_info(state.packager_pid, track_id) + ) do + {_max_pts, _track_pts} = resume_info(state.packager_pid, track_id) spec = bin_input(pad) - |> then(fn spec -> - # if encoding == :AAC do - # spec - # |> child({:filler, track_id}, %Membrane.HLS.AACFiller{duration: max_pts - track_pts}) - # |> child(:fix_parser, %Membrane.AAC.Parser{ - # out_encapsulation: :none, - # output_config: :esds - # }) - # |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: track_pts}) - # else - child(spec, {:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts}) - # end - end) - |> child({:muxer, track_id}, %Membrane.MP4.Muxer.CMAF{ - segment_min_duration: state.opts.min_segment_duration + # |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts}) + |> via_in(Pad.ref(:input, track_id)) + |> audio_muxer(state) + |> via_out(Pad.ref(:output), options: [tracks: [track_id]]) + |> child({:sink, track_id}, %Membrane.HLS.CMAFSink{ + packager_pid: state.packager_pid, + track_id: track_id, + target_segment_duration: state.opts.target_segment_duration, + build_stream: pad_opts.build_stream }) + + {[spec: spec], state} + end + + @impl true + def handle_pad_added( + Pad.ref(:input, track_id) = pad, + %{pad_options: %{encoding: :H264} = pad_opts} = ctx, + state + ) do + {_max_pts, _track_pts} = resume_info(state.packager_pid, track_id) + + had_video_input? = + Enum.any?(ctx.pads, fn {Pad.ref(:input, id), data} -> + id != track_id and data.options.encoding == :H264 + end) + + muxer = fn spec -> + if had_video_input? do + child(spec, {:muxer, track_id}, %Membrane.MP4.Muxer.CMAF{ + segment_min_duration: segment_duration(state) + }) + else + spec + |> via_in(Pad.ref(:input, track_id)) + |> audio_muxer(state) + |> via_out(Pad.ref(:output), options: [tracks: [track_id]]) + end + end + + spec = + bin_input(pad) + # |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts}) + |> muxer.() |> child({:sink, track_id}, %Membrane.HLS.CMAFSink{ packager_pid: state.packager_pid, track_id: track_id, @@ -143,15 +169,15 @@ defmodule Membrane.HLS.SinkBin do %{pad_options: %{encoding: :TEXT} = pad_opts}, state ) do - {max_pts, track_pts} = resume_info(state.packager_pid, track_id) + {_max_pts, _track_pts} = resume_info(state.packager_pid, track_id) spec = bin_input(pad) - |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts}) - |> child({:filler, track_id}, %Membrane.HLS.TextFiller{from: track_pts}) + # |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts}) + # |> child({:filler, track_id}, %Membrane.HLS.TextFiller{from: track_pts}) |> child({:cues, track_id}, Membrane.WebVTT.CueBuilderFilter) |> child({:segments, track_id}, %Membrane.WebVTT.SegmentFilter{ - segment_duration: state.opts.target_segment_duration, + segment_duration: segment_duration(state), headers: [ %Subtitle.WebVTT.HeaderLine{key: :description, original: "WEBVTT"}, %Subtitle.WebVTT.HeaderLine{ @@ -170,26 +196,6 @@ defmodule Membrane.HLS.SinkBin do {[spec: spec], state} end - def resume_info(packager_pid, track_id) do - Agent.get(packager_pid, fn packager -> - max_pts = - Packager.max_track_duration(packager) - |> Ratio.new() - |> Membrane.Time.seconds() - - track_pts = - if Packager.has_track?(packager, track_id) do - Packager.track_duration(packager, track_id) - |> Ratio.new() - |> Membrane.Time.seconds() - else - 0 - end - - {max_pts, track_pts} - end) - end - @impl true def handle_element_end_of_stream({:sink, _track_id} = sink, _pad, ctx, state) do all_sinks = @@ -233,6 +239,41 @@ defmodule Membrane.HLS.SinkBin do {[], live_schedule_next_sync(state)} end + defp audio_muxer(spec, state) do + child( + spec, + {:muxer, :audio}, + %Membrane.MP4.Muxer.CMAF{ + segment_min_duration: segment_duration(state) - Membrane.Time.millisecond() + }, + get_if_exists: true + ) + end + + defp segment_duration(state) do + state.opts.target_segment_duration - Membrane.Time.second() + end + + defp resume_info(packager_pid, track_id) do + Agent.get(packager_pid, fn packager -> + max_pts = + Packager.max_track_duration(packager) + |> Ratio.new() + |> Membrane.Time.seconds() + + track_pts = + if Packager.has_track?(packager, track_id) do + Packager.track_duration(packager, track_id) + |> Ratio.new() + |> Membrane.Time.seconds() + else + 0 + end + + {max_pts, track_pts} + end) + end + defp live_schedule_next_sync(state) do state = state diff --git a/mix.lock b/mix.lock index c2fa666..caeedc6 100644 --- a/mix.lock +++ b/mix.lock @@ -3,7 +3,7 @@ "bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"}, "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, "heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"}, - "kim_hls": {:git, "https://github.com/kim-company/kim_hls.git", "689723bdc39e09829004e4f32fd4c6e932abea5a", []}, + "kim_hls": {:git, "https://github.com/kim-company/kim_hls.git", "58abab9d137b885b31ef6e72d07d38652a9dc3ca", []}, "kim_q": {:hex, :kim_q, "1.0.0", "17cfc45e9f7e65485f0f31bbf09893d6ff35cc2fbefc39aed146a3c29740584e", [:mix], [{:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7a8ee76a2c2e774c34345df3c7a234a8effeedc3f3aea845feb7c09030097278"}, "kim_subtitle": {:git, "https://github.com/kim-company/kim_subtitle.git", "8239e1bcea938167829a6b8bd2a9678c63c7bdd4", []}, "logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"}, diff --git a/test/membrane/hls/sink_bin_test.exs b/test/membrane/hls/sink_bin_test.exs index 7ca9228..88b354e 100644 --- a/test/membrane/hls/sink_bin_test.exs +++ b/test/membrane/hls/sink_bin_test.exs @@ -9,8 +9,7 @@ defmodule Membrane.HLS.SinkBinTest do spec = [ child(:sink, %Membrane.HLS.SinkBin{ manifest_uri: URI.new!("file://#{tmp_dir}/stream.m3u8"), - min_segment_duration: Membrane.Time.seconds(5), - target_segment_duration: Membrane.Time.seconds(10), + target_segment_duration: Membrane.Time.seconds(7), storage: HLS.Storage.File.new() }), @@ -107,7 +106,7 @@ defmodule Membrane.HLS.SinkBinTest do bandwidth: 850_000, resolution: format.resolution, frame_rate: 25.0, - codecs: Membrane.HLS.serialize_codecs(format.codecs), + codecs: [], audio: "audio", subtitles: "subtitles" }