From 192ca4d2e59b65db30930e367f73cb3c3cf0ce6b Mon Sep 17 00:00:00 2001 From: Brendan Ball Date: Mon, 20 Jun 2022 10:30:50 +0200 Subject: [PATCH] Add backoff and reconnect support Currently Stargate prevents the application from starting up or causes it to crash if pulsar is or becomes unavailable. This is a result of how WebSockex is implemented. This builds on a refactor of WebSockex at https://github.com/Azolo/websockex/pull/112 which implements connection management async to process startup. This makes Stargate more production ready since it will allow an application to degrade gracefully when pulsar isn't available temporarily. Main changes as a result of this: 1. Reconnect backoff feature is added to be customized by the user 2. Since Stargate will continue running even when not connected to Pulsar, the user needs to know that messages aren't being produced successfully. A breaking change was made to the async ACK MFA where the first argument is now the result of the produce, which may either be `:ok` or `{:error, reason}`. --- lib/stargate/connection.ex | 141 +++++++++++++++++- lib/stargate/producer.ex | 36 ++++- lib/stargate/producer/acknowledger.ex | 6 +- lib/stargate/receiver.ex | 24 ++- lib/stargate/receiver/dispatcher.ex | 27 +++- mix.exs | 2 +- mix.lock | 18 +-- test/integration/producer_receiver_test.exs | 8 + test/support/mock_consumer.ex | 4 +- test/support/mock_socket.ex | 40 ++++- test/unit/stargate/connection_test.exs | 84 ++++++++++- .../stargate/producer/acknowledger_test.exs | 31 ++-- test/unit/stargate/producer_test.exs | 89 +++++++++-- test/unit/stargate/receiver_test.exs | 96 +++++++++++- 14 files changed, 538 insertions(+), 68 deletions(-) diff --git a/lib/stargate/connection.ex b/lib/stargate/connection.ex index 8c2b2fb..c59f08f 100644 --- a/lib/stargate/connection.ex +++ b/lib/stargate/connection.ex @@ -15,6 +15,18 @@ defmodule Stargate.Connection do topic: String.t() } + @typedoc """ + Calculates custom connection backoff when unable to connect to Pulsar. + Can be used to implement exponential backoff. + Note that for an MFA tuple, the arity of the function should be `length(args) + 1` since the first + arg will be the attempt. + """ + @type backoff_calculator :: + (attempt :: non_neg_integer() -> timeout()) | {module(), fun :: atom(), args :: list()} + + @callback handle_send_error(reason :: term, ref :: term, state :: term) :: :ok + @callback handle_connected(state :: term) :: :ok + @doc """ The Connection using macro provides the common websocket connection and keepalive functionality into a single line for replicating connection @@ -24,15 +36,22 @@ defmodule Stargate.Connection do quote do use WebSockex + @behaviour Stargate.Connection + @ping_interval 30_000 + + require Logger + @impl WebSockex def handle_connect(_conn, state) do - :timer.send_interval(30_000, :send_ping) - {:ok, state} + tref = Process.send_after(self(), :send_ping, @ping_interval) + :ok = apply(__MODULE__, :handle_connected, [state]) + {:ok, Map.put(state, :ping_timer_ref, tref)} end @impl WebSockex def handle_info(:send_ping, state) do - {:reply, :ping, state} + tref = Process.send_after(self(), :send_ping, @ping_interval) + {:reply, :ping, Map.put(state, :ping_timer_ref, tref)} end @impl WebSockex @@ -44,6 +63,91 @@ defmodule Stargate.Connection do def handle_pong(_pong_frame, state) do {:ok, state} end + + @impl WebSockex + def handle_disconnect( + %{reason: reason, attempt_number: attempt}, + %{backoff_calculator: backoff_calculator} = state + ) do + case Map.get(state, :ping_timer_ref) do + nil -> + :ok + + ref -> + Process.cancel_timer(ref) + :ok + end + + backoff = Stargate.Connection.calculate_backoff(backoff_calculator, attempt) + + Logger.warning( + "[Stargate] disconnected", + reason: inspect(reason), + url: state.url, + attempt: attempt, + backoff_ms: backoff + ) + + {:backoff, backoff, state} + end + + @impl WebSockex + def handle_send_result(:ok, _frame, _ref, state) do + {:ok, state} + end + + @impl WebSockex + def handle_send_result({:error, reason}, :ping, _ref, state) do + Logger.warning("[Stargate] error sending ping", reason: inspect(reason)) + {:close, state} + end + + @impl WebSockex + def handle_send_result({:error, %WebSockex.ConnError{} = reason}, frame, ref, state) do + Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url) + + :ok = apply(__MODULE__, :handle_send_error, [:not_connected, ref, state]) + {:close, state} + end + + @impl WebSockex + def handle_send_result({:error, %WebSockex.NotConnectedError{} = reason}, frame, ref, state) do + Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url) + + :ok = apply(__MODULE__, :handle_send_error, [:not_connected, ref, state]) + {:ok, state} + end + + @impl WebSockex + def handle_send_result({:error, reason}, frame, ref, state) do + Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url) + + :ok = apply(__MODULE__, :handle_send_error, [:unknown, ref, state]) + {:ok, state} + end + + @impl WebSockex + def terminate({reason, stacktrace}, state) when is_exception(reason) do + Logger.error(Exception.format(:error, reason, stacktrace)) + end + + @impl WebSockex + def terminate(_reason, _state) do + :ok + end + + @impl Stargate.Connection + def handle_send_error(reason, ref, state) do + :ok + end + + @impl Stargate.Connection + def handle_connected(state) do + :ok + end + + defoverridable handle_send_error: 3, + handle_connected: 1 end end @@ -69,9 +173,7 @@ defmodule Stargate.Connection do end base_url = - "#{protocol}://#{host}/ws/v2/#{type}/#{persistence}/#{tenant}/#{namespace}/#{topic}#{ - subscription - }" + "#{protocol}://#{host}/ws/v2/#{type}/#{persistence}/#{tenant}/#{namespace}/#{topic}#{subscription}" url = case params do @@ -102,6 +204,33 @@ defmodule Stargate.Connection do |> Enum.map(&transform_auth/1) end + @doc false + @spec calculate_backoff(attempt :: non_neg_integer()) :: timeout() + def calculate_backoff(_attempt) do + 2_000 + end + + @doc false + @spec calculate_backoff(backoff_calculator(), attempt :: non_neg_integer()) :: timeout() + def calculate_backoff(calc, attempt) do + case calc do + {module, function, args} -> + apply(module, function, [attempt | args]) + + fun when is_function(fun, 1) -> + fun.(attempt) + + _ -> + raise ArgumentError, + "Backoff calculator does not conform to spec of {module, function, args} or fun/1" + end + end + + @doc false + def default_backoff_calculator() do + {Stargate.Connection, :calculate_backoff, []} + end + defp transform_auth({:ssl_options, _opts} = ssl_opts), do: ssl_opts defp transform_auth({:auth_token, token}) do diff --git a/lib/stargate/producer.ex b/lib/stargate/producer.ex index c05b304..3a94cac 100644 --- a/lib/stargate/producer.ex +++ b/lib/stargate/producer.ex @@ -12,6 +12,7 @@ defmodule Stargate.Producer do use Puid import Stargate.Supervisor, only: [via: 2] alias Stargate.Producer.{Acknowledger, QueryParams} + alias Stargate.Connection @typedoc """ A URL defining the host and topic to which a Stargate producer can @@ -99,7 +100,8 @@ defmodule Stargate.Producer do When calling `produce/3` the third argument must be an MFA tuple which is used by the producer's acknowledger process to asynchronously perform acknowledgement that the message was received by the cluster successfully. This is used to avoid blocking the - calling process for performance reasons. + calling process for performance reasons. The result of the produce is added as a first + argument when calling the MFA tuple which is either `:ok` or `{:error, reason}`. """ @spec produce(producer(), message() | [message()], {module(), atom(), [term()]}) :: :ok | {:error, term()} @@ -130,7 +132,8 @@ defmodule Stargate.Producer do :tenant, :namespace, :topic, - :query_params + :query_params, + :backoff_calculator ] end @@ -153,6 +156,7 @@ defmodule Stargate.Producer do * `persistence` can be one of "persistent" or "non-persistent" per the Pulsar specification of topics as being in-memory only or persisted to the brokers' disks. Defaults to "persistent". + * `backoff_calculator` See `Stargate.Connection.t:backoff_calculator/0`. * `query_params` is a map containing any or all of the following: * `send_timeout` the time at which a produce operation will time out; defaults to 30 seconds @@ -178,11 +182,15 @@ defmodule Stargate.Producer do query_params = QueryParams.build_params(query_params_config) registry = Keyword.fetch!(args, :registry) + backoff_calculator = + Keyword.get(args, :backoff_calculator, Stargate.Connection.default_backoff_calculator()) + state = args |> Stargate.Connection.connection_settings(:producer, query_params) |> Map.put(:query_params, query_params_config) |> Map.put(:registry, registry) + |> Map.put(:backoff_calculator, backoff_calculator) |> (fn fields -> struct(State, fields) end).() server_opts = @@ -202,17 +210,16 @@ defmodule Stargate.Producer do @impl WebSockex def handle_cast({:send, payload, ctx, ack}, state) do - Acknowledger.produce( + ack_name = via( state.registry, {:producer_ack, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}", "#{state.topic}"} - ), - ctx, - ack - ) + ) + + Acknowledger.produce(ack_name, ctx, ack) - {:reply, {:text, payload}, state} + {:reply, {:text, payload}, ctx, state} end @impl WebSockex @@ -235,6 +242,19 @@ defmodule Stargate.Producer do {:ok, state} end + @impl Connection + def handle_send_error(reason, ctx, state) do + :ok = + state.registry + |> via( + {:producer_ack, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}", + "#{state.topic}"} + ) + |> Acknowledger.ack({:error, reason, ctx}) + + :ok + end + defp construct_payload(%{"payload" => _payload, "context" => context} = message) do encoded_message = message diff --git a/lib/stargate/producer/acknowledger.ex b/lib/stargate/producer/acknowledger.ex index 49948db..73f094b 100644 --- a/lib/stargate/producer/acknowledger.ex +++ b/lib/stargate/producer/acknowledger.ex @@ -72,7 +72,7 @@ defmodule Stargate.Producer.Acknowledger do send(pid, {ref, :ack}) {module, function, args} -> - apply(module, function, args) + apply(module, function, [:ok | args]) end {:noreply, new_state} @@ -86,8 +86,8 @@ defmodule Stargate.Producer.Acknowledger do {pid, ref} when is_pid(pid) -> send(pid, {ref, :error, reason}) - _mfa -> - Logger.error("Failed to execute produce for reason : #{inspect(reason)}") + {module, function, args} -> + apply(module, function, [{:error, reason} | args]) end {:noreply, new_state} diff --git a/lib/stargate/receiver.ex b/lib/stargate/receiver.ex index f0f63d9..ea75182 100644 --- a/lib/stargate/receiver.ex +++ b/lib/stargate/receiver.ex @@ -9,6 +9,7 @@ defmodule Stargate.Receiver do import Stargate.Supervisor, only: [via: 2] alias Stargate.{Consumer, Reader} alias Stargate.Receiver.Dispatcher + alias Stargate.Connection @typedoc "A string identifier assigned to each message by the cluster" @type message_id :: String.t() @@ -57,7 +58,8 @@ defmodule Stargate.Receiver do :tenant, :namespace, :topic, - :query_params + :query_params, + :backoff_calculator ] end @@ -89,6 +91,7 @@ defmodule Stargate.Receiver do on the received messages. Defaults to 1. * `handler_init_args` is any term that will be passed to the message handler to initialize its state when a stateful handler is desired. Defaults to an empty list. + * `backoff_calculator` See `Stargate.Connection.t:backoff_calculator/0`. * `query_params` is a map containing any or all of the following: # Consumer @@ -122,6 +125,9 @@ defmodule Stargate.Receiver do registry = Keyword.fetch!(args, :registry) query_params_config = Keyword.get(args, :query_params) + backoff_calculator = + Keyword.get(args, :backoff_calculator, Stargate.Connection.default_backoff_calculator()) + query_params = case type do :consumer -> Consumer.QueryParams.build_params(query_params_config) @@ -130,7 +136,8 @@ defmodule Stargate.Receiver do setup_state = %{ registry: registry, - query_params: query_params_config + query_params: query_params_config, + backoff_calculator: backoff_calculator } state = @@ -154,6 +161,19 @@ defmodule Stargate.Receiver do WebSockex.start_link(state.url, __MODULE__, state, server_opts) end + @impl Connection + def handle_connected(state) do + :ok = + state.registry + |> via( + {:dispatcher, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}", + "#{state.topic}"} + ) + |> Dispatcher.connected() + + :ok + end + @impl WebSockex def handle_frame( {:text, msg}, diff --git a/lib/stargate/receiver/dispatcher.ex b/lib/stargate/receiver/dispatcher.ex index 0a0f0ef..8686420 100644 --- a/lib/stargate/receiver/dispatcher.ex +++ b/lib/stargate/receiver/dispatcher.ex @@ -43,6 +43,14 @@ defmodule Stargate.Receiver.Dispatcher do @spec push(GenServer.server(), [raw_message()] | raw_message()) :: :ok def push(dispatcher, messages), do: GenServer.cast(dispatcher, {:push, messages}) + @doc """ + Notify that producer is connected. + This is relevant when using `pull_mode` and `handle_demand` had an error because the + connection wasn't established, likely because it was in a backoff state. + """ + @spec push(GenServer.server(), [raw_message()] | raw_message()) :: :ok + def connected(dispatcher), do: GenServer.cast(dispatcher, :connected) + @doc """ Starts a `Stargate.Receiver.Dispatcher` GenStage process and links it to the calling process. @@ -67,6 +75,7 @@ defmodule Stargate.Receiver.Dispatcher do tenant = Keyword.fetch!(init_args, :tenant) ns = Keyword.fetch!(init_args, :namespace) topic = Keyword.fetch!(init_args, :topic) + registry = Keyword.fetch!(init_args, :registry) pull = case get_in(init_args, [:query_params, :pull_mode]) do @@ -75,13 +84,13 @@ defmodule Stargate.Receiver.Dispatcher do end state = %State{ - registry: Keyword.fetch!(init_args, :registry), + registry: registry, persistence: persistence, tenant: tenant, namespace: ns, topic: topic, pull_mode: pull, - receiver: {:"#{type}", "#{persistence}", "#{tenant}", "#{ns}", "#{topic}"} + receiver: via(registry, {:"#{type}", "#{persistence}", "#{tenant}", "#{ns}", "#{topic}"}) } {:ok, _receiver} = Stargate.Receiver.start_link(init_args) @@ -98,14 +107,22 @@ defmodule Stargate.Receiver.Dispatcher do def handle_cast({:push, message}, state), do: {:noreply, [message], state} @impl GenStage - def handle_demand(demand, %{pull_mode: true} = state) do - receiver = via(state.registry, state.receiver) - + def handle_cast(:connected, %{demand: demand, receiver: receiver} = state) do Stargate.Receiver.pull_permit(receiver, demand) + {:noreply, [], state} + end + @impl GenStage + def handle_cast(:connected, state) do {:noreply, [], state} end + @impl GenStage + def handle_demand(demand, %{pull_mode: true, receiver: receiver} = state) do + Stargate.Receiver.pull_permit(receiver, demand) + {:noreply, [], Map.put(state, :demand, demand)} + end + @impl GenStage def handle_demand(_, state), do: {:noreply, [], state} diff --git a/mix.exs b/mix.exs index 6639733..06a16f4 100644 --- a/mix.exs +++ b/mix.exs @@ -36,7 +36,7 @@ defmodule Stargate.MixProject do {:jason, "~> 1.2"}, {:plug_cowboy, "~> 2.5", only: [:test, :integration]}, {:puid, "~> 1.1.1"}, - {:websockex, "~> 0.4.3"}, + {:websockex, github: "BrendanBall/websockex", branch: "backoff_reconnect"}, {:credo, "~> 1.5", only: [:dev, :test], runtime: false} ] end diff --git a/mix.lock b/mix.lock index 3011e72..c1210ba 100644 --- a/mix.lock +++ b/mix.lock @@ -1,8 +1,8 @@ %{ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, - "cowboy": {:hex, :cowboy, "2.8.0", "f3dc62e35797ecd9ac1b50db74611193c29815401e53bac9a5c0577bd7bc667d", [:rebar3], [{:cowlib, "~> 2.9.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "4643e4fba74ac96d4d152c75803de6fad0b3fa5df354c71afdd6cbeeb15fac8a"}, - "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"}, - "cowlib": {:hex, :cowlib, "2.9.1", "61a6c7c50cf07fdd24b2f45b89500bb93b6686579b069a89f88cb211e1125c78", [:rebar3], [], "hexpm", "e4175dc240a70d996156160891e1c62238ede1729e45740bdd38064dad476170"}, + "cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"}, + "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, + "cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"}, "credo": {:hex, :credo, "1.5.5", "e8f422026f553bc3bebb81c8e8bf1932f498ca03339856c7fec63d3faac8424b", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dd8623ab7091956a855dc9f3062486add9c52d310dfd62748779c4315d8247de"}, "crypto_rand": {:hex, :crypto_rand, "1.0.2", "e1381d3fbe743cfb6082150f4f95dc7ac3fcaf5b53dae0fc96f7e0f006d91dd1", [:mix], [], "hexpm", "ca0eebc811c4496446c4d32e18e3888d7cf8fb1f441af015578f3523b1e35be8"}, "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, @@ -18,14 +18,14 @@ "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, "makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, - "mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"}, + "mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"}, "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, "patiently": {:hex, :patiently, "0.2.0", "67eb139591e10c4b363ae0198e832552f191c58894731efd3bf124ec4722267a", [:mix], [], "hexpm", "c08cc5edc27def565647a9b55a0bea8025a5f81a4472e57692f28f2292c44c94"}, - "plug": {:hex, :plug, "1.11.1", "f2992bac66fdae679453c9e86134a4201f6f43a687d8ff1cd1b2862d53c80259", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "23524e4fefbb587c11f0833b3910bfb414bf2e2534d61928e920f54e3a1b881f"}, - "plug_cowboy": {:hex, :plug_cowboy, "2.5.0", "51c998f788c4e68fc9f947a5eba8c215fbb1d63a520f7604134cab0270ea6513", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5b2c8925a5e2587446f33810a58c01e66b3c345652eeec809b76ba007acde71a"}, + "plug": {:hex, :plug, "1.13.6", "187beb6b67c6cec50503e940f0434ea4692b19384d47e5fdfd701e93cadb4cc2", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "02b9c6b9955bce92c829f31d6284bf53c591ca63c4fb9ff81dfd0418667a34ff"}, + "plug_cowboy": {:hex, :plug_cowboy, "2.5.2", "62894ccd601cf9597e2c23911ff12798a8a18d237e9739f58a6b04e4988899fe", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "ea6e87f774c8608d60c8d34022a7d073bd7680a0a013f049fc62bf35efea1044"}, "plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"}, "puid": {:hex, :puid, "1.1.1", "e1155e141fe21141dba74a840b0a6b01336f972f08e9ce8fa489ffcd48ba9fa6", [:mix], [{:crypto_rand, "~> 1.0", [hex: :crypto_rand, repo: "hexpm", optional: false]}], "hexpm", "fc271ee494abf54b9cb896b30648c49f2f6e8112cf4990b7e3a92ca871be3338"}, - "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"}, - "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, - "websockex": {:hex, :websockex, "0.4.3", "92b7905769c79c6480c02daacaca2ddd49de936d912976a4d3c923723b647bf0", [:mix], [], "hexpm", "95f2e7072b85a3a4cc385602d42115b73ce0b74a9121d0d6dbbf557645ac53e4"}, + "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, + "telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"}, + "websockex": {:git, "https://github.com/BrendanBall/websockex.git", "2e124d89eaa8ef44a9054076b599121939dcfe91", [branch: "backoff_reconnect"]}, } diff --git a/test/integration/producer_receiver_test.exs b/test/integration/producer_receiver_test.exs index fca9236..da171b5 100644 --- a/test/integration/producer_receiver_test.exs +++ b/test/integration/producer_receiver_test.exs @@ -103,6 +103,10 @@ defmodule Stargate.ProducerReceiverTest do consumer: consumer_opts ) + # Connections are established asynchronously to the process startup + # And subscriptions only receive messages from when they were created. + Process.sleep(100) + Stargate.registry_key(tenant, namespace, topic) |> Stargate.produce(input) @@ -153,6 +157,10 @@ defmodule Stargate.ProducerReceiverTest do ] ) + # Connections are established asynchronously to the process startup + # And subscriptions only receive messages from when they were created. + Process.sleep(100) + Stargate.registry_key(tenant, namespace, topic) |> Stargate.produce(input) diff --git a/test/support/mock_consumer.ex b/test/support/mock_consumer.ex index c647499..0cd3206 100644 --- a/test/support/mock_consumer.ex +++ b/test/support/mock_consumer.ex @@ -13,7 +13,9 @@ defmodule MockConsumer do source: Keyword.get(init_args, :source) } - {:consumer, state, subscribe_to: [{state.producer, []}]} + max_demand = Keyword.get(init_args, :max_demand, 5) + + {:consumer, state, subscribe_to: [{state.producer, [max_demand: max_demand]}]} end def handle_events(messages, _from, state) do diff --git a/test/support/mock_socket.ex b/test/support/mock_socket.ex index b2aedf9..8fe3834 100644 --- a/test/support/mock_socket.ex +++ b/test/support/mock_socket.ex @@ -64,6 +64,10 @@ defmodule MockSocket do {:reply, {:text, message}, %{state | count: count + 1}} end + def websocket_handle({:text, "stop"}, state) do + {:stop, state} + end + def websocket_handle( {:text, "{\"type\":\"permit" <> _permits = message}, %{source: pid} = state @@ -134,19 +138,53 @@ defmodule SampleClient do use Stargate.Connection def cast(message), do: WebSockex.cast(__MODULE__, {:send, message}) + def cast(message, ref), do: WebSockex.cast(__MODULE__, {:send, message, ref}) def ping_socket, do: send(__MODULE__, :send_ping) def start_link(init_args) do port = Keyword.get(init_args, :port) path = Keyword.get(init_args, :path) - WebSockex.start_link("http://localhost:#{port}/#{path}", __MODULE__, %{}, name: __MODULE__) + test_pid = self() + + backoff_calculator = + Keyword.get( + init_args, + :backoff_calculator, + Stargate.Connection.default_backoff_calculator() + ) + + url = "http://localhost:#{port}/#{path}" + + state = %{ + backoff_calculator: backoff_calculator, + test_pid: test_pid, + url: url + } + + WebSockex.start_link(url, __MODULE__, state, name: __MODULE__) end def handle_cast({:send, payload}, state) do {:reply, {:text, payload}, state} end + def handle_cast({:send, payload, ref}, state) do + {:reply, {:text, payload}, ref, state} + end + def handle_frame({:text, _message}, state) do {:ok, state} end + + @impl Stargate.Connection + def handle_connected(%{test_pid: pid}) do + send(pid, :client_connected) + :ok + end + + @impl Stargate.Connection + def handle_send_error(reason, ref, %{test_pid: pid}) do + send(pid, {:received_send_error, reason, ref}) + :ok + end end diff --git a/test/unit/stargate/connection_test.exs b/test/unit/stargate/connection_test.exs index abfbaee..dce8f78 100644 --- a/test/unit/stargate/connection_test.exs +++ b/test/unit/stargate/connection_test.exs @@ -5,16 +5,23 @@ defmodule Stargate.ConnectionTest do port = Enum.random(49_152..65_535) {:ok, server} = MockSocket.Supervisor.start_link(port: port, path: "ws_test", source: self()) - {:ok, client} = SampleClient.start_link(port: port, path: "ws_test") on_exit(fn -> - Enum.map([server, client], &kill/1) + Enum.map([server], &kill/1) end) [port: port] end describe "connection macro" do + setup %{port: port} do + {:ok, client} = SampleClient.start_link(port: port, path: "ws_test") + + on_exit(fn -> + Enum.map([client], &kill/1) + end) + end + test "establishes a socket connection" do :ok = SampleClient.cast(Jason.encode!(%{"context" => "connection test", "messageId" => "1"})) @@ -78,6 +85,79 @@ defmodule Stargate.ConnectionTest do end end + describe "backoff" do + defmodule Backoff do + def calc(attempt, :test) do + 1_000 * attempt + end + end + + test "calculate_backoff default calculator" do + assert 2_000 = + Stargate.Connection.default_backoff_calculator() + |> Stargate.Connection.calculate_backoff(1) + end + + test "calculate_backoff MFA" do + mfa = {Stargate.ConnectionTest.Backoff, :calc, [:test]} + assert 1_000 = Stargate.Connection.calculate_backoff(mfa, 1) + assert 2_000 = Stargate.Connection.calculate_backoff(mfa, 2) + end + + test "calculate_backoff function" do + fun = fn attempt -> 1_000 * attempt end + assert 1_000 = Stargate.Connection.calculate_backoff(fun, 1) + assert 2_000 = Stargate.Connection.calculate_backoff(fun, 2) + end + + test "calculate_backoff invalid function" do + fun = fn -> 1_000 end + assert_raise ArgumentError, fn -> Stargate.Connection.calculate_backoff(fun, 1) end + end + + test "sample client", %{port: port} do + test_pid = self() + + backoff_calc = fn _ -> + send(test_pid, :calculate_backoff) + 10 + end + + {:ok, _client} = + SampleClient.start_link(port: port, path: "ws_test", backoff_calculator: backoff_calc) + + assert_receive :client_connected + + SampleClient.cast("stop") + + assert_receive :calculate_backoff + assert_receive :client_connected + end + end + + describe "handle_send_error" do + test "sending message during backoff results in an error", %{port: port} do + test_pid = self() + + backoff_calc = fn _ -> + send(test_pid, :calculate_backoff) + :infinity + end + + {:ok, _client} = + SampleClient.start_link(port: port, path: "ws_test", backoff_calculator: backoff_calc) + + assert_receive :client_connected + + SampleClient.cast("stop") + assert_receive :calculate_backoff + + ref = make_ref() + SampleClient.cast("test", ref) + assert_receive {:received_send_error, :not_connected, ^ref} + end + end + defp kill(pid) do ref = Process.monitor(pid) Process.exit(pid, :normal) diff --git a/test/unit/stargate/producer/acknowledger_test.exs b/test/unit/stargate/producer/acknowledger_test.exs index 7119b7d..79c6811 100644 --- a/test/unit/stargate/producer/acknowledger_test.exs +++ b/test/unit/stargate/producer/acknowledger_test.exs @@ -1,6 +1,5 @@ defmodule Stargate.Producer.AcknowledgerTest do use ExUnit.Case - import ExUnit.CaptureLog alias Stargate.Producer.Acknowledger, as: ProdAcknowledger @@ -43,34 +42,36 @@ defmodule Stargate.Producer.AcknowledgerTest do end describe "asynchronous ack" do + defmodule Ack do + def ack(res, pid, msg) do + send(pid, {res, msg}) + end + end + test "tracks a produce and executes the saved function", %{acknowledger: acknowledger} do :ok = ProdAcknowledger.produce( acknowledger, "123", - {Kernel, :send, [self(), "async_ack"]} + {Stargate.Producer.AcknowledgerTest.Ack, :ack, [self(), "async_ack"]} ) :ok = ProdAcknowledger.ack(acknowledger, {:ack, "123"}) - assert_receive "async_ack" + assert_receive {:ok, "async_ack"} end test "logs errors when they occur during async ack", %{acknowledger: acknowledger} do - ack_function = fn -> - :ok = - ProdAcknowledger.produce( - acknowledger, - "234", - {Kernel, :send, [self(), "async_ack_error"]} - ) - - :ok = ProdAcknowledger.ack(acknowledger, {:error, "oh nooo", "234"}) + :ok = + ProdAcknowledger.produce( + acknowledger, + "234", + {Stargate.Producer.AcknowledgerTest.Ack, :ack, [self(), "async_ack_error"]} + ) - Process.sleep(10) - end + :ok = ProdAcknowledger.ack(acknowledger, {:error, "oh nooo", "234"}) - assert capture_log(ack_function) =~ "Failed to execute produce for reason : \"oh nooo\"" + assert_receive {{:error, "oh nooo"}, "async_ack_error"} end end diff --git a/test/unit/stargate/producer_test.exs b/test/unit/stargate/producer_test.exs index 727c190..a4e5d89 100644 --- a/test/unit/stargate/producer_test.exs +++ b/test/unit/stargate/producer_test.exs @@ -33,12 +33,12 @@ defmodule Stargate.ProducerTest do component: :producer ) - :ok = - Stargate.produce(producer, %{ - "payload" => "helloooo", - "context" => "123", - "properties" => %{"key" => "value"} - }) + assert :ok = + Stargate.produce(producer, %{ + "payload" => "helloooo", + "context" => "123", + "properties" => %{"key" => "value"} + }) assert_receive {:received_frame, "123, {\"context\":\"123\",\"payload\":\"aGVsbG9vb28=\",\"properties\":{\"key\":\"value\"}} loud and clear"} @@ -73,6 +73,31 @@ defmodule Stargate.ProducerTest do "123, {\"context\":\"123\",\"payload\":\"aGVsbG9vb28=\",\"properties\":{\"error\":\"something went wrong\"}} loud and clear"} end + test "produce fails because socket is not connected", %{port: port} do + opts = [ + registry: :sg_reg_producer, + host: [localhost: port + 1], + tenant: "default", + namespace: "public", + topic: "foobar" + ] + + {:ok, _registry} = Registry.start_link(keys: :unique, name: :sg_reg_producer) + {:ok, _} = Stargate.Producer.Supervisor.start_link(opts) + + producer = + Stargate.registry_key(opts[:tenant], opts[:namespace], opts[:topic], + registry: opts[:registry], + component: :producer + ) + + {:error, :not_connected} = + Stargate.produce(producer, %{ + "payload" => "helloooo", + "context" => "123" + }) + end + test "produces via one-off producer", %{port: port} do url = "ws://localhost:#{port}/ws/v2/producer/persistent/default/public/foobar" @@ -125,6 +150,12 @@ defmodule Stargate.ProducerTest do end describe "produce/3" do + defmodule Ack do + def ack(res, pid, msg) do + send(pid, {res, msg}) + end + end + test "handles produce acking out of band", %{port: port} do opts = [ name: "produce_3_test", @@ -147,16 +178,56 @@ defmodule Stargate.ProducerTest do ) test = self() + context = "123" + + spawn(fn -> + Stargate.produce( + producer, + %{"context" => context, "payload" => "message"}, + {Stargate.ProducerTest.Ack, :ack, [test, context]} + ) + end) + + assert_receive {:ok, "123"}, 1_000 + + assert_receive {:received_frame, + "123, {\"context\":\"123\",\"payload\":\"bWVzc2FnZQ==\"} loud and clear"}, + 1_000 + end + + test "handles produce acking out of band error", %{port: port} do + opts = [ + name: "produce_3_test", + host: [localhost: port + 1], + protocol: "ws", + producer: [ + persistence: "persistent", + tenant: "default", + namespace: "public", + topic: "foobar" + ] + ] + + {:ok, _supervisor} = Stargate.Supervisor.start_link(opts) + + [{producer, _}] = + Registry.lookup( + :sg_reg_produce_3_test, + {:producer, "persistent", "default", "public", "foobar"} + ) + + test = self() + context = "123" spawn(fn -> Stargate.produce( producer, - %{"context" => "123", "payload" => "message"}, - {Kernel, :send, [test, "message received"]} + %{"context" => context, "payload" => "message"}, + {Stargate.ProducerTest.Ack, :ack, [test, context]} ) end) - assert_receive "message received", 1_000 + assert_receive {{:error, :not_connected}, ^context}, 1_000 end end diff --git a/test/unit/stargate/receiver_test.exs b/test/unit/stargate/receiver_test.exs index 1267a74..82470ed 100644 --- a/test/unit/stargate/receiver_test.exs +++ b/test/unit/stargate/receiver_test.exs @@ -25,19 +25,28 @@ defmodule Stargate.ReceiverTest do {:ok, registry} = Registry.start_link(keys: :unique, name: :"sg_reg_#{reg_name}") {:ok, server} = MockSocket.Supervisor.start_link(port: port, path: path, source: self()) - {:ok, dispatcher} = Dispatcher.start_link(opts) - {:ok, consumer} = MockConsumer.start_link(producer: dispatcher, source: self()) receiver = Stargate.registry_key(tenant, ns, topic, component: type, name: reg_name) on_exit(fn -> - Enum.map([registry, server, dispatcher, consumer], &kill/1) + Enum.map([registry, server], &kill/1) end) - [receiver: receiver] + [opts: opts, receiver: receiver] end describe "handle_frame" do + setup %{opts: opts} do + {:ok, dispatcher} = Dispatcher.start_link(opts) + {:ok, consumer} = MockConsumer.start_link(producer: dispatcher, source: self()) + + on_exit(fn -> + Enum.map([dispatcher, consumer], &kill/1) + end) + + [] + end + test "receives messages from the socket", %{receiver: receiver} do Enum.each(0..2, fn _ -> WebSockex.send_frame(receiver, {:text, "push_message"}) end) @@ -48,6 +57,17 @@ defmodule Stargate.ReceiverTest do end describe "ack/2" do + setup %{opts: opts} do + {:ok, dispatcher} = Dispatcher.start_link(opts) + {:ok, consumer} = MockConsumer.start_link(producer: dispatcher, source: self()) + + on_exit(fn -> + Enum.map([dispatcher, consumer], &kill/1) + end) + + [] + end + test "sends receive acks to the socket", %{receiver: receiver} do Enum.map(["ack1", "ack2", "ack3"], &Stargate.Receiver.ack(receiver, &1)) @@ -57,8 +77,29 @@ defmodule Stargate.ReceiverTest do end end - describe "pull_permit/2" do - test "sends permit requests through the socket", %{receiver: receiver} do + describe "pull mode" do + setup %{opts: opts} do + opts = + Keyword.merge(opts, + backoff_calculator: fn _ -> 0 end, + query_params: %{ + pull_mode: true + } + ) + + {:ok, dispatcher} = Dispatcher.start_link(opts) + + {:ok, consumer} = + MockConsumer.start_link(producer: dispatcher, source: self(), max_demand: 1) + + on_exit(fn -> + Enum.map([dispatcher, consumer], &kill/1) + end) + + [] + end + + test "pull_permit sends permit requests through the socket", %{receiver: receiver} do Stargate.Receiver.pull_permit(receiver, 10) assert_receive {:permit_request, "permitting 10 messages"} @@ -68,6 +109,49 @@ defmodule Stargate.ReceiverTest do Stargate.Receiver.pull_permit(receiver, 100) assert_receive {:permit_request, "permitting 100 messages"} end + + test "continue on reconnect", %{receiver: receiver} do + assert_receive {:permit_request, "permitting 1 messages"} + WebSockex.send_frame(receiver, {:text, "push_message"}) + + assert_receive {:event_received, ["consumer message 0"]} + + :ok = WebSockex.send_frame(receiver, {:text, "stop"}) + + Process.sleep(100) + + assert_receive {:permit_request, "permitting 1 messages"} + + WebSockex.send_frame(receiver, {:text, "push_message"}) + assert_receive {:event_received, ["consumer message 0"]} + end + end + + describe "push mode" do + setup %{opts: opts} do + opts = Keyword.merge(opts, backoff_calculator: fn _ -> 0 end) + {:ok, dispatcher} = Dispatcher.start_link(opts) + {:ok, consumer} = MockConsumer.start_link(producer: dispatcher, source: self()) + + on_exit(fn -> + Enum.map([dispatcher, consumer], &kill/1) + end) + + [] + end + + test "continue on reconnect", %{receiver: receiver} do + WebSockex.send_frame(receiver, {:text, "push_message"}) + + assert_receive {:event_received, ["consumer message 0"]} + + :ok = WebSockex.send_frame(receiver, {:text, "stop"}) + + Process.sleep(100) + + WebSockex.send_frame(receiver, {:text, "push_message"}) + assert_receive {:event_received, ["consumer message 0"]} + end end defp kill(pid) do