Skip to content

Commit

Permalink
Merge pull request #57 from membraneframework/forward_timestamps
Browse files Browse the repository at this point in the history
Forward timestamps
  • Loading branch information
bartkrak authored Apr 3, 2024
2 parents dd8e90d + 1751941 commit 47f76bd
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ It is a part of [Membrane Multimedia Framework](https://membrane.stream).
Add the following line to your `deps` in `mix.exs`. Run `mix deps.get`.

```elixir
{:membrane_ffmpeg_swresample_plugin, "~> 0.19.2"}
{:membrane_ffmpeg_swresample_plugin, "~> 0.20.0"}
```

The precompiled builds of the [ffmpeg](https://www.ffmpeg.org) will be pulled and linked automatically. However, should there be any problems, consider installing it manually.
Expand Down
44 changes: 38 additions & 6 deletions lib/membrane_ffmpeg_swresample_plugin/converter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ defmodule Membrane.FFmpeg.SWResample.Converter do
|> Map.merge(%{
native: nil,
queue: <<>>,
input_stream_format_provided?: options.input_stream_format != nil
input_stream_format_provided?: options.input_stream_format != nil,
pts_queue: []
})

{[], state}
Expand Down Expand Up @@ -122,16 +123,29 @@ defmodule Membrane.FFmpeg.SWResample.Converter do
end

@impl true
def handle_buffer(:input, %Buffer{payload: payload}, _ctx, state) do
def handle_buffer(:input, %Buffer{payload: payload, pts: input_pts}, _ctx, state) do
input_frame_size = RawAudio.frame_size(state.input_stream_format)
output_frame_size = RawAudio.frame_size(state.output_stream_format)

expected_output_frames_count =
(byte_size(payload) * output_frame_size / (input_frame_size * input_frame_size)) |> round()

state =
Map.update!(state, :pts_queue, fn pts_queue ->
pts_queue ++ [{input_pts, expected_output_frames_count}]
end)

conversion_result =
convert!(state.native, RawAudio.frame_size(state.input_stream_format), payload, state.queue)
convert!(state.native, input_frame_size, payload, state.queue)

case conversion_result do
{<<>>, queue} ->
{[], %{state | queue: queue}}

{converted, queue} ->
{[buffer: {:output, %Buffer{payload: converted}}], %{state | queue: queue}}
{state, out_pts} = update_pts_queue(state, byte_size(converted) / output_frame_size)

{[buffer: {:output, %Buffer{payload: converted, pts: out_pts}}], %{state | queue: queue}}
end
end

Expand All @@ -154,8 +168,15 @@ defmodule Membrane.FFmpeg.SWResample.Converter do
{[end_of_stream: :output], %{state | queue: <<>>}}

converted ->
{[buffer: {:output, %Buffer{payload: converted}}, end_of_stream: :output],
%{state | queue: <<>>}}
converted_frames_count =
byte_size(converted) / RawAudio.frame_size(state.output_stream_format)

{state, out_pts} = update_pts_queue(state, converted_frames_count)

{[
buffer: {:output, %Buffer{payload: converted, pts: out_pts}},
end_of_stream: :output
], %{state | queue: <<>>}}
end
end

Expand Down Expand Up @@ -212,4 +233,15 @@ defmodule Membrane.FFmpeg.SWResample.Converter do
{:error, reason} -> raise "Error while flushing converter: #{inspect(reason)}"
end
end

defp update_pts_queue(state, converted_frames_count) do
[{out_pts, expected_frames} | rest] = state.pts_queue

if converted_frames_count < expected_frames do
{%{state | pts_queue: [{out_pts, expected_frames - converted_frames_count}] ++ rest},
out_pts}
else
{%{state | pts_queue: rest}, out_pts}
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Membrane.FFmpeg.SWResample.Mixfile do
use Mix.Project

@github_url "https://github.com/membraneframework/membrane_ffmpeg_swresample_plugin"
@version "0.19.2"
@version "0.20.0"

def project do
[
Expand Down
1 change: 1 addition & 0 deletions test/fixtures/input_s16le_stereo_16khz.raw

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions test/membrane_ffmpeg_swresample_plugin/converter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ defmodule Membrane.FFmpeg.SWResample.ConverterTest do
output_stream_format: @u8_format,
frames_per_buffer: 2048,
native: nil,
queue: <<>>
queue: <<>>,
pts_queue: []
}
}
end
Expand Down Expand Up @@ -156,7 +157,7 @@ defmodule Membrane.FFmpeg.SWResample.ConverterTest do

assert {[], new_state} = @module.handle_buffer(:input, buffer, nil, state)

assert new_state == %{state | queue: payload}
assert %{new_state | pts_queue: nil} == %{state | queue: payload, pts_queue: nil}
refute_called(@native, :convert)
end

Expand Down
66 changes: 66 additions & 0 deletions test/membrane_ffmpeg_swresample_plugin/pts_forward_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
defmodule Membrane.FFmpeg.SWResample.PtsForwardTest do
use ExUnit.Case

import Membrane.ChildrenSpec
import Membrane.Testing.Assertions

alias Membrane.FFmpeg.SWResample.Converter
alias Membrane.{RawAudio, Testing}

@pts_multiplier 31_250_000

test "pts forward test" do
input_stream_format = %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 2}
output_stream_format = %RawAudio{sample_format: :s32le, sample_rate: 32_000, channels: 2}

# 32 frames * 2048 bytes
fixture_path = "test/fixtures/input_s16le_stereo_16khz.raw"

spec = [
child(:source, %Membrane.Testing.Source{output: buffers_from_file(fixture_path)})
|> child(:resampler, %Converter{
input_stream_format: input_stream_format,
output_stream_format: output_stream_format
})
|> child(:sink, Membrane.Testing.Sink)
]

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)
assert_sink_buffer(pipeline, :sink, _buffer)

Enum.each(0..30, fn index ->
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: out_pts})
assert out_pts == index * @pts_multiplier
end)

assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: last_out_pts})
assert last_out_pts == 30 * @pts_multiplier

assert_end_of_stream(pipeline, :sink)
Testing.Pipeline.terminate(pipeline)
end

defp buffers_from_file(path) do
binary = File.read!(path)

split_binary(binary)
|> Enum.with_index()
|> Enum.map(fn {payload, index} ->
%Membrane.Buffer{
payload: payload,
pts: index * @pts_multiplier
}
end)
end

@spec split_binary(binary(), list(binary())) :: list(binary())
def split_binary(binary, acc \\ [])

def split_binary(<<binary::binary-size(2048), rest::binary>>, acc) do
split_binary(rest, [binary] ++ acc)
end

def split_binary(rest, acc) when byte_size(rest) <= 2048 do
Enum.reverse(acc) ++ [rest]
end
end

0 comments on commit 47f76bd

Please sign in to comment.