Skip to content

Commit

Permalink
Terminate engine on room termination. Move request handler managment …
Browse files Browse the repository at this point in the history
…to room (#105)
  • Loading branch information
Karolk99 authored Oct 12, 2023
1 parent 63783c1 commit 3c8c8bb
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 4 deletions.
4 changes: 1 addition & 3 deletions lib/jellyfish/component/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Jellyfish.Component.HLS do

@behaviour Jellyfish.Endpoint.Config

alias Jellyfish.Component.HLS.{LLStorage, RequestHandler, Storage}
alias Jellyfish.Component.HLS.{LLStorage, Storage}
alias Jellyfish.Room

alias JellyfishWeb.ApiSpec.Component.HLS.Options
Expand Down Expand Up @@ -78,8 +78,6 @@ defmodule Jellyfish.Component.HLS do
end

defp setup_hls_storage(room_id, low_latency?: true) do
RequestHandler.start(room_id)

fn directory -> %LLStorage{directory: directory, room_id: room_id} end
end

Expand Down
23 changes: 22 additions & 1 deletion lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ defmodule Jellyfish.Room do
with :ok <- check_component_allowed(component_type, state),
{:ok, component} <- Component.new(component_type, options) do
state = put_in(state, [:components, component.id], component)

maybe_spawn_hls_processes(state.id, component.metadata)
:ok = Engine.add_endpoint(state.engine_pid, component.engine_endpoint, id: component.id)

Logger.info("Added component #{inspect(component.id)}")
Expand Down Expand Up @@ -366,6 +366,16 @@ defmodule Jellyfish.Room do
{:noreply, state}
end

@impl true
def terminate(_reason, %{engine_pid: engine_pid} = state) do
Engine.terminate(engine_pid, asynchronous?: true, timeout: 10_000)

hls_component = hls_component(state)
unless is_nil(hls_component), do: remove_hls_processes(state.id, hls_component.metadata)

:ok
end

defp new(id, max_peers, video_codec) do
rtc_engine_options = [
id: id
Expand Down Expand Up @@ -404,6 +414,17 @@ defmodule Jellyfish.Room do
}
end

defp hls_component(%{components: components}),
do:
Enum.find_value(components, fn {_id, component} ->
if component.type == Component.HLS, do: component
end)

defp maybe_spawn_hls_processes(room_id, %{low_latency: true}),
do: Component.HLS.RequestHandler.start(room_id)

defp maybe_spawn_hls_processes(_room_id, _metadata), do: nil

defp remove_hls_processes(room_id, %{low_latency: true}),
do: Component.HLS.RequestHandler.stop(room_id)

Expand Down
9 changes: 9 additions & 0 deletions test/jellyfish_web/controllers/component_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,19 @@ defmodule JellyfishWeb.ComponentControllerTest do
assert Process.alive?(request_handler)
Process.monitor(request_handler)

{:ok, %{engine_pid: engine_pid}} = Jellyfish.RoomService.get_room(room_id)
assert Process.alive?(request_handler)
Process.monitor(engine_pid)

room_conn = delete(conn, ~p"/room/#{room_id}")
assert response(room_conn, :no_content)

# Engine can terminate up to around 5 seconds
# Hls endpoint tries to process all streams to the end before termination
# It has 5 seconds for it
assert_receive {:DOWN, _ref, :process, ^engine_pid, :normal}, 10_000
assert_receive {:DOWN, _ref, :process, ^request_handler, :normal}

assert Registry.lookup(Jellyfish.RequestHandlerRegistry, room_id) |> Enum.empty?()
end

Expand Down
17 changes: 17 additions & 0 deletions test/jellyfish_web/controllers/room_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,21 @@ defmodule JellyfishWeb.RoomControllerTest do
setup [:create_room]

test "deletes chosen room", %{conn: conn, room_id: room_id} do
room_pid = RoomService.find_room!(room_id)
%{engine_pid: engine_pid} = :sys.get_state(room_pid)

assert Process.alive?(room_pid)
assert Process.alive?(engine_pid)

Process.monitor(room_pid)
Process.monitor(engine_pid)

conn = delete(conn, ~p"/room/#{room_id}")
assert response(conn, :no_content)

assert_receive({:DOWN, _ref, :process, ^room_pid, :normal})
assert_receive({:DOWN, _ref, :process, ^engine_pid, :normal})

conn = get(conn, ~p"/room/#{room_id}")
assert json_response(conn, :not_found) == %{"errors" => "Room #{room_id} does not exist"}
end
Expand All @@ -132,12 +144,17 @@ defmodule JellyfishWeb.RoomControllerTest do
%{room_id: room2_id} = create_room(state)

room_pid = RoomService.find_room!(room_id)
%{engine_pid: engine_pid} = :sys.get_state(room_pid)

assert Process.alive?(engine_pid)
Process.monitor(engine_pid)

:erlang.trace(Process.whereis(RoomService), true, [:receive])

assert true = Process.exit(room_pid, :error)

assert_receive({:trace, _pid, :receive, {:DOWN, _ref, :process, ^room_pid, :error}})
assert_receive({:DOWN, _ref, :process, ^engine_pid, :error})

# Shouldn't throw an error as in ets should be only living processes
rooms = RoomService.list_rooms()
Expand Down

0 comments on commit 3c8c8bb

Please sign in to comment.