Skip to content

Commit

Permalink
reimplement existing functionality via r2
Browse files Browse the repository at this point in the history
  • Loading branch information
indiscrete_void committed Jun 23, 2024
1 parent c17e032 commit a349c00
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 68 deletions.
26 changes: 12 additions & 14 deletions cli/Pnet/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import Data.ByteString.Base58.Internal
import Data.ByteString.Char8 qualified as BC
import Network.Socket hiding (close)
import Pnet
import Pnet.Routing
import Pnet.Options
import Pnet.Routing
import Polysemy hiding (run)
import Polysemy.Async
import Polysemy.Extra.Async
Expand All @@ -20,26 +20,24 @@ import Transport.Maybe
parseNodeID :: String -> Maybe Address
parseNodeID = fmap (fromInteger . bsToInteger) . decodeBase58 bitcoinAlphabet . BC.pack

pnet :: (Member ByteInputWithEOF r, Member ByteOutput r, Member (InputWithEOF NodeToManagerMessage) r, Member (Output ManagerToNodeMessage) r, Member Fail r, Member Trace r, Member Close r, Member Async r) => Command -> Sem r ()
pnet Ls = output ListNodes >> (inputOrFail @NodeToManagerMessage >>= traceTagged "Ls" . show)
pnet :: (Member (InputWithEOF RouteTo) r, Member (Output RoutedFrom) r, Member ByteInputWithEOF r, Member ByteOutput r, Member (InputWithEOF Response) r, Member (Output Request) r, Member Fail r, Member Trace r, Member Close r, Member Async r) => Command -> Sem r ()
pnet Ls = output ListNodes >> (inputOrFail @Response >>= traceTagged "Ls" . show)
pnet (Connect transport maybeNodeID) = do
maybeNodeID' <- maybe (pure Nothing) (fmap Just . maybeFail "invalid node ID" . parseNodeID) maybeNodeID
output (ConnectNode transport maybeNodeID') >> case transport of
Stdio -> async_ nodeToDaemon >> daemonToNode
_ -> _
where
nodeToDaemon = transferStream (msg . Just) (msg Nothing)
output (ConnectNode transport maybeNodeID')
case transport of
Stdio -> async_ (handle nodeToIO) >> handle ioToNode
where
msg = ManagerNodeData . TunnelMessage
daemonToNode = handle go
where
go (DaemonNodeData (TunnelMessage maybeStr)) = maybe close output maybeStr
go _ = _
ioToNode msg = traceTagged ("RoutedFrom " <> show tmpAddr) (show msg) >> output (RoutedFrom tmpAddr $ Just msg)
nodeToIO (RouteTo address maybeMsg)
| address == tmpAddr = traceTagged ("RouteTo " <> show address) (show maybeMsg) >> maybe close output maybeMsg
| otherwise = _
_ -> _
pnet _ = _

main :: IO ()
main =
let runUnserialized = runDecoder . deserializeInput @NodeToManagerMessage . serializeOutput @ManagerToNodeMessage
let runUnserialized = runDecoder . deserializeInput @Response . serializeOutput @Request . deserializeInput @RouteTo . serializeOutput @RoutedFrom
runTransport s = inputToSocket bufferSize s . outputToSocket s . runUnserialized
runStdio = outputToIO stdout . inputToIO bufferSize stdin . closeToIO stdout
run s = runFinal . asyncToIOFinal . embedToFinal @IO . failToEmbed @IO . traceToStderrBuffered . runTransport s . runStdio
Expand Down
29 changes: 9 additions & 20 deletions common/Pnet.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module Pnet
( TunnelMessage (..),
NodeToManagerMessage (..),
ManagerToNodeMessage (..),
( Request (..),
Response (..),
pnetSocketAddr,
pnetSocket,
withPnetSocket,
Expand All @@ -14,7 +13,6 @@ where

import Control.Applicative ((<|>))
import Control.Exception
import Data.ByteString (ByteString)
import Data.Maybe
import Data.Serialize
import Debug.Trace
Expand All @@ -29,20 +27,13 @@ data Transport
| Process String
deriving stock (Show, Generic)

newtype TunnelMessage = TunnelMessage
{ tunnelMessageData :: Maybe ByteString
}
data Request where
ListNodes :: Request
ConnectNode :: Transport -> Maybe Address -> Request
deriving stock (Show, Generic)

data NodeToManagerMessage where
NodeList :: [Address] -> NodeToManagerMessage
DaemonNodeData :: TunnelMessage -> NodeToManagerMessage
deriving stock (Show, Generic)

data ManagerToNodeMessage where
ListNodes :: ManagerToNodeMessage
ConnectNode :: Transport -> Maybe Address -> ManagerToNodeMessage
ManagerNodeData :: TunnelMessage -> ManagerToNodeMessage
data Response where
NodeList :: [Address] -> Response
deriving stock (Show, Generic)

timeout :: Int
Expand Down Expand Up @@ -83,8 +74,6 @@ withPnetSocket = bracket pnetSocket (`gracefulClose` timeout)

instance Serialize Transport

instance Serialize TunnelMessage

instance Serialize NodeToManagerMessage
instance Serialize Request

instance Serialize ManagerToNodeMessage
instance Serialize Response
8 changes: 7 additions & 1 deletion common/Pnet/Routing.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Pnet.Routing (Address, RouteTo (..), RoutedFrom (..), r2, runR2) where
module Pnet.Routing (Address, RouteTo (..), RoutedFrom (..), r2, runR2, selfAddr, tmpAddr) where

import Data.ByteString (ByteString)
import Data.DoubleWord
Expand Down Expand Up @@ -39,6 +39,12 @@ runR2 node =
outputRouteTo :: (Member (Output RouteTo) r) => Maybe ByteString -> Sem r ()
outputRouteTo = output . RouteTo node

selfAddr :: Address
selfAddr = -1

tmpAddr :: Address
tmpAddr = 0

instance Serialize Int128

instance Serialize Word128
Expand Down
14 changes: 8 additions & 6 deletions daemon/Pnet/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ initialState = []
whenJust :: (Monad m) => (a -> m ()) -> Maybe a -> m ()
whenJust = maybe (pure ())

pnetd :: (Member (Accept s) r, Member (Sockets ManagerToNodeMessage NodeToManagerMessage s) r, Member (AtomicState (State s)) r, Member Trace r, Member Fail r, Member Decoder r, Member Async r, Eq s) => Sem r ()
pnetd :: (Member (Accept s) r, Member (Sockets Request Response s) r, Member (Sockets RoutedFrom RouteTo s) r, Member (AtomicState (State s)) r, Member Trace r, Member Fail r, Member Decoder r, Member Async r, Eq s) => Sem r ()
pnetd = foreverAcceptAsync \s -> socket s (handle (go s) >> close)
where
go _ ListNodes = do
Expand All @@ -39,7 +39,7 @@ pnetd = foreverAcceptAsync \s -> socket s (handle (go s) >> close)
go s (ConnectNode transport maybeNodeID) = do
traceTagged "NodeAvailability" (Text.printf "%s connected over `%s`" nodeIDStr (show transport))
whenJust (atomicModify' . (:) . entry) maybeNodeID
traceTagged "pnetnd" . show =<< runFail (mn2nn pnetnd)
traceTagged "pnetnd" . show =<< runFail (socket s $ runR2 tmpAddr pnetnd')
traceTagged "NodeAvailability" (Text.printf "%s disconnected from `%s`" nodeIDStr (show transport))
whenJust (atomicModify' . List.delete . entry) maybeNodeID
where
Expand All @@ -49,10 +49,12 @@ pnetd = foreverAcceptAsync \s -> socket s (handle (go s) >> close)

main :: IO ()
main =
let runUnserialized :: (Member Fail r, Member Decoder r, Member ByteInputWithEOF r, Member ByteOutput r) => InterpretersFor (InputWithEOF ManagerToNodeMessage ': Output NodeToManagerMessage ': '[]) r
runUnserialized = serializeOutput @NodeToManagerMessage . deserializeInput @ManagerToNodeMessage
runTransport s = closeToSocket timeout s . outputToSocket s . inputToSocket bufferSize s . runUnserialized . raise2Under @ByteInputWithEOF . raise2Under @ByteOutput
runSocket s = acceptToIO s . runScopedBundle @(SocketEffects ManagerToNodeMessage NodeToManagerMessage) runTransport
let runUnserialized :: (Member Fail r, Member Decoder r, Member ByteInputWithEOF r, Member ByteOutput r) => InterpretersFor (InputWithEOF Request ': Output Response ': '[]) r
runUnserialized = serializeOutput @Response . deserializeInput @Request
runUnserialized' :: (Member Fail r, Member Decoder r, Member ByteInputWithEOF r, Member ByteOutput r) => InterpretersFor (InputWithEOF RoutedFrom ': Output RouteTo ': '[]) r
runUnserialized' = serializeOutput @RouteTo . deserializeInput @RoutedFrom
runTransport f s = closeToSocket timeout s . outputToSocket s . inputToSocket bufferSize s . f . raise2Under @ByteInputWithEOF . raise2Under @ByteOutput
runSocket s = acceptToIO s . runScopedBundle @(SocketEffects Request Response) (runTransport runUnserialized) . runScopedBundle @(SocketEffects RoutedFrom RouteTo) (runTransport runUnserialized')
runAtomicState = void . atomicStateToIO initialState
run s =
runFinal @IO
Expand Down
42 changes: 15 additions & 27 deletions daemon/Pnet/Node.hs
Original file line number Diff line number Diff line change
@@ -1,43 +1,31 @@
module Pnet.Node
( NodeToNodeMessage (..),
pnetnd,
mn2nn,
( pnetnd,
pnetnd',
)
where

import Data.Serialize
import GHC.Generics
import Pnet
import Data.ByteString
import Pnet.Routing
import Polysemy hiding (send)
import Polysemy.Extra.Trace
import Polysemy.Fail
import Polysemy.Output
import Polysemy.Serialize
import Polysemy.Socket
import Polysemy.Trace
import Polysemy.Transport
import Transport.Maybe

data NodeToNodeMessage where
Ping :: NodeToNodeMessage
Pong :: NodeToNodeMessage
deriving stock (Show, Generic)
ping :: ByteString
ping = "ping"

mn2nn :: (Member (InputWithEOF ManagerToNodeMessage) r, Member (Output NodeToManagerMessage) r, Member Decoder r, Member Fail r) => InterpretersFor (InputWithEOF NodeToNodeMessage ': Output NodeToNodeMessage ': '[]) r
mn2nn = o2o . i2i
pnetnd :: (Members (SocketEffects RoutedFrom RouteTo) r, Member Trace r, Member Fail r) => Sem r ()
pnetnd = runR2 selfAddr (output ping >> traceTagged "pnetnd: r2 ping" (show ping) >> inputOrFail >>= go) >> close
where
i2i :: (Member (InputWithEOF ManagerToNodeMessage) r, Member Decoder r, Member Fail r) => InterpreterFor (InputWithEOF NodeToNodeMessage) r
i2i = interpret \Input ->
let managerNodeData (ManagerNodeData (TunnelMessage maybeStr)) = maybeStr
managerNodeData _ = _
in Just <$> deserializeFrom (inputOrFail >>= maybeFailEOF . managerNodeData)
o2o :: (Member (Output NodeToManagerMessage) r) => InterpreterFor (Output NodeToNodeMessage) r
o2o = interpret \(Output msg) -> output . DaemonNodeData . TunnelMessage . Just $ serialize msg
go (RoutedFrom addr maybeMsg)
| addr == selfAddr = traceTagged "pnetnd: r2 ping" (show maybeMsg)
| otherwise = fail "pnetnd: r2 ping: got no reply"

pnetnd :: (Members (SocketEffects NodeToNodeMessage NodeToNodeMessage) r, Member Trace r) => Sem r ()
pnetnd = trace "sending Ping" >> output Ping >> handle go >> close
pnetnd' :: (Members (SocketEffects ByteString ByteString) r, Member Trace r, Member Fail r) => Sem r ()
pnetnd' = runUnserialized pnetnd
where
go Ping = traceTagged "Ping" "Pong" >> output Pong
go Pong = traceTagged "Pong" "doing nothing"

instance Serialize NodeToNodeMessage
runUnserialized :: (Members '[InputWithEOF ByteString, Output ByteString] r, Member Fail r) => InterpretersFor '[InputWithEOF RoutedFrom, Output RouteTo, Decoder] r
runUnserialized = runDecoder . serializeOutput @RouteTo . deserializeInput @RoutedFrom

0 comments on commit a349c00

Please sign in to comment.