diff --git a/lib/websockex.ex b/lib/websockex.ex index 1467a8f..fe06d86 100644 --- a/lib/websockex.ex +++ b/lib/websockex.ex @@ -98,10 +98,8 @@ defmodule WebSockex do """ @type option :: WebSockex.Conn.connection_option() - | {:async, boolean} | {:debug, debug_opts} | {:name, atom | {:global, term} | {:via, module, term}} - | {:handle_initial_conn_failure, boolean} @typedoc """ The reason a connection was closed. @@ -195,14 +193,15 @@ defmodule WebSockex do The possible returns for this callback are: - `{:ok, state}` will continue the process termination. - - `{:reconnect, state}` will attempt to reconnect instead of terminating. - - `{:reconnect, conn, state}` will attempt to reconnect with the connection + - `{:backoff, timeout, state}` will attempt to reconnect after timeout instead of terminating. + - `{:backoff, timeout, conn, state}` will attempt to reconnect after timeout with the connection data in `conn`. `conn` is expected to be a `t:WebSockex.Conn.t/0`. + """ @callback handle_disconnect(connection_status_map, state :: term) :: {:ok, new_state} - | {:reconnect, new_state} - | {:reconnect, new_conn :: WebSockex.Conn.t(), new_state} + | {:backoff, timeout(), new_state} + | {:backoff, timeout(), new_conn :: WebSockex.Conn.t(), new_state} when new_state: term @doc """ @@ -281,6 +280,11 @@ defmodule WebSockex do {:ok, state} end + @doc false + def await_status(conn, status, timeout \\ 5_000) do + WebSockex.await_status(conn, status, timeout) + end + @doc false def handle_frame(frame, _state) do raise "No handle_frame/2 clause in #{__MODULE__} provided for #{inspect(frame)}" @@ -424,11 +428,28 @@ defmodule WebSockex do def send_frame(client, frame) do try do - {:ok, res} = :gen.call(client, :"$websockex_send", frame) - res + :gen.call(client, :"$websockex_send", frame) catch - _, reason -> + :exit, reason -> exit({reason, {__MODULE__, :call, [client, frame]}}) + else + {:ok, res} -> res + end + end + + @doc """ + Waits for connection to reach specified status. + This is useful if you want to make sure that the websocket is connected before attempting to send messages + """ + @spec await_status(WebSockex.client(), :connected, timeout) :: :ok | {:error, term} + def await_status(client, status, timeout) do + try do + :gen.call(client, :"$websockex_await_status", status, timeout) + catch + :exit, reason -> + exit({reason, {__MODULE__, :await_status, [client, status]}}) + else + {:ok, res} -> res end end @@ -464,6 +485,10 @@ defmodule WebSockex do close_loop(reason, parent, debug, Map.delete(state, :connection_status)) end + def system_continue(parent, debug, %{connection_status: :backoff} = state) do + backoff_loop(parent, debug, Map.delete(state, :connection_status)) + end + @doc false @spec system_terminate(term, pid, any, any) :: no_return def system_terminate(reason, parent, debug, state) do @@ -543,38 +568,27 @@ defmodule WebSockex do # OTP stuffs debug = Utils.parse_debug_options(self(), opts) - reply_fun = - case Keyword.get(opts, :async, false) do - true -> - :proc_lib.init_ack(parent, {:ok, self()}) - &async_init_fun/1 - - false -> - &sync_init_fun(parent, &1) - end + :proc_lib.init_ack(parent, {:ok, self()}) state = %{ conn: conn, module: module, module_state: module_state, name: name, - reply_fun: reply_fun, buffer: <<>>, - fragment: nil + fragment: nil, + backoff: nil, + await_status_reqs: %{}, + attempt_number: 1 } - handle_conn_failure = Keyword.get(opts, :handle_initial_conn_failure, false) - case open_connection(parent, debug, state) do {:ok, new_state} -> debug = Utils.sys_debug(debug, :connected, state) - module_init(parent, debug, new_state) + connect(parent, debug, new_state) - {:error, error, new_state} when handle_conn_failure == true -> - init_conn_failure(error, parent, debug, new_state) - - {:error, error, _} -> - state.reply_fun.({:error, error}) + {:error, error, new_state} -> + on_disconnect(error, parent, debug, update_in(new_state.attempt_number, &(&1 + 1))) end end @@ -588,19 +602,16 @@ defmodule WebSockex do state = Map.put(state, :connection_status, :connecting) :sys.handle_system_msg(req, from, parent, __MODULE__, debug, state) + {:"$websockex_await_status", from, status} -> + state = await_status(status, from, parent, debug, state) + open_loop(parent, debug, state) + {:"$websockex_send", from, _frame} -> :gen.reply(from, {:error, %WebSockex.NotConnectedError{connection_state: :opening}}) open_loop(parent, debug, state) {:EXIT, ^parent, reason} -> - case state do - %{reply_fun: reply_fun} -> - reply_fun.(reason) - exit(reason) - - _ -> - terminate(reason, parent, debug, state) - end + terminate(reason, parent, debug, state) {^ref, {:ok, new_conn}} -> Process.demonitor(ref, [:flush]) @@ -618,6 +629,43 @@ defmodule WebSockex do end end + defp backoff_loop(parent, debug, state) do + %{backoff: backoff} = state + + receive do + {:system, from, req} -> + state = Map.put(state, :connection_status, :backoff) + :sys.handle_system_msg(req, from, parent, __MODULE__, debug, state) + + {:"$websockex_send", from, _frame} -> + :gen.reply(from, {:error, %WebSockex.NotConnectedError{connection_state: :backoff}}) + backoff_loop(parent, debug, state) + + {:"$websockex_cast", msg} -> + debug = Utils.sys_debug(debug, {:in, :cast, msg}, state) + common_handle({:handle_cast, msg}, parent, debug, state) + + {:"$websockex_await_status", from, status} -> + state = await_status(status, from, parent, debug, state) + backoff_loop(parent, debug, state) + + {:EXIT, ^parent, reason} -> + terminate(reason, parent, debug, state) + + {:timeout, ^backoff, :backoff} -> + state = %{state | backoff: nil} + + case open_connection(parent, debug, state) do + {:ok, new_state} -> + debug = Utils.sys_debug(debug, :connected, state) + connect(parent, debug, new_state) + + {:error, error, new_state} -> + on_disconnect(error, parent, debug, update_in(new_state.attempt_number, &(&1 + 1))) + end + end + end + defp websocket_loop(parent, debug, state) do case WebSockex.Frame.parse_frame(state.buffer) do {:ok, frame, buffer} -> @@ -640,6 +688,10 @@ defmodule WebSockex do debug = Utils.sys_debug(debug, {:in, :cast, msg}, state) common_handle({:handle_cast, msg}, parent, debug, state) + {:"$websockex_await_status", from, status} -> + state = await_status(status, from, parent, debug, state) + websocket_loop(parent, debug, state) + {:"$websockex_send", from, frame} -> sync_send(frame, from, parent, debug, state) @@ -666,6 +718,7 @@ defmodule WebSockex do defp close_loop(reason, parent, debug, %{conn: conn, timer_ref: timer_ref} = state) do transport = state.conn.transport socket = state.conn.socket + state = cancel_backoff(state) receive do {:system, from, req} -> @@ -678,6 +731,10 @@ defmodule WebSockex do {^transport, ^socket, _} -> close_loop(reason, parent, debug, state) + {:"$websockex_await_status", from, status} -> + state = await_status(status, from, parent, debug, state) + close_loop(reason, parent, debug, state) + {:"$websockex_send", from, _frame} -> :gen.reply(from, {:error, %WebSockex.NotConnectedError{connection_state: :closing}}) close_loop(reason, parent, debug, state) @@ -696,6 +753,52 @@ defmodule WebSockex do end end + defp await_status( + :connected = status, + from, + _parent, + _debug, + %{connection_status: status} = state + ) do + :gen.reply(from, :ok) + state + end + + defp await_status(:connected = status, from, _parent, _debug, state) do + update_in(state.await_status_reqs, fn reqs -> + Map.update(reqs, status, [from], &[from | &1]) + end) + end + + defp await_status(status, from, _parent, _debug, state) do + :gen.reply(from, {:error, :unsupported_status, status}) + state + end + + defp handle_status_change(:connected = new_status, _parent, _debug, state) do + execute_telemetry([:websockex, :connected], state) + state = %{state | attempt_number: 1} + + state = + case Map.get(state.await_status_reqs, new_status) do + nil -> + state + + reqs -> + Enum.each(reqs, &:gen.reply(&1, :ok)) + + update_in(state.await_status_reqs, fn reqs -> + Map.delete(reqs, new_status) + end) + end + + Map.put(state, :connection_status, new_status) + end + + defp handle_status_change(new_status, _parent, _debug, state) do + Map.put(state, :connection_status, new_status) + end + # Frame Handling defp handle_frame(:ping, parent, debug, state) do @@ -938,61 +1041,34 @@ defmodule WebSockex do # Connection Handling - defp init_conn_failure(reason, parent, debug, state, attempt \\ 1) do - case handle_disconnect(reason, state, attempt) do - {:ok, new_module_state} -> - init_failure(reason, parent, debug, %{state | module_state: new_module_state}) - - {:reconnect, new_conn, new_module_state} -> - state = %{state | conn: new_conn, module_state: new_module_state} - debug = Utils.sys_debug(debug, :reconnect, state) - - case open_connection(parent, debug, state) do - {:ok, new_state} -> - debug = Utils.sys_debug(debug, :connected, state) - module_init(parent, debug, new_state) - - {:error, new_reason, new_state} -> - init_conn_failure(new_reason, parent, debug, new_state, attempt + 1) - end - - {:"$EXIT", reason} -> - init_failure(reason, parent, debug, state) - end - end + defp on_disconnect(reason, parent, debug, state) do + state = handle_status_change(:disconnected, parent, debug, state) - defp on_disconnect(reason, parent, debug, state, attempt \\ 1) do - case handle_disconnect(reason, state, attempt) do + case handle_disconnect(reason, state) do {:ok, new_module_state} when is_tuple(reason) and elem(reason, 0) == :error -> terminate(elem(reason, 1), parent, debug, %{state | module_state: new_module_state}) {:ok, new_module_state} -> terminate(reason, parent, debug, %{state | module_state: new_module_state}) - {:reconnect, new_conn, new_module_state} -> - state = %{state | conn: new_conn, module_state: new_module_state} - debug = Utils.sys_debug(debug, :reconnect, state) - - case open_connection(parent, debug, state) do - {:ok, new_state} -> - debug = Utils.sys_debug(debug, :reconnected, state) - reconnect(parent, debug, new_state) - - {:error, new_reason, new_state} -> - on_disconnect(new_reason, parent, debug, new_state, attempt + 1) - end + {:backoff, backoff_timeout, new_conn, new_module_state} -> + backoff = start_backoff(backoff_timeout) + state = %{state | conn: new_conn, backoff: backoff, module_state: new_module_state} + state = handle_status_change(:backoff, parent, debug, state) + backoff_loop(parent, debug, state) {:"$EXIT", reason} -> terminate(reason, parent, debug, state) end end - defp reconnect(parent, debug, state) do + defp connect(parent, debug, state) do result = try_callback(state.module, :handle_connect, [state.conn, state.module_state]) case result do {:ok, new_module_state} -> state = Map.merge(state, %{buffer: <<>>, fragment: nil, module_state: new_module_state}) + state = handle_status_change(:connected, parent, debug, state) websocket_loop(parent, debug, state) {:"$EXIT", reason} -> @@ -1032,39 +1108,6 @@ defmodule WebSockex do end # Other State Functions - - defp module_init(parent, debug, state) do - execute_telemetry([:websockex, :connected], state) - - result = try_callback(state.module, :handle_connect, [state.conn, state.module_state]) - - case result do - {:ok, new_module_state} -> - state.reply_fun.({:ok, self()}) - - state = - Map.put(state, :module_state, new_module_state) - |> Map.delete(:reply_fun) - - websocket_loop(parent, debug, state) - - {:"$EXIT", reason} -> - state.reply_fun.(reason) - - badreply -> - reason = - {:error, - %WebSockex.BadResponseError{ - module: state.module, - function: :handle_connect, - args: [state.conn, state.module_state], - response: badreply - }} - - state.reply_fun.(reason) - end - end - @spec terminate(any, pid, any, any) :: no_return defp terminate(reason, parent, debug, state) do execute_telemetry([:websockex, :terminate], state, %{reason: reason}) @@ -1092,8 +1135,8 @@ defmodule WebSockex do end end - defp handle_disconnect(reason, state, attempt) do - status_map = %{conn: state.conn, reason: reason, attempt_number: attempt} + defp handle_disconnect(reason, state) do + status_map = %{conn: state.conn, reason: reason, attempt_number: state.attempt_number} execute_telemetry([:websockex, :disconnected], state, status_map) @@ -1103,11 +1146,11 @@ defmodule WebSockex do {:ok, new_state} -> {:ok, new_state} - {:reconnect, new_state} -> - {:reconnect, state.conn, new_state} + {:backoff, timeout, new_conn, new_state} -> + {:backoff, timeout, new_conn, new_state} - {:reconnect, new_conn, new_state} -> - {:reconnect, new_conn, new_state} + {:backoff, timeout, new_state} -> + {:backoff, timeout, state.conn, new_state} {:"$EXIT", _} = res -> res @@ -1137,21 +1180,6 @@ defmodule WebSockex do {:"$EXIT", payload} end - defp init_failure(reason, _parent, _debug, state) do - state.reply_fun.({:error, reason}) - end - - defp async_init_fun({:ok, _}), do: :noop - defp async_init_fun(exit_reason), do: exit(exit_reason) - - defp sync_init_fun(parent, {error, stacktrace}) when is_list(stacktrace) do - :proc_lib.init_ack(parent, {:error, error}) - end - - defp sync_init_fun(parent, reply) do - :proc_lib.init_ack(parent, reply) - end - defp validate_handshake(headers, key) do challenge = :crypto.hash(:sha, key <> @handshake_guid) |> Base.encode64() @@ -1186,4 +1214,36 @@ defmodule WebSockex do else defp execute_telemetry(_, _, _ \\ %{}), do: :ok end + + ## backoff helpers + + defp start_backoff(:infinity), do: nil + + defp start_backoff(timeout) do + :erlang.start_timer(timeout, self(), :backoff) + end + + defp cancel_backoff(%{backoff: nil} = state), do: state + + defp cancel_backoff(%{backoff: backoff} = state) do + case :erlang.cancel_timer(backoff) do + false -> + flush_backoff(backoff) + + _ -> + :ok + end + + %{state | backoff: nil} + end + + defp flush_backoff(backoff) do + receive do + {:timeout, ^backoff, :backoff} -> + :ok + after + 0 -> + :ok + end + end end diff --git a/lib/websockex/errors.ex b/lib/websockex/errors.ex index 725ff24..b3bf8fd 100644 --- a/lib/websockex/errors.ex +++ b/lib/websockex/errors.ex @@ -24,9 +24,7 @@ defmodule WebSockex.RequestError do defexception [:code, :message] def message(%__MODULE__{code: code, message: message}) do - "Didn't get a proper response from the server. The response was: #{inspect(code)} #{ - inspect(message) - }" + "Didn't get a proper response from the server. The response was: #{inspect(code)} #{inspect(message)}" end end @@ -56,9 +54,7 @@ defmodule WebSockex.BadResponseError do defexception [:response, :module, :function, :args] def message(%__MODULE__{} = error) do - "Bad Response: Got #{inspect(error.response)} from #{ - inspect(Exception.format_mfa(error.module, error.function, error.args)) - }" + "Bad Response: Got #{inspect(error.response)} from #{inspect(Exception.format_mfa(error.module, error.function, error.args))}" end end @@ -70,9 +66,7 @@ defmodule WebSockex.FrameError do end def message(%__MODULE__{reason: :control_frame_too_large} = exception) do - "Control Frame Too Large: Control Frames Can't Be Larger Than 125 Bytes\nbuffer: #{ - exception.buffer - }" + "Control Frame Too Large: Control Frames Can't Be Larger Than 125 Bytes\nbuffer: #{exception.buffer}" end def message(%__MODULE__{reason: :invalid_utf8} = exception) do @@ -80,9 +74,7 @@ defmodule WebSockex.FrameError do end def message(%__MODULE__{reason: :invalid_close_code} = exception) do - "Invalid Close Code: Close Codes must be in range of 1000 through 4999\nbuffer: #{ - exception.buffer - }" + "Invalid Close Code: Close Codes must be in range of 1000 through 4999\nbuffer: #{exception.buffer}" end def message(%__MODULE__{} = exception) do @@ -103,9 +95,7 @@ defmodule WebSockex.FrameEncodeError do def message(%__MODULE__{reason: :close_code_out_of_range} = error) do """ Close Code Out of Range: Close code must be between 1000-4999. - Frame: {#{inspect(error.frame_type)}, #{inspect(error.close_code)}, #{ - inspect(error.frame_payload) - }} + Frame: {#{inspect(error.frame_type)}, #{inspect(error.close_code)}, #{inspect(error.frame_payload)}} """ end end diff --git a/lib/websockex/utils.ex b/lib/websockex/utils.ex index 85468b4..937e0a0 100644 --- a/lib/websockex/utils.ex +++ b/lib/websockex/utils.ex @@ -155,7 +155,7 @@ defmodule WebSockex.Utils do IO.puts(io_dev, "*DBG* #{inspect(name)} sending frame: #{inspect(frame)}") end - defp print_event(io_dev, :reconnect, %{name: name}) do + defp print_event(io_dev, :backoff, %{name: name}) do IO.puts(io_dev, "*DBG* #{inspect(name)} attempting to reconnect") end diff --git a/test/support/test_client.ex b/test/support/test_client.ex index e4f943e..993932d 100644 --- a/test/support/test_client.ex +++ b/test/support/test_client.ex @@ -52,6 +52,10 @@ defmodule WebSockex.TestClient do end end + def cast(pid, message) do + WebSockex.cast(pid, message) + end + def handle_connect(_conn, %{connect_badreply: true}), do: :lemons def handle_connect(_conn, %{connect_error: true}), do: raise("Connect Error") def handle_connect(_conn, %{connect_exit: true}), do: exit("Connect Exit") @@ -176,11 +180,6 @@ defmodule WebSockex.TestClient do def handle_disconnect(_, %{disconnect_error: true}), do: raise("Disconnect Error") def handle_disconnect(_, %{disconnect_exit: true}), do: exit("Disconnect Exit") - def handle_disconnect(_, %{catch_init_connect_failure: pid} = state) do - send(pid, :caught_initial_conn_failure) - {:ok, state} - end - def handle_disconnect(%{attempt_number: 3} = failure_map, %{multiple_reconnect: pid} = state) do send(pid, {:stopping_retry, failure_map}) {:ok, state} @@ -192,18 +191,18 @@ defmodule WebSockex.TestClient do ) do send(pid, {:retry_connect, failure_map}) send(pid, {:check_retry_state, %{attempt: attempt, state: state}}) - {:reconnect, Map.put(state, :attempt, attempt)} + {:backoff, 0, Map.put(state, :attempt, attempt)} end def handle_disconnect(_, %{change_conn_reconnect: pid, good_url: url} = state) do uri = URI.parse(url) conn = WebSockex.Conn.new(uri) send(pid, :retry_change_conn) - {:reconnect, conn, state} + {:backoff, 0, conn, state} end def handle_disconnect(%{reason: {:local, 4985, _}}, state) do - {:reconnect, state} + {:backoff, 0, state} end def handle_disconnect( @@ -211,11 +210,30 @@ defmodule WebSockex.TestClient do %{catch_disconnect: pid, reconnect: true} = state ) do send(pid, {:caught_disconnect, :reconnecting}) - {:reconnect, state} + {:backoff, 0, state} end def handle_disconnect(_, %{reconnect: true} = state) do - {:reconnect, state} + {:backoff, 0, state} + end + + def handle_disconnect(%{attempt_number: attempt}, %{signal_reconnect: pid} = state) do + send(pid, {:check_retry_state, %{attempt: attempt}}) + + receive do + :continue_reconnect -> :ok + end + + {:backoff, 0, state} + end + + def handle_disconnect(_, %{backoff: backoff, catch_disconnect: pid} = state) do + send(pid, :caught_disconnect) + {:backoff, backoff, state} + end + + def handle_disconnect(_, %{backoff: backoff} = state) do + {:backoff, backoff, state} end def handle_disconnect( diff --git a/test/support/test_server.ex b/test/support/test_server.ex index 0f4386d..dda7a5d 100644 --- a/test/support/test_server.ex +++ b/test/support/test_server.ex @@ -25,7 +25,7 @@ defmodule WebSockex.TestServer do case Plug.Adapters.Cowboy.http(__MODULE__, [], opts) do {:ok, _} -> - {:ok, {ref, url}} + {:ok, {agent_pid, ref, url}} {:error, :eaddrinuse} -> start(pid) @@ -48,7 +48,7 @@ defmodule WebSockex.TestServer do case Plug.Adapters.Cowboy.https(__MODULE__, [], opts) do {:ok, _} -> - {:ok, {ref, url}} + {:ok, {agent_pid, ref, url}} {:error, :eaddrinuse} -> require Logger @@ -57,14 +57,8 @@ defmodule WebSockex.TestServer do end end - def new_conn_mode(socket_pid, mode) do - ref = make_ref() - send(socket_pid, {:mode, mode, ref, self()}) - - receive do - {^ref, :ok} -> - :ok - end + def new_conn_mode(server_pid, mode) do + Agent.update(server_pid, fn _ -> mode end) end def shutdown(ref) do diff --git a/test/websockex/conn_test.exs b/test/websockex/conn_test.exs index bdf8687..ca6146c 100644 --- a/test/websockex/conn_test.exs +++ b/test/websockex/conn_test.exs @@ -2,7 +2,7 @@ defmodule WebSockex.ConnTest do use ExUnit.Case, async: true setup do - {:ok, {server_ref, url}} = WebSockex.TestServer.start(self()) + {:ok, {_, server_ref, url}} = WebSockex.TestServer.start(self()) on_exit(fn -> WebSockex.TestServer.shutdown(server_ref) end) @@ -137,7 +137,7 @@ defmodule WebSockex.ConnTest do describe "secure connection" do setup do - {:ok, {server_ref, url}} = WebSockex.TestServer.start_https(self()) + {:ok, {_, server_ref, url}} = WebSockex.TestServer.start_https(self()) on_exit(fn -> WebSockex.TestServer.shutdown(server_ref) end) @@ -214,7 +214,7 @@ defmodule WebSockex.ConnTest do end test "works on wss connections" do - {:ok, {server_ref, url}} = WebSockex.TestServer.start_https(self()) + {:ok, {_, server_ref, url}} = WebSockex.TestServer.start_https(self()) on_exit(fn -> WebSockex.TestServer.shutdown(server_ref) end) uri = URI.parse(url) conn = WebSockex.Conn.new(uri) diff --git a/test/websockex_telemetry_test.exs b/test/websockex_telemetry_test.exs index 4c31add..c557a3d 100644 --- a/test/websockex_telemetry_test.exs +++ b/test/websockex_telemetry_test.exs @@ -22,9 +22,10 @@ defmodule WebSockexTelemetryTest do %{} ) - {:ok, {server_ref, url}} = WebSockex.TestServer.start(self()) + {:ok, {_, server_ref, url}} = WebSockex.TestServer.start(self()) {:ok, pid} = TestClient.start(url, %{catch_text: self()}) + TestClient.await_status(pid, :connected) on_exit(fn -> :telemetry.detach(handler_id) diff --git a/test/websockex_test.exs b/test/websockex_test.exs index 9775d9b..2d2d678 100644 --- a/test/websockex_test.exs +++ b/test/websockex_test.exs @@ -18,14 +18,15 @@ defmodule WebSockexTest do end setup do - {:ok, {server_ref, url}} = TestServer.start(self()) + {:ok, {server_pid, server_ref, url}} = TestServer.start(self()) on_exit(fn -> TestServer.shutdown(server_ref) end) {:ok, pid} = TestClient.start_link(url, %{}) - server_pid = TestServer.receive_socket_pid() + TestClient.await_status(pid, :connected) + socket_pid = TestServer.receive_socket_pid() - [pid: pid, url: url, server_pid: server_pid, server_ref: server_ref] + [pid: pid, url: url, socket_pid: socket_pid, server_ref: server_ref, server_pid: server_pid] end describe "local named processes" do @@ -111,14 +112,14 @@ defmodule WebSockexTest do conn = WebSockex.Conn.new(URI.parse(context.url)) assert {:ok, _} = WebSockex.start(conn, TestClient, %{catch_text: self()}) - server_pid = TestServer.receive_socket_pid() + socket_pid = TestServer.receive_socket_pid() - send(server_pid, {:send, {:text, "Start Link Conn"}}) + send(socket_pid, {:send, {:text, "Start Link Conn"}}) assert_receive {:caught_text, "Start Link Conn"} end - test "with async option failure", context do - assert {:ok, pid} = TestClient.start(context.url, %{async_test: true}, async: true) + test "with failure", context do + assert {:ok, pid} = TestClient.start(context.url, %{async_test: true}) Process.monitor(pid) @@ -128,13 +129,6 @@ defmodule WebSockexTest do assert_receive {:DOWN, _, :process, ^pid, "Async Test"} end - test "without async option", context do - Process.flag(:trap_exit, true) - - assert TestClient.start(context.url, %{async_test: true}) == - {:error, %RuntimeError{message: "Async Timeout"}} - end - test "returns an error with a bad url" do assert TestClient.start_link("lemon_pie", :ok) == {:error, %WebSockex.URLError{url: "lemon_pie"}} @@ -146,31 +140,24 @@ defmodule WebSockexTest do conn = WebSockex.Conn.new(URI.parse(context.url)) assert {:ok, _} = WebSockex.start_link(conn, TestClient, %{catch_text: self()}) - server_pid = TestServer.receive_socket_pid() + socket_pid = TestServer.receive_socket_pid() - send(server_pid, {:send, {:text, "Start Link Conn"}}) + send(socket_pid, {:send, {:text, "Start Link Conn"}}) assert_receive {:caught_text, "Start Link Conn"} end - test "with async option", context do - assert {:ok, _} = TestClient.start_link(context.url, %{catch_text: self()}, async: true) + test "with url", context do + assert {:ok, _} = TestClient.start_link(context.url, %{catch_text: self()}) - server_pid = TestServer.receive_socket_pid() + socket_pid = TestServer.receive_socket_pid() - send(server_pid, {:send, {:text, "Hello"}}) + send(socket_pid, {:send, {:text, "Hello"}}) assert_receive {:caught_text, "Hello"} end - test "without async option", context do - Process.flag(:trap_exit, true) - - assert TestClient.start_link(context.url, %{async_test: true}) == - {:error, %RuntimeError{message: "Async Timeout"}} - end - - test "with async option failure", context do + test "with failure", context do Process.flag(:trap_exit, true) - assert {:ok, pid} = TestClient.start_link(context.url, %{async_test: true}, async: true) + assert {:ok, pid} = TestClient.start_link(context.url, %{async_test: true}) send(pid, {:continue_async, self()}) @@ -185,7 +172,7 @@ defmodule WebSockexTest do end test "can handle initial connect headers" do - {:ok, {server_ref, url}} = TestServer.start_https(self()) + {:ok, {_server_pid, server_ref, url}} = TestServer.start_https(self()) on_exit(fn -> TestServer.shutdown(server_ref) end) @@ -201,17 +188,17 @@ defmodule WebSockexTest do end test "can connect to secure server" do - {:ok, {server_ref, url}} = TestServer.start_https(self()) + {:ok, {_server_pid, server_ref, url}} = TestServer.start_https(self()) on_exit(fn -> TestServer.shutdown(server_ref) end) {:ok, pid} = TestClient.start_link(url, %{}, cacerts: TestServer.cacerts()) - server_pid = TestServer.receive_socket_pid() + socket_pid = TestServer.receive_socket_pid() TestClient.catch_attr(pid, :pong, self()) # Test server -> client ping - send(server_pid, :send_ping) + send(socket_pid, :send_ping) assert_receive :received_pong # Test client -> server ping @@ -228,7 +215,7 @@ defmodule WebSockexTest do end test "handles a ssl message send right after connecting" do - {:ok, {server_ref, url}} = TestServer.start_https(self()) + {:ok, {server_pid, server_ref, url}} = TestServer.start_https(self()) on_exit(fn -> TestServer.shutdown(server_ref) end) @@ -245,8 +232,7 @@ defmodule WebSockexTest do ] ) - TestServer.receive_socket_pid() - |> TestServer.new_conn_mode(:immediate_reply) + TestServer.new_conn_mode(server_pid, :immediate_reply) assert {:ok, _pid} = TestClient.start_link(url, %{catch_text: self()}) @@ -283,8 +269,8 @@ defmodule WebSockexTest do assert_receive conn = %WebSockex.Conn{} :inet.setopts(conn.socket, active: false) - send(context.server_pid, {:send, {:text, "Hello"}}) - send(context.server_pid, {:send, {:text, "Bye"}}) + send(context.socket_pid, {:send, {:text, "Hello"}}) + send(context.socket_pid, {:send, {:text, "Bye"}}) :inet.setopts(conn.socket, active: true) @@ -413,7 +399,7 @@ defmodule WebSockexTest do test "returns an error while closing", %{pid: pid} = context do Process.flag(:trap_exit, true) - send(context.server_pid, :stall) + send(context.socket_pid, :stall) WebSockex.cast(pid, :close) @@ -424,6 +410,44 @@ defmodule WebSockexTest do end end + describe "await_status" do + test "connected: called when connecting", context do + TestServer.new_conn_mode(context.server_pid, :connection_wait) + {:ok, pid} = TestClient.start_link(context.url, %{}) + socket_pid = TestServer.receive_socket_pid() + + task = + Task.async(fn -> + TestClient.await_status(pid, :connected) + end) + + send(socket_pid, :connection_continue) + + assert Task.await(task) == :ok + end + + test "connected: called when connected", context do + {:ok, pid} = TestClient.start_link(context.url, %{catch_connect: self()}) + + assert_receive :caught_connect + + assert :ok = TestClient.await_status(pid, :connected) + end + + test "connected: timeout", context do + TestServer.new_conn_mode(context.server_pid, :connection_wait) + {:ok, pid} = TestClient.start_link(context.url, %{}) + + res = catch_exit(TestClient.await_status(pid, :connected, 20)) + assert {:timeout, {WebSockex, :await_status, [_, :connected]}} = res + end + + test "disconnected: not yet supported", context do + res = TestClient.await_status(context.pid, :disconnected) + assert {:error, :unsupported_status, :disconnected} = res + end + end + describe "handle_cast callback" do test "is called", context do WebSockex.cast(context.pid, {:pid_reply, self()}) @@ -489,7 +513,7 @@ defmodule WebSockexTest do TestClient.catch_attr(context.pid, :connect, self()) WebSockex.cast(context.pid, {:set_attr, :reconnect, true}) - send(context.server_pid, :close) + send(context.socket_pid, :close) assert_receive :caught_connect end @@ -499,7 +523,7 @@ defmodule WebSockexTest do test "can handle a binary frame", context do TestClient.catch_attr(context.pid, :binary, self()) binary = :erlang.term_to_binary(:hello) - send(context.server_pid, {:send, {:binary, binary}}) + send(context.socket_pid, {:send, {:binary, binary}}) assert_receive {:caught_binary, ^binary} end @@ -507,7 +531,7 @@ defmodule WebSockexTest do test "can handle a text frame", context do TestClient.catch_attr(context.pid, :text, self()) text = "Murky is green" - send(context.server_pid, {:send, {:text, text}}) + send(context.socket_pid, {:send, {:text, text}}) assert_receive {:caught_text, ^text} end @@ -637,7 +661,7 @@ defmodule WebSockexTest do test "executes in handle_frame bad reply", %{pid: pid} = context do Process.flag(:trap_exit, true) - send(context.server_pid, {:send, {:text, "Bad Reply"}}) + send(context.socket_pid, {:send, {:text, "Bad Reply"}}) assert_receive {1011, ""} assert_receive {:EXIT, ^pid, %WebSockex.BadResponseError{}} @@ -646,7 +670,7 @@ defmodule WebSockexTest do test "executes in handle_frame error", %{pid: pid} = context do Process.flag(:trap_exit, true) - send(context.server_pid, {:send, {:text, "Error"}}) + send(context.socket_pid, {:send, {:text, "Error"}}) assert_receive {1011, ""} assert_receive {:EXIT, ^pid, {%RuntimeError{message: "Frame Error"}, _}} @@ -655,7 +679,7 @@ defmodule WebSockexTest do test "executes in handle_frame exit", %{pid: pid} = context do Process.flag(:trap_exit, true) - send(context.server_pid, {:send, {:text, "Exit"}}) + send(context.socket_pid, {:send, {:text, "Exit"}}) assert_receive {1011, ""} assert_receive {:EXIT, ^pid, "Frame Exit"} @@ -664,7 +688,7 @@ defmodule WebSockexTest do test "executes in handle_ping bad reply", %{pid: pid} = context do Process.flag(:trap_exit, true) - send(context.server_pid, {:send, {:ping, "Bad Reply"}}) + send(context.socket_pid, {:send, {:ping, "Bad Reply"}}) assert_receive {1011, ""} assert_receive {:EXIT, ^pid, %WebSockex.BadResponseError{}}, 500 @@ -673,7 +697,7 @@ defmodule WebSockexTest do test "executes in handle_ping error", %{pid: pid} = context do Process.flag(:trap_exit, true) - send(context.server_pid, {:send, {:ping, "Error"}}) + send(context.socket_pid, {:send, {:ping, "Error"}}) assert_receive {1011, ""} assert_receive {:EXIT, ^pid, {%RuntimeError{message: "Ping Error"}, _}} @@ -682,7 +706,7 @@ defmodule WebSockexTest do test "executes in handle_ping exit", %{pid: pid} = context do Process.flag(:trap_exit, true) - send(context.server_pid, {:send, {:ping, "Exit"}}) + send(context.socket_pid, {:send, {:ping, "Exit"}}) assert_receive {1011, ""} assert_receive {:EXIT, ^pid, "Ping Exit"} @@ -691,7 +715,7 @@ defmodule WebSockexTest do test "executes in handle_pong bad reply", %{pid: pid} = context do Process.flag(:trap_exit, true) - send(context.server_pid, {:send, {:pong, "Bad Reply"}}) + send(context.socket_pid, {:send, {:pong, "Bad Reply"}}) assert_receive {1011, ""} assert_receive {:EXIT, ^pid, %WebSockex.BadResponseError{}} @@ -700,7 +724,7 @@ defmodule WebSockexTest do test "executes in handle_pong error", %{pid: pid} = context do Process.flag(:trap_exit, true) - send(context.server_pid, {:send, {:pong, "Error"}}) + send(context.socket_pid, {:send, {:pong, "Error"}}) assert_receive {1011, ""} assert_receive {:EXIT, ^pid, {%RuntimeError{message: "Pong Error"}, _}} @@ -709,7 +733,7 @@ defmodule WebSockexTest do test "executes in handle_pong exit", %{pid: pid} = context do Process.flag(:trap_exit, true) - send(context.server_pid, {:send, {:pong, "Exit"}}) + send(context.socket_pid, {:send, {:pong, "Exit"}}) assert_receive {1011, ""} assert_receive {:EXIT, ^pid, "Pong Exit"} @@ -746,17 +770,6 @@ defmodule WebSockexTest do assert_received :terminate end - test "is not executed in handle_disconnect before initialized", context do - assert {:error, %WebSockex.BadResponseError{}} = - TestClient.start_link( - context.url <> "bad", - %{disconnect_badreply: true}, - handle_initial_conn_failure: true - ) - - refute_received :terminate - end - test "executes in handle_connect bad reply", %{pid: pid} do Process.flag(:trap_exit, true) WebSockex.cast(pid, {:set_attr, :connect_badreply, true}) @@ -790,17 +803,6 @@ defmodule WebSockexTest do assert_received :terminate end - test "is not executed in handle_connect before initialized", context do - assert {:error, %WebSockex.BadResponseError{}} = - TestClient.start_link( - context.url, - %{connect_badreply: true}, - handle_initial_conn_failure: true - ) - - refute_received :terminate - end - test "executes in a frame close", context do WebSockex.cast(context.pid, :close) @@ -836,13 +838,13 @@ defmodule WebSockexTest do describe "handle_ping callback" do test "can handle a ping frame", context do - send(context.server_pid, :send_ping) + send(context.socket_pid, :send_ping) assert_receive :received_pong end test "can handle a ping frame with a payload", context do - send(context.server_pid, :send_payload_ping) + send(context.socket_pid, :send_payload_ping) assert_receive :received_payload_pong end @@ -915,7 +917,7 @@ defmodule WebSockexTest do assert_receive :caught_disconnect # Test HTTPS - {:ok, {server_ref, url}} = TestServer.start_https(self()) + {:ok, {_server_pid, server_ref, url}} = TestServer.start_https(self()) on_exit(fn -> TestServer.shutdown(server_ref) end) {:ok, pid} = TestClient.start_link(url, %{}) @@ -948,13 +950,13 @@ defmodule WebSockexTest do end test "is invoked when receiving a close frame", context do - send(context.server_pid, :close) + send(context.socket_pid, :close) assert_receive :caught_disconnect, 1250 end test "is invoked when receiving a close frame with a payload", context do - send(context.server_pid, {:close, 4025, "Testing"}) + send(context.socket_pid, {:close, 4025, "Testing"}) assert_receive {:caught_disconnect, 4025, "Testing"}, 1250 end @@ -978,8 +980,8 @@ defmodule WebSockexTest do assert_receive {4985, "Testing Reconnect"} - server_pid = TestServer.receive_socket_pid() - send(server_pid, {:send, {:text, "Hello"}}) + socket_pid = TestServer.receive_socket_pid() + send(socket_pid, {:send, {:text, "Hello"}}) assert_receive {:caught_text, "Hello"}, 500 end @@ -988,7 +990,7 @@ defmodule WebSockexTest do WebSockex.cast(client_pid, {:set_attr, :multiple_reconnect, self()}) TestServer.new_conn_mode(context.server_pid, {:code, 403}) - send(context.server_pid, :shutdown) + send(context.socket_pid, :shutdown) assert_receive {:retry_connect, %{conn: %WebSockex.Conn{}, attempt_number: 1}} assert_receive {:check_retry_state, %{attempt: 1}} @@ -1004,8 +1006,31 @@ defmodule WebSockexTest do assert_receive {:DOWN, _ref, :process, ^client_pid, %WebSockex.RequestError{code: 403}} end + @tag :current + test "successful reconnect resets attempt_number", %{pid: client_pid} = context do + WebSockex.cast(client_pid, {:set_attr, :signal_reconnect, self()}) + + TestServer.new_conn_mode(context.server_pid, {:code, 403}) + send(context.socket_pid, :shutdown) + + assert_receive {:check_retry_state, %{attempt: 1}} + send(client_pid, :continue_reconnect) + assert_receive {:check_retry_state, %{attempt: 2}} + send(client_pid, :continue_reconnect) + assert_receive {:check_retry_state, %{attempt: 3}} + + TestServer.new_conn_mode(context.server_pid, :ok) + send(client_pid, :continue_reconnect) + :ok = TestClient.await_status(client_pid, :connected) + + socket_pid = TestServer.receive_socket_pid() + TestServer.new_conn_mode(context.server_pid, {:code, 403}) + send(socket_pid, :shutdown) + assert_receive {:check_retry_state, %{attempt: 1}} + end + test "can provide new conn struct during reconnect", context do - {:ok, {server_ref, new_url}} = TestServer.start(self()) + {:ok, {_server_pid, server_ref, new_url}} = TestServer.start(self()) on_exit(fn -> TestServer.shutdown(server_ref) end) WebSockex.cast(context.pid, {:set_attr, :change_conn_reconnect, self()}) @@ -1013,26 +1038,26 @@ defmodule WebSockexTest do TestClient.catch_attr(context.pid, :text, self()) TestServer.new_conn_mode(context.server_pid, {:code, 403}) - send(context.server_pid, :shutdown) + send(context.socket_pid, :shutdown) - server_pid = TestServer.receive_socket_pid() + socket_pid = TestServer.receive_socket_pid() assert_received :retry_change_conn - send(server_pid, {:send, {:text, "Hello"}}) + send(socket_pid, {:send, {:text, "Hello"}}) assert_receive {:caught_text, "Hello"} end test "can handle remote closures during client close initiation", context do WebSockex.cast(context.pid, :delayed_close) - Process.exit(context.server_pid, :kill) + Process.exit(context.socket_pid, :kill) assert_receive {:caught_disconnect, {:remote, :closed}} end test "can handle socket terminations", context do - Process.exit(context.server_pid, :kill) + Process.exit(context.socket_pid, :kill) assert_receive {:caught_disconnect, {:remote, :closed}} end @@ -1055,20 +1080,20 @@ defmodule WebSockexTest do WebSockex.cast(context.pid, {:set_attr, :reconnect, true}) - Process.exit(context.server_pid, :kill) + Process.exit(context.socket_pid, :kill) send(context.pid, {conn.transport, conn.socket, part}) assert_receive {:caught_disconnect, :reconnecting} - server_pid = TestServer.receive_socket_pid() - send(server_pid, {:send, {:text, "Hello"}}) + socket_pid = TestServer.receive_socket_pid() + send(socket_pid, {:send, {:text, "Hello"}}) assert_receive {:caught_text, "Hello"} end test "local close timeout", context do - send(context.server_pid, :stall) + send(context.socket_pid, :stall) WebSockex.cast(context.pid, :close) @@ -1076,65 +1101,6 @@ defmodule WebSockexTest do assert_receive :caught_disconnect end - - test "gets invoked during initial connect with handle_initial_conn_failure", context do - assert {:error, _} = - TestClient.start_link( - context.url <> "bad", - %{catch_init_connect_failure: self()}, - handle_initial_conn_failure: true - ) - - assert_receive :caught_initial_conn_failure - end - - test "doesn't get invoked during initial connect without retry", context do - assert {:error, _} = - TestClient.start_link(context.url <> "bad", %{catch_init_connect_failure: self()}) - - refute_receive :caught_initial_conn_failure - end - - test "can attempt to reconnect during an initial connect", context do - assert {:error, _} = - TestClient.start_link( - context.url <> "bad", - %{multiple_reconnect: self()}, - handle_initial_conn_failure: true - ) - - assert_received {:retry_connect, - %{conn: %WebSockex.Conn{}, reason: %{code: 404}, attempt_number: 1}} - - assert_received {:check_retry_state, %{attempt: 1}} - - assert_received {:retry_connect, - %{conn: %WebSockex.Conn{}, reason: %{code: 404}, attempt_number: 2}} - - assert_received {:check_retry_state, %{attempt: 2, state: %{attempt: 1}}} - - assert_received {:stopping_retry, - %{conn: %WebSockex.Conn{}, reason: %{code: 404}, attempt_number: 3}} - end - - test "can reconnect with a new conn struct during an initial connection retry", context do - state_map = %{change_conn_reconnect: self(), good_url: context.url, catch_text: self()} - - assert {:ok, _} = - TestClient.start_link( - context.url <> "bad", - state_map, - handle_initial_conn_failure: true - ) - - server_pid = TestServer.receive_socket_pid() - - assert_received :retry_change_conn - - send(server_pid, {:send, {:text, "Hello"}}) - - assert_receive {:caught_text, "Hello"} - end end describe "format_status callback" do @@ -1156,10 +1122,10 @@ defmodule WebSockexTest do end test "is invoked when implemented", context do - WebSockex.cast(context.pid, {:set_attr, :custom_status, true}) + {:ok, pid} = TestClient.start_link(context.url, %{custom_status: true}) {{:data, data}, _} = - elem(:sys.get_status(context.pid), 3) + elem(:sys.get_status(pid), 3) |> List.last() |> List.keydelete(:data, 0) |> List.keytake(:data, 0) @@ -1195,23 +1161,18 @@ defmodule WebSockexTest do end end - test "Won't exit on a request error", context do - assert TestClient.start_link(context.url <> "blah", %{}) == - {:error, %WebSockex.RequestError{code: 404, message: "Not Found"}} - end - describe "default implementation errors" do setup context do {:ok, pid} = WebSockex.start_link(context.url, BareClient, %{}) - server_pid = TestServer.receive_socket_pid() + socket_pid = TestServer.receive_socket_pid() - [pid: pid, server_pid: server_pid] + [pid: pid, socket_pid: socket_pid] end test "handle_frame", context do Process.flag(:trap_exit, true) frame = {:text, "Hello"} - send(context.server_pid, {:send, frame}) + send(context.socket_pid, {:send, frame}) message = "No handle_frame/2 clause in #{__MODULE__}.BareClient provided for #{inspect(frame)}" @@ -1242,7 +1203,8 @@ defmodule WebSockexTest do describe "OTP Compliance" do test "requires the child to exit when receiving a parent exit signal", context do Process.flag(:trap_exit, true) - {:ok, pid} = WebSockex.start_link(context.url, BareClient, []) + {:ok, pid} = WebSockex.start_link(context.url, BareClient, %{}) + :ok = BareClient.await_status(pid, :connected) send(pid, {:EXIT, self(), "OTP Compliance Test"}) @@ -1256,7 +1218,7 @@ defmodule WebSockexTest do WebSockex.cast(pid, {:set_attr, :reconnect, true}) TestClient.catch_attr(pid, :terminate, self()) TestServer.new_conn_mode(context.server_pid, :connection_wait) - send(context.server_pid, :close) + send(context.socket_pid, :close) _new_server_pid = TestServer.receive_socket_pid() @@ -1272,11 +1234,10 @@ defmodule WebSockexTest do assert_received :terminate end - test "a parent exit signal doesn't call terminate on initial connect", context do - Process.flag(:trap_exit, true) + test "can send system messages while connecting", context do TestServer.new_conn_mode(context.server_pid, :connection_wait) - {:ok, pid} = TestClient.start_link(context.url, %{catch_terminate: self()}, async: true) + {:ok, pid} = WebSockex.start_link(context.url, BareClient, []) {:data, data} = elem(:sys.get_status(pid), 3) @@ -1285,41 +1246,74 @@ defmodule WebSockexTest do assert {"Connection Status", :connecting} in data - send(pid, {:EXIT, self(), "OTP Compliance Test"}) - assert_receive {:EXIT, ^pid, "OTP Compliance Test"} - refute_received :terminate + new_server_pid = TestServer.receive_socket_pid() + send(new_server_pid, :connection_continue) + + connected_server_pid = TestServer.receive_socket_pid() + + send(connected_server_pid, :send_ping) + assert_receive :received_pong + + {:data, data} = + elem(:sys.get_status(pid), 3) + |> List.flatten() + |> List.keyfind(:data, 0) + + assert {"Connection Status", :connected} in data end - test "can send system messages while connecting", context do - TestServer.new_conn_mode(context.server_pid, :connection_wait) + test "exits with a parent exit signal while backoff", context do + Process.flag(:trap_exit, true) + TestServer.new_conn_mode(context.server_pid, {:code, 403}) - {:ok, pid} = WebSockex.start_link(context.url, BareClient, [], async: true) + {:ok, pid} = + TestClient.start_link(context.url, %{ + backoff: :infinity, + catch_disconnect: self(), + catch_terminate: self() + }) + + assert_receive :caught_disconnect {:data, data} = elem(:sys.get_status(pid), 3) |> List.flatten() |> List.keyfind(:data, 0) - assert {"Connection Status", :connecting} in data + assert {"Connection Status", :backoff} in data - new_server_pid = TestServer.receive_socket_pid() - send(new_server_pid, :connection_continue) + send(pid, {:EXIT, self(), "OTP Compliance Test"}) + assert_receive {:EXIT, ^pid, "OTP Compliance Test"} + assert_received :terminate + end - connected_server_pid = TestServer.receive_socket_pid() + test "can send system messages while backoff", context do + Process.flag(:trap_exit, true) + TestServer.new_conn_mode(context.server_pid, {:code, 403}) - send(connected_server_pid, :send_ping) - assert_receive :received_pong + {:ok, pid} = + TestClient.start_link(context.url, %{ + backoff: :infinity, + catch_disconnect: self(), + catch_terminate: self() + }) + + assert_receive :caught_disconnect {:data, data} = elem(:sys.get_status(pid), 3) |> List.flatten() |> List.keyfind(:data, 0) - assert {"Connection Status", :connected} in data + assert {"Connection Status", :backoff} in data + + TestClient.cast(pid, {:pid_reply, self()}) + + assert_receive :cast end test "can send system messages while closing", context do - send(context.server_pid, :stall) + send(context.socket_pid, :stall) TestClient.catch_attr(context.pid, :terminate, self()) WebSockex.cast(context.pid, :close) @@ -1338,7 +1332,7 @@ defmodule WebSockexTest do TestClient.catch_attr(pid, :terminate, self()) WebSockex.cast(context.pid, :close) - send(context.server_pid, :stall) + send(context.socket_pid, :stall) {:data, data} = elem(:sys.get_status(pid), 3) @@ -1362,7 +1356,6 @@ defmodule WebSockexTest do test ":sys.get_state only returns module_state", context do get_state = :sys.get_state(context.pid) - WebSockex.cast(context.pid, {:get_state, self()}) assert_receive ^get_state