Skip to content

Commit

Permalink
Event-based reaper instead of polling one
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Jul 5, 2016
1 parent f4be4fb commit 9a994b8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 41 deletions.
68 changes: 28 additions & 40 deletions Data/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ module Data.Pool
, destroyAllResources
) where

import Control.Applicative ((<$>))
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay)
import Control.Applicative ((<$>), (<*>))
import Control.Concurrent (myThreadId)
import Control.Concurrent.STM
import Control.Exception (SomeException, onException, mask_)
import Control.Monad (forM_, forever, join, liftM3, unless, when)
import Control.Exception (SomeException, onException)
import Control.Monad (forM_, join, unless, when)
import Control.Concurrent.AlarmClock
import Data.Hashable (hash)
import Data.IORef (IORef, newIORef, mkWeakIORef)
import Data.List (partition)
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime, addUTCTime)
import Data.Typeable (Typeable)
import GHC.Conc.Sync (labelThread)
import qualified Control.Exception as E
import qualified Data.Vector as V

Expand Down Expand Up @@ -86,6 +86,8 @@ data LocalPool a = LocalPool {
-- ^ Count of open entries (both idle and in use).
, entries :: TVar [Entry a]
-- ^ Idle entries.
, setLastUsedTime :: UTCTime -> STM ()
-- ^ Record the last-used time of a resource, to request a reaper wakeup.
, lfin :: IORef ()
-- ^ empty value used to attach a finalizer to (internal)
} deriving (Typeable)
Expand Down Expand Up @@ -158,10 +160,17 @@ createPool create destroy numStripes idleTime maxResources = do
modError "pool " $ "invalid idle time " ++ show idleTime
when (maxResources < 1) $
modError "pool " $ "invalid maximum resource count " ++ show maxResources
localPools <- V.replicateM numStripes $
liftM3 LocalPool (newTVarIO 0) (newTVarIO []) (newIORef ())
reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask ->
unmask $ reaper destroy idleTime localPools

localPoolStates <- V.replicateM numStripes $ (,) <$> newTVarIO 0 <*> newTVarIO []
alarmClock <- newAlarmClock $ reapStaleEntries destroy idleTime localPoolStates

let handleSetLastUsedTime t = do
isSet <- isAlarmSetSTM alarmClock
unless isSet $ setAlarmSTM alarmClock $ addUTCTime idleTime t

localPools <- V.forM localPoolStates $ \(inUse, entries) ->
LocalPool inUse entries handleSetLastUsedTime <$> newIORef ()

fin <- newIORef ()
let p = Pool {
create
Expand All @@ -172,47 +181,24 @@ createPool create destroy numStripes idleTime maxResources = do
, localPools
, fin
}
mkWeakIORef fin (killThread reaperId) >>
mkWeakIORef fin (destroyAlarmClock alarmClock) >>
V.mapM_ (\lp -> mkWeakIORef (lfin lp) (purgeLocalPool destroy lp)) localPools
return p

-- TODO: Propose 'forkIOLabeledWithUnmask' for the base library.

-- | Sparks off a new thread using 'forkIOWithUnmask' to run the given
-- IO computation, but first labels the thread with the given label
-- (using 'labelThread').
--
-- The implementation makes sure that asynchronous exceptions are
-- masked until the given computation is executed. This ensures the
-- thread will always be labeled which guarantees you can always
-- easily find it in the GHC event log.
--
-- Like 'forkIOWithUnmask', the given computation is given a function
-- to unmask asynchronous exceptions. See the documentation of that
-- function for the motivation of this.
--
-- Returns the 'ThreadId' of the newly created thread.
forkIOLabeledWithUnmask :: String
-> ((forall a. IO a -> IO a) -> IO ())
-> IO ThreadId
forkIOLabeledWithUnmask label m = mask_ $ forkIOWithUnmask $ \unmask -> do
tid <- myThreadId
labelThread tid label
m unmask

-- | Periodically go through all pools, closing any resources that
-- have been left idle for too long.
reaper :: (a -> IO ()) -> NominalDiffTime -> V.Vector (LocalPool a) -> IO ()
reaper destroy idleTime pools = forever $ do
threadDelay (1 * 1000000)
reapStaleEntries :: (a -> IO ()) -> NominalDiffTime -> V.Vector (TVar Int, TVar [Entry a]) -> AlarmClock UTCTime -> IO ()
reapStaleEntries destroy idleTime poolStates alarmClock = do
now <- getCurrentTime
let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime
V.forM_ pools $ \LocalPool{..} -> do
V.forM_ poolStates $ \(inUse, entries) -> do
resources <- atomically $ do
(stale,fresh) <- partition isStale <$> readTVar entries
unless (null stale) $ do
writeTVar entries fresh
modifyTVar_ inUse (subtract (length stale))
unless (null fresh) $
setAlarmSTM alarmClock $ addUTCTime idleTime $ minimum $ map lastUse fresh
return (map entry stale)
forM_ resources $ \resource -> do
destroy resource `E.catch` \(_::SomeException) -> return ()
Expand Down Expand Up @@ -363,7 +349,9 @@ destroyResource Pool{..} LocalPool{..} resource = do
putResource :: LocalPool a -> a -> IO ()
putResource LocalPool{..} resource = do
now <- getCurrentTime
atomically $ modifyTVar_ entries (Entry resource now:)
atomically $ do
modifyTVar_ entries (Entry resource now:)
setLastUsedTime now
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE putResource #-}
#endif
Expand Down
3 changes: 2 additions & 1 deletion resource-pool.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ library
transformers-base >= 0.4,
stm >= 2.3,
time,
vector >= 0.7
vector >= 0.7,
alarmclock >= 0.4.0.2

if flag(developer)
ghc-options: -Werror
Expand Down

0 comments on commit 9a994b8

Please sign in to comment.