diff --git a/.gitignore b/.gitignore index 9f79198..759c1d3 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,6 @@ membrane_hls_plugin-*.tar # Temporary files, for example, from tests. /tmp/ + +/output +validation_data.json diff --git a/.tool-versions b/.tool-versions index b8f509b..de9b652 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,5 +1,2 @@ -elixir 1.15.7-otp-26 -erlang 26.1.2 -nodejs 16.16.0 -golang 1.16.6 -rust 1.74.0 \ No newline at end of file +elixir 1.16.3-otp-26 +erlang 26.2.5 diff --git a/examples/rtmp.exs b/examples/rtmp.exs new file mode 100644 index 0000000..1c4b494 --- /dev/null +++ b/examples/rtmp.exs @@ -0,0 +1,143 @@ +Mix.install([ + :membrane_core, + :membrane_rtmp_plugin, + :membrane_aac_plugin, + :membrane_h26x_plugin, + :membrane_tee_plugin, + {:membrane_ffmpeg_transcoder_plugin, "~> 1.1"}, + {:membrane_hls_plugin, path: Path.expand(Path.join(__DIR__, "../"))} +]) + +defmodule Pipeline do + use Membrane.Pipeline + + @impl true + def handle_init(_ctx, _opts) do + File.rm_rf("tmp") + + structure = [ + # Source + child(:source, %Membrane.RTMP.Source{ + url: "rtmp://0.0.0.0:1935/app/stream_key" + }) + |> child(:transcoder, Membrane.FFmpeg.Transcoder), + + # Sink + child(:sink, %Membrane.HLS.SinkBin{ + manifest_uri: URI.new!("file://tmp/stream.m3u8"), + target_segment_duration: Membrane.Time.seconds(7), + storage: HLS.Storage.File.new() + }), + + # Audio + get_child(:transcoder) + |> via_out(:audio, options: [bitrate: 128_000]) + |> child(:audio_parser, %Membrane.AAC.Parser{ + out_encapsulation: :none, + output_config: :esds + }) + |> via_in(Pad.ref(:input, "audio_128k"), + options: [ + encoding: :AAC, + build_stream: fn uri, %Membrane.CMAF.Track{} = format -> + %HLS.AlternativeRendition{ + uri: uri, + name: "Audio (EN)", + type: :audio, + group_id: "program_audio", + language: "en", + channels: to_string(format.codecs.mp4a.channels), + autoselect: true, + default: true + } + end + ] + ) + |> get_child(:sink), + + # Video HD + get_child(:transcoder) + |> via_out(:video, options: [ + resolution: {-2, 720}, + bitrate: 3_300_000, + profile: :high, + fps: 30, + gop_size: 60, + b_frames: 3, + crf: 26, + preset: :veryfast, + tune: :zerolatency + ]) + |> child({:parser, :hd}, %Membrane.H264.Parser{ + output_stream_structure: :avc1, + output_alignment: :au + }) + |> via_in(Pad.ref(:input, "video_720p"), + options: [ + encoding: :H264, + build_stream: fn uri, %Membrane.CMAF.Track{} = format -> + %HLS.VariantStream{ + uri: uri, + bandwidth: 3951200, + resolution: format.resolution, + codecs: Membrane.HLS.serialize_codecs(format.codecs), + audio: "program_audio" + } + end + ] + ) + |> get_child(:sink), + + # Video SD + get_child(:transcoder) + |> via_out(:video, options: [ + resolution: {-2, 360}, + bitrate: 1020800, + profile: :main, + fps: 15, + gop_size: 30, + b_frames: 3, + crf: 26, + preset: :veryfast, + tune: :zerolatency + ]) + |> child({:parser, :sd}, %Membrane.H264.Parser{ + output_stream_structure: :avc1, + output_alignment: :au + }) + |> via_in(Pad.ref(:input, "video_360p"), + options: [ + encoding: :H264, + build_stream: fn uri, %Membrane.CMAF.Track{} = format -> + %HLS.VariantStream{ + uri: uri, + bandwidth: 1_200_000, + resolution: format.resolution, + codecs: Membrane.HLS.serialize_codecs(format.codecs), + audio: "program_audio" + } + end + ] + ) + |> get_child(:sink) + ] + + {[spec: structure], %{}} + end + + def handle_child_notification(:end_of_stream, :sink, _ctx, state) do + {[terminate: :normal], state} + end +end + +# Start a pipeline with `Membrane.RTMP.Source` that will spawn an RTMP server waiting for +# the client connection on given URL +{:ok, _supervisor, pipeline} = Membrane.Pipeline.start_link(Pipeline) + +# Wait for the pipeline to terminate itself +ref = Process.monitor(pipeline) + +:ok = + receive do + {:DOWN, ^ref, _process, ^pipeline, :normal} -> :ok + end diff --git a/lib/membrane/hls.ex b/lib/membrane/hls.ex new file mode 100644 index 0000000..03184c9 --- /dev/null +++ b/lib/membrane/hls.ex @@ -0,0 +1,22 @@ +defmodule Membrane.HLS do + def serialize_codecs(codecs) do + codecs + |> Enum.map(&serialize_codec(&1)) + |> Enum.reject(&is_nil/1) + end + + defp serialize_codec({:avc1, %{profile: profile, compatibility: compatibility, level: level}}) do + [profile, compatibility, level] + |> Enum.map(&Integer.to_string(&1, 16)) + |> Enum.map_join(&String.pad_leading(&1, 2, "0")) + |> then(&"avc1.#{&1}") + |> String.downcase() + end + + defp serialize_codec({:hvc1, %{profile: profile, level: level}}), + do: "hvc1.#{profile}.4.L#{level}.B0" + + defp serialize_codec({:mp4a, %{aot_id: aot_id}}), do: String.downcase("mp4a.40.#{aot_id}") + + defp serialize_codec(_other), do: nil +end diff --git a/lib/membrane/hls/cmaf_sink.ex b/lib/membrane/hls/cmaf_sink.ex new file mode 100644 index 0000000..9a8bfb2 --- /dev/null +++ b/lib/membrane/hls/cmaf_sink.ex @@ -0,0 +1,115 @@ +defmodule Membrane.HLS.CMAFSink do + use Membrane.Sink + alias HLS.Packager + + def_input_pad( + :input, + accepted_format: Membrane.CMAF.Track + ) + + def_options( + packager_pid: [ + spec: pid(), + description: "PID of the packager." + ], + track_id: [ + spec: String.t(), + description: "ID of the track." + ], + build_stream: [ + spec: + (URI.t(), Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()), + description: "Build the stream with the given stream format" + ], + target_segment_duration: [ + spec: Membrane.Time.t() + ] + ) + + @impl true + def handle_init(_context, opts) do + {[], %{opts: opts, upload_tasks: %{}}} + end + + def handle_stream_format(:input, format, _ctx, state) do + track_id = state.opts.track_id + + target_segment_duration = + Membrane.Time.as_seconds(state.opts.target_segment_duration, :exact) + |> Ratio.ceil() + + Agent.update(state.opts.packager_pid, fn packager -> + packager = + if Packager.has_track?(packager, track_id) do + # Packager.discontinue_track(packager, track_id) + packager + else + uri = Packager.new_variant_uri(packager, track_id) + + Packager.add_track( + packager, + track_id, + codecs: Membrane.HLS.serialize_codecs(format.codecs), + stream: state.opts.build_stream.(uri, format), + segment_extension: ".m4s", + target_segment_duration: target_segment_duration + ) + end + + Packager.put_init_section(packager, track_id, format.header) + end) + + {[], state} + end + + def handle_buffer(:input, buffer, _ctx, state) do + {job_ref, upload_fun} = + Agent.get_and_update(state.opts.packager_pid, fn packager -> + {packager, {ref, upload_fun}} = + Packager.put_segment_async( + packager, + state.opts.track_id, + buffer.payload, + Membrane.Time.as_seconds(buffer.metadata.duration) |> Ratio.to_float() + ) + + {{ref, upload_fun}, packager} + end) + + task = Task.async(upload_fun) + + {[], put_in(state, [:upload_tasks, task.ref], %{job_ref: job_ref, task: task})} + end + + def handle_info({task_ref, :ok}, _ctx, state) do + Process.demonitor(task_ref, [:flush]) + + {data, state} = pop_in(state, [:upload_tasks, task_ref]) + + Agent.update(state.opts.packager_pid, fn packager -> + Packager.ack_segment(packager, state.opts.track_id, data.job_ref) + end) + + {[], state} + end + + def handle_info({:DOWN, _ref, _, _, reason}, _ctx, state) do + raise "Cannot write segment of track #{state.track_id} with reason: #{inspect(reason)}." + {[], state} + end + + def handle_end_of_stream(:input, _ctx, state) do + state.upload_tasks + |> Map.values() + |> Enum.map(& &1.task) + |> Task.await_many(:infinity) + + Agent.update(state.opts.packager_pid, fn packager -> + Enum.reduce(state.upload_tasks, packager, fn {_task_ref, data}, packager -> + Packager.ack_segment(packager, state.opts.track_id, data.job_ref) + end) + end) + + {[], %{state | upload_tasks: %{}}} + end +end diff --git a/lib/membrane/hls/filler/aac_filler.ex b/lib/membrane/hls/filler/aac_filler.ex new file mode 100644 index 0000000..4a08d18 --- /dev/null +++ b/lib/membrane/hls/filler/aac_filler.ex @@ -0,0 +1,56 @@ +defmodule Membrane.HLS.AACFiller do + use Membrane.Filter + + def_input_pad(:input, + accepted_format: Membrane.AAC + ) + + def_output_pad(:output, + accepted_format: Membrane.RemoteStream + ) + + def_options( + duration: [ + spec: Membrane.Time.t() + ] + ) + + def handle_init(_ctx, opts) do + {[], %{duration: opts.duration, format: nil, filled: false}} + end + + def handle_stream_format(:input, format, _ctx, state) do + {[forward: %Membrane.RemoteStream{content_format: Membrane.AAC}], %{state | format: format}} + end + + def handle_buffer(:input, buffer, _ctx, state) do + buffer = %{buffer | pts: nil, dts: nil} + + if state.filled do + {[forward: buffer], state} + else + format = state.format + + silence_buffer = + if Membrane.Time.as_milliseconds(state.duration, :round) > 0 do + duration = + Membrane.Time.as_seconds(state.duration) + |> Ratio.to_float() + + {silence, 0} = + System.cmd( + "ffmpeg", + ~w(-f lavfi -i anullsrc=r=#{format.sample_rate} -ac #{format.channels} -t #{duration} -c:a aac -f adts -) + ) + + %Membrane.Buffer{ + payload: silence + } + else + nil + end + + {[buffer: {:output, List.wrap(silence_buffer) ++ [buffer]}], %{state | filled: true}} + end + end +end diff --git a/lib/membrane/hls/filler/text_filler.ex b/lib/membrane/hls/filler/text_filler.ex new file mode 100644 index 0000000..dae49d6 --- /dev/null +++ b/lib/membrane/hls/filler/text_filler.ex @@ -0,0 +1,39 @@ +defmodule Membrane.HLS.TextFiller do + use Membrane.Filter + + def_input_pad(:input, + accepted_format: Membrane.Text + ) + + def_output_pad(:output, + accepted_format: Membrane.Text + ) + + def_options( + from: [ + spec: Membrane.Time.t() + ] + ) + + def handle_init(_ctx, opts) do + {[], %{from: opts.from, filled: false}} + end + + def handle_buffer(:input, buffer, _ctx, state) do + if state.filled do + {[forward: buffer], state} + else + Membrane.Logger.debug( + "Generated empty text buffer with a duration of #{buffer.pts - state.from - Membrane.Time.millisecond()}" + ) + + silence_buffer = %Membrane.Buffer{ + payload: "", + pts: state.from, + metadata: %{to: buffer.pts - Membrane.Time.millisecond()} + } + + {[buffer: {:output, [silence_buffer, buffer]}], %{state | filled: true}} + end + end +end diff --git a/lib/membrane/hls/shifter.ex b/lib/membrane/hls/shifter.ex new file mode 100644 index 0000000..546fedd --- /dev/null +++ b/lib/membrane/hls/shifter.ex @@ -0,0 +1,35 @@ +defmodule Membrane.HLS.Shifter do + use Membrane.Filter + + def_input_pad(:input, + accepted_format: any_of(Membrane.H264, Membrane.AAC, Membrane.Text) + ) + + def_output_pad(:output, + accepted_format: any_of(Membrane.H264, Membrane.AAC, Membrane.Text) + ) + + def_options( + duration: [ + spec: Membrane.Time.t() + ] + ) + + def handle_init(_ctx, opts) do + {[], %{duration: opts.duration}} + end + + def handle_buffer(:input, buffer, _ctx, state) do + shifted_buffer = %{ + buffer + | pts: buffer.pts + state.duration, + dts: Membrane.Buffer.get_dts_or_pts(buffer) + state.duration, + metadata: update_metadata(buffer.metadata, state.duration) + } + + {[buffer: {:output, shifted_buffer}], state} + end + + def update_metadata(%{to: to} = metadata, duration), do: %{metadata | to: to + duration} + def update_metadata(metadata, _duration), do: metadata +end diff --git a/lib/membrane/hls/sink_bin.ex b/lib/membrane/hls/sink_bin.ex new file mode 100644 index 0000000..02d2f0d --- /dev/null +++ b/lib/membrane/hls/sink_bin.ex @@ -0,0 +1,317 @@ +defmodule Membrane.HLS.SinkBin do + @moduledoc """ + Bin responsible for receiving audio and video streams, performing payloading and CMAF + muxing to eventually store them using provided storage configuration. + """ + use Membrane.Bin + alias HLS.Packager + + require Membrane.Logger + + def_options( + manifest_uri: [ + spec: URI.t(), + description: """ + Destination URI of the manifest. + Example: file://output/stream.m3u8 + """ + ], + storage: [ + spec: HLS.Storage, + required: true, + description: """ + Implementation of the storage. + """ + ], + target_segment_duration: [ + spec: Membrane.Time.t(), + description: """ + Target duration for each HLS segment. + """ + ], + mode: [ + spec: {:live, Membrane.Time.t()} | :vod, + default: :vod, + description: """ + * {:live, safety_delay} -> This element will include the provided segments + in the media playlist each target_segment_duration. + * :vod -> At the end of the segment production, playlists are written down. + """ + ] + ) + + def_input_pad(:input, + accepted_format: any_of(Membrane.H264, Membrane.AAC, Membrane.Text), + availability: :on_request, + options: [ + encoding: [ + spec: :AAC | :H264 | :TEXT, + description: """ + Encoding type determining which parser will be used for the given stream. + """ + ], + build_stream: [ + spec: + (URI.t(), Membrane.CMAF.Track.t() -> + HLS.VariantStream.t() | HLS.AlternativeRendition.t()), + description: "Build either a `HLS.VariantStream` or a `HLS.AlternativeRendition`." + ] + ] + ) + + @impl true + def handle_init(_context, opts) do + {[], %{opts: opts, packager_pid: nil, ended_sinks: MapSet.new(), live_state: nil}} + end + + @impl true + def handle_setup(_context, state) do + {:ok, packager_pid} = + Agent.start_link(fn -> + Packager.new( + storage: state.opts.storage, + manifest_uri: state.opts.manifest_uri, + resume_finished_tracks: true + ) + end) + + {[], %{state | packager_pid: packager_pid}} + end + + @impl true + def handle_element_start_of_stream( + child = {:muxer, _}, + _pad, + _ctx, + state = %{live_state: nil, opts: %{mode: {:live, _}}} + ) do + Membrane.Logger.debug("Initializing live state: triggering child: #{inspect(child)}") + {[], live_init_state(state)} + end + + def handle_element_start_of_stream(_child, _pad, _ctx, state) do + {[], state} + end + + @impl true + def handle_pad_added(_pad, ctx, _state) when ctx.playback == :playing, + do: + raise( + "New pads can be added to #{inspect(__MODULE__)} only before playback transition to :playing" + ) + + @impl true + def handle_pad_added( + Pad.ref(:input, track_id) = pad, + %{pad_options: %{encoding: :AAC} = pad_opts}, + state + ) do + {_max_pts, _track_pts} = resume_info(state.packager_pid, track_id) + + spec = + bin_input(pad) + # |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts}) + |> via_in(Pad.ref(:input, track_id)) + |> audio_muxer(state) + |> via_out(Pad.ref(:output), options: [tracks: [track_id]]) + |> child({:sink, track_id}, %Membrane.HLS.CMAFSink{ + packager_pid: state.packager_pid, + track_id: track_id, + target_segment_duration: state.opts.target_segment_duration, + build_stream: pad_opts.build_stream + }) + + {[spec: spec], state} + end + + @impl true + def handle_pad_added( + Pad.ref(:input, track_id) = pad, + %{pad_options: %{encoding: :H264} = pad_opts} = ctx, + state + ) do + {_max_pts, _track_pts} = resume_info(state.packager_pid, track_id) + + had_video_input? = + Enum.any?(ctx.pads, fn {Pad.ref(:input, id), data} -> + id != track_id and data.options.encoding == :H264 + end) + + muxer = fn spec -> + if had_video_input? do + child(spec, {:muxer, track_id}, %Membrane.MP4.Muxer.CMAF{ + segment_min_duration: segment_min_duration(state) + }) + else + spec + |> via_in(Pad.ref(:input, track_id)) + |> audio_muxer(state) + |> via_out(Pad.ref(:output), options: [tracks: [track_id]]) + end + end + + spec = + bin_input(pad) + # |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts}) + |> muxer.() + |> child({:sink, track_id}, %Membrane.HLS.CMAFSink{ + packager_pid: state.packager_pid, + track_id: track_id, + target_segment_duration: state.opts.target_segment_duration, + build_stream: pad_opts.build_stream + }) + + {[spec: spec], state} + end + + def handle_pad_added( + Pad.ref(:input, track_id) = pad, + %{pad_options: %{encoding: :TEXT} = pad_opts}, + state + ) do + {_max_pts, _track_pts} = resume_info(state.packager_pid, track_id) + + spec = + bin_input(pad) + # |> child({:shifter, track_id}, %Membrane.HLS.Shifter{duration: max_pts}) + # |> child({:filler, track_id}, %Membrane.HLS.TextFiller{from: track_pts}) + |> child({:cues, track_id}, Membrane.WebVTT.CueBuilderFilter) + |> child({:segments, track_id}, %Membrane.WebVTT.SegmentFilter{ + segment_duration: state.opts.target_segment_duration - Membrane.Time.second(), + headers: [ + %Subtitle.WebVTT.HeaderLine{key: :description, original: "WEBVTT"}, + %Subtitle.WebVTT.HeaderLine{ + key: :x_timestamp_map, + original: "X-TIMESTAMP-MAP=LOCAL:00:00:00.000,MPEGTS:90000" + } + ] + }) + |> child({:sink, track_id}, %Membrane.HLS.WebVTTSink{ + packager_pid: state.packager_pid, + track_id: track_id, + target_segment_duration: state.opts.target_segment_duration, + build_stream: pad_opts.build_stream + }) + + {[spec: spec], state} + end + + @impl true + def handle_element_end_of_stream({:sink, _track_id} = sink, _pad, ctx, state) do + all_sinks = + ctx.children + |> Map.keys() + |> Enum.filter(&match?({:sink, _}, &1)) + |> MapSet.new() + + ended_sinks = MapSet.put(state.ended_sinks, sink) + + if MapSet.equal?(all_sinks, ended_sinks) do + Agent.update(state.packager_pid, &Packager.flush(&1)) + + state = + state + |> put_in([:live_state], %{stop: true}) + |> put_in([:ended_sinks], ended_sinks) + + {[notify_parent: :end_of_stream], state} + else + {[], %{state | ended_sinks: ended_sinks}} + end + end + + def handle_element_end_of_stream(_element, _pad, _ctx, state) do + {[], state} + end + + @impl true + def handle_info(:sync, _ctx, state = %{live_state: %{stop: true}}) do + {[], state} + end + + def handle_info(:sync, _ctx, state) do + Membrane.Logger.debug("Packager: syncing playlists") + + Agent.update(state.packager_pid, fn p -> + Packager.sync(p, state.live_state.next_sync_point) + end) + + {[], live_schedule_next_sync(state)} + end + + defp audio_muxer(spec, state) do + child( + spec, + {:muxer, :audio}, + %Membrane.MP4.Muxer.CMAF{ + segment_min_duration: segment_min_duration(state) + }, + get_if_exists: true + ) + end + + defp segment_min_duration(state) do + state.opts.target_segment_duration - Membrane.Time.seconds(2) + end + + defp resume_info(packager_pid, track_id) do + Agent.get(packager_pid, fn packager -> + max_pts = + Packager.max_track_duration(packager) + |> Ratio.new() + |> Membrane.Time.seconds() + + track_pts = + if Packager.has_track?(packager, track_id) do + Packager.track_duration(packager, track_id) + |> Ratio.new() + |> Membrane.Time.seconds() + else + 0 + end + + {max_pts, track_pts} + end) + end + + defp live_schedule_next_sync(state) do + state = + state + |> update_in([:live_state, :next_sync_point], fn x -> + x + state.opts.target_segment_duration + end) + |> update_in([:live_state, :next_deadline], fn x -> + x + Membrane.Time.as_milliseconds(state.opts.target_segment_duration, :round) + end) + + Process.send_after(self(), :sync, state.live_state.next_deadline, abs: true) + state + end + + defp live_init_state(state) do + # Tells where in the playlist we should start issuing segments. + next_sync_point = + Agent.get( + state.packager_pid, + &Packager.next_sync_point(&1, state.opts.target_segment_duration) + ) + + {:live, safety_delay} = state.opts.mode + now = :erlang.monotonic_time(:millisecond) + + # Tells when we should do it. + deadline = + now + Membrane.Time.as_milliseconds(state.opts.target_segment_duration, :round) + + Membrane.Time.as_milliseconds(safety_delay, :round) + + live_state = %{ + next_sync_point: next_sync_point, + next_deadline: deadline, + stop: false + } + + Process.send_after(self(), :sync, deadline, abs: true) + + %{state | live_state: live_state} + end +end diff --git a/lib/membrane/hls/webvtt_sink.ex b/lib/membrane/hls/webvtt_sink.ex new file mode 100644 index 0000000..be30315 --- /dev/null +++ b/lib/membrane/hls/webvtt_sink.ex @@ -0,0 +1,111 @@ +defmodule Membrane.HLS.WebVTTSink do + use Membrane.Sink + alias HLS.Packager + + def_input_pad( + :input, + accepted_format: Membrane.Text + ) + + def_options( + packager_pid: [ + spec: pid(), + description: "PID of the packager." + ], + track_id: [ + spec: String.t(), + description: "ID of the track." + ], + build_stream: [ + spec: + (URI.t(), Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()), + description: "Build the stream with the given stream format" + ], + target_segment_duration: [ + spec: Membrane.Time.t() + ] + ) + + @impl true + def handle_init(_context, opts) do + {[], %{opts: opts, upload_tasks: %{}}} + end + + def handle_stream_format(:input, format, _ctx, state) do + track_id = state.opts.track_id + + target_segment_duration = + Membrane.Time.as_seconds(state.opts.target_segment_duration, :exact) + |> Ratio.ceil() + + Agent.update(state.opts.packager_pid, fn packager -> + if Packager.has_track?(packager, track_id) do + # Packager.discontinue_track(packager, track_id) + packager + else + uri = Packager.new_variant_uri(packager, track_id) + + Packager.add_track( + packager, + track_id, + stream: state.opts.build_stream.(uri, format), + segment_extension: ".vtt", + target_segment_duration: target_segment_duration + ) + end + end) + + {[], state} + end + + def handle_buffer(:input, buffer, _ctx, state) do + {job_ref, upload_fun} = + Agent.get_and_update(state.opts.packager_pid, fn packager -> + {packager, {ref, upload_fun}} = + Packager.put_segment_async( + packager, + state.opts.track_id, + buffer.payload, + Membrane.Time.as_seconds(buffer.metadata.duration) |> Ratio.to_float() + ) + + {{ref, upload_fun}, packager} + end) + + task = Task.async(upload_fun) + + {[], put_in(state, [:upload_tasks, task.ref], %{job_ref: job_ref, task: task})} + end + + def handle_info({task_ref, :ok}, _ctx, state) do + Process.demonitor(task_ref, [:flush]) + + {data, state} = pop_in(state, [:upload_tasks, task_ref]) + + Agent.update(state.opts.packager_pid, fn packager -> + Packager.ack_segment(packager, state.opts.track_id, data.job_ref) + end) + + {[], state} + end + + def handle_info({:DOWN, _ref, _, _, reason}, _ctx, state) do + raise "Cannot write segment of track #{state.track_id} with reason: #{inspect(reason)}." + {[], state} + end + + def handle_end_of_stream(:input, _ctx, state) do + state.upload_tasks + |> Map.values() + |> Enum.map(& &1.task) + |> Task.await_many(:infinity) + + Agent.update(state.opts.packager_pid, fn packager -> + Enum.reduce(state.upload_tasks, packager, fn {_task_ref, data}, packager -> + Packager.ack_segment(packager, state.opts.track_id, data.job_ref) + end) + end) + + {[], %{state | upload_tasks: %{}}} + end +end diff --git a/mix.exs b/mix.exs index b42aa5f..d285c7d 100644 --- a/mix.exs +++ b/mix.exs @@ -24,9 +24,14 @@ defmodule Membrane.HLS.MixProject do defp deps do [ {:membrane_core, "~> 1.0"}, - {:membrane_file_plugin, "~> 0.16.0", only: :test}, + {:membrane_file_plugin, "~> 0.17"}, + {:membrane_mp4_plugin, "~> 0.35"}, + {:membrane_aac_plugin, "~> 0.18"}, + {:membrane_h26x_plugin, "~> 0.10"}, + {:kim_q, "~> 1.0"}, {:kim_hls, github: "kim-company/kim_hls"}, - {:kim_q, github: "kim-company/kim_q"} + {:membrane_text_format, github: "kim-company/membrane_text_format"}, + {:membrane_webvtt_plugin, github: "kim-company/membrane_webvtt_plugin"} ] end diff --git a/mix.lock b/mix.lock index 1cc636b..caeedc6 100644 --- a/mix.lock +++ b/mix.lock @@ -1,22 +1,28 @@ %{ + "bimap": {:hex, :bimap, "1.3.0", "3ea4832e58dc83a9b5b407c6731e7bae87458aa618e6d11d8e12114a17afa4b3", [:mix], [], "hexpm", "bf5a2b078528465aa705f405a5c638becd63e41d280ada41e0f77e6d255a10b4"}, "bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"}, - "certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"}, "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, - "hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"}, - "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, - "kim_hls": {:git, "https://github.com/kim-company/kim_hls.git", "daab76ec6a56443204e7bbeb452102711e51a52d", []}, - "kim_q": {:git, "https://github.com/kim-company/kim_q.git", "4a4924efead4230e488375fe9e2aa817931cd05a", []}, - "membrane_core": {:hex, :membrane_core, "1.0.0", "1b543aefd952283be1f2a215a1db213aa4d91222722ba03cd35280622f1905ee", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "352c90fd0a29942143c4bf7a727cc05c632e323f50a1a4e99321b1e8982f1533"}, - "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.16.0", "7917f6682c22b9bcfc2ca20ed960eee0f7d03ad31fd5f59ed850f1fe3ddd545a", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "b0727998f75a9b4dab8a2baefdfc13c3eac00a04e061ab1b0e61dc5566927acc"}, - "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, - "mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"}, - "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"}, + "kim_hls": {:git, "https://github.com/kim-company/kim_hls.git", "58abab9d137b885b31ef6e72d07d38652a9dc3ca", []}, + "kim_q": {:hex, :kim_q, "1.0.0", "17cfc45e9f7e65485f0f31bbf09893d6ff35cc2fbefc39aed146a3c29740584e", [:mix], [{:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7a8ee76a2c2e774c34345df3c7a234a8effeedc3f3aea845feb7c09030097278"}, + "kim_subtitle": {:git, "https://github.com/kim-company/kim_subtitle.git", "8239e1bcea938167829a6b8bd2a9678c63c7bdd4", []}, + "logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"}, + "membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"}, + "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.2", "7da3f73e60ad12178623379fd62e0a90e127d65541ab9c46f35e996642db4786", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "24060500f83b0697cccbe8528ba48c0fbc225b88c87057c413aa3bf5d0dcb6a6"}, + "membrane_cmaf_format": {:hex, :membrane_cmaf_format, "0.7.1", "9ea858faefdcb181cdfa8001be827c35c5f854e9809ad57d7062cff1f0f703fd", [:mix], [], "hexpm", "3c7b4ed2a986e27f6f336d2f19e9442cb31d93b3142fc024c019572faca54a73"}, + "membrane_core": {:hex, :membrane_core, "1.1.1", "4dcff6e9f3b2ecd4f437c20e201e53957731772c0f15b3005062c41f7f58f500", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3802f3fc071505c59d48792487d9927e803d4edb4039710ffa52cdb60bb0aecc"}, + "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.2", "650e134c2345d946f930082fac8bac9f5aba785a7817d38a9a9da41ffc56fa92", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "df50c6040004cd7b901cf057bd7e99c875bbbd6ae574efc93b2c753c96f43b9d"}, + "membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"}, + "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, + "membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.2", "caf2790d8c107df35f8d456b45f4e09fb9c56ce6c7669a3a03f7d59972e6ed82", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "becf1ac4a589adecd850137ccd61a33058f686083a514a7e39fcd721bcf9fb2e"}, + "membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"}, + "membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.35.2", "cbedb5272ef1c8f7d9cd3c44f820a90306469b1dc84b8db30ff55bb6195b7cb2", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "8afd4e7779a742dd56c23f1f23053933d1b0b34d397ad368a2f56f995edb2fe0"}, + "membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"}, + "membrane_text_format": {:git, "https://github.com/kim-company/membrane_text_format.git", "e6dec9d0b50d766194826f6b7a739d167f10275f", []}, + "membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"}, + "membrane_webvtt_plugin": {:git, "https://github.com/kim-company/membrane_webvtt_plugin.git", "09cbd85fd18f45bd7f570d71eafdc1ca10b35bd0", []}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, - "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, - "ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"}, - "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, - "tesla": {:hex, :tesla, "1.4.4", "bb89aa0c9745190930366f6a2ac612cdf2d0e4d7fff449861baa7875afd797b2", [:mix], [{:castore, "~> 0.1", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.3", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, "~> 1.3", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "d5503a49f9dec1b287567ea8712d085947e247cb11b06bc54adb05bfde466457"}, - "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, + "ratio": {:hex, :ratio, "4.0.1", "3044166f2fc6890aa53d3aef0c336f84b2bebb889dc57d5f95cc540daa1912f8", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "c60cbb3ccdff9ffa56e7d6d1654b5c70d9f90f4d753ab3a43a6bf40855b881ce"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, } diff --git a/test/fixtures/samples_big-buck-bunny_bun33s.aac b/test/fixtures/samples_big-buck-bunny_bun33s.aac new file mode 100644 index 0000000..0930400 Binary files /dev/null and b/test/fixtures/samples_big-buck-bunny_bun33s.aac differ diff --git a/test/fixtures/samples_big-buck-bunny_bun33s_720x480.h264 b/test/fixtures/samples_big-buck-bunny_bun33s_720x480.h264 new file mode 100644 index 0000000..8805388 Binary files /dev/null and b/test/fixtures/samples_big-buck-bunny_bun33s_720x480.h264 differ diff --git a/test/membrane/hls/sink_bin_test.exs b/test/membrane/hls/sink_bin_test.exs new file mode 100644 index 0000000..88b354e --- /dev/null +++ b/test/membrane/hls/sink_bin_test.exs @@ -0,0 +1,123 @@ +defmodule Membrane.HLS.SinkBinTest do + use ExUnit.Case + use Membrane.Pipeline + + import Membrane.Testing.Assertions + + @tag :tmp_dir + test "on a new stream", %{tmp_dir: tmp_dir} do + spec = [ + child(:sink, %Membrane.HLS.SinkBin{ + manifest_uri: URI.new!("file://#{tmp_dir}/stream.m3u8"), + target_segment_duration: Membrane.Time.seconds(7), + storage: HLS.Storage.File.new() + }), + + # Audio + child(:aac_source, %Membrane.File.Source{ + location: "test/fixtures/samples_big-buck-bunny_bun33s.aac" + }) + |> child(:aac_parser, %Membrane.AAC.Parser{ + out_encapsulation: :none, + output_config: :esds + }) + |> via_in(Pad.ref(:input, "audio_128k"), + options: [ + encoding: :AAC, + build_stream: fn uri, %Membrane.CMAF.Track{} = format -> + %HLS.AlternativeRendition{ + uri: uri, + name: "Audio (EN)", + type: :audio, + group_id: "audio", + language: "en", + channels: to_string(format.codecs.mp4a.channels), + default: true, + autoselect: true + } + end + ] + ) + |> get_child(:sink), + + # Subtitles + child(:text_source, %Membrane.Testing.Source{ + stream_format: %Membrane.Text{locale: "de"}, + output: [ + %Membrane.Buffer{ + payload: "", + pts: 0, + metadata: %{to: Membrane.Time.milliseconds(99)} + }, + %Membrane.Buffer{ + payload: "Subtitle from start to 5s", + pts: Membrane.Time.milliseconds(100), + metadata: %{to: Membrane.Time.seconds(5)} + }, + %Membrane.Buffer{ + payload: "", + pts: Membrane.Time.seconds(5) + Membrane.Time.millisecond(), + metadata: %{to: Membrane.Time.seconds(11)} + }, + %Membrane.Buffer{ + payload: "Subtitle from 11s to 15s", + pts: Membrane.Time.seconds(11) + Membrane.Time.millisecond(), + metadata: %{to: Membrane.Time.seconds(15)} + }, + %Membrane.Buffer{ + payload: "", + pts: Membrane.Time.seconds(15) + Membrane.Time.millisecond(), + metadata: %{to: Membrane.Time.seconds(32)} + } + ] + }) + |> via_in(Pad.ref(:input, "subtitles"), + options: [ + encoding: :TEXT, + build_stream: fn uri, %Membrane.Text{} = format -> + %HLS.AlternativeRendition{ + uri: uri, + name: "Subtitles (EN)", + type: :subtitles, + group_id: "subtitles", + language: format.locale, + default: true, + autoselect: true + } + end + ] + ) + |> get_child(:sink), + + # Video + child(:h264_source, %Membrane.File.Source{ + location: "test/fixtures/samples_big-buck-bunny_bun33s_720x480.h264" + }) + |> child(:h264_parser, %Membrane.H264.Parser{ + generate_best_effort_timestamps: %{framerate: {25, 1}}, + output_stream_structure: :avc1 + }) + |> via_in(Pad.ref(:input, "video_460x720"), + options: [ + encoding: :H264, + build_stream: fn uri, %Membrane.CMAF.Track{} = format -> + %HLS.VariantStream{ + uri: uri, + bandwidth: 850_000, + resolution: format.resolution, + frame_rate: 25.0, + codecs: [], + audio: "audio", + subtitles: "subtitles" + } + end + ] + ) + |> get_child(:sink) + ] + + pipeline = Membrane.Testing.Pipeline.start_link_supervised!(spec: spec) + assert_pipeline_notified(pipeline, :sink, :end_of_stream, 10_000) + :ok = Membrane.Pipeline.terminate(pipeline) + end +end