Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question about terminating gatherFrom in the event of an outer exception #57

Open
bbarker opened this issue Sep 16, 2020 · 5 comments
Open

Comments

@bbarker
Copy link

bbarker commented Sep 16, 2020

Looking at the gatherFrom code (pasted below for convenient), I've got a scatter function that starts up a TCP server. Ideally, this TCP server would be restarted in the case gatherFrom is called again due to exception handling at the outer level. Of course, it became obvious that this wasn't possible due to the bound listening socket never being closed. I'm not immediately sure of the best way to handle this, though one possibility is perhaps rewriting the code to use withAsync. However, even if possible, I'm not even sure that would be desirable for all use cases.

gatherFrom :: (MonadIO m, MonadUnliftIO m)
           => Int                -- ^ Size of the queue to create
           -> (TBQueue o -> m ()) -- ^ Action that generates output values
           -> ConduitT () o m ()
gatherFrom size scatter = do
    chan   <- liftIO $ newTBQueueIO (fromIntegral size)
    worker <- lift $ async (scatter chan)
    gather worker chan
  where
    gather worker chan = do
        (xs, mres) <- liftIO $ atomically $ do
            xs <- whileM (not <$> isEmptyTBQueue chan) (readTBQueue chan)
            (xs,) <$> pollSTM worker
        traverse_ yield xs
        case mres of
            Just (Left e)  -> liftIO $ throwIO (e :: SomeException)
            Just (Right r) -> return r
            Nothing        -> gather worker chan
@bbarker
Copy link
Author

bbarker commented Sep 16, 2020

I tried an alternative approach, but it has had interesting (detrimental) results...

gatherFromA :: (MonadIO m, MonadUnliftIO m)
           => Int                -- ^ Size of the queue to create
           -> (TBQueue o -> m ()) -- ^ Action that generates output values
           -> m (ConduitT () o m (), Async ())
gatherFromA size scatter = do
    chan <- newTBQueueIO (fromIntegral size)
    worker <- async (scatter chan)
    pure $ (gather worker chan, worker)
  where
    gather worker chan = do
        (xs, mres) <- liftIO $ atomically $ do
            xs <- whileM (not <$> isEmptyTBQueue chan) (readTBQueue chan)
            (xs,) <$> pollSTM worker
        traverse_ yield xs
        case mres of
            Just (Left e)  -> liftIO $ throwIO (e :: SomeException)
            Just (Right r) -> return r
            Nothing        -> gather worker chan

When I cancel the returned Async () in bracket where gatherFromA is used, it actually kills my entire program!

@bbarker
Copy link
Author

bbarker commented Sep 16, 2020

I spoke too soon, if I comment out the call to cancel, my program is still crashing for some unknown reason, though this is new behavior.

UPDATE: it appears to be a memory leak somewhere; I haven't yet narrowed down whether it is related to the above change.

@bbarker
Copy link
Author

bbarker commented Sep 17, 2020

Some profiling output below. It does appear that the crash is related to the implementation of gatherFromA at first glance, though my current guess is that it is more to do with how the code is being used; will continue to investigate.

FarmDataServer exe_hc

FarmDataServer exe_hd

@bbarker
Copy link
Author

bbarker commented Sep 24, 2020

As it turns out, that was a memory leak unrelated to the implementation of gatherFromA, so I think this form of gatherFrom is still useful - it solved my original problem.

I can plan to make a PR of this that implements gatherFrom in terms of gatherFromA.

@qnikst
Copy link
Collaborator

qnikst commented Oct 10, 2020

Sorry for the late response. Do I understand correctly that gatherFromA differ from gather in a way that we return an async of the worker so we can wait on it or kill it?

And in this case gatherFrom can be implemented in terms for gatherFromA? In this case I don't see problems with a PR that will introduce it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants