Skip to content

Commit

Permalink
implement stdio<->virtual-cat tunneling
Browse files Browse the repository at this point in the history
  • Loading branch information
indiscrete_void committed Oct 7, 2024
1 parent 557e33d commit 0d8ba2c
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 50 deletions.
11 changes: 8 additions & 3 deletions cli/Pnet/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@ import Polysemy.Serialize
import Polysemy.Socket
import Polysemy.Transport
import System.IO
import System.Random.Stateful
import Text.Printf (hPrintf)

main :: IO ()
main =
let runUnserialized = runDecoder . deserializeInput @Response . serializeOutput @Handshake . deserializeInput @(RouteTo (Maybe ByteString)) . serializeOutput @(RoutedFrom (Maybe ByteString)) . deserializeInput @(RouteTo ByteString) . serializeOutput @(RoutedFrom ByteString)
let runUnserialized = runDecoder . deserializeInput @Response . serializeOutput @Handshake . deserializeInput @(RouteTo ByteString) . serializeOutput @(RoutedFrom ByteString) . deserializeInput @(RoutedFrom ByteString) . serializeOutput @(RouteTo ByteString) . deserializeInput @(RoutedFrom Connection) . serializeOutput @(RouteTo Connection) . deserializeInput @(RoutedFrom (Maybe ByteString)) . serializeOutput @(RouteTo (Maybe ByteString)) . serializeOutput @(RouteTo (Maybe NodeHandshake))
runTransport s = inputToSocket bufferSize s . outputToSocket s . runUnserialized
runStdio = outputToIO stdout . inputToIO bufferSize stdin . closeToIO stdout
run s = runFinal . asyncToIOFinal . embedToFinal @IO . failToEmbed @IO . traceToStderrBuffered . runTransport s . runStdio . scopedProcToIOFinal bufferSize
in withPnetSocket \s -> do
in hSetBuffering stderr LineBuffering >> withPnetSocket \s -> do
(Options command maybeSocketPath) <- parse
gen <- initStdGen >>= newIOGenM
self <- uniformM @Address gen
hPrintf stderr "me: %s\n" $ show self
connect s =<< pnetSocketAddr maybeSocketPath
run s $ pnet command
run s $ pnet self command
9 changes: 9 additions & 0 deletions common/Pnet.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Pnet
( Handshake (..),
Response (..),
NodeHandshake (..),
pnetSocketAddr,
pnetSocket,
withPnetSocket,
Expand Down Expand Up @@ -30,12 +31,18 @@ data Transport
data Handshake where
ListNodes :: Handshake
ConnectNode :: Transport -> Maybe Address -> Handshake
Route :: Address -> Handshake
deriving stock (Show, Generic)

data Response where
NodeList :: [Address] -> Response
deriving stock (Show, Generic)

data NodeHandshake where
NodeRoute :: NodeHandshake
NodeTunnel :: NodeHandshake
deriving stock (Show, Generic)

timeout :: Int
timeout = 16384

Expand Down Expand Up @@ -77,3 +84,5 @@ instance Serialize Transport
instance Serialize Handshake

instance Serialize Response

instance Serialize NodeHandshake
26 changes: 20 additions & 6 deletions common/Pnet/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Pnet.Routing
import Pnet.Routing.Transfer
import Polysemy
import Polysemy.Async
import Polysemy.Extra.Async
import Polysemy.Extra.Trace
import Polysemy.Fail
import Polysemy.Process
Expand All @@ -25,19 +26,32 @@ listNodes = traceTagged "Ls" $ output ListNodes >> (inputOrFail @Response >>= tr
connectNode :: (Members (TransportEffects (RouteTo ByteString) (RoutedFrom ByteString)) r, Member Async r, Member (Output Handshake) r, Members (TransportEffects ByteString ByteString) r, Member Trace r, Member (Scoped CreateProcess Process) r) => Transport -> Maybe Address -> Sem r ()
connectNode transport maybeAddress = output (ConnectNode transport maybeAddress) >> streamTransport transport

tunnelTransport :: (Members (TransportEffects (RoutedFrom (Maybe ByteString)) (RouteTo (Maybe ByteString))) r, Member (Output (RouteTo Connection)) r, Member (Output Handshake) r, Member (Output (RouteTo (Maybe NodeHandshake))) r, Members (TransportEffects ByteString ByteString) r, Member Trace r, Member Async r) => Address -> Address -> Sem r ()
tunnelTransport self address =
output (Route self)
>> connectR2 address
>> runR2Output @NodeHandshake address (output NodeTunnel)
>> async_ (runR2Output @ByteString address inputToOutput)
>> runR2Input @ByteString address inputToOutput

pnet ::
( Members (TransportEffects (RouteTo ByteString) (RoutedFrom ByteString)) r,
Member ByteInputWithEOF r,
Member ByteOutput r,
Member (Scoped CreateProcess Process) r,
Members (TransportEffects (RoutedFrom (Maybe ByteString)) (RouteTo (Maybe ByteString))) r,
Member (Output (RouteTo Connection)) r,
Member (Output (RouteTo (Maybe NodeHandshake))) r,
Member (InputWithEOF Response) r,
Member (Output Handshake) r,
Member (Scoped CreateProcess Process) r,
Member ByteInputWithEOF r,
Member ByteOutput r,
Member Fail r,
Member Trace r,
Member Async r
) =>
Address ->
Command ->
Sem r ()
pnet Ls = listNodes
pnet (Connect transport maybeAddress) = connectNode transport maybeAddress
pnet (Tunnel _ _) = _
pnet _ Ls = listNodes
pnet _ (Connect transport maybeAddress) = connectNode transport maybeAddress
pnet self (Tunnel address Stdio) = tunnelTransport self address
pnet _ (Tunnel _ _) = _
24 changes: 23 additions & 1 deletion common/Pnet/Daemon.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module Pnet.Daemon (pnetd) where

import Data.ByteString (ByteString)
import Pnet
import Pnet.Daemon.Node
import Pnet.Daemon.Server
import Pnet.Routing
import Polysemy
Expand All @@ -11,8 +12,29 @@ import Polysemy.Socket.Accept
import Polysemy.Sockets
import Polysemy.Trace

pnetd :: (Member (Accept s) r, Member (Sockets Handshake Response s) r, Member (Sockets (RoutedFrom (Maybe (RoutedFrom (Maybe ByteString)))) (RouteTo (Maybe (RouteTo (Maybe ByteString)))) s) r, Member (AtomicState (State s)) r, Member Trace r, Member Async r, Eq s) => Sem r ()
pnetd ::
( Member (Accept s) r,
Member (Sockets Handshake Response s) r,
Member (Sockets (RoutedFrom (Maybe (RoutedFrom (Maybe ByteString)))) (RouteTo (Maybe (RouteTo (Maybe ByteString)))) s) r,
Member (Sockets (RouteTo (Maybe (RouteTo (Maybe ByteString)))) (RoutedFrom (Maybe (RoutedFrom (Maybe ByteString)))) s) r,
Member (Sockets (RoutedFrom (Maybe (RoutedFrom Connection))) (RouteTo (Maybe (RouteTo Connection))) s) r,
Member (Sockets (RoutedFrom (Maybe (RoutedFrom (Maybe (RoutedFrom (Maybe ByteString)))))) (RouteTo (Maybe (RouteTo (Maybe (RouteTo (Maybe ByteString)))))) s) r,
Member (Sockets (RoutedFrom (Maybe (RoutedFrom (Maybe NodeHandshake)))) (RouteTo (Maybe (RouteTo (Maybe NodeHandshake)))) s) r,
Member (Sockets (RoutedFrom (Maybe (RouteTo ByteString))) (RouteTo (Maybe (RoutedFrom ByteString))) s) r,
Member (Sockets (RouteTo ByteString) (RoutedFrom ByteString) s) r,
Member (AtomicState (State s)) r,
Member Trace r,
Member Async r,
Eq s
) =>
Sem r ()
pnetd = foreverAcceptAsync \s ->
socket @Handshake @Response s
. socket @(RoutedFrom (Maybe (RoutedFrom (Maybe ByteString)))) @(RouteTo (Maybe (RouteTo (Maybe ByteString)))) s
. socket @(RouteTo (Maybe (RouteTo (Maybe ByteString)))) @(RoutedFrom (Maybe (RoutedFrom (Maybe ByteString)))) s
. socket @(RoutedFrom (Maybe (RoutedFrom Connection))) @(RouteTo (Maybe (RouteTo Connection))) s
. socket @(RoutedFrom (Maybe (RoutedFrom (Maybe (RoutedFrom (Maybe ByteString)))))) @(RouteTo (Maybe (RouteTo (Maybe (RouteTo (Maybe ByteString)))))) s
. socket @(RouteTo ByteString) @(RoutedFrom ByteString) s
. socket @(RoutedFrom (Maybe (RoutedFrom (Maybe NodeHandshake)))) @(RouteTo (Maybe (RouteTo (Maybe NodeHandshake)))) s
. socket @(RoutedFrom (Maybe (RouteTo ByteString))) @(RouteTo (Maybe (RoutedFrom ByteString))) s
$ pnetcd s
75 changes: 66 additions & 9 deletions common/Pnet/Daemon/Node.hs
Original file line number Diff line number Diff line change
@@ -1,19 +1,76 @@
module Pnet.Daemon.Node (pnetnd) where
module Pnet.Daemon.Node (State, initialState, stateAddNode, stateDeleteNode, route, tunnelProcess, pnetnd) where

import Data.ByteString (ByteString)
import Control.Monad.Extra
import Data.ByteString
import Data.List qualified as List
import Data.Maybe
import Pnet
import Pnet.Routing
import Polysemy
import Polysemy.AtomicState
import Polysemy.Extra.Trace
import Polysemy.Fail
import Polysemy.Sockets
import Polysemy.Trace
import Polysemy.Transport

ping :: ByteString
ping = "ping"
type State s = [(s, Address)]

pnetnd :: (Members (TransportEffects (RoutedFrom (Maybe ByteString)) (RouteTo (Maybe ByteString))) r, Member Trace r, Member Fail r) => Sem r ()
pnetnd = traceTagged "pnetnd" . traceTagged "r2 ping" $ runR2 selfAddr go >> close
initialState :: State s
initialState = []

stateAddNode :: (Member (AtomicState (State s)) r) => (s, Address) -> Sem r ()
stateAddNode = atomicModify' . (:)

stateDeleteNode :: (Member (AtomicState (State s)) r, Eq s) => (s, Address) -> Sem r ()
stateDeleteNode = atomicModify' . List.delete

runAddress :: (Member (AtomicState (State s)) r) => (s -> InterpreterFor (Output o) r) -> Address -> InterpreterFor (Output o) r
runAddress f addr m = do
s <- lookupSocket <$> atomicGet
f s m
where
go =
(output ping >> trace (show ping))
>> (inputOrFail >>= trace . show)
lookupSocket = fst . fromJust . List.find ((== addr) . snd)

route ::
forall s r.
( Member (AtomicState (State s)) r,
Member (InputWithEOF (RouteTo ByteString)) r,
Member Trace r
) =>
(s -> InterpreterFor (Output (RoutedFrom ByteString)) r) ->
Address ->
Sem r ()
route f sender = traceTagged "route" $ raise @Trace do
trace ("routing for " ++ show sender)
let sendTo :: Address -> RoutedFrom ByteString -> Sem r ()
sendTo addr = runAddress f addr . output
handle (r2Sem sendTo sender)

tunnelProcess :: (Members (TransportEffects (RoutedFrom (Maybe ByteString)) (RouteTo (Maybe ByteString))) r, Member Trace r, Member (Output (RouteTo (Maybe NodeHandshake))) r, Member (Output (RouteTo Connection)) r) => Address -> Sem r ()
tunnelProcess addr = traceTagged ("tunnel " <> show addr) do
trace ("tunneling for " ++ show addr)
connectR2 addr
runR2Output addr $ output NodeRoute
runR2 addr inputToOutput

pnetnd ::
( Members (TransportEffects (RoutedFrom (Maybe ByteString)) (RouteTo (Maybe ByteString))) r,
Member (Sockets (RouteTo ByteString) (RoutedFrom ByteString) s) r,
Member (InputWithEOF (RoutedFrom (Maybe NodeHandshake))) r,
Member (InputWithEOF (RouteTo ByteString)) r,
Member (Output (RouteTo (Maybe NodeHandshake))) r,
Member (AtomicState (State s)) r,
Member Fail r,
Member Trace r,
Member (Output (RouteTo Connection)) r
) =>
Address ->
Address ->
Sem r ()
pnetnd peer addr = traceTagged "pnetnd" $ raise @Trace do
trace ("accepted " <> show addr)
handshake <- runR2Input @NodeHandshake addr $ inputOrFail @NodeHandshake
case handshake of
NodeRoute -> route socketOutput peer
NodeTunnel -> tunnelProcess addr
84 changes: 64 additions & 20 deletions common/Pnet/Daemon/Server.hs
Original file line number Diff line number Diff line change
@@ -1,37 +1,81 @@
module Pnet.Daemon.Server (State, initialState, pnetcd) where
module Pnet.Daemon.Server (listNodes, connectNode, pnetcd) where

import Control.Monad.Extra
import Data.ByteString (ByteString)
import Data.List qualified as List
import Data.Maybe
import Pnet
import Pnet.Daemon.Node
import Pnet.Routing
import Polysemy
import Polysemy.AtomicState
import Polysemy.Extra.Trace
import Polysemy.Fail
import Polysemy.Sockets
import Polysemy.Trace
import Polysemy.Transport
import Text.Printf qualified as Text

type State s = [(s, Address)]
listNodes :: (Member (AtomicState (State s)) r, Member (Output Response) r, Member Trace r) => Sem r ()
listNodes = traceTagged "ListNodes" do
nodeList <- map snd <$> atomicGet
trace (Text.printf "responding with `%s`" (show nodeList))
output (NodeList nodeList)

initialState :: State s
initialState = []
connectNode ::
( Member (AtomicState (State s)) r,
Members (TransportEffects (RoutedFrom (Maybe (RoutedFrom Connection))) (RouteTo (Maybe (RouteTo Connection)))) r,
Members (TransportEffects (RoutedFrom (Maybe (RoutedFrom (Maybe ByteString)))) (RouteTo (Maybe (RouteTo (Maybe ByteString))))) r,
Member (Sockets (RouteTo ByteString) (RoutedFrom ByteString) s) r,
Member (Input (Maybe (RoutedFrom (Maybe (RoutedFrom (Maybe NodeHandshake)))))) r,
Member (Input (Maybe (RoutedFrom (Maybe (RouteTo ByteString))))) r,
Member (Output (RouteTo (Maybe (RouteTo (Maybe NodeHandshake))))) r,
Member Trace r,
Eq s
) =>
s ->
Transport ->
Maybe Address ->
Sem r ()
connectNode s transport maybeNodeID = traceTagged "connection" do
let nodeID = fromJust maybeNodeID
trace (Text.printf "%s connected over `%s`" nodeIDStr (show transport))
whenJust maybeNodeID (stateAddNode . entry)
traceTagged "pnetnd" . trace . show @(Either String ())
=<< runFail
( runR2 @(RoutedFrom Connection) @(RouteTo Connection) defaultAddr
. runR2 @(RoutedFrom (Maybe ByteString)) @(RouteTo (Maybe ByteString)) defaultAddr
. runR2 @(RoutedFrom (Maybe NodeHandshake)) @(RouteTo (Maybe NodeHandshake)) defaultAddr
. runR2Input @(RouteTo ByteString) defaultAddr
. runR2Input @(RoutedFrom (Maybe NodeHandshake)) defaultAddr
$ forever (acceptR2 >>= pnetnd nodeID)
)
trace (Text.printf "%s disconnected from `%s`" nodeIDStr (show transport))
whenJust maybeNodeID (stateDeleteNode . entry)
where
nodeIDStr = maybe "unknown node" show maybeNodeID
entry nodeID = (s, nodeID)

pnetcd :: (Members (TransportEffects Handshake Response) r, Members (TransportEffects (RoutedFrom (Maybe (RoutedFrom (Maybe ByteString)))) (RouteTo (Maybe (RouteTo (Maybe ByteString))))) r, Member (AtomicState (State s)) r, Member Trace r, Eq s) => s -> Sem r ()
pnetcd = traceTagged "pnetcd" . handle . go
pnetcd ::
( Members (TransportEffects Handshake Response) r,
Members (TransportEffects (RoutedFrom (Maybe (RoutedFrom Connection))) (RouteTo (Maybe (RouteTo Connection)))) r,
Members (TransportEffects (RoutedFrom (Maybe (RoutedFrom (Maybe ByteString)))) (RouteTo (Maybe (RouteTo (Maybe ByteString))))) r,
Member (Sockets (RoutedFrom (Maybe (RouteTo ByteString))) (RouteTo (Maybe (RoutedFrom ByteString))) s) r,
Member (Sockets (RouteTo ByteString) (RoutedFrom ByteString) s) r,
Member (InputWithEOF (RoutedFrom (Maybe (RoutedFrom (Maybe NodeHandshake))))) r,
Member (InputWithEOF (RoutedFrom (Maybe (RouteTo ByteString)))) r,
Member (InputWithEOF (RouteTo ByteString)) r,
Member (Output (RouteTo (Maybe (RouteTo (Maybe NodeHandshake))))) r,
Member (AtomicState (State s)) r,
Member Trace r,
Eq s
) =>
s ->
Sem r ()
pnetcd s = handle \case
ListNodes -> listNodes
(ConnectNode transport maybeNodeID) -> connectNode s transport maybeNodeID
Route sender ->
let entry = (s, sender)
in stateAddNode entry >> route runClientOutput sender >> stateDeleteNode entry
where
go _ ListNodes = traceTagged "ListNodes" do
nodeList <- map snd <$> atomicGet
trace (Text.printf "responding with `%s`" (show nodeList))
output (NodeList nodeList)
go s (ConnectNode transport maybeNodeID) = traceTagged "connection" do
trace (Text.printf "%s connected over `%s`" nodeIDStr (show transport))
whenJust maybeNodeID (atomicModify' . (:) . entry)
traceTagged "pnetnd" . trace . show =<< runFail (runR2 defaultAddr pnetnd)
trace (Text.printf "%s disconnected from `%s`" nodeIDStr (show transport))
whenJust maybeNodeID (atomicModify' . List.delete . entry)
where
nodeIDStr = maybe "unknown node" show maybeNodeID
entry nodeID = (s, nodeID)
runClientOutput s = socketOutput s . runR2Output defaultAddr . raiseUnder @(Output (RouteTo (Maybe (RoutedFrom ByteString))))
5 changes: 1 addition & 4 deletions common/Pnet/Routing.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Pnet.Routing (Address, RouteTo (..), RoutedFrom (..), Connection, r2, r2Sem, runR2, runR2Input, runR2Output, runR2Close, selfAddr, defaultAddr, connectR2, acceptR2) where
module Pnet.Routing (Address, RouteTo (..), RoutedFrom (..), Connection, r2, r2Sem, runR2, runR2Input, runR2Output, runR2Close, defaultAddr, connectR2, acceptR2) where

import Control.Monad
import Data.ByteString
Expand Down Expand Up @@ -78,9 +78,6 @@ connectR2 addr = traceTagged "connectR2" (trace $ "connecting to " <> show addr)
acceptR2 :: (Member (InputWithEOF (RoutedFrom Connection)) r, Member Fail r) => Sem r Address
acceptR2 = routedFromNode <$> inputOrFail

selfAddr :: Address
selfAddr = -1

defaultAddr :: Address
defaultAddr = 0

Expand Down
5 changes: 4 additions & 1 deletion common/Polysemy/Sockets.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Polysemy.Sockets (Sockets, bundleSocketEffects, socket) where
module Polysemy.Sockets (Sockets, bundleSocketEffects, socket, socketOutput) where

import Polysemy
import Polysemy.Bundle
Expand All @@ -17,3 +17,6 @@ type Sockets i o s = Scoped s (Socket i o)

socket :: (Member (Sockets i o s) r) => s -> InterpretersFor (TransportEffects i o) r
socket s = scoped s . bundleSocketEffects . raise3Under

socketOutput :: forall i o s r. (Member (Sockets i o s) r) => s -> InterpreterFor (Output o) r
socketOutput s = socket s . raise @(InputWithEOF i) . raiseUnder @Close
Loading

0 comments on commit 0d8ba2c

Please sign in to comment.