Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store client ID in Redix.PubSub #261

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lib/redix/connector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,13 @@ defmodule Redix.Connector do
end
end

defp sync_command(transport, socket, command, timeout) do
@spec sync_command(
:ssl | :gen_tcp,
:gen_tcp.socket() | :ssl.sslsocket(),
[String.t()],
integer()
) :: {:ok, any} | {:error, any}
def sync_command(transport, socket, command, timeout) do
with :ok <- transport.send(socket, Redix.Protocol.pack(command)),
do: recv_response(transport, socket, &Redix.Protocol.parse/1, timeout)
end
Expand Down
12 changes: 12 additions & 0 deletions lib/redix/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -416,4 +416,16 @@ defmodule Redix.PubSub do
when is_binary(patterns) or is_list(patterns) do
:gen_statem.call(conn, {:punsubscribe, List.wrap(patterns), subscriber})
end

@doc """
Gets the redis `CLIENT ID` associated with a connection
## Examples

iex> Redix.PubSub.client_id(conn)
{:ok, 123}
"""
@spec client_id(connection()) :: {:ok, integer()} | {:error, any()}
def client_id(conn) do
:gen_statem.call(conn, :client_id)
end
end
35 changes: 29 additions & 6 deletions lib/redix/pubsub/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Redix.PubSub.Connection do
:backoff_current,
:last_disconnect_reason,
:connected_address,
:client_id,
subscriptions: %{},
monitors: %{}
]
Expand All @@ -28,14 +29,14 @@ defmodule Redix.PubSub.Connection do
data = %__MODULE__{opts: opts, transport: transport}

if opts[:sync_connect] do
with {:ok, socket, address} <- Connector.connect(data.opts, _conn_pid = self()),
:ok <- setopts(data, socket, active: :once) do
with {:ok, socket, address, client_id} <- connect(data) do
data = %__MODULE__{
data
| socket: socket,
last_disconnect_reason: nil,
backoff_current: nil,
connected_address: address
connected_address: address,
client_id: client_id
}

{:ok, :connected, data}
Expand Down Expand Up @@ -104,8 +105,7 @@ defmodule Redix.PubSub.Connection do
end

def disconnected(:internal, :connect, data) do
with {:ok, socket, address} <- Connector.connect(data.opts, _conn_pid = self()),
:ok <- setopts(data, socket, active: :once) do
with {:ok, socket, address, client_id} <- connect(data) do
:telemetry.execute([:redix, :connection], %{}, %{
connection: self(),
connection_name: data.opts[:name],
Expand All @@ -118,7 +118,8 @@ defmodule Redix.PubSub.Connection do
| socket: socket,
last_disconnect_reason: nil,
backoff_current: nil,
connected_address: address
connected_address: address,
client_id: client_id
}

{:next_state, :connected, data, {:next_event, :internal, :handle_connection}}
Expand Down Expand Up @@ -193,6 +194,12 @@ defmodule Redix.PubSub.Connection do
{:keep_state, data}
end

def disconnected({:call, from}, :client_id, data) do
:ok = :gen_statem.reply(from, {:error, :disconnected})

{:keep_state, data}
end

def connected(:internal, :handle_connection, data) do
if map_size(data.subscriptions) > 0 do
case resubscribe_after_reconnection(data) do
Expand Down Expand Up @@ -224,6 +231,12 @@ defmodule Redix.PubSub.Connection do
end
end

def connected({:call, from}, :client_id, data) do
:ok = :gen_statem.reply(from, {:ok, data.client_id})

{:keep_state, data}
end

def connected(:info, {transport_closed, socket}, %__MODULE__{socket: socket} = data)
when transport_closed in [:tcp_closed, :ssl_closed] do
disconnect(data, transport_closed, _handle_disconnection? = true)
Expand Down Expand Up @@ -561,6 +574,16 @@ defmodule Redix.PubSub.Connection do
defp key_for_target(:psubscribe, pattern), do: {:pattern, pattern}
defp key_for_target(:punsubscribe, pattern), do: {:pattern, pattern}

defp connect(%__MODULE__{opts: opts, transport: transport} = data) do
timeout = Keyword.fetch!(opts, :timeout)

with {:ok, socket, address} <- Connector.connect(opts, _conn_pid = self()),
{:ok, client_id} <- Connector.sync_command(transport, socket, ["CLIENT", "ID"], timeout),
:ok <- setopts(data, socket, active: :once) do
{:ok, socket, address, client_id}
end
end

defp setopts(data, socket, opts) do
inets_mod(data.transport).setopts(socket, opts)
end
Expand Down
5 changes: 5 additions & 0 deletions test/redix/pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ defmodule Redix.PubSubTest do
assert info[:fullsweep_after] == fullsweep_after
end

test "client_id should be available after start_link/2" do
{:ok, pid} = PubSub.start_link(port: @port)
assert match?({:ok, client_id} when is_number(client_id), PubSub.client_id(pid))
end

test "subscribe/unsubscribe flow", %{pubsub: pubsub, conn: conn} do
# First, we subscribe.
assert {:ok, ref} = PubSub.subscribe(pubsub, ["foo", "bar"], self())
Expand Down
Loading