Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader is not clear for making a simple HLS source #12

Open
grant-zukowski-xumo opened this issue Nov 7, 2024 · 9 comments
Open

Reader is not clear for making a simple HLS source #12

grant-zukowski-xumo opened this issue Nov 7, 2024 · 9 comments

Comments

@grant-zukowski-xumo
Copy link

I've tried a few times to make a reader for my HLS source. Each time I get an error about my reader. I'm happy to make a pull request for an hls source example, but I'm having a hard time getting the most basic thing to work. I've referenced your tests cases as well, but nothing seems to help.

16:31:43.793 [error] <0.3189.0>/:source Error occured in Membrane Element:
** (Protocol.UndefinedError) protocol Membrane.HLS.Reader not implemented for %Support.Reader{} of type Support.Reader (a struct). There are no implementations for this protocol.
Mix.install([
  :membrane_core,
  # :membrane_rtmp_plugin,
  # :membrane_aac_plugin,
  # :membrane_h26x_plugin,
  # :membrane_tee_plugin,
  # {:membrane_ffmpeg_transcoder_plugin, "~> 1.1"},
  {:membrane_hls_plugin, github: "kim-company/membrane_hls_plugin"}
])


defmodule My.Reader do
  defstruct []
end

defimpl Membrane.HLS.Reader, for: My.Reader do
  @impl true
  def read(_, %URI{path: path}, _), do: File.read(path)

  @impl true
  def exists?(_, %URI{path: path}) do
    case File.stat(path) do
      {:ok, _} -> true
      {:error, _} -> false
    end
  end
end

defmodule Membrane.Demo.HLSIn do
  @moduledoc """
  Pipeline that contains HLS input element.
  """

  use Membrane.Pipeline

  @impl true
  def handle_init(_ctx, _opts) do
    spec =
      child(:source, %Membrane.HLS.Source{
        reader: %My.Reader{},
        master_playlist_uri: "<real hls stream on the internet>"
      })

    {[spec: spec], %{}}
  end
end

{:ok, _supervisor, _pipeline} = Membrane.Pipeline.start_link(Membrane.Demo.HLSIn, [])
@varsill
Copy link
Contributor

varsill commented Dec 13, 2024

Hi @grant-zukowski-xumo
The reason why you cannot run the script is because the protocols obtained with Mix.install are already "consolidated" (it's assumed that all its implementations are already defined, which allows for some optimizations, see: https://hexdocs.pm/elixir/1.14/Protocol.html#module-consolidation).

In order to turn off that optimization, which is required in your case, you need to pass consolidate_protocols: false flag to Mix.install.
Apart from that you also need to wait in your script for the pipeline's termination.
Your updated script can look as follows:

Mix.install([
  :membrane_core,
  :membrane_file_plugin,
  {:membrane_hls_plugin, github: "kim-company/membrane_hls_plugin"}
], consolidate_protocols: false)


defmodule My.Reader do
  defstruct []
end

defimpl Membrane.HLS.Reader, for: My.Reader do
  @impl true
  def read(_, %URI{path: path}, _), do: File.read(path)

  @impl true
  def exists?(_, %URI{path: path}) do
    case File.stat(path) do
      {:ok, _} -> true
      {:error, _} -> false
    end
  end
end

defmodule Membrane.Demo.HLSIn do
  @moduledoc """
  Pipeline that contains HLS input element.
  """

  use Membrane.Pipeline

  @impl true
  def handle_init(_ctx, _opts) do
    spec =
      child(:source, %Membrane.HLS.Source{
        reader: %My.Reader{},
        master_playlist_uri: "<real hls stream on the internet>"
      })
      # ... rest of your processing goes here
     |> child(:sink, %Membrane.File.Sink{location: "<output>"})

    {[spec: spec], %{}}
  end

  @impl true
  def handle_element_end_of_stream(:sink, :input, _ctx, state) do
    {[terminate: :normal], state}
  end
end

{:ok, _supervisor, pipeline_pid} = Membrane.Pipeline.start_link(Membrane.Demo.HLSIn, [])
ref = Process.monitor(pipeline_pid)
:ok = receive do
  {:DOWN, ^ref, :process, ^pipeline_pid, _reason} -> :ok
end

Surely, you need to extend your spec with some further elements to do the desired processing, I've just added :sink so that to show you how to wait for the pipeline's termination.

@grant-zukowski-xumo
Copy link
Author

grant-zukowski-xumo commented Dec 13, 2024

thanks for your answer but I tried and got a new error. I came to this library so I could use Membrane with an HLS as an input source - initial question I asked, https://github.com/orgs/membraneframework/discussions/878. I was directed here from the main membrane library. This is the first time I've tried to use Membrane and I just need an hls input to work. Can you please post a working example (for hls input) in your examples directory or help me fix the block you shared.

We are not going to be doing much to the signal except for passing it through for now. We mainly need to change an HLS pull into an HLS push.

13:38:13.777 [error] <0.3079.0>/:source Error occured in Membrane Element:
** (FunctionClauseError) no function clause matching in Membrane.HLS.Reader.My.Reader.read/3
    iex:7: Membrane.HLS.Reader.My.Reader.read(%My.Reader{}, "<contribution hls stream>", [])
    (membrane_hls_plugin 0.1.0) lib/membrane/hls/source.ex:106: Membrane.HLS.Source.handle_info/3
    (membrane_core 1.1.2) lib/membrane/core/callback_handler.ex:139: Membrane.Core.CallbackHandler.exec_callback/4
    (membrane_core 1.1.2) lib/membrane/core/callback_handler.ex:69: Membrane.Core.CallbackHandler.exec_and_handle_callback/5
    (membrane_core 1.1.2) lib/membrane/core/element.ex:299: Membrane.Core.Element.handle_info/2
    (stdlib 6.0) gen_server.erl:2173: :gen_server.try_handle_info/3
    (stdlib 6.0) gen_server.erl:2261: :gen_server.handle_msg/6
    (stdlib 6.0) proc_lib.erl:329: :proc_lib.init_p_do_apply/3

@grant-zukowski-xumo
Copy link
Author

I updated my reader so I think it is happy now. I'm still working on the :sink park, or some way to know this is "working". I may also try an hls output.

@varsill
Copy link
Contributor

varsill commented Dec 18, 2024

Hello,
it turns out that there is some issue with versioning. I think that membrane_hls_plugin is not compatible with the newest version of kim_hls, so it throws errors when we try to install it without specifying particular version of kim_hls.
I've needed to override kim_hls to a particular version:

Mix.install([
  {:membrane_core, "~> 1.1.1"},
  {:membrane_file_plugin, "~> 0.17.0"},
  {:membrane_hls_plugin, github: "kim-company/membrane_hls_plugin", ref: "a19ab7e12aa75c31c6b75b40f709c8f0ce9195a"},
  {:membrane_mpeg_ts_plugin, github: "kim-company/membrane_mpeg_ts_plugin"},
  {:kim_hls, github: "kim-company/kim_hls", ref: "4226bf8a6f75bb69376a9b04097d783fabd33451", override: true},
], consolidate_protocols: false)


defmodule My.Reader do
  defstruct []
end

defimpl Membrane.HLS.Reader, for: My.Reader do
  @impl true
  def read(_, %URI{path: path}, _), do: File.read(path)

  @impl true
  def exists?(_, %URI{path: path}) do
    case File.stat(path) do
      {:ok, _} -> true
      {:error, _} -> false
    end
  end
end

defmodule Membrane.Demo.HLSIn do
  use Membrane.Pipeline

  alias Membrane.HLS.Source
  alias HLS.Playlist.Master

  @master_playlist_uri URI.new!("./test/fixtures/mpeg-ts/stream.m3u8")

  @impl true
  def handle_init(_ctx, opts) do
    structure = child(:source, %Source{reader: %My.Reader{}, master_playlist_uri: @master_playlist_uri})

    {[spec: structure], opts}
  end

  @impl true
  def handle_child_notification({:hls_master_playlist, master}, :source, _ctx, state) do
    stream =
      master
      |> Master.variant_streams()
      |> Enum.at(0) # we always choose stream variant 0 here

    case stream do
      nil ->
        {[], state}

      stream ->
        structure = [
          get_child(:source)
          |> via_out(Pad.ref(:output, {:rendition, stream}))
          |> child(:demuxer, Membrane.MPEG.TS.Demuxer)
        ]

        {[{:spec, structure}], state}
    end
  end

  @impl true
  def handle_child_notification({:mpeg_ts_pmt, pmt}, :demuxer, _context, state) do
    %{streams: streams} = pmt

    audio_track_id =
      Enum.find_value(streams, fn {track_id, content} ->
        if content.stream_type == :AAC, do: track_id
      end)

    video_track_id =
      Enum.find_value(streams, fn {track_id, content} ->
        if content.stream_type == :H264, do: track_id
      end)

    {:ok, packager} =
      HLS.Packager.start_link(
        storage: HLS.Storage.File.new(),
        manifest_uri: URI.new!("file://tmp/stream.m3u8"),
        resume_finished_tracks: true,
        restore_pending_segments: false
      )

    structure = [
      child(:sink, %Membrane.HLS.SinkBin{
        packager: packager,
        target_segment_duration: Membrane.Time.seconds(7),
      }),
      get_child(:demuxer)
      |> via_out(Pad.ref(:output, {:stream_id, audio_track_id}))
      |> child(:audio_parser, %Membrane.AAC.Parser{
            out_encapsulation: :none,
            output_config: :esds
      })
      |> via_in(Pad.ref(:input, "audio_128k"),
        options: [
          encoding: :AAC,
          segment_duration: Membrane.Time.seconds(6),
          build_stream: fn %Membrane.CMAF.Track{} = format ->
            %HLS.AlternativeRendition{
              name: "Audio (EN)",
              type: :audio,
              group_id: "program_audio",
              language: "en",
              channels: to_string(format.codecs.mp4a.channels),
              autoselect: true,
              default: true
            }
          end
        ]
      )
      |> get_child(:sink),
      get_child(:demuxer)
      |> via_out(Pad.ref(:output, {:stream_id, video_track_id}))
       |> child({:parser, :hd}, %Membrane.H264.Parser{
            output_stream_structure: :avc1,
            output_alignment: :au
          })
      |> via_in(Pad.ref(:input, "video_720p"),
        options: [
          encoding: :H264,
          segment_duration: Membrane.Time.seconds(6),
          build_stream: fn %Membrane.CMAF.Track{} = format ->
            %HLS.VariantStream{
              uri: nil,
              bandwidth: 3951200,
              resolution: format.resolution,
              codecs: Membrane.HLS.serialize_codecs(format.codecs),
              audio: "program_audio"
            }
          end
        ]
      )
      |> get_child(:sink)
    ]

    {[spec: {structure, log_metadata: Logger.metadata()}], state}
  end

  @impl true
  def handle_child_notification(_msg, _child, _ctx, state) do
    {[], state}
  end

  @impl true
  def handle_element_end_of_stream(:sink, :input, _ctx, state) do
    {[terminate: :normal], state}
  end

  @impl true
  def handle_element_end_of_stream(_element, _pad, _ctx, state) do
    {[], state}
  end
end

{:ok, _supervisor, pipeline_pid} = Membrane.Pipeline.start_link(Membrane.Demo.HLSIn, [])
ref = Process.monitor(pipeline_pid)
:ok = receive do
  {:DOWN, ^ref, :process, ^pipeline_pid, _reason} -> :ok
end

Unfortunately, I believe that membrane_hls_plugin is now producing buffers which are incompatible with membrane_mpeg_ts_plugin's demuxer - @dmorn could you provide us with some insight on how the stream produced by Membrane.HLS.Source is now expected to be demuxed?

@dmorn
Copy link
Member

dmorn commented Dec 18, 2024

Hi people. We'll publish this library with everything fixed on hex soon. The hls sink side is also now battle tested. Stay tuned 😉✌️

@grant-zukowski-xumo
Copy link
Author

@varsill thank you for the updated version. I'll try this with my source and see if I can get it to work with the pinned version.

@grant-zukowski-xumo
Copy link
Author

Using a real hls stream, and reading it with Req (not sure that was the way to do it, but it gives me the master), the example you posted above runs, but it seems to stop. I've confirmed that my input stream is still running.


18:45:18.997 [debug] <0.340.0>/:audio_parser Sending stream format through pad :output
Stream format: %Membrane.AAC{sample_rate: 48000, channels: 2, profile: :LC, mpeg_version: 4, samples_per_frame: 1024, frames_per_buffer: 1, encapsulation: :none, config: {:esds, <<3, 128, 128, 128, 34, 0, 1, 0, 4, 128, 128, 128, 20, 64, 21, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 128, 128, 128, 2, 17, 144, 6, 128, 128, 128, 1, 2>>}}


18:45:19.001 [debug] <0.340.0>/{:parser, :hd} Sending stream format through pad :output
Stream format: %Membrane.H264{width: 256, height: 144, profile: :high, alignment: :au, nalu_in_metadata?: true, framerate: nil, stream_structure: {:avc1, <<1, 100, 0, 12, 255, 225, 0, 29, 103, 100, 0, 12, 172, 217, 65, 1, 58, 16, 0, 0, 62, 144, 0, 14, 166, 6, 102, 7, 208, 11, 186, 76, 48, 7, 138, 20, 203, 1, 0, 6, 104, ...>>}}


18:45:19.012 [debug] <0.340.0>/:sink/{:muxer, "video_720p"} Sending stream format through pad {Membrane.Pad, :output, #Reference<0.3131235685.3380609032.54114>}
Stream format: %Membrane.CMAF.Track{content_type: :video, header: <<0, 0, 0, 24, 102, 116, 121, 112, 105, 115, 111, 53, 0, 0, 2, 0, 105, 115, 111, 54, 109, 112, 52, 49, 0, 0, 2, 141, 109, 111, 111, 118, 0, 0, 0, 108, 109, 118, 104, 100, 0, 0, 0, 0, 0, 0, 0, 0, ...>>, resolution: {256, 144}, codecs: %{avc1: %{profile: 100, level: 12, compatibility: 0}}}


18:45:19.013 [debug] <0.340.0>/:sink/{:muxer, "audio_128k"} Sending stream format through pad {Membrane.Pad, :output, #Reference<0.3131235685.3380609031.53363>}
Stream format: %Membrane.CMAF.Track{content_type: :audio, header: <<0, 0, 0, 24, 102, 116, 121, 112, 105, 115, 111, 53, 0, 0, 2, 0, 105, 115, 111, 54, 109, 112, 52, 49, 0, 0, 2, 68, 109, 111, 111, 118, 0, 0, 0, 108, 109, 118, 104, 100, 0, 0, 0, 0, 0, 0, 0, 0, ...>>, resolution: nil, codecs: %{mp4a: %{channels: 2, aot_id: 2, frequency: 48000}}}


18:45:23.920 [warning] PES filtering enabled. Following streams [256, 257]

I do get some files, but none of them will play, and they don't keep populating.
Screenshot 2024-12-19 at 6 48 18 PM

I'd like to take the highest variant, then make a ladder from that. But for now just outputting in HLS or any format that I could see or that would work would be a great start for an example file.

As recommended in membrane_core, I want to use this output.

# hls egress
    {:membrane_http_adaptive_stream_plugin, "~> 0.18.6"},  

@varsill
Copy link
Contributor

varsill commented Dec 20, 2024

@grant-zukowski-xumo thanks for an update, could you show me how did you connect the output of the HLS.Source with input of the HLS Sink from the {:membrane_http_adaptive_stream_plugin, "~> 0.18.6"}?

The thing is that you need to somehow demux the content of input .ts segments and later mux the stream into CMAF container before attaching it to the HLS.Sink output.
I've encountered a problem with the demuxing phase as there is some mismatch between the format of buffers produced by the HLS.Source and required input format of the MPEG.TS.Demuxer.

@grant-zukowski-xumo
Copy link
Author


11:25:00.683 [warning] retry: got exception, will retry in 1000ms, 3 attempts left

11:25:00.684 [warning] ** (Req.TransportError) socket closed

11:25:06.692 [warning] retry: got exception, will retry in 2000ms, 2 attempts left

11:25:06.692 [warning] ** (Req.TransportError) timeout

11:25:16.413 [warning] retry: got exception, will retry in 4000ms, 1 attempt left

11:25:16.413 [warning] ** (Req.TransportError) timeout

11:25:25.446 [error] GenServer #PID<0.362.0> terminating
** (Req.TransportError) timeout
    (req 0.5.8) lib/req.ex:1108: Req.request!/2
    iex:8: Membrane.HLS.Reader.My.Reader.read/3
    (membrane_hls_plugin 0.1.0) lib/membrane/hls/source.ex:49: anonymous fn/2 in Membrane.HLS.Source.handle_pad_added/3
    (kim_hls 0.1.0) lib/hls/playlist/media/tracker.ex:58: HLS.Playlist.Media.Tracker.read_media_playlist/2
    (kim_hls 0.1.0) lib/hls/playlist/media/tracker.ex:64: HLS.Playlist.Media.Tracker.handle_refresh/2
    (stdlib 5.2.3) gen_server.erl:1095: :gen_server.try_handle_info/3
    (stdlib 5.2.3) gen_server.erl:1183: :gen_server.handle_msg/6
    (stdlib 5.2.3) proc_lib.erl:241: :proc_lib.init_p_do_apply/3

There was more output that I didn't realize was produced

@grant-zukowski-xumo thanks for an update, could you show me how did you connect the output of the HLS.Source with input of the HLS Sink from the {:membrane_http_adaptive_stream_plugin, "~> 0.18.6"}?

I'm using exactly the same code that you posted.

Also, I'm not sure if there is something wrong with my reader.

defimpl Membrane.HLS.Reader, for: My.Reader do
  @impl true
  def read(_, path, _) do
    %Req.Response{status: 200, body: body} = Req.get!(path)
    {:ok, body}
  end

  @impl true
  def exists?(_, %URI{path: path}) do
    case File.stat(path) do
      {:ok, _} -> true
      {:error, _} -> false
    end
  end
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants