Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ping command for PubSub connections #274

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/redix/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ defmodule Redix.PubSub do
:gen_statem.call(conn, {:subscribe, List.wrap(channels), subscriber})
end

@doc """
Sends a ping and waits for a reply from the server.

Upon a successful reply from the server, :ok will be returned. If no reply is received within 5 seconds, :error will be returned.

"""
@spec ping(connection()) :: :ok | :error
def ping(conn) do
:gen_statem.call(conn, :ping)
end

@doc """
Subscribes `subscriber` to the given pattern or list of patterns.

Expand Down
86 changes: 85 additions & 1 deletion lib/redix/pubsub/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule Redix.PubSub.Connection do
:connected_address,
:client_id,
subscriptions: %{},
monitors: %{}
monitors: %{},
pings: []
]

@backoff_exponent 1.5
Expand Down Expand Up @@ -161,6 +162,29 @@ defmodule Redix.PubSub.Connection do
{:keep_state, data}
end

def disconnected(:info, {:ping_callback, random_string}, data) do
data =
case List.first(data.pings) do
nil ->
# already processed
data

ping ->
case ping.random_string do
^random_string ->
:ok = :gen_statem.reply(ping.from, :error)
[_ping | pings] = data.pings
Map.put(data, :pings, pings)

_ ->
# already processed
data
end
end

{:keep_state, data}
end

def disconnected({:call, from}, {operation, targets, pid}, data)
when operation in [:unsubscribe, :punsubscribe] do
:ok = :gen_statem.reply(from, :ok)
Expand Down Expand Up @@ -194,6 +218,11 @@ defmodule Redix.PubSub.Connection do
{:keep_state, data}
end

def disconnected({:call, from}, :ping, _data) do
reply = :error
{:keep_state_and_data, {:reply, from, reply}}
end

def disconnected({:call, from}, :get_client_id, _data) do
reply = {:error, %ConnectionError{reason: :closed}}
{:keep_state_and_data, {:reply, from, reply}}
Expand Down Expand Up @@ -230,6 +259,19 @@ defmodule Redix.PubSub.Connection do
end
end

def connected({:call, from}, :ping, data) do
random_string = :crypto.strong_rand_bytes(12) |> Base.encode64()
pipeline = [["PING", "ping:#{random_string}"]]
data.transport.send(data.socket, Enum.map(pipeline, &Protocol.pack/1))

Process.send_after(self(), {:ping_callback, random_string}, 5000)

pings = data.pings ++ [%{random_string: random_string, from: from}]
data = Map.put(data, :pings, pings)

{:keep_state, data}
end

def connected({:call, from}, :get_client_id, data) do
reply =
if id = data.client_id do
Expand All @@ -241,6 +283,29 @@ defmodule Redix.PubSub.Connection do
{:keep_state_and_data, {:reply, from, reply}}
end

def connected(:info, {:ping_callback, random_string}, data) do
data =
case List.first(data.pings) do
nil ->
# already processed
data

ping ->
case ping.random_string do
^random_string ->
:ok = :gen_statem.reply(ping.from, :error)
[_ping | pings] = data.pings
Map.put(data, :pings, pings)

_ ->
# already processed
data
end
end

{:keep_state, data}
end

def connected(:info, {transport_closed, socket}, %__MODULE__{socket: socket} = data)
when transport_closed in [:tcp_closed, :ssl_closed] do
disconnect(data, transport_closed, _handle_disconnection? = true)
Expand Down Expand Up @@ -351,6 +416,25 @@ defmodule Redix.PubSub.Connection do
handle_pubsub_message_with_payload(data, {:pattern, pattern}, :pmessage, properties)
end

defp handle_pubsub_msg(data, "ping:" <> _rest = reply) do
handle_pubsub_msg(data, ["pong", reply])
end

defp handle_pubsub_msg(data, ["pong", "ping:" <> reply]) do
[ping | pings] = data.pings
data = Map.put(data, :pings, pings)

res =
case ping.random_string do
^reply -> :ok
_ -> :error
end

:ok = :gen_statem.reply(ping.from, res)

{:ok, data}
end

defp handle_pubsub_message_with_payload(data, target_key, kind, properties) do
case data.subscriptions[target_key] do
{:subscribed, subscribers} ->
Expand Down
23 changes: 23 additions & 0 deletions test/redix/pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,29 @@ defmodule Redix.PubSubTest do
refute new_state.continuation
end

test "pubsub: multiple pings succeeds", %{pubsub: pubsub} do
assert :ok = PubSub.ping(pubsub)
assert :ok = PubSub.ping(pubsub)
end

test "pubsub: ping gives error when disconnected", %{pubsub: pubsub} do
{:connected, state} = :sys.get_state(pubsub)
socket = state.socket

send(pubsub, {:tcp_closed, socket})
assert {:disconnected, _new_state} = :sys.get_state(pubsub)

assert :error = PubSub.ping(pubsub)
end

test "pubsub: multiple pings succeeds when subscribed to channel", %{pubsub: pubsub} do
assert {:ok, ref} = PubSub.subscribe(pubsub, "foo", self())
assert_receive {:redix_pubsub, ^pubsub, ^ref, :subscribed, %{channel: "foo"}}, 1000

assert :ok = PubSub.ping(pubsub)
assert :ok = PubSub.ping(pubsub)
end

defp wait_until_passes(timeout, fun) when timeout <= 0 do
fun.()
end
Expand Down