Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory explodes trying to consume chunked pure stream #115

Open
qrpnxz opened this issue Jul 28, 2022 · 3 comments
Open

Memory explodes trying to consume chunked pure stream #115

qrpnxz opened this issue Jul 28, 2022 · 3 comments

Comments

@qrpnxz
Copy link

qrpnxz commented Jul 28, 2022

I was trying to use mapsM and chunksOf to go from Stream (Of Word8) to Stream (Of ByteString), and found that consuming even just 200MB was allocating dozens of Gigabytes memory. Discovered that the problem was eagerness. My infinite source (Of Word8) was a series of Steps, which are strict on the Functor (why?). If I use repeats to construct the Stream, then the code behaves sanely again, albeit taking twice as long as using splitsAt by hand since, to add laziness, repeats interleaves pure Effects into the stream. (Which S.each does not do.)

Things to consider:

  1. Removing bang from Step (unless there's a good reason for it to have a bang).
  2. Adding a non-strict Step constructor
  3. Adding a lazy version of S.each (by adding Effects to it like repeats, or better using another constructor)
  4. Adding a note about this problem, and how to get around it.
@chessai
Copy link
Member

chessai commented Apr 23, 2023

Can you include a minimal reproduction?

@chessai
Copy link
Member

chessai commented Apr 23, 2023

Steps, which are strict on the Functor (why?)

Stream type:

data Stream f m r
  = Step !(f (Stream f m r))
  | Effect (m (Stream f m r))
  | Return r

Of type:

data Of a b = !a :> b

So, Of is a left-strict pair; meaning that pattern-matching on an Of a b will evaluate a to WHNF.
This means that a single Step in a Stream (Of a) m r will look like:

Step (a :> rest)

When we reach a Step, that means we have reached the next step in the stream, and a here is the value contained in that step. For doing actual streaming work, where we grab the next thing in the stream and do some work with it, this typically makes a lot of sense.

Note that none of the functions you listed actually force f ~ (Of a), so you could just use (,) a, the lazy 2-tuple, if that is indeed the problem.

@chessai
Copy link
Member

chessai commented Apr 23, 2023

Here is something where I tried to replicate your issue:

 cat StreamingChunks.hs
{-# language ImportQualifiedPost #-}

module Main (main) where

import Data.Bifunctor (first)
import Data.ByteString (ByteString)
import Data.ByteString qualified as B
import Data.ByteString.Char8 qualified as BC8
import Data.List qualified as List
import Data.Word (Word8)
import Streaming (Stream)
import Streaming qualified as S
import Streaming.Prelude (Of)
import Streaming.Prelude qualified as S
import System.IO qualified as IO

main :: IO ()
main = do
  S.mapM_ BC8.putStr
  $ packBytes
  $ S.each (List.replicate streamSize asciiA)

packBytes :: (Monad m) => Stream (Of Word8) m r -> Stream (Of ByteString) m r
packBytes s = id
  $ S.mapsM (fmap (S.mapOf B.pack) . S.toList)
  $ S.chunksOf chunkSize
  $ s

asciiA :: Word8
asciiA = 65

streamSize :: Int
streamSize = 10_000_000

chunkSize :: Int
chunkSize = 32

The runtime stats:

❯ ghc StreamingChunks.hs -o main -O2 -rtsopts -fforce-recomp && ./main +RTS -s -RTS 1>/dev/null
[1 of 2] Compiling Main             ( StreamingChunks.hs, StreamingChunks.o )
[2 of 2] Linking main [Objects changed]
   3,667,802,560 bytes allocated in the heap
       1,936,712 bytes copied during GC
          36,024 bytes maximum residency (2 sample(s))
          25,416 bytes maximum slop
               6 MiB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0       876 colls,     0 par    0.003s   0.004s     0.0000s    0.0007s
  Gen  1         2 colls,     0 par    0.000s   0.000s     0.0001s    0.0001s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.353s  (  0.353s elapsed)
  GC      time    0.004s  (  0.004s elapsed)
  EXIT    time    0.000s  (  0.004s elapsed)
  Total   time    0.357s  (  0.360s elapsed)

  %GC     time       0.0%  (0.0% elapsed)

  Alloc rate    10,382,332,804 bytes per MUT second

  Productivity  98.9% of total user, 97.9% of total elapsed

~3.7 GB total allocations, but only 36K max residency.
Total runtime ~0.36s.

So, my guess is that whatever you were doing must be more complicated than this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants