diff --git a/lib/redix/pubsub.ex b/lib/redix/pubsub.ex index 1f9dce6..ba9226e 100644 --- a/lib/redix/pubsub.ex +++ b/lib/redix/pubsub.ex @@ -329,6 +329,17 @@ defmodule Redix.PubSub do :gen_statem.call(conn, {:subscribe, List.wrap(channels), subscriber}) end + @doc """ + Sends a ping and waits for a reply from the server. + + Upon a successful reply from the server, :ok will be returned. If no reply is received within 5 seconds, :error will be returned. + + """ + @spec ping(connection()) :: :ok | :error + def ping(conn) do + :gen_statem.call(conn, :ping) + end + @doc """ Subscribes `subscriber` to the given pattern or list of patterns. diff --git a/lib/redix/pubsub/connection.ex b/lib/redix/pubsub/connection.ex index b5c0037..c833dbe 100644 --- a/lib/redix/pubsub/connection.ex +++ b/lib/redix/pubsub/connection.ex @@ -15,7 +15,8 @@ defmodule Redix.PubSub.Connection do :connected_address, :client_id, subscriptions: %{}, - monitors: %{} + monitors: %{}, + pings: [] ] @backoff_exponent 1.5 @@ -161,6 +162,29 @@ defmodule Redix.PubSub.Connection do {:keep_state, data} end + def disconnected(:info, {:ping_callback, random_string}, data) do + data = + case List.first(data.pings) do + nil -> + # already processed + data + + ping -> + case ping.random_string do + ^random_string -> + :ok = :gen_statem.reply(ping.from, :error) + [_ping | pings] = data.pings + Map.put(data, :pings, pings) + + _ -> + # already processed + data + end + end + + {:keep_state, data} + end + def disconnected({:call, from}, {operation, targets, pid}, data) when operation in [:unsubscribe, :punsubscribe] do :ok = :gen_statem.reply(from, :ok) @@ -194,6 +218,11 @@ defmodule Redix.PubSub.Connection do {:keep_state, data} end + def disconnected({:call, from}, :ping, _data) do + reply = :error + {:keep_state_and_data, {:reply, from, reply}} + end + def disconnected({:call, from}, :get_client_id, _data) do reply = {:error, %ConnectionError{reason: :closed}} {:keep_state_and_data, {:reply, from, reply}} @@ -230,6 +259,19 @@ defmodule Redix.PubSub.Connection do end end + def connected({:call, from}, :ping, data) do + random_string = :crypto.strong_rand_bytes(12) |> Base.encode64() + pipeline = [["PING", "ping:#{random_string}"]] + data.transport.send(data.socket, Enum.map(pipeline, &Protocol.pack/1)) + + Process.send_after(self(), {:ping_callback, random_string}, 5000) + + pings = data.pings ++ [%{random_string: random_string, from: from}] + data = Map.put(data, :pings, pings) + + {:keep_state, data} + end + def connected({:call, from}, :get_client_id, data) do reply = if id = data.client_id do @@ -241,6 +283,29 @@ defmodule Redix.PubSub.Connection do {:keep_state_and_data, {:reply, from, reply}} end + def connected(:info, {:ping_callback, random_string}, data) do + data = + case List.first(data.pings) do + nil -> + # already processed + data + + ping -> + case ping.random_string do + ^random_string -> + :ok = :gen_statem.reply(ping.from, :error) + [_ping | pings] = data.pings + Map.put(data, :pings, pings) + + _ -> + # already processed + data + end + end + + {:keep_state, data} + end + def connected(:info, {transport_closed, socket}, %__MODULE__{socket: socket} = data) when transport_closed in [:tcp_closed, :ssl_closed] do disconnect(data, transport_closed, _handle_disconnection? = true) @@ -351,6 +416,25 @@ defmodule Redix.PubSub.Connection do handle_pubsub_message_with_payload(data, {:pattern, pattern}, :pmessage, properties) end + defp handle_pubsub_msg(data, "ping:" <> _rest = reply) do + handle_pubsub_msg(data, ["pong", reply]) + end + + defp handle_pubsub_msg(data, ["pong", "ping:" <> reply]) do + [ping | pings] = data.pings + data = Map.put(data, :pings, pings) + + res = + case ping.random_string do + ^reply -> :ok + _ -> :error + end + + :ok = :gen_statem.reply(ping.from, res) + + {:ok, data} + end + defp handle_pubsub_message_with_payload(data, target_key, kind, properties) do case data.subscriptions[target_key] do {:subscribed, subscribers} -> diff --git a/test/redix/pubsub_test.exs b/test/redix/pubsub_test.exs index e60ddce..ba9b399 100644 --- a/test/redix/pubsub_test.exs +++ b/test/redix/pubsub_test.exs @@ -365,6 +365,29 @@ defmodule Redix.PubSubTest do refute new_state.continuation end + test "pubsub: multiple pings succeeds", %{pubsub: pubsub} do + assert :ok = PubSub.ping(pubsub) + assert :ok = PubSub.ping(pubsub) + end + + test "pubsub: ping gives error when disconnected", %{pubsub: pubsub} do + {:connected, state} = :sys.get_state(pubsub) + socket = state.socket + + send(pubsub, {:tcp_closed, socket}) + assert {:disconnected, _new_state} = :sys.get_state(pubsub) + + assert :error = PubSub.ping(pubsub) + end + + test "pubsub: multiple pings succeeds when subscribed to channel", %{pubsub: pubsub} do + assert {:ok, ref} = PubSub.subscribe(pubsub, "foo", self()) + assert_receive {:redix_pubsub, ^pubsub, ^ref, :subscribed, %{channel: "foo"}}, 1000 + + assert :ok = PubSub.ping(pubsub) + assert :ok = PubSub.ping(pubsub) + end + defp wait_until_passes(timeout, fun) when timeout <= 0 do fun.() end