Skip to content

Commit

Permalink
instead of RawPool, put the ref in Pool and pass the fields to manage
Browse files Browse the repository at this point in the history
  • Loading branch information
robx committed Feb 20, 2023
1 parent a84ede0 commit 0eda834
Showing 1 changed file with 22 additions and 25 deletions.
47 changes: 22 additions & 25 deletions library/Hasql/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ isAlive poolConfig now conn =
now <= connCreationTimeNSec conn + 1000 * fromIntegral (confMaxLifetime poolConfig)

-- | Pool of connections to DB.
data RawPool = RawPool
data Pool = Pool
{ -- | Configuration
poolConfig :: Config,
-- | Avail connections.
Expand All @@ -121,12 +121,9 @@ data RawPool = RawPool
-- connections.
poolCapacity :: TVar Int,
-- | Whether to return a connection to the pool.
poolReuse :: TVar (TVar Bool)
}

data Pool = Pool
{ pool :: RawPool,
reaperRef :: IORef ()
poolReuse :: TVar (TVar Bool),
-- | To stop the manager thread via garbage collection.
poolReaperRef :: IORef ()
}

-- | Create a connection-pool.
Expand All @@ -135,17 +132,17 @@ data Pool = Pool
-- to 'use'.
acquireConf :: Config -> IO Pool
acquireConf config = do
rawPool <-
RawPool config
<$> newTQueueIO
<*> newTVarIO (confSize config)
<*> (newTVarIO =<< newTVarIO True)
ref <- newIORef ()
manager <- forkIOWithUnmask $ \unmask -> unmask $ manage rawPool
void . mkWeakIORef ref $ do
connectionQueue <- newTQueueIO
capacity <- newTVarIO (confSize config)
reuse <- newTVarIO =<< newTVarIO True
reaperRef <- newIORef ()

manager <- forkIOWithUnmask $ \unmask -> unmask $ manage config connectionQueue capacity
void . mkWeakIORef reaperRef $ do
-- When the pool goes out of scope, stop the manager.
killThread manager
return $ Pool rawPool ref

return $ Pool config connectionQueue capacity reuse reaperRef

-- | Create a connection-pool, with default settings.
--
Expand Down Expand Up @@ -192,7 +189,7 @@ acquireDynamically poolSize acqTimeout fetchConnectionSettings =
-- So you can use this function to reset the connections in the pool.
-- Naturally, you can also use it to release the resources.
release :: Pool -> IO ()
release (Pool (RawPool {..}) _) =
release Pool {..} =
join . atomically $ do
prevReuse <- readTVar poolReuse
writeTVar prevReuse False
Expand All @@ -205,17 +202,17 @@ release (Pool (RawPool {..}) _) =

-- | Active pool management thread. (For now, this closes pooled connections
-- that are older than maxLifetime.)
manage :: RawPool -> IO ()
manage RawPool {..} = forever $ do
threadDelay (confManageInterval poolConfig)
manage :: Config -> TQueue Conn -> TVar Int -> IO ()
manage config connectionQueue capacity = forever $ do
threadDelay (confManageInterval config)
now <- getMonotonicTimeNSec
join . atomically $ do
conns <- flushTQueue poolConnectionQueue
let (keep, close) = partition (isAlive poolConfig now) conns
traverse_ (writeTQueue poolConnectionQueue) keep
conns <- flushTQueue connectionQueue
let (keep, close) = partition (isAlive config now) conns
traverse_ (writeTQueue connectionQueue) keep
return $ forM_ close $ \conn -> do
Connection.release (connConnection conn)
atomically $ modifyTVar' poolCapacity succ
atomically $ modifyTVar' capacity succ

-- | Use a connection from the pool to run a session and return the connection
-- to the pool, when finished.
Expand All @@ -225,7 +222,7 @@ manage RawPool {..} = forever $ do
-- and a slot gets freed up for a new connection to be established the next
-- time one is needed. The error still gets returned from this function.
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use (Pool (RawPool {..}) _) sess = do
use Pool {..} sess = do
timeout <- do
delay <- registerDelay $ confAcquisitionTimeout poolConfig
return $ readTVar delay
Expand Down

0 comments on commit 0eda834

Please sign in to comment.