Skip to content

Commit

Permalink
implement backoff for reconnect
Browse files Browse the repository at this point in the history
The current reconnect feature doesn't support backoff, so it's not
really useful since spamming reconnects is not good behaviour.
This introduces a backoff feature, taking inspiration from
https://hex.pm/packages/connection.

This was meant to be a small change, however with the existing
async and handle_initial_conn_failure options, the solution turned out
to be incoherent and also uncovered existing bugs with reconnect during
init that could indefinitely block the supervisor startup process.
Additionally, I couldn't fix a lot of the existing tests without
introducing a new api to wait for a connection to be established.

As a result, this change makes breaking changes to the api, and
introduces some new apis to recover some previous functionality.
In particular, async init is now the default and only behaviour, ie.
a network connection is only made after finalizing the special process
init.

If you want to block code execution until a connection has been
established, you can use the new `await_status/2` api.
  • Loading branch information
Brendan Ball committed Jun 10, 2022
1 parent 5ff38da commit 28caf5c
Show file tree
Hide file tree
Showing 8 changed files with 404 additions and 348 deletions.
314 changes: 187 additions & 127 deletions lib/websockex.ex

Large diffs are not rendered by default.

20 changes: 5 additions & 15 deletions lib/websockex/errors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ defmodule WebSockex.RequestError do
defexception [:code, :message]

def message(%__MODULE__{code: code, message: message}) do
"Didn't get a proper response from the server. The response was: #{inspect(code)} #{
inspect(message)
}"
"Didn't get a proper response from the server. The response was: #{inspect(code)} #{inspect(message)}"
end
end

Expand Down Expand Up @@ -56,9 +54,7 @@ defmodule WebSockex.BadResponseError do
defexception [:response, :module, :function, :args]

def message(%__MODULE__{} = error) do
"Bad Response: Got #{inspect(error.response)} from #{
inspect(Exception.format_mfa(error.module, error.function, error.args))
}"
"Bad Response: Got #{inspect(error.response)} from #{inspect(Exception.format_mfa(error.module, error.function, error.args))}"
end
end

Expand All @@ -70,19 +66,15 @@ defmodule WebSockex.FrameError do
end

def message(%__MODULE__{reason: :control_frame_too_large} = exception) do
"Control Frame Too Large: Control Frames Can't Be Larger Than 125 Bytes\nbuffer: #{
exception.buffer
}"
"Control Frame Too Large: Control Frames Can't Be Larger Than 125 Bytes\nbuffer: #{exception.buffer}"
end

def message(%__MODULE__{reason: :invalid_utf8} = exception) do
"Invalid UTF-8: Text and Close frames must have UTF-8 payloads.\nbuffer: #{exception.buffer}"
end

def message(%__MODULE__{reason: :invalid_close_code} = exception) do
"Invalid Close Code: Close Codes must be in range of 1000 through 4999\nbuffer: #{
exception.buffer
}"
"Invalid Close Code: Close Codes must be in range of 1000 through 4999\nbuffer: #{exception.buffer}"
end

def message(%__MODULE__{} = exception) do
Expand All @@ -103,9 +95,7 @@ defmodule WebSockex.FrameEncodeError do
def message(%__MODULE__{reason: :close_code_out_of_range} = error) do
"""
Close Code Out of Range: Close code must be between 1000-4999.
Frame: {#{inspect(error.frame_type)}, #{inspect(error.close_code)}, #{
inspect(error.frame_payload)
}}
Frame: {#{inspect(error.frame_type)}, #{inspect(error.close_code)}, #{inspect(error.frame_payload)}}
"""
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/websockex/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ defmodule WebSockex.Utils do
IO.puts(io_dev, "*DBG* #{inspect(name)} sending frame: #{inspect(frame)}")
end

defp print_event(io_dev, :reconnect, %{name: name}) do
defp print_event(io_dev, :backoff, %{name: name}) do
IO.puts(io_dev, "*DBG* #{inspect(name)} attempting to reconnect")
end

Expand Down
38 changes: 28 additions & 10 deletions test/support/test_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ defmodule WebSockex.TestClient do
end
end

def cast(pid, message) do
WebSockex.cast(pid, message)
end

def handle_connect(_conn, %{connect_badreply: true}), do: :lemons
def handle_connect(_conn, %{connect_error: true}), do: raise("Connect Error")
def handle_connect(_conn, %{connect_exit: true}), do: exit("Connect Exit")
Expand Down Expand Up @@ -176,11 +180,6 @@ defmodule WebSockex.TestClient do
def handle_disconnect(_, %{disconnect_error: true}), do: raise("Disconnect Error")
def handle_disconnect(_, %{disconnect_exit: true}), do: exit("Disconnect Exit")

def handle_disconnect(_, %{catch_init_connect_failure: pid} = state) do
send(pid, :caught_initial_conn_failure)
{:ok, state}
end

def handle_disconnect(%{attempt_number: 3} = failure_map, %{multiple_reconnect: pid} = state) do
send(pid, {:stopping_retry, failure_map})
{:ok, state}
Expand All @@ -192,30 +191,49 @@ defmodule WebSockex.TestClient do
) do
send(pid, {:retry_connect, failure_map})
send(pid, {:check_retry_state, %{attempt: attempt, state: state}})
{:reconnect, Map.put(state, :attempt, attempt)}
{:backoff, 0, Map.put(state, :attempt, attempt)}
end

def handle_disconnect(_, %{change_conn_reconnect: pid, good_url: url} = state) do
uri = URI.parse(url)
conn = WebSockex.Conn.new(uri)
send(pid, :retry_change_conn)
{:reconnect, conn, state}
{:backoff, 0, conn, state}
end

def handle_disconnect(%{reason: {:local, 4985, _}}, state) do
{:reconnect, state}
{:backoff, 0, state}
end

def handle_disconnect(
%{reason: {:remote, :closed}},
%{catch_disconnect: pid, reconnect: true} = state
) do
send(pid, {:caught_disconnect, :reconnecting})
{:reconnect, state}
{:backoff, 0, state}
end

def handle_disconnect(_, %{reconnect: true} = state) do
{:reconnect, state}
{:backoff, 0, state}
end

def handle_disconnect(%{attempt_number: attempt}, %{signal_reconnect: pid} = state) do
send(pid, {:check_retry_state, %{attempt: attempt}})

receive do
:continue_reconnect -> :ok
end

{:backoff, 0, state}
end

def handle_disconnect(_, %{backoff: backoff, catch_disconnect: pid} = state) do
send(pid, :caught_disconnect)
{:backoff, backoff, state}
end

def handle_disconnect(_, %{backoff: backoff} = state) do
{:backoff, backoff, state}
end

def handle_disconnect(
Expand Down
14 changes: 4 additions & 10 deletions test/support/test_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule WebSockex.TestServer do

case Plug.Adapters.Cowboy.http(__MODULE__, [], opts) do
{:ok, _} ->
{:ok, {ref, url}}
{:ok, {agent_pid, ref, url}}

{:error, :eaddrinuse} ->
start(pid)
Expand All @@ -48,7 +48,7 @@ defmodule WebSockex.TestServer do

case Plug.Adapters.Cowboy.https(__MODULE__, [], opts) do
{:ok, _} ->
{:ok, {ref, url}}
{:ok, {agent_pid, ref, url}}

{:error, :eaddrinuse} ->
require Logger
Expand All @@ -57,14 +57,8 @@ defmodule WebSockex.TestServer do
end
end

def new_conn_mode(socket_pid, mode) do
ref = make_ref()
send(socket_pid, {:mode, mode, ref, self()})

receive do
{^ref, :ok} ->
:ok
end
def new_conn_mode(server_pid, mode) do
Agent.update(server_pid, fn _ -> mode end)
end

def shutdown(ref) do
Expand Down
6 changes: 3 additions & 3 deletions test/websockex/conn_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule WebSockex.ConnTest do
use ExUnit.Case, async: true

setup do
{:ok, {server_ref, url}} = WebSockex.TestServer.start(self())
{:ok, {_, server_ref, url}} = WebSockex.TestServer.start(self())

on_exit(fn -> WebSockex.TestServer.shutdown(server_ref) end)

Expand Down Expand Up @@ -137,7 +137,7 @@ defmodule WebSockex.ConnTest do

describe "secure connection" do
setup do
{:ok, {server_ref, url}} = WebSockex.TestServer.start_https(self())
{:ok, {_, server_ref, url}} = WebSockex.TestServer.start_https(self())

on_exit(fn -> WebSockex.TestServer.shutdown(server_ref) end)

Expand Down Expand Up @@ -214,7 +214,7 @@ defmodule WebSockex.ConnTest do
end

test "works on wss connections" do
{:ok, {server_ref, url}} = WebSockex.TestServer.start_https(self())
{:ok, {_, server_ref, url}} = WebSockex.TestServer.start_https(self())
on_exit(fn -> WebSockex.TestServer.shutdown(server_ref) end)
uri = URI.parse(url)
conn = WebSockex.Conn.new(uri)
Expand Down
3 changes: 2 additions & 1 deletion test/websockex_telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ defmodule WebSockexTelemetryTest do
%{}
)

{:ok, {server_ref, url}} = WebSockex.TestServer.start(self())
{:ok, {_, server_ref, url}} = WebSockex.TestServer.start(self())

{:ok, pid} = TestClient.start(url, %{catch_text: self()})
TestClient.await_status(pid, :connected)

on_exit(fn ->
:telemetry.detach(handler_id)
Expand Down
Loading

0 comments on commit 28caf5c

Please sign in to comment.