Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
domenkozar committed Dec 31, 2023
1 parent 6c1a19e commit dc51f55
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 3 deletions.
65 changes: 65 additions & 0 deletions src/Network/WebSockets/Simple/AckProtocol.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
module Network.WebSockets.Simple.AckProtocol (AckProtocol (..), resendTimedoutEvents) where

import Control.Concurrent (threadDelay)
import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (asks)
import Data.HashMap.Strict qualified as HashMap
import Data.IORef (atomicModifyIORef', readIORef)
import Data.Time.Clock (addUTCTime, getCurrentTime, secondsToNominalDiffTime)
import GHC.Generics (Generic)
import Network.WebSockets.Simple.Session qualified as Session

-- inspired by https://socket.io/docs/v4/socket-io-protocol/#exchange-protocol
data AckProtocol message
= Send message
| Event Integer message
| EventAck Integer
deriving (Show, Generic)

instance
( MonadIO m,
Session.Codec (AckProtocol send),
Session.Codec (AckProtocol receive),
Session.Codec send,
Session.Codec receive
) =>
Session.SessionProtocol m (AckProtocol send) (AckProtocol receive)
where
send (Send msg) = do
timestamp <- liftIO getCurrentTime
ackProtocol <- asks Session.ackProtocol
id_ <- liftIO $ atomicModifyIORef' ackProtocol $ \(current, hashMap) ->
let next = current + 1
-- inefficient since we're converting to bytestring twice and on each retry
newHashMap = HashMap.insert next (timestamp, Session.toByteString msg) hashMap
in ((next, newHashMap), next)
Session.send $ Event id_ msg

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.6 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.4 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.2 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on windows-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.6 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on macos-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.4 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.2 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on windows-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 37 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on macos-latest

• Overlapping instances for Session.SessionProtocol
send (Event _ _) = error "send: unexpected Event message"
send (EventAck _) = error "send: unexpected EventAck message"

receive = do
msg <- Session.receive

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.6 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.4 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.2 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on windows-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.6 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on macos-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.4 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.2 on ubuntu-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on windows-latest

• Overlapping instances for Session.SessionProtocol

Check failure on line 42 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on macos-latest

• Overlapping instances for Session.SessionProtocol
case msg of
EventAck id_ -> do
ackProtocol <- asks Session.ackProtocol
_ <- liftIO $ atomicModifyIORef' ackProtocol $ \(current, hashMap) ->
((current, HashMap.delete id_ hashMap), ())
return $ EventAck id_
Event id_ msg2 -> do
Session.send $ EventAck id_
return $ Event id_ msg2
Send _ -> error "receive: unexpected Send message"

resendTimedoutEvents :: (MonadIO m, Session.Codec send, Session.Codec receive) => Session.Session m send receive ()
resendTimedoutEvents = do
ackProtocol <- asks Session.ackProtocol
(_, hashMap) <- liftIO $ readIORef ackProtocol
currentTime <- liftIO getCurrentTime
let timedout = HashMap.filter (\(msgTimestamp, _) -> addUTCTime (secondsToNominalDiffTime $ fromIntegral interval) msgTimestamp < currentTime) hashMap
forM_ (HashMap.toList timedout) $ \(id_, (_, msg)) ->
Session.send $ Event id_ $ Session.fromByteString msg

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on ubuntu-latest

• Couldn't match expected type ‘send’

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.6 on ubuntu-latest

• Couldn't match expected type ‘send’

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.4 on ubuntu-latest

• Couldn't match expected type ‘send’

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.2 on ubuntu-latest

• Couldn't match expected type ‘send’

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on ubuntu-latest

• Couldn't match expected type ‘send’

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on windows-latest

• Couldn't match expected type ‘send’

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.6 on ubuntu-latest

• Couldn't match expected type ‘send’

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on macos-latest

• Couldn't match expected type ‘send’

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.4 on ubuntu-latest

• Couldn't match expected type ‘send’

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.2 on ubuntu-latest

• Couldn't match expected type ‘send’

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on windows-latest

• Couldn't match expected type ‘send’

Check failure on line 61 in src/Network/WebSockets/Simple/AckProtocol.hs

View workflow job for this annotation

GitHub Actions / GHC 9.8 on macos-latest

• Couldn't match expected type ‘send’
liftIO $ threadDelay (fromIntegral interval * 1000 * 1000)
resendTimedoutEvents
where
interval = 10
12 changes: 10 additions & 2 deletions src/Network/WebSockets/Simple/Session.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module Network.WebSockets.Simple.Session
run,
Session (..),
SessionProtocol (..),
ackProtocol,
)
where

Expand All @@ -13,6 +14,9 @@ import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Reader (MonadReader, ReaderT, asks, runReaderT)
import Data.ByteString (ByteString, toStrict)
import Data.HashMap.Strict qualified as HashMap
import Data.IORef (IORef, newIORef)
import Data.Time.Clock (UTCTime)
import Network.WebSockets qualified as WS

-- Allows decoding from ByteString to any format like JSON or CBOR.
Expand All @@ -23,7 +27,10 @@ class Codec a where
-- State for the session
data SessionEnv = SessionEnv
{ sendChan :: Unagi.InChan ByteString,
receiveChan :: Unagi.OutChan ByteString
receiveChan :: Unagi.OutChan ByteString,
-- TODO: ideally we'd implement a way for each WebsocketMonad instance to specify how env is created
-- maybe order by timestamp?
ackProtocol :: IORef (Integer, HashMap.HashMap Integer (UTCTime, ByteString))
}

newtype Session m send receive a = Session (ReaderT SessionEnv m a)
Expand All @@ -50,7 +57,8 @@ run :: (Codec send, Codec receive) => Int -> WS.Connection -> Session IO send re
run limit conn sendApp receiveApp = do
(sendChanWrite, sendChanRead) <- liftIO $ Unagi.newChan limit
(receiveChanWrite, receiveChanRead) <- liftIO $ Unagi.newChan limit
let clientEnv = SessionEnv sendChanWrite receiveChanRead
ackProtocol <- liftIO $ newIORef (0, HashMap.empty)
let clientEnv = SessionEnv sendChanWrite receiveChanRead ackProtocol

-- Use async to queue the send and receive channels in parallel
sendAsync <- liftIO $ async $ forever $ do
Expand Down
5 changes: 4 additions & 1 deletion websockets-simple.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ common common
unliftio-core,
bytestring,
exceptions,
stamina
stamina,
time,
unordered-containers

default-extensions: OverloadedStrings

Expand All @@ -36,6 +38,7 @@ library
exposed-modules:
Network.WebSockets.Simple.Server
Network.WebSockets.Simple.Client
Network.WebSockets.Simple.AckProtocol
other-modules:
Network.WebSockets.Simple.Session
Network.WebSockets.Simple.Utils
Expand Down

0 comments on commit dc51f55

Please sign in to comment.