Skip to content

Commit

Permalink
Merge pull request #64 from membraneframework/non-monotonic-pts-fix
Browse files Browse the repository at this point in the history
Opus encoder outputting non-monotonic PTS
  • Loading branch information
bartkrak authored Aug 29, 2024
2 parents 776facb + 637ac38 commit 4ad01f5
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 43 deletions.
4 changes: 4 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ workflows:
build:
jobs:
- elixir/build_test:
cache-version: 4
filters: &filters
tags:
only: /v.*/
- elixir/test:
cache-version: 4
filters:
<<: *filters
- elixir/lint:
cache-version: 4
filters:
<<: *filters
- elixir/hex_publish:
cache-version: 4
requires:
- elixir/build_test
- elixir/test
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_opus_plugin` to your list of de
```elixir
def deps do
[
{:membrane_opus_plugin, "~> 0.20.2"}
{:membrane_opus_plugin, "~> 0.20.3"}
]
end
```
Expand Down
2 changes: 1 addition & 1 deletion bundlex.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Membrane.Opus.BundlexProject do

def project() do
[
natives: natives(Bundlex.platform())
natives: natives(Bundlex.get_target())
]
end

Expand Down
27 changes: 23 additions & 4 deletions lib/membrane_opus/encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ defmodule Membrane.Opus.Encoder do
require Membrane.Logger

alias __MODULE__.Native
alias Membrane.Buffer
alias Membrane.Opus
alias Membrane.Opus.Util
alias Membrane.RawAudio
alias Membrane.{Buffer, Opus, RawAudio, Time}

@allowed_channels [1, 2]
@allowed_applications [:voip, :audio, :low_delay]
Expand Down Expand Up @@ -173,7 +171,28 @@ defmodule Membrane.Opus.Encoder do
end

defp set_current_pts(%{queue: <<>>} = state, input_pts) do
%{state | current_pts: input_pts}
if state.current_pts != nil and input_pts != nil and state.current_pts > input_pts do
diff = state.current_pts - input_pts

cond do
diff > Time.milliseconds(100) ->
Membrane.Logger.warning(
"Expexted input buffer PTS to be #{state.current_pts}, got #{input_pts}, diff #{Time.pretty_duration(diff)}"
)

diff > Time.milliseconds(10) ->
Membrane.Logger.debug(
"Expexted input buffer PTS to be #{state.current_pts}, got #{input_pts}, diff #{Time.pretty_duration(diff)}"
)

true ->
:ok
end

state
else
%{state | current_pts: input_pts}
end
end

defp set_current_pts(state, _input_pts), do: state
Expand Down
7 changes: 4 additions & 3 deletions lib/membrane_opus/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Membrane.Opus.Parser do
alias Membrane.Opus.Util

@type delimitation_t :: :delimit | :undelimit | :keep
@dialyzer {:nowarn_function, maybe_parse: 5}

def_options delimitation: [
spec: delimitation_t(),
Expand Down Expand Up @@ -105,7 +106,9 @@ defmodule Membrane.Opus.Parser do
set_current_pts(state, input_pts)
)

if check_pts_integrity? do
packets_len = length(packets)

if check_pts_integrity? and packets_len > 0 do
Util.validate_pts_integrity(packets, input_pts)
end

Expand All @@ -114,8 +117,6 @@ defmodule Membrane.Opus.Parser do
channels: channels
}

packets_len = length(packets)

packet_actions =
cond do
packets_len > 0 and stream_format != ctx.pads.output.stream_format ->
Expand Down
35 changes: 26 additions & 9 deletions lib/membrane_opus/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Membrane.Opus.Util do
# Miscellaneous utility functions
import Membrane.Time
require Membrane.Logger
alias Membrane.Buffer

@spec parse_toc_byte(data :: binary) ::
{:ok, config_number :: 0..31, stereo_flag :: 0..1, frame_packing :: 0..3} | :error
Expand Down Expand Up @@ -69,17 +70,33 @@ defmodule Membrane.Opus.Util do

@spec validate_pts_integrity([Membrane.Buffer.t()], integer()) :: :ok
def validate_pts_integrity(packets, input_pts) do
cond do
length(packets) < 2 or Enum.at(packets, 1).pts == input_pts ->
:ok
%Buffer{pts: output_pts} = List.first(packets)

Enum.at(packets, 1).pts > input_pts ->
Membrane.Logger.warning("PTS values are overlapping")
:ok
if output_pts == nil or input_pts == nil do
:ok
else
# Opus encoder and parser output each frame with pts = previous_pts + frame duration. If the first two input frames have pts = 0,
# the first one will be outputted with pts = 0 and the next with pts = 20000000 (20 ms), making output pts "overtake" input pts by one frame length.
# Over time, as Opus receives more frames, each with arbitrary length, and tries to output exactly 20 ms long frames,
# it buffers some data before outputting it, allowing input pts to catch up. In effect, the difference between output and input pts oscillates
# between 2 ms and 22 ms. Warnings are emitted only if this difference exceeds 30 ms.

Enum.at(packets, 1).pts < input_pts ->
Membrane.Logger.warning("PTS values are not continous")
:ok
diff = output_pts - input_pts

epsilon = 30 |> milliseconds()

cond do
diff > epsilon ->
Membrane.Logger.warning(
"Input PTS value is lagging behind calculated output PTS more than expected"
)

diff < 0 ->
Membrane.Logger.warning("Input PTS value is overlapping output PTS")

true ->
:ok
end
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.Opus.Plugin.Mixfile do
use Mix.Project

@version "0.20.2"
@version "0.20.3"
@github_url "https://github.com/membraneframework/membrane_opus_plugin"

def project do
Expand Down
Loading

0 comments on commit 4ad01f5

Please sign in to comment.