Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add CLUSTER INFO command and use it for new checkedConnectCluster #185

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/Database/Redis.hs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ module Database.Redis (
RedisCtx(..), MonadRedis(..),

-- * Connection
Connection, ConnectError(..), connect, checkedConnect, disconnect,
withConnect, withCheckedConnect,
ConnectInfo(..), defaultConnectInfo, parseConnectInfo, connectCluster,
PortID(..),
Connection, ConnectError(..), connect, checkedConnect,
ClusterConnectError (..), connectCluster, checkedConnectCluster,
disconnect, withConnect, withCheckedConnect,
ConnectInfo(..), defaultConnectInfo, parseConnectInfo, PortID(..),

-- * Commands
module Database.Redis.Commands,
Expand Down Expand Up @@ -197,13 +197,15 @@ module Database.Redis (
import Database.Redis.Core
import Database.Redis.Connection
( runRedis
, connectCluster
, defaultConnectInfo
, ConnectInfo(..)
, disconnect
, checkedConnect
, connect
, checkedConnectCluster
, connectCluster
, ConnectError(..)
, ClusterConnectError(..)
, Connection(..)
, withConnect
, withCheckedConnect)
Expand Down
3 changes: 3 additions & 0 deletions src/Database/Redis/Commands.hs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ xinfoStream, -- |Get info about a stream. The Redis command @XINFO@ is split int
xdel, -- |Delete messages from a stream. Since Redis 5.0.0
xtrim, -- |Set the upper bound for number of messages in a stream. Since Redis 5.0.0
inf, -- |Constructor for `inf` Redis argument values
ClusterInfoResponse (..),
ClusterInfoResponseState (..),
clusterInfo,
ClusterNodesResponse(..),
ClusterNodesResponseEntry(..),
ClusterNodesResponseSlotSpec(..),
Expand Down
22 changes: 22 additions & 0 deletions src/Database/Redis/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ import Database.Redis.Commands
( ping
, select
, auth
, clusterInfo
, clusterSlots
, command
, ClusterInfoResponseState (..)
, ClusterInfoResponse (..)
, ClusterSlotsResponse(..)
, ClusterSlotsResponseEntry(..)
, ClusterSlotsNode(..))
Expand Down Expand Up @@ -158,6 +161,25 @@ checkedConnect connInfo = do
runRedis conn $ void ping
return conn

-- |Constructs a 'Connection' pool to a Redis cluster designated by the
-- given 'ConnectInfo', then tests if the server is actually there.
-- Throws an exception if the connection to the Redis server can't be
-- established.
checkedConnectCluster :: ConnectInfo -> IO Connection
checkedConnectCluster connInfo = do
conn <- connectCluster connInfo
res <- runRedis conn clusterInfo
case res of
Right r -> case clusterInfoResponseState r of
OK -> pure conn
Down -> throwIO $ ClusterDownError r
Left e -> throwIO $ ClusterConnectError e

newtype ClusterDownError = ClusterDownError ClusterInfoResponse
deriving (Eq, Show, Typeable)

instance Exception ClusterDownError

-- |Destroy all idle resources in the pool.
disconnect :: Connection -> IO ()
disconnect (NonClusteredConnection pool) = destroyAllResources pool
Expand Down
141 changes: 140 additions & 1 deletion src/Database/Redis/ManualCommands.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Prelude hiding (min, max)
import Data.ByteString (ByteString, empty, append)
import qualified Data.ByteString.Char8 as Char8
import qualified Data.ByteString as BS
import Data.Maybe (maybeToList, catMaybes)
import Data.Maybe (maybeToList, catMaybes, fromMaybe)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup ((<>))
#endif
Expand Down Expand Up @@ -1267,6 +1267,145 @@ ping
=> m (f Status)
ping = sendRequest (["PING"] )

-- https://redis.io/commands/cluster-info/
data ClusterInfoResponse = ClusterInfoResponse
{ clusterInfoResponseState :: ClusterInfoResponseState,
clusterInfoResponseSlotsAssigned :: Integer,
clusterInfoResponseSlotsOK :: Integer,
clusterInfoResponseSlotsPfail :: Integer,
clusterInfoResponseSlotsFail :: Integer,
clusterInfoResponseKnownNodes :: Integer,
clusterInfoResponseSize :: Integer,
clusterInfoResponseCurrentEpoch :: Integer,
clusterInfoResponseMyEpoch :: Integer,
clusterInfoResponseStatsMessagesSent :: Integer,
clusterInfoResponseStatsMessagesReceived :: Integer,
clusterInfoResponseTotalLinksBufferLimitExceeded :: Integer,
clusterInfoResponseStatsMessagesPingSent :: Maybe Integer,
clusterInfoResponseStatsMessagesPingReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesPongSent :: Maybe Integer,
clusterInfoResponseStatsMessagesPongReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesMeetSent :: Maybe Integer,
clusterInfoResponseStatsMessagesMeetReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesFailSent :: Maybe Integer,
clusterInfoResponseStatsMessagesFailReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesPublishSent :: Maybe Integer,
clusterInfoResponseStatsMessagesPublishReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesAuthReqSent :: Maybe Integer,
clusterInfoResponseStatsMessagesAuthReqReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesAuthAckSent :: Maybe Integer,
clusterInfoResponseStatsMessagesAuthAckReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesUpdateSent :: Maybe Integer,
clusterInfoResponseStatsMessagesUpdateReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesMfstartSent :: Maybe Integer,
clusterInfoResponseStatsMessagesMfstartReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesModuleSent :: Maybe Integer,
clusterInfoResponseStatsMessagesModuleReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesPublishshardSent :: Maybe Integer,
clusterInfoResponseStatsMessagesPublishshardReceived :: Maybe Integer
}
deriving (Show, Eq)

data ClusterInfoResponseState
= OK
| Down
deriving (Show, Eq)

defClusterInfoResponse :: ClusterInfoResponse
defClusterInfoResponse =
ClusterInfoResponse
{ clusterInfoResponseState = Down,
clusterInfoResponseSlotsAssigned = 0,
clusterInfoResponseSlotsOK = 0,
clusterInfoResponseSlotsPfail = 0,
clusterInfoResponseSlotsFail = 0,
clusterInfoResponseKnownNodes = 0,
clusterInfoResponseSize = 0,
clusterInfoResponseCurrentEpoch = 0,
clusterInfoResponseMyEpoch = 0,
clusterInfoResponseStatsMessagesSent = 0,
clusterInfoResponseStatsMessagesReceived = 0,
clusterInfoResponseTotalLinksBufferLimitExceeded = 0,
clusterInfoResponseStatsMessagesPingSent = Nothing,
clusterInfoResponseStatsMessagesPingReceived = Nothing,
clusterInfoResponseStatsMessagesPongSent = Nothing,
clusterInfoResponseStatsMessagesPongReceived = Nothing,
clusterInfoResponseStatsMessagesMeetSent = Nothing,
clusterInfoResponseStatsMessagesMeetReceived = Nothing,
clusterInfoResponseStatsMessagesFailSent = Nothing,
clusterInfoResponseStatsMessagesFailReceived = Nothing,
clusterInfoResponseStatsMessagesPublishSent = Nothing,
clusterInfoResponseStatsMessagesPublishReceived = Nothing,
clusterInfoResponseStatsMessagesAuthReqSent = Nothing,
clusterInfoResponseStatsMessagesAuthReqReceived = Nothing,
clusterInfoResponseStatsMessagesAuthAckSent = Nothing,
clusterInfoResponseStatsMessagesAuthAckReceived = Nothing,
clusterInfoResponseStatsMessagesUpdateSent = Nothing,
clusterInfoResponseStatsMessagesUpdateReceived = Nothing,
clusterInfoResponseStatsMessagesMfstartSent = Nothing,
clusterInfoResponseStatsMessagesMfstartReceived = Nothing,
clusterInfoResponseStatsMessagesModuleSent = Nothing,
clusterInfoResponseStatsMessagesModuleReceived = Nothing,
clusterInfoResponseStatsMessagesPublishshardSent = Nothing,
clusterInfoResponseStatsMessagesPublishshardReceived = Nothing
}

parseClusterInfoResponse :: [[ByteString]] -> ClusterInfoResponse -> Maybe ClusterInfoResponse
parseClusterInfoResponse fields resp = case fields of
[] -> pure resp
(["cluster_state", state] : fs) -> parseState state >>= \s -> parseClusterInfoResponse fs $ resp {clusterInfoResponseState = s}
(["cluster_slots_assigned", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsAssigned = v}
(["cluster_slots_ok", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsOK = v}
(["cluster_slots_pfail", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsPfail = v}
(["cluster_slots_fail", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsFail = v}
(["cluster_known_nodes", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseKnownNodes = v}
(["cluster_size", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSize = v}
(["cluster_current_epoch", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseCurrentEpoch = v}
(["cluster_my_epoch", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseMyEpoch = v}
(["cluster_stats_messages_sent", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesSent = v}
(["cluster_stats_messages_received", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesReceived = v}
(["total_cluster_links_buffer_limit_exceeded", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseTotalLinksBufferLimitExceeded = fromMaybe 0 $ parseInteger value} -- this value should be mandatory according to the spec, but isn't necessarily set in Redis 6
(["cluster_stats_messages_ping_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPingSent = parseInteger value}
(["cluster_stats_messages_ping_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPingReceived = parseInteger value}
(["cluster_stats_messages_pong_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPongSent = parseInteger value}
(["cluster_stats_messages_pong_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPongReceived = parseInteger value}
(["cluster_stats_messages_meet_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMeetSent = parseInteger value}
(["cluster_stats_messages_meet_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMeetReceived = parseInteger value}
(["cluster_stats_messages_fail_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesFailSent = parseInteger value}
(["cluster_stats_messages_fail_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesFailReceived = parseInteger value}
(["cluster_stats_messages_publish_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishSent = parseInteger value}
(["cluster_stats_messages_publish_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishReceived = parseInteger value}
(["cluster_stats_messages_auth_req_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthReqSent = parseInteger value}
(["cluster_stats_messages_auth_req_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthReqReceived = parseInteger value}
(["cluster_stats_messages_auth_ack_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthAckSent = parseInteger value}
(["cluster_stats_messages_auth_ack_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthAckReceived = parseInteger value}
(["cluster_stats_messages_update_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesUpdateSent = parseInteger value}
(["cluster_stats_messages_update_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesUpdateReceived = parseInteger value}
(["cluster_stats_messages_mfstart_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMfstartSent = parseInteger value}
(["cluster_stats_messages_mfstart_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMfstartReceived = parseInteger value}
(["cluster_stats_messages_module_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesModuleSent = parseInteger value}
(["cluster_stats_messages_module_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesModuleReceived = parseInteger value}
(["cluster_stats_messages_publishshard_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishshardSent = parseInteger value}
(["cluster_stats_messages_publishshard_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishshardReceived = parseInteger value}
(_ : fs) -> parseClusterInfoResponse fs resp
where
parseState bs = case bs of
"ok" -> Just OK
"fail" -> Just Down
_ -> Nothing
parseInteger = fmap fst . Char8.readInteger

instance RedisResult ClusterInfoResponse where
decode r@(Bulk (Just bulkData)) =
maybe (Left r) Right
. flip parseClusterInfoResponse defClusterInfoResponse
. map (Char8.split ':' . Char8.takeWhile (/= '\r'))
$ Char8.lines bulkData
decode r = Left r

clusterInfo :: RedisCtx m f => m (f ClusterInfoResponse)
clusterInfo = sendRequest ["CLUSTER", "INFO"]

data ClusterNodesResponse = ClusterNodesResponse
{ clusterNodesResponseEntries :: [ClusterNodesResponseEntry]
} deriving (Show, Eq)
Expand Down