From ced6592890eacfb4e35303757bacf98850f86f6f Mon Sep 17 00:00:00 2001 From: Alexander Vershilov Date: Sat, 30 May 2015 03:01:12 +0300 Subject: [PATCH] Revert "Named send is intra-node, so we can use the optimised routing / encoding" This reverts commit 9f10533cc0edd741f6472157d581d4b8e9e0bf2c. Using names send to intra-nodes break ability to store remote processes inside registry, and cloud haskell used that functionality. So we need to revert commit in order to unbreak that. --- .../Process/Internal/Primitives.hs | 2 +- src/Control/Distributed/Process/Node.hs | 20 +++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/Control/Distributed/Process/Internal/Primitives.hs b/src/Control/Distributed/Process/Internal/Primitives.hs index ee3d4945..ed23d591 100644 --- a/src/Control/Distributed/Process/Internal/Primitives.hs +++ b/src/Control/Distributed/Process/Internal/Primitives.hs @@ -1130,7 +1130,7 @@ whereisRemoteAsync nid label = -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () nsend label msg = - sendCtrlMsg Nothing (NamedSend label (createUnencodedMessage msg)) + sendCtrlMsg Nothing (NamedSend label (createMessage msg)) -- | Named send to a process in the local registry (asynchronous). -- This function makes /no/ attempt to serialize and (in the case when the diff --git a/src/Control/Distributed/Process/Node.hs b/src/Control/Distributed/Process/Node.hs index 69cec3a5..bb94ae9d 100644 --- a/src/Control/Distributed/Process/Node.hs +++ b/src/Control/Distributed/Process/Node.hs @@ -157,10 +157,11 @@ import Control.Distributed.Process.Internal.Types , RegisterReply(..) , WhereIsReply(..) , payloadToMessage + , messageToPayload , createUnencodedMessage , runLocalProcess , firstNonReservedProcessId - , ImplicitReconnect(WithImplicitReconnect) + , ImplicitReconnect(WithImplicitReconnect,NoImplicitReconnect) ) import Control.Distributed.Process.Management.Internal.Agent ( mxAgentController @@ -188,6 +189,7 @@ import Control.Distributed.Process.Serializable (Serializable) import Control.Distributed.Process.Internal.Messaging ( sendBinary , sendMessage + , sendPayload , closeImplicitReconnections , impliesDeathOf ) @@ -692,8 +694,8 @@ nodeController = do ncEffectRegister from label atnode pid force NCMsg (ProcessIdentifier from) (WhereIs label) -> ncEffectWhereIs from label - NCMsg _ (NamedSend label msg') -> - ncEffectNamedSend label msg' + NCMsg (ProcessIdentifier from) (NamedSend label msg') -> + ncEffectNamedSend from label msg' NCMsg _ (UnreliableSend lpid msg') -> ncEffectLocalSend node (ProcessId (localNodeId node) lpid) msg' NCMsg _ (LocalSend to msg') -> @@ -912,11 +914,17 @@ ncEffectWhereIs from label = do (WhereIsReply label mPid) -- [Unified: Table 14] -ncEffectNamedSend :: String -> Message -> NC () -ncEffectNamedSend label msg = do +ncEffectNamedSend :: ProcessId -> String -> Message -> NC () +ncEffectNamedSend from label msg = do mPid <- gets (^. registeredHereFor label) + node <- ask -- If mPid is Nothing, we just ignore the named send (as per Table 14) - forM_ mPid $ \pid -> postMessage pid msg + forM_ mPid $ \pid -> + liftIO $ sendPayload node + (ProcessIdentifier from) + (ProcessIdentifier pid) + NoImplicitReconnect + (messageToPayload msg) -- [Issue #DP-20] ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC ()