Skip to content

Commit

Permalink
Add backoff and reconnect support
Browse files Browse the repository at this point in the history
Currently Stargate prevents the application from starting up
or causes it to crash if pulsar is or becomes unavailable.
This is a result of how WebSockex is implemented.
This builds on a refactor of WebSockex at
Azolo/websockex#112
which implements connection management async to process startup.
This makes Stargate more production ready since it will allow
an application to degrade gracefully when pulsar isn't available
temporarily.

Main changes as a result of this:
1. Reconnect backoff feature is added to be customized by the user
2. Since Stargate will continue running even when not connected to
   Pulsar, the user needs to know that messages aren't being produced
successfully.
A breaking change was made to the async ACK MFA where the
first argument is now the result of the produce, which may either be
`:ok` or `{:error, reason}`.
  • Loading branch information
Brendan Ball committed Jun 24, 2022
1 parent 3c33e0a commit 192ca4d
Show file tree
Hide file tree
Showing 14 changed files with 538 additions and 68 deletions.
141 changes: 135 additions & 6 deletions lib/stargate/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ defmodule Stargate.Connection do
topic: String.t()
}

@typedoc """
Calculates custom connection backoff when unable to connect to Pulsar.
Can be used to implement exponential backoff.
Note that for an MFA tuple, the arity of the function should be `length(args) + 1` since the first
arg will be the attempt.
"""
@type backoff_calculator ::
(attempt :: non_neg_integer() -> timeout()) | {module(), fun :: atom(), args :: list()}

@callback handle_send_error(reason :: term, ref :: term, state :: term) :: :ok
@callback handle_connected(state :: term) :: :ok

@doc """
The Connection using macro provides the common websocket connection
and keepalive functionality into a single line for replicating connection
Expand All @@ -24,15 +36,22 @@ defmodule Stargate.Connection do
quote do
use WebSockex

@behaviour Stargate.Connection
@ping_interval 30_000

require Logger

@impl WebSockex
def handle_connect(_conn, state) do
:timer.send_interval(30_000, :send_ping)
{:ok, state}
tref = Process.send_after(self(), :send_ping, @ping_interval)
:ok = apply(__MODULE__, :handle_connected, [state])
{:ok, Map.put(state, :ping_timer_ref, tref)}
end

@impl WebSockex
def handle_info(:send_ping, state) do
{:reply, :ping, state}
tref = Process.send_after(self(), :send_ping, @ping_interval)
{:reply, :ping, Map.put(state, :ping_timer_ref, tref)}
end

@impl WebSockex
Expand All @@ -44,6 +63,91 @@ defmodule Stargate.Connection do
def handle_pong(_pong_frame, state) do
{:ok, state}
end

@impl WebSockex
def handle_disconnect(
%{reason: reason, attempt_number: attempt},
%{backoff_calculator: backoff_calculator} = state
) do
case Map.get(state, :ping_timer_ref) do
nil ->
:ok

ref ->
Process.cancel_timer(ref)
:ok
end

backoff = Stargate.Connection.calculate_backoff(backoff_calculator, attempt)

Logger.warning(
"[Stargate] disconnected",
reason: inspect(reason),
url: state.url,
attempt: attempt,
backoff_ms: backoff
)

{:backoff, backoff, state}
end

@impl WebSockex
def handle_send_result(:ok, _frame, _ref, state) do
{:ok, state}
end

@impl WebSockex
def handle_send_result({:error, reason}, :ping, _ref, state) do
Logger.warning("[Stargate] error sending ping", reason: inspect(reason))
{:close, state}
end

@impl WebSockex
def handle_send_result({:error, %WebSockex.ConnError{} = reason}, frame, ref, state) do
Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url)

:ok = apply(__MODULE__, :handle_send_error, [:not_connected, ref, state])
{:close, state}
end

@impl WebSockex
def handle_send_result({:error, %WebSockex.NotConnectedError{} = reason}, frame, ref, state) do
Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url)

:ok = apply(__MODULE__, :handle_send_error, [:not_connected, ref, state])
{:ok, state}
end

@impl WebSockex
def handle_send_result({:error, reason}, frame, ref, state) do
Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url)

:ok = apply(__MODULE__, :handle_send_error, [:unknown, ref, state])
{:ok, state}
end

@impl WebSockex
def terminate({reason, stacktrace}, state) when is_exception(reason) do
Logger.error(Exception.format(:error, reason, stacktrace))
end

@impl WebSockex
def terminate(_reason, _state) do
:ok
end

@impl Stargate.Connection
def handle_send_error(reason, ref, state) do
:ok
end

@impl Stargate.Connection
def handle_connected(state) do
:ok
end

defoverridable handle_send_error: 3,
handle_connected: 1
end
end

Expand All @@ -69,9 +173,7 @@ defmodule Stargate.Connection do
end

base_url =
"#{protocol}://#{host}/ws/v2/#{type}/#{persistence}/#{tenant}/#{namespace}/#{topic}#{
subscription
}"
"#{protocol}://#{host}/ws/v2/#{type}/#{persistence}/#{tenant}/#{namespace}/#{topic}#{subscription}"

url =
case params do
Expand Down Expand Up @@ -102,6 +204,33 @@ defmodule Stargate.Connection do
|> Enum.map(&transform_auth/1)
end

@doc false
@spec calculate_backoff(attempt :: non_neg_integer()) :: timeout()
def calculate_backoff(_attempt) do
2_000
end

@doc false
@spec calculate_backoff(backoff_calculator(), attempt :: non_neg_integer()) :: timeout()
def calculate_backoff(calc, attempt) do
case calc do
{module, function, args} ->
apply(module, function, [attempt | args])

fun when is_function(fun, 1) ->
fun.(attempt)

_ ->
raise ArgumentError,
"Backoff calculator does not conform to spec of {module, function, args} or fun/1"
end
end

@doc false
def default_backoff_calculator() do
{Stargate.Connection, :calculate_backoff, []}
end

defp transform_auth({:ssl_options, _opts} = ssl_opts), do: ssl_opts

defp transform_auth({:auth_token, token}) do
Expand Down
36 changes: 28 additions & 8 deletions lib/stargate/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Stargate.Producer do
use Puid
import Stargate.Supervisor, only: [via: 2]
alias Stargate.Producer.{Acknowledger, QueryParams}
alias Stargate.Connection

@typedoc """
A URL defining the host and topic to which a Stargate producer can
Expand Down Expand Up @@ -99,7 +100,8 @@ defmodule Stargate.Producer do
When calling `produce/3` the third argument must be an MFA tuple which is used by
the producer's acknowledger process to asynchronously perform acknowledgement that the
message was received by the cluster successfully. This is used to avoid blocking the
calling process for performance reasons.
calling process for performance reasons. The result of the produce is added as a first
argument when calling the MFA tuple which is either `:ok` or `{:error, reason}`.
"""
@spec produce(producer(), message() | [message()], {module(), atom(), [term()]}) ::
:ok | {:error, term()}
Expand Down Expand Up @@ -130,7 +132,8 @@ defmodule Stargate.Producer do
:tenant,
:namespace,
:topic,
:query_params
:query_params,
:backoff_calculator
]
end

Expand All @@ -153,6 +156,7 @@ defmodule Stargate.Producer do
* `persistence` can be one of "persistent" or "non-persistent" per the Pulsar
specification of topics as being in-memory only or persisted to the brokers' disks.
Defaults to "persistent".
* `backoff_calculator` See `Stargate.Connection.t:backoff_calculator/0`.
* `query_params` is a map containing any or all of the following:
* `send_timeout` the time at which a produce operation will time out; defaults to 30 seconds
Expand All @@ -178,11 +182,15 @@ defmodule Stargate.Producer do
query_params = QueryParams.build_params(query_params_config)
registry = Keyword.fetch!(args, :registry)

backoff_calculator =
Keyword.get(args, :backoff_calculator, Stargate.Connection.default_backoff_calculator())

state =
args
|> Stargate.Connection.connection_settings(:producer, query_params)
|> Map.put(:query_params, query_params_config)
|> Map.put(:registry, registry)
|> Map.put(:backoff_calculator, backoff_calculator)
|> (fn fields -> struct(State, fields) end).()

server_opts =
Expand All @@ -202,17 +210,16 @@ defmodule Stargate.Producer do

@impl WebSockex
def handle_cast({:send, payload, ctx, ack}, state) do
Acknowledger.produce(
ack_name =
via(
state.registry,
{:producer_ack, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}",
"#{state.topic}"}
),
ctx,
ack
)
)

Acknowledger.produce(ack_name, ctx, ack)

{:reply, {:text, payload}, state}
{:reply, {:text, payload}, ctx, state}
end

@impl WebSockex
Expand All @@ -235,6 +242,19 @@ defmodule Stargate.Producer do
{:ok, state}
end

@impl Connection
def handle_send_error(reason, ctx, state) do
:ok =
state.registry
|> via(
{:producer_ack, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}",
"#{state.topic}"}
)
|> Acknowledger.ack({:error, reason, ctx})

:ok
end

defp construct_payload(%{"payload" => _payload, "context" => context} = message) do
encoded_message =
message
Expand Down
6 changes: 3 additions & 3 deletions lib/stargate/producer/acknowledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ defmodule Stargate.Producer.Acknowledger do
send(pid, {ref, :ack})

{module, function, args} ->
apply(module, function, args)
apply(module, function, [:ok | args])
end

{:noreply, new_state}
Expand All @@ -86,8 +86,8 @@ defmodule Stargate.Producer.Acknowledger do
{pid, ref} when is_pid(pid) ->
send(pid, {ref, :error, reason})

_mfa ->
Logger.error("Failed to execute produce for reason : #{inspect(reason)}")
{module, function, args} ->
apply(module, function, [{:error, reason} | args])
end

{:noreply, new_state}
Expand Down
24 changes: 22 additions & 2 deletions lib/stargate/receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Stargate.Receiver do
import Stargate.Supervisor, only: [via: 2]
alias Stargate.{Consumer, Reader}
alias Stargate.Receiver.Dispatcher
alias Stargate.Connection

@typedoc "A string identifier assigned to each message by the cluster"
@type message_id :: String.t()
Expand Down Expand Up @@ -57,7 +58,8 @@ defmodule Stargate.Receiver do
:tenant,
:namespace,
:topic,
:query_params
:query_params,
:backoff_calculator
]
end

Expand Down Expand Up @@ -89,6 +91,7 @@ defmodule Stargate.Receiver do
on the received messages. Defaults to 1.
* `handler_init_args` is any term that will be passed to the message handler to initialize
its state when a stateful handler is desired. Defaults to an empty list.
* `backoff_calculator` See `Stargate.Connection.t:backoff_calculator/0`.
* `query_params` is a map containing any or all of the following:
# Consumer
Expand Down Expand Up @@ -122,6 +125,9 @@ defmodule Stargate.Receiver do
registry = Keyword.fetch!(args, :registry)
query_params_config = Keyword.get(args, :query_params)

backoff_calculator =
Keyword.get(args, :backoff_calculator, Stargate.Connection.default_backoff_calculator())

query_params =
case type do
:consumer -> Consumer.QueryParams.build_params(query_params_config)
Expand All @@ -130,7 +136,8 @@ defmodule Stargate.Receiver do

setup_state = %{
registry: registry,
query_params: query_params_config
query_params: query_params_config,
backoff_calculator: backoff_calculator
}

state =
Expand All @@ -154,6 +161,19 @@ defmodule Stargate.Receiver do
WebSockex.start_link(state.url, __MODULE__, state, server_opts)
end

@impl Connection
def handle_connected(state) do
:ok =
state.registry
|> via(
{:dispatcher, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}",
"#{state.topic}"}
)
|> Dispatcher.connected()

:ok
end

@impl WebSockex
def handle_frame(
{:text, msg},
Expand Down
Loading

0 comments on commit 192ca4d

Please sign in to comment.