Skip to content

Typed Channels

Tim Watson edited this page Dec 1, 2018 · 2 revisions

Discussion

How do we build up an API for typed channels that can answer both local and remote use cases and remain performant and reliable?

  • Exercise: Sketch out what channels might look like as independent from actors

We will start with local only channels and then consider remote channels separately. We will also start out considering unidirectional channels only. Bi-directional channels (i.e. request-reply channels) can be layered on top of the basic primitives, I would've thought.

Let's assume we have a fan-out exchange, and want to disperse data using it. There are a number of options for doing this currently. One of them is to use distributed-process-execution, and for a fan-out specifically, to use a broadcast exchange.

fanOut :: [(Int -> Int)] -> Process ()
fanOut xfms = do
  us <- getSelfPid
  ex <- broadcastExchange
  pids <- forM_ xfms $ \f -> spawnLocal $ do self <- getSelfPid
                                             expect >>= \i -> send us (self, f i)
  replicateM_ 10 (post ex)
  collectFrom pids

This model doesn't compose well or feel very appealing. The middle-man needs setting up explicitly before adding the end of the chain, and then starting the producers? Yuk...

On reflection though, I don't want to try and make this too compositional, or delve into routing to deeply. Doing so would effectively move us in the direction of a (reactive) streams API, which isn't the focus here. Instead, let's look at some pseudocode options for working with actors and channels. First let's consider channels in isolation.

-- Q: do we want to ensure channels are used in (MonadIO m) => Monad m -> ...?
chewingOnChannels = do
  -- in some monad or other then..
  forM_ ([1..50] :: [Int]) $
    source yield <$> pure ((*) 2 :: Int -> Int) <*> (sink $ liftIO . putStrLn)

Obviously we could make that work without any concurrency at all. Let's assume our source is a channel that is being written to by some other thread, and our sink behaves like a FIFO queue - something we could implement on the writing end of the sink with an STM TBQueue, for example.

This allows our reader to apply back-pressure to the sender rather easily. An even more direct form of back-pressure might involve using a TMVar or some such.

I believe that different channel semantics are required for different situations. Our high level API should define some primitives that channels can behave in various different ways, and are amenable to different implementation strategies.

producer chan = forever $ writeChannel chan 42
consumer chan = forever $ readChannel chan >>= print

main = do
  -- this API is speculative, ofc. The point is to choose at channel creation
  -- time, the semantics we require of the channel. I have used the terminology
  -- _acquire_ to indicate the fact that for channels to function properly, we
  -- might need to set up some background infrastructure...
  chan <- acquireChannel ChannelType_X

  void $ forkIO $ producer chan -- starts flooding the channel immediately
  tid <- forkIO $ consumer chan -- reads from the channel continuously

  -- assuming our channel is either size bounded or 1-for-1 (TMVar, etc..)
  -- when we kill the consumer, the producer should block on writeChannel
  killThread tid  -- speculative API, probably looks like throwTo

  -- we can allow the producer to progress a bit now
  forM_ [1..100] $ readChannel chan

  -- and now we can close the channel
  closeChannel chan

Regardless of whether or not a channel utilises back-pressure, we need signalling on its control plane.

API design question: does closing the channel terminate the producer, or does it provide a status datum as the result of writeChannel? Perhaps coupling the two APIs we make the most sense.

demo chan = do
  -- so writeChannel chan might throw ChannelClosedException
  -- API design question: do we wrap other synchronous/asynchronous exceptions
  -- in ChannelClosedException and re-throw?
  void $ forkIO $ forever $ void $
    try (writeChannel chan "hello again") :: IO (Either ChannelClosedException ())

  -- and an alternative that tells you if it succeeded or not...
  didWrite <- writeChannel chan "hi there" :: IO ChannelStatus
  case didWrite of
    ChannelOk      -> return ()
    ChannelClosed  -> putStrLn "hmm, that didn't work..."
    ChannelError e -> {- some exception -} print e
  -- lots of design questions about the above...

  -- now let's blow up that forkIO thread from above O_o
  closeChannel chan

I'm not sure there's any easy to way to track providers and consumers of a channel automatically outside of an actor based implementation. Aside from introducing indeterminate GHC magic, I think the best approach here would be to provide an explicit API for doing this kind of thing. That might also be a useful place to introduce monadic execution contexts, which would give clients of the API the ability to interact with internal stuff in a frictionless manner. This is going on the assumption that our channel abstractions can be made to fit with Applicative and Alternative, which I would very much like. (Once we start looking at bi-directional channels, do we then get into BiFunctor and Contravariant land, and end up using Profunctors?)

demo1 chan = do
  -- we register ourselves and get on with it (note the implicit binding to `chan`)
  registerProducer chan $ forever $ writeChannel "Well hello there!"

demo2 chan = do
  -- we register ourselves again, and get on with it (not the implicit binding again)
  registerConsumer chan $ forever $ readChannel >>= putStrLn

demo3 chan = do
  forkIO $ demo1 chan
  forkIO $ demo2 chan

  channelSubscriptions chan >>= print
  channelProducers chan >>= print
  channelConsumer chan >>= print
  channelStats >>= print

This is all still quite a low level API, and that's really the point. If we get the layers defined cleanly, with clear semantics - at the API level, so implementors are responsible for meeting the demand - then we should be able to build richer capabilities on top of these.

Design Consideration: Dependency Inversion

Abstractions should not depend on details, but rather, details should depend on abstractions. Or so the saying goes. I think figuring out where the interface/api segregation needs to occur when we combine these channel based APIs with remoting, and then with actors, will be tricky. Not to mention that we want to ensure we do not break the formal semantics for distributed-process - but I think we need to find ways to derive this API for its own set of formal semantics.

  • Exercise: Sketch out different approaches for monitoring channels

All the examples above were predicated on some implementation as yet undefined, which makes it possible to communicate across multiple threads via the channels. Speculative commentary about monitoring channels is an entirely more complex task. Let's take a few high level options and discuss without pseudocode for the time being...

  1. Monitoring using asynchronous exceptions

This is a fairly simple approach. For a given channel, we maintain some registry of interested clients. When certain channel events occur, we notify said clients by throwing an asynchronous exception at them.

The advantages of this approach lay in its simplicity. Adding oneself to the registry is presumably not an operation that will occur with excessive frequency, thus we can do this in a critical section, and in the caller's thread. Let us assume that we have defined the generic definition of a channel using a record type, and that the specifics of how the channel is implemented are deferred to some ChannelType record, which supplies the thunks which we in turn evaluate in order to operate on it. At our outer level record, a TMVar holds the set of client ThreadIds to which we wish to throw asynchronous exceptions in the event of certain channel state changes...

-- assuming a Channel record which has a `Set ThreadId` stored in a `TMVar`
closeChannel Channel{..} = do
  tids <- atomically do
    ms <- takeTMVar monitors
    apiClose channelType -- this assumes that implementations must close in STM
    putTMVar monitors ms -- Q: is this really necessary?
    return ms
  forM_ tids (flip throwTo)

Loads of assumptions there - that closing a channel can take place in STM for any implementation for example. I think that can probably hold, since channels that rely on code running in IO - for example channels that are backed by remoting - will presumably have to spin off threads to handle their network interactions. Perhaps someone does want a synchronous, serialised channel where readChannel accesses the network directly, but I don't see that as a common case, and such an implementation wouldn't be able to use network-transport directly, unless it was able to handle all the event types by itself (and also be comfortable with one channel consuming an entire TCP/IP connection all by itself!). Given this is the case, then apiClose can use STM to signal the networking thread and if needed, wait for a reply using a TMVar or whatever.

  • Performance Considerations?

In the current implementation of distributed-process, there is already a good deal of inter-thread synchronisation using STM. Both the process mailbox, and all typed channels, have either the network listener or the node controller thread use an STM transaction to write incoming messages - in fact both do this depending on the source of the write. It looks like stm-channelize is trying to provide a generic API over the network side of things, but I'm not sure about the safety of that implementation either.

  1. Monitoring using asynchronous exceptions

Now, an alternative implementation of closeChannel would be to use a signal. Looking at the code in Control.Concurrent.Event, I don't think it will work for this use case, but I suspect we can cook something up. This disadvantage of this approach, of course, is that the monitoring thread won't get the signal in a timely fashion - they'll only know about the channel's demise (or whatever other event took place) when they poll the signalling mechanism. I'm not really sure how valuable that approach is in practise.

Clone this wiki locally