From 3a541844408585b940b2cfd454b716483cebd7c9 Mon Sep 17 00:00:00 2001 From: Kenneth Ito Date: Wed, 6 Mar 2024 12:56:06 -0800 Subject: [PATCH] Support client id in pubsub --- lib/redix/connector.ex | 8 +++++++- lib/redix/pubsub.ex | 12 ++++++++++++ lib/redix/pubsub/connection.ex | 35 ++++++++++++++++++++++++++++------ test/redix/pubsub_test.exs | 5 +++++ 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/lib/redix/connector.ex b/lib/redix/connector.ex index 81cc984..b3c5bb9 100644 --- a/lib/redix/connector.ex +++ b/lib/redix/connector.ex @@ -250,7 +250,13 @@ defmodule Redix.Connector do end end - defp sync_command(transport, socket, command, timeout) do + @spec sync_command( + :ssl | :gen_tcp, + :gen_tcp.socket() | :ssl.sslsocket(), + [String.t()], + integer() + ) :: {:ok, any} | {:error, any} + def sync_command(transport, socket, command, timeout) do with :ok <- transport.send(socket, Redix.Protocol.pack(command)), do: recv_response(transport, socket, &Redix.Protocol.parse/1, timeout) end diff --git a/lib/redix/pubsub.ex b/lib/redix/pubsub.ex index b75c45c..8d15b9d 100644 --- a/lib/redix/pubsub.ex +++ b/lib/redix/pubsub.ex @@ -416,4 +416,16 @@ defmodule Redix.PubSub do when is_binary(patterns) or is_list(patterns) do :gen_statem.call(conn, {:punsubscribe, List.wrap(patterns), subscriber}) end + + @doc """ + Gets the redis `CLIENT ID` associated with a connection + ## Examples + + iex> Redix.PubSub.client_id(conn) + {:ok, 123} + """ + @spec client_id(connection()) :: {:ok, integer()} | {:error, any()} + def client_id(conn) do + :gen_statem.call(conn, :client_id) + end end diff --git a/lib/redix/pubsub/connection.ex b/lib/redix/pubsub/connection.ex index 8f3de75..ac1a3ae 100644 --- a/lib/redix/pubsub/connection.ex +++ b/lib/redix/pubsub/connection.ex @@ -13,6 +13,7 @@ defmodule Redix.PubSub.Connection do :backoff_current, :last_disconnect_reason, :connected_address, + :client_id, subscriptions: %{}, monitors: %{} ] @@ -28,14 +29,14 @@ defmodule Redix.PubSub.Connection do data = %__MODULE__{opts: opts, transport: transport} if opts[:sync_connect] do - with {:ok, socket, address} <- Connector.connect(data.opts, _conn_pid = self()), - :ok <- setopts(data, socket, active: :once) do + with {:ok, socket, address, client_id} <- connect(data) do data = %__MODULE__{ data | socket: socket, last_disconnect_reason: nil, backoff_current: nil, - connected_address: address + connected_address: address, + client_id: client_id } {:ok, :connected, data} @@ -104,8 +105,7 @@ defmodule Redix.PubSub.Connection do end def disconnected(:internal, :connect, data) do - with {:ok, socket, address} <- Connector.connect(data.opts, _conn_pid = self()), - :ok <- setopts(data, socket, active: :once) do + with {:ok, socket, address, client_id} <- connect(data) do :telemetry.execute([:redix, :connection], %{}, %{ connection: self(), connection_name: data.opts[:name], @@ -118,7 +118,8 @@ defmodule Redix.PubSub.Connection do | socket: socket, last_disconnect_reason: nil, backoff_current: nil, - connected_address: address + connected_address: address, + client_id: client_id } {:next_state, :connected, data, {:next_event, :internal, :handle_connection}} @@ -193,6 +194,12 @@ defmodule Redix.PubSub.Connection do {:keep_state, data} end + def disconnected({:call, from}, :client_id, data) do + :ok = :gen_statem.reply(from, {:error, :disconnected}) + + {:keep_state, data} + end + def connected(:internal, :handle_connection, data) do if map_size(data.subscriptions) > 0 do case resubscribe_after_reconnection(data) do @@ -224,6 +231,12 @@ defmodule Redix.PubSub.Connection do end end + def connected({:call, from}, :client_id, data) do + :ok = :gen_statem.reply(from, {:ok, data.client_id}) + + {:keep_state, data} + end + def connected(:info, {transport_closed, socket}, %__MODULE__{socket: socket} = data) when transport_closed in [:tcp_closed, :ssl_closed] do disconnect(data, transport_closed, _handle_disconnection? = true) @@ -561,6 +574,16 @@ defmodule Redix.PubSub.Connection do defp key_for_target(:psubscribe, pattern), do: {:pattern, pattern} defp key_for_target(:punsubscribe, pattern), do: {:pattern, pattern} + defp connect(%__MODULE__{opts: opts, transport: transport} = data) do + timeout = Keyword.fetch!(opts, :timeout) + + with {:ok, socket, address} <- Connector.connect(opts, _conn_pid = self()), + {:ok, client_id} <- Connector.sync_command(transport, socket, ["CLIENT", "ID"], timeout), + :ok <- setopts(data, socket, active: :once) do + {:ok, socket, address, client_id} + end + end + defp setopts(data, socket, opts) do inets_mod(data.transport).setopts(socket, opts) end diff --git a/test/redix/pubsub_test.exs b/test/redix/pubsub_test.exs index a8aeeee..b7187fe 100644 --- a/test/redix/pubsub_test.exs +++ b/test/redix/pubsub_test.exs @@ -23,6 +23,11 @@ defmodule Redix.PubSubTest do assert info[:fullsweep_after] == fullsweep_after end + test "client_id should be available after start_link/2" do + {:ok, pid} = PubSub.start_link(port: @port) + assert match?({:ok, client_id} when is_number(client_id), PubSub.client_id(pid)) + end + test "subscribe/unsubscribe flow", %{pubsub: pubsub, conn: conn} do # First, we subscribe. assert {:ok, ref} = PubSub.subscribe(pubsub, ["foo", "bar"], self())