Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect diamonds #909

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
12 changes: 12 additions & 0 deletions lib/membrane/core/bin/diamond_detection_controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# defmodule Membrane.Core.Bin.DiamondDetectionController do
# @moduledoc false

# require Membrane.Core.Message, as: Message

# alias Membrane.Core.Bin.State

# @spec trigger_diamond_detection(State.t()) :: :ok
# def trigger_diamond_detection(state) do
# Message.send(state.parent_pid, :trigger_diamond_detection)
# end
# end
99 changes: 99 additions & 0 deletions lib/membrane/core/diamond_detector.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# defmodule Membrane.Core.DiamondDetector do
# use GenServer

# require Membrane.Core.Message, as: Message
# require Membrane.Core.Utils, as: Utils

# @timer_interval 100

# defmodule State do
# use Bunch.Access

# defstruct output_pads: %{},
# input_pads: %{},
# elements_effective_flow_control: %{},
# updated_since_last_run?: false,
# timer_ref: nil
# end

# @impl GenServer
# def init(_arg) do
# Utils.log_on_error do
# {:ok, %State{}}
# end
# end

# @impl GenServer
# def handle_info(Message.new(:effective_flow_control_change, {efc, pid}), %State{} = state) do
# Utils.log_on_error do
# state = set_effective_flow_control(pid, efc, state)
# {:noreply, state}
# end
# end

# @impl GenServer
# def handle_info(Message.new(:new_element, pid), %State{} = state) do
# Utils.log_on_error do
# Process.monitor(pid)
# state = set_effective_flow_control(pid, :push, state)
# {:noreply, state}
# end
# end

# @impl GenServer
# def handle_info(Message.new(:new_link, {from, _from_flow_control, to, to_flow_control}), %State{} = state) do
# Utils.log_on_error do
# # if :push not in [from_flow_control, to_flow_control]
# state =
# %{state | updated_since_last_run?: true}
# |> Map.update!(:output_pads, &add_new_output_pad(&1, from, to, to_flow_control))
# |> Map.update!(:input_pads, &add_new_input_pad(&1, from, to, to_flow_control))
# |> ensure_timer_running()

# {:noreply, state}
# end
# end

# @impl GenServer
# def handle_info(Message.new(:delete_link, {from, to, to_flow_control}), %State{} = state) do
# state
# |> Map.update!(:output_pads, )

# end

# @impl GenServer
# def handle_info(:do_algorithm, %State{} = state) do
# Utils.log_on_error do
# # do_algorithm
# {:noreply, %{state | timer_ref: nil, updated_since_last_run?: false}}
# end
# end

# defp set_effective_flow_control(pid, efc, %State{} = state) do
# %{state | updated_since_last_run?: true}
# |> Map.update!(:elements_effective_flow_control, &Map.put(&1, pid, efc))
# |> ensure_timer_running()
# end

# defp ensure_timer_running(%State{timer_ref: nil} = state) do
# timer_ref = self() |> Process.send_after(:do_algorithm, @timer_interval)
# %{state | timer_ref: timer_ref}
# end

# defp ensure_timer_running(state), do: state

# defp add_new_output_pad(output_pads, from, to, to_flow_control) do
# entry = {to, to_flow_control}

# output_pads
# |> Map.update(from, [entry], &[entry | &1])
# end

# defp add_new_input_pad(input_pads, from, to, to_flow_control) do
# entry = {from, to_flow_control}

# input_pads
# |> Map.update(to, [entry], &1[entry | &1])
# end

# end
40 changes: 40 additions & 0 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule Membrane.Core.Element do
alias Membrane.Core.Element.{
BufferController,
DemandController,
DiamondDetectionController,
EffectiveFlowController,
EventController,
LifecycleController,
Expand Down Expand Up @@ -281,6 +282,45 @@ defmodule Membrane.Core.Element do
{:noreply, state}
end

defp do_handle_info(Message.new(:start_diamond_detection), state) do
:ok = DiamondDetectionController.start_diamond_detection(state)
{:noreply, state}
end

defp do_handle_info(
Message.new(:diamond_detection, [diamond_detection_ref, diamond_detection_path]),
state
) do
state =
DiamondDetectionController.continue_diamond_detection(
diamond_detection_ref,
diamond_detection_path,
state
)

{:noreply, state}
end

defp do_handle_info(Message.new(:delete_diamond_detection_ref, diamond_detection_ref), state) do
state = DiamondDetectionController.delete_diamond_detection_ref(diamond_detection_ref, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:start_diamond_detection_trigger, trigger_ref), state) do
state = DiamondDetectionController.start_diamond_detection_trigger(trigger_ref, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:diamond_detection_trigger, trigger_ref), state) do
state = DiamondDetectionController.handle_diamond_detection_trigger(trigger_ref, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:delete_diamond_detection_trigger_ref, trigger_ref), state) do
state = DiamondDetectionController.delete_diamond_detection_trigger_ref(trigger_ref, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:terminate), state) do
state = LifecycleController.handle_terminate_request(state)
{:noreply, state}
Expand Down
156 changes: 156 additions & 0 deletions lib/membrane/core/element/diamond_detection_controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
defmodule Membrane.Core.Element.DiamondDetectionController do
@moduledoc false

require Membrane.Core.Message, as: Message
require Membrane.Pad, as: Pad

alias Membrane.Child
alias Membrane.Core.Element.State
alias Membrane.Element.PadData

# TODO: don't forward diamond detection and triggers in endpoints

@type diamond_detection_path :: [{pid(), Child.name(), Pad.ref()}]

@spec start_diamond_detection(State.t()) :: :ok
def start_diamond_detection(state) do
make_ref()
|> forward_diamond_detection([], state)
end

@spec continue_diamond_detection(reference(), diamond_detection_path(), State.t()) :: State.t()
def continue_diamond_detection(diamond_detection_ref, diamond_detecton_path, state) do
cond do
not is_map_key(state.diamond_detection_ref_to_path, diamond_detection_ref) ->
:ok = forward_diamond_detection(diamond_detection_ref, diamond_detecton_path, state)

:ok =
Message.new(:delete_diamond_detection_ref, diamond_detection_ref)
|> send_after_to_self()

state
|> put_in(
[:diamond_detection_ref_to_path, diamond_detection_ref],
diamond_detecton_path
)

has_cycle?(diamond_detecton_path) ->
state

true ->
# todo: log diamond
state
end
end

@spec delete_diamond_detection_ref(reference(), State.t()) :: State.t()
def delete_diamond_detection_ref(diamond_detection_ref, state) do
state
|> Map.update!(
:diamond_detection_ref_to_path,
&Map.delete(&1, diamond_detection_ref)
)
end

@spec forward_diamond_detection(reference(), diamond_detection_path(), State.t()) :: :ok
defp forward_diamond_detection(diamond_detection_ref, diamond_detection_path, state) do
auto_pull_mode? = state.effective_flow_control == :pull

state.pads_data
|> Enum.each(fn {pad_ref, pad_data} ->
if is_output_pull_pad(pad_data, auto_pull_mode?) do
name = state.name
# name = Membrane.ComponentPath.get()
diamond_detection_path = [{self(), name, pad_ref} | diamond_detection_path]

Message.send(
pad_data.pid,
:diamond_detection,
[diamond_detection_ref, diamond_detection_path]
)
end
end)
end

defp forward_diamond_detection_trigger(trigger_ref, state) do
state.pads_data
|> Enum.each(fn {_pad_ref, %PadData{} = pad_data} ->
if pad_data.direction == :input and pad_data.flow_control != :push do
Message.send(pad_data.pid, :diamond_detection_trigger, trigger_ref)
end
end)
end

defp is_output_pull_pad(%PadData{} = pad_data, auto_pull_mode?) do
pad_data.direction == :output and
(pad_data.flow_control == :manual or
(pad_data.flow_control == :auto and auto_pull_mode?))
end

defp has_cycle?(diamond_detection_path) do
uniq_length = diamond_detection_path |> Enum.uniq() |> length()
uniq_length < length(diamond_detection_path)
end

def start_diamond_detection_trigger(spec_ref, state) do
if map_size(state.pads_data) < 2 or
MapSet.member?(state.diamond_detection_trigger_refs, spec_ref) do
state
else
do_handle_diamond_detection_trigger(spec_ref, state)
end
end

def handle_diamond_detection_trigger(trigger_ref, %State{} = state) do
if state.type == :endpoint or
MapSet.member?(state.diamond_detection_trigger_refs, trigger_ref),
do: state,
else: do_handle_diamond_detection_trigger(trigger_ref, state)
end

defp do_handle_diamond_detection_trigger(trigger_ref, %State{} = state) do
state =
state
|> Map.update!(:diamond_detection_trigger_refs, &MapSet.put(&1, trigger_ref))

:ok =
Message.new(:delete_diamond_detection_trigger_ref, trigger_ref)
|> send_after_to_self()

:ok = forward_diamond_detection_trigger(trigger_ref, state)

if output_pull_arity(state) >= 2,
do: postpone_diamond_detection(state),
else: state
end

defp postpone_diamond_detection(%State{} = state) when state.diamond_detection_postponed? do
state
end

defp postpone_diamond_detection(%State{} = state) do
:ok =
Message.new(:start_diamond_detection)
|> send_after_to_self(1)

%{state | diamond_detection_postponed?: true}
end

def delete_diamond_detection_trigger_ref(trigger_ref, state) do
state
|> Map.update!(:diamond_detection_trigger_refs, &MapSet.delete(&1, trigger_ref))
end

defp output_pull_arity(state) do
auto_pull_mode? = state.effective_flow_control == :pull

state.pads_data
|> Enum.count(fn {_pad_ref, pad_data} -> is_output_pull_pad(pad_data, auto_pull_mode?) end)
end

defp send_after_to_self(message, seconds \\ 10) do
send_after_time = Membrane.Time.seconds(seconds) |> Membrane.Time.as_milliseconds(:round)
self() |> Process.send_after(message, send_after_time)
:ok
end
end
12 changes: 10 additions & 2 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Membrane.Core.Element.State do

alias Membrane.{Clock, Element, Pad, Sync}
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Element.EffectiveFlowController
alias Membrane.Core.Element.{DiamondDetectionController, EffectiveFlowController}
alias Membrane.Core.Timer

require Membrane.Pad
Expand Down Expand Up @@ -46,7 +46,12 @@ defmodule Membrane.Core.Element.State do
stalker: Membrane.Core.Stalker.t(),
satisfied_auto_output_pads: MapSet.t(),
awaiting_auto_input_pads: MapSet.t(),
resume_delayed_demands_loop_in_mailbox?: boolean()
resume_delayed_demands_loop_in_mailbox?: boolean(),
diamond_detection_ref_to_path: %{
optional(reference()) => DiamondDetectionController.diamond_detection_path()
},
diamond_detection_trigger_refs: MapSet.t(reference()),
diamond_detection_postponed?: boolean()
}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand Down Expand Up @@ -79,6 +84,9 @@ defmodule Membrane.Core.Element.State do
handle_demand_loop_counter: 0,
pads_to_snapshot: MapSet.new(),
playback_queue: [],
diamond_detection_ref_to_path: %{},
diamond_detection_trigger_refs: MapSet.new(),
diamond_detection_postponed?: false,
pads_data: %{},
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
Expand Down
10 changes: 9 additions & 1 deletion lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
ChildEntryParser,
ChildrenModel,
ClockHandler,
DiamondDetectionController,
Link,
SpecificationParser
}
Expand Down Expand Up @@ -425,7 +426,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end
end

defp do_proceed_spec_startup(_spec_ref, %{status: :ready} = spec_data, state) do
defp do_proceed_spec_startup(spec_ref, %{status: :ready} = spec_data, state) do
state =
Enum.reduce(spec_data.children_names, state, fn child, state ->
%{pid: pid, terminating?: terminating?} = state.children[child]
Expand All @@ -439,6 +440,13 @@ defmodule Membrane.Core.Parent.ChildLifeController do
put_in(state.children[child].ready?, true)
end)

spec_data.links_ids
|> Enum.map(&state.links[&1].from.child)
|> Enum.uniq()
|> Enum.each(fn child_name ->
DiamondDetectionController.start_diamond_detection_trigger(child_name, spec_ref, state)
end)

state =
with %{playback: :playing} <- state do
handle_children_playing(spec_data.children_names, state)
Expand Down
Loading
Loading