From 741dda613062bface2bc32841db97c890cc3e691 Mon Sep 17 00:00:00 2001 From: Stefan Berthold Date: Fri, 8 Jul 2022 08:30:14 +0000 Subject: [PATCH] add CLUSTER INFO command and use it for new checkedConnectCluster Redis nodes in cluster mode can be pinged, but a successful pong from one cluster node would not determine whether the cluster and thus all its nodes are up and running. The cluster is up only if the `CLUSTER INFO` command reports `cluster_state:ok`. This PR adds the `CLUSTER INFO` command and the `checkedConnectCluster` helper function for safely connecting to Redis clusters only when the cluster is up. --- src/Database/Redis.hs | 12 ++- src/Database/Redis/Commands.hs | 3 + src/Database/Redis/Connection.hs | 22 +++++ src/Database/Redis/ManualCommands.hs | 141 ++++++++++++++++++++++++++- 4 files changed, 172 insertions(+), 6 deletions(-) diff --git a/src/Database/Redis.hs b/src/Database/Redis.hs index 41eb39c1..62653fc6 100644 --- a/src/Database/Redis.hs +++ b/src/Database/Redis.hs @@ -162,10 +162,10 @@ module Database.Redis ( RedisCtx(..), MonadRedis(..), -- * Connection - Connection, ConnectError(..), connect, checkedConnect, disconnect, - withConnect, withCheckedConnect, - ConnectInfo(..), defaultConnectInfo, parseConnectInfo, connectCluster, - PortID(..), + Connection, ConnectError(..), connect, checkedConnect, + ClusterConnectError (..), connectCluster, checkedConnectCluster, + disconnect, withConnect, withCheckedConnect, + ConnectInfo(..), defaultConnectInfo, parseConnectInfo, PortID(..), -- * Commands module Database.Redis.Commands, @@ -197,13 +197,15 @@ module Database.Redis ( import Database.Redis.Core import Database.Redis.Connection ( runRedis - , connectCluster , defaultConnectInfo , ConnectInfo(..) , disconnect , checkedConnect , connect + , checkedConnectCluster + , connectCluster , ConnectError(..) + , ClusterConnectError(..) , Connection(..) , withConnect , withCheckedConnect) diff --git a/src/Database/Redis/Commands.hs b/src/Database/Redis/Commands.hs index 621f9723..4a58aae4 100644 --- a/src/Database/Redis/Commands.hs +++ b/src/Database/Redis/Commands.hs @@ -261,6 +261,9 @@ xinfoStream, -- |Get info about a stream. The Redis command @XINFO@ is split int xdel, -- |Delete messages from a stream. Since Redis 5.0.0 xtrim, -- |Set the upper bound for number of messages in a stream. Since Redis 5.0.0 inf, -- |Constructor for `inf` Redis argument values +ClusterInfoResponse (..), +ClusterInfoResponseState (..), +clusterInfo, ClusterNodesResponse(..), ClusterNodesResponseEntry(..), ClusterNodesResponseSlotSpec(..), diff --git a/src/Database/Redis/Connection.hs b/src/Database/Redis/Connection.hs index 507abb74..ef973788 100644 --- a/src/Database/Redis/Connection.hs +++ b/src/Database/Redis/Connection.hs @@ -30,8 +30,11 @@ import Database.Redis.Commands ( ping , select , auth + , clusterInfo , clusterSlots , command + , ClusterInfoResponseState (..) + , ClusterInfoResponse (..) , ClusterSlotsResponse(..) , ClusterSlotsResponseEntry(..) , ClusterSlotsNode(..)) @@ -158,6 +161,25 @@ checkedConnect connInfo = do runRedis conn $ void ping return conn +-- |Constructs a 'Connection' pool to a Redis cluster designated by the +-- given 'ConnectInfo', then tests if the server is actually there. +-- Throws an exception if the connection to the Redis server can't be +-- established. +checkedConnectCluster :: ConnectInfo -> IO Connection +checkedConnectCluster connInfo = do + conn <- connectCluster connInfo + res <- runRedis conn clusterInfo + case res of + Right r -> case clusterInfoResponseState r of + OK -> pure conn + Down -> throwIO $ ClusterDownError r + Left e -> throwIO $ ClusterConnectError e + +newtype ClusterDownError = ClusterDownError ClusterInfoResponse + deriving (Eq, Show, Typeable) + +instance Exception ClusterDownError + -- |Destroy all idle resources in the pool. disconnect :: Connection -> IO () disconnect (NonClusteredConnection pool) = destroyAllResources pool diff --git a/src/Database/Redis/ManualCommands.hs b/src/Database/Redis/ManualCommands.hs index b0f616b0..61359a58 100644 --- a/src/Database/Redis/ManualCommands.hs +++ b/src/Database/Redis/ManualCommands.hs @@ -6,7 +6,7 @@ import Prelude hiding (min, max) import Data.ByteString (ByteString, empty, append) import qualified Data.ByteString.Char8 as Char8 import qualified Data.ByteString as BS -import Data.Maybe (maybeToList, catMaybes) +import Data.Maybe (maybeToList, catMaybes, fromMaybe) #if __GLASGOW_HASKELL__ < 808 import Data.Semigroup ((<>)) #endif @@ -1267,6 +1267,145 @@ ping => m (f Status) ping = sendRequest (["PING"] ) +-- https://redis.io/commands/cluster-info/ +data ClusterInfoResponse = ClusterInfoResponse + { clusterInfoResponseState :: ClusterInfoResponseState, + clusterInfoResponseSlotsAssigned :: Integer, + clusterInfoResponseSlotsOK :: Integer, + clusterInfoResponseSlotsPfail :: Integer, + clusterInfoResponseSlotsFail :: Integer, + clusterInfoResponseKnownNodes :: Integer, + clusterInfoResponseSize :: Integer, + clusterInfoResponseCurrentEpoch :: Integer, + clusterInfoResponseMyEpoch :: Integer, + clusterInfoResponseStatsMessagesSent :: Integer, + clusterInfoResponseStatsMessagesReceived :: Integer, + clusterInfoResponseTotalLinksBufferLimitExceeded :: Integer, + clusterInfoResponseStatsMessagesPingSent :: Maybe Integer, + clusterInfoResponseStatsMessagesPingReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesPongSent :: Maybe Integer, + clusterInfoResponseStatsMessagesPongReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesMeetSent :: Maybe Integer, + clusterInfoResponseStatsMessagesMeetReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesFailSent :: Maybe Integer, + clusterInfoResponseStatsMessagesFailReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesPublishSent :: Maybe Integer, + clusterInfoResponseStatsMessagesPublishReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesAuthReqSent :: Maybe Integer, + clusterInfoResponseStatsMessagesAuthReqReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesAuthAckSent :: Maybe Integer, + clusterInfoResponseStatsMessagesAuthAckReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesUpdateSent :: Maybe Integer, + clusterInfoResponseStatsMessagesUpdateReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesMfstartSent :: Maybe Integer, + clusterInfoResponseStatsMessagesMfstartReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesModuleSent :: Maybe Integer, + clusterInfoResponseStatsMessagesModuleReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesPublishshardSent :: Maybe Integer, + clusterInfoResponseStatsMessagesPublishshardReceived :: Maybe Integer + } + deriving (Show, Eq) + +data ClusterInfoResponseState + = OK + | Down + deriving (Show, Eq) + +defClusterInfoResponse :: ClusterInfoResponse +defClusterInfoResponse = + ClusterInfoResponse + { clusterInfoResponseState = Down, + clusterInfoResponseSlotsAssigned = 0, + clusterInfoResponseSlotsOK = 0, + clusterInfoResponseSlotsPfail = 0, + clusterInfoResponseSlotsFail = 0, + clusterInfoResponseKnownNodes = 0, + clusterInfoResponseSize = 0, + clusterInfoResponseCurrentEpoch = 0, + clusterInfoResponseMyEpoch = 0, + clusterInfoResponseStatsMessagesSent = 0, + clusterInfoResponseStatsMessagesReceived = 0, + clusterInfoResponseTotalLinksBufferLimitExceeded = 0, + clusterInfoResponseStatsMessagesPingSent = Nothing, + clusterInfoResponseStatsMessagesPingReceived = Nothing, + clusterInfoResponseStatsMessagesPongSent = Nothing, + clusterInfoResponseStatsMessagesPongReceived = Nothing, + clusterInfoResponseStatsMessagesMeetSent = Nothing, + clusterInfoResponseStatsMessagesMeetReceived = Nothing, + clusterInfoResponseStatsMessagesFailSent = Nothing, + clusterInfoResponseStatsMessagesFailReceived = Nothing, + clusterInfoResponseStatsMessagesPublishSent = Nothing, + clusterInfoResponseStatsMessagesPublishReceived = Nothing, + clusterInfoResponseStatsMessagesAuthReqSent = Nothing, + clusterInfoResponseStatsMessagesAuthReqReceived = Nothing, + clusterInfoResponseStatsMessagesAuthAckSent = Nothing, + clusterInfoResponseStatsMessagesAuthAckReceived = Nothing, + clusterInfoResponseStatsMessagesUpdateSent = Nothing, + clusterInfoResponseStatsMessagesUpdateReceived = Nothing, + clusterInfoResponseStatsMessagesMfstartSent = Nothing, + clusterInfoResponseStatsMessagesMfstartReceived = Nothing, + clusterInfoResponseStatsMessagesModuleSent = Nothing, + clusterInfoResponseStatsMessagesModuleReceived = Nothing, + clusterInfoResponseStatsMessagesPublishshardSent = Nothing, + clusterInfoResponseStatsMessagesPublishshardReceived = Nothing + } + +parseClusterInfoResponse :: [[ByteString]] -> ClusterInfoResponse -> Maybe ClusterInfoResponse +parseClusterInfoResponse fields resp = case fields of + [] -> pure resp + (["cluster_state", state] : fs) -> parseState state >>= \s -> parseClusterInfoResponse fs $ resp {clusterInfoResponseState = s} + (["cluster_slots_assigned", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsAssigned = v} + (["cluster_slots_ok", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsOK = v} + (["cluster_slots_pfail", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsPfail = v} + (["cluster_slots_fail", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsFail = v} + (["cluster_known_nodes", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseKnownNodes = v} + (["cluster_size", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSize = v} + (["cluster_current_epoch", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseCurrentEpoch = v} + (["cluster_my_epoch", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseMyEpoch = v} + (["cluster_stats_messages_sent", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesSent = v} + (["cluster_stats_messages_received", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesReceived = v} + (["total_cluster_links_buffer_limit_exceeded", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseTotalLinksBufferLimitExceeded = fromMaybe 0 $ parseInteger value} -- this value should be mandatory according to the spec, but isn't necessarily set in Redis 6 + (["cluster_stats_messages_ping_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPingSent = parseInteger value} + (["cluster_stats_messages_ping_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPingReceived = parseInteger value} + (["cluster_stats_messages_pong_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPongSent = parseInteger value} + (["cluster_stats_messages_pong_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPongReceived = parseInteger value} + (["cluster_stats_messages_meet_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMeetSent = parseInteger value} + (["cluster_stats_messages_meet_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMeetReceived = parseInteger value} + (["cluster_stats_messages_fail_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesFailSent = parseInteger value} + (["cluster_stats_messages_fail_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesFailReceived = parseInteger value} + (["cluster_stats_messages_publish_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishSent = parseInteger value} + (["cluster_stats_messages_publish_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishReceived = parseInteger value} + (["cluster_stats_messages_auth_req_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthReqSent = parseInteger value} + (["cluster_stats_messages_auth_req_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthReqReceived = parseInteger value} + (["cluster_stats_messages_auth_ack_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthAckSent = parseInteger value} + (["cluster_stats_messages_auth_ack_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthAckReceived = parseInteger value} + (["cluster_stats_messages_update_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesUpdateSent = parseInteger value} + (["cluster_stats_messages_update_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesUpdateReceived = parseInteger value} + (["cluster_stats_messages_mfstart_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMfstartSent = parseInteger value} + (["cluster_stats_messages_mfstart_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMfstartReceived = parseInteger value} + (["cluster_stats_messages_module_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesModuleSent = parseInteger value} + (["cluster_stats_messages_module_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesModuleReceived = parseInteger value} + (["cluster_stats_messages_publishshard_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishshardSent = parseInteger value} + (["cluster_stats_messages_publishshard_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishshardReceived = parseInteger value} + (_ : fs) -> parseClusterInfoResponse fs resp + where + parseState bs = case bs of + "ok" -> Just OK + "fail" -> Just Down + _ -> Nothing + parseInteger = fmap fst . Char8.readInteger + +instance RedisResult ClusterInfoResponse where + decode r@(Bulk (Just bulkData)) = + maybe (Left r) Right + . flip parseClusterInfoResponse defClusterInfoResponse + . map (Char8.split ':' . Char8.takeWhile (/= '\r')) + $ Char8.lines bulkData + decode r = Left r + +clusterInfo :: RedisCtx m f => m (f ClusterInfoResponse) +clusterInfo = sendRequest ["CLUSTER", "INFO"] + data ClusterNodesResponse = ClusterNodesResponse { clusterNodesResponseEntries :: [ClusterNodesResponseEntry] } deriving (Show, Eq)