diff --git a/Data/Attoparsec/ByteString/Streaming.hs b/Data/Attoparsec/ByteString/Streaming.hs new file mode 100644 index 0000000..86b2465 --- /dev/null +++ b/Data/Attoparsec/ByteString/Streaming.hs @@ -0,0 +1,137 @@ +{- | Here is a simple use of 'parsed' and standard @Streaming@ segmentation devices to + parse a file in which groups of numbers are separated by blank lines. Such a problem + of \'nesting streams\' is described in the @conduit@ context in + + +> -- $ cat nums.txt +> -- 1 +> -- 2 +> -- 3 +> -- +> -- 4 +> -- 5 +> -- 6 +> -- +> -- 7 +> -- 8 + + We will sum the groups and stream the results to standard output: + +> import Streaming +> import qualified Streaming.Prelude as S +> import qualified Data.ByteString.Streaming.Char8 as Q +> import qualified Data.Attoparsec.ByteString.Char8 as A +> import qualified Data.Attoparsec.ByteString.Streaming as A +> import Data.Function ((&)) +> +> main = Q.getContents -- raw bytes +> & A.parsed lineParser -- stream of parsed `Maybe Int`s; blank lines are `Nothing` +> & void -- drop any unparsed nonsense at the end +> & S.split Nothing -- split on blank lines +> & S.maps S.concat -- keep `Just x` values in the sub-streams (cp. catMaybes) +> & S.mapped S.sum -- sum each substream +> & S.print -- stream results to stdout +> +> lineParser = Just <$> A.scientific <* A.endOfLine <|> Nothing <$ A.endOfLine + +> -- $ cat nums.txt | ./atto +> -- 6.0 +> -- 15.0 +> -- 15.0 + +-} + +module Data.Attoparsec.ByteString.Streaming + (Message + , parse + , parsed + ) where + +import qualified Data.Attoparsec.ByteString as A +import qualified Data.Attoparsec.Internal.Types as T +import qualified Data.ByteString as B +import Data.ByteString.Streaming +import Data.ByteString.Streaming.Internal +import Streaming hiding (concats, unfold) +import Streaming.Internal (Stream (..)) + +--- + +type Message = ([String], String) + +{- | The result of a parse (@Either a ([String], String)@), with the unconsumed byte stream. + +>>> :set -XOverloadedStrings -- the string literal below is a streaming bytestring +>>> (r,rest1) <- AS.parse (A.scientific <* A.many' A.space) "12.3 4.56 78.3" +>>> print r +Left 12.3 +>>> (s,rest2) <- AS.parse (A.scientific <* A.many' A.space) rest1 +>>> print s +Left 4.56 +>>> (t,rest3) <- AS.parse (A.scientific <* A.many' A.space) rest2 +>>> print t +Left 78.3 +>>> Q.putStrLn rest3 + +-} +parse :: Monad m + => A.Parser a + -> ByteString m x -> m (Either a Message, ByteString m x) +parse parser bs = do + (e,rest) <- apply parser bs + return (either Right Left e, rest) +{-#INLINE parse #-} + +apply :: Monad m + => A.Parser a + -> ByteString m x -> m (Either Message a, ByteString m x) +apply parser = begin + where begin p0 = case p0 of + Go m -> m >>= begin + Empty r -> step id (A.parse parser mempty) (return r) + Chunk bs p1 | B.null bs -> begin p1 -- attoparsec understands "" as eof + | otherwise -> step (chunk bs >>) (A.parse parser bs) p1 + + step diff res p0 = case res of + T.Fail _ c m -> return (Left (c,m), diff p0) + T.Done a b -> return (Right b, chunk a >> p0) + T.Partial k -> do + let clean p = case p of -- inspect for null chunks before + Go m -> m >>= clean -- feeding attoparsec + Empty r -> step diff (k mempty) (return r) + Chunk bs p1 | B.null bs -> clean p1 + | otherwise -> step (diff . (chunk bs >>)) (k bs) p1 + clean p0 +{-#INLINABLE apply #-} + +{-| Apply a parser repeatedly to a stream of bytes, streaming the parsed values, but + ending when the parser fails.or the bytes run out. + +>>> S.print $ AS.parsed (A.scientific <* A.many' A.space) $ "12.3 4.56 78.9" +12.3 +4.56 +78.9 +18.282 +-} +parsed + :: Monad m + => A.Parser a -- ^ Attoparsec parser + -> ByteString m r -- ^ Raw input + -> Stream (Of a) m (Either (Message, ByteString m r) r) +parsed parser = begin + where begin p0 = case p0 of -- inspect for null chunks before + Go m -> lift m >>= begin -- feeding attoparsec + Empty r -> Return (Right r) + Chunk bs p1 | B.null bs -> begin p1 + | otherwise -> step (chunk bs >>) (A.parse parser bs) p1 + step diffP res p0 = case res of + A.Fail _ c m -> Return (Left ((c,m), diffP p0)) + A.Done bs a | B.null bs -> Step (a :> begin p0) + | otherwise -> Step (a :> begin (chunk bs >> p0)) + A.Partial k -> do + x <- lift (nextChunk p0) + case x of + Left e -> step diffP (k mempty) (return e) + Right (bs,p1) | B.null bs -> step diffP res p1 + | otherwise -> step (diffP . (chunk bs >>)) (k bs) p1 +{-# INLINABLE parsed #-} diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e4d7d9f --- /dev/null +++ b/LICENSE @@ -0,0 +1,30 @@ +Copyright (c) 2015, Michael Thompson, 2014 Gabriel Gonzalez, 2014 Renzo Carbonara + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of michaelt nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..57a0cae --- /dev/null +++ b/README.md @@ -0,0 +1,12 @@ +# streaming-utils + +*http, json, attoparsec and pipes material for* `streaming` *and* `streaming-bytestring` + +`Streaming.Pipes` reimplements some of the standard pipes splitting and joining operations with `Stream` in place of `FreeT`. The operations are all plain functions, not lenses. They will thus be simpler to use, unless of course you are using pipes' `StateT` parsing. Another module is planned to recover this style of parsing. + +`Data.ByteString.Streaming.HTTP` just replicates [`Pipes.HTTP`](https://hackage.haskell.org/package/pipes-http-1.0.2/docs/Pipes-HTTP.html) (barely a character is changed) so that the response takes the form of a `ByteString m ()` rather than `Producer ByteString m ()`. Something like this is the intuitively correct response type, insofar as a pipes Producer (like a conduit Source and an io-streams InputStream) is properly a succession of independent semantically significant values. + +`Data.ByteString.Streaming.Aeson` replicates Renzo Carbonara's [`Pipes.Aeson`](https://hackage.haskell.org/package/pipes-aeson-0.4.1.5/docs/Pipes-Aeson.html). It also includes materials for appying the additional parsers exported by `json-streams` library, which have some advantages for properly streaming applications as is explained in the documentation. + +`Data.Attoparsec.ByteString.Streaming` in turn pretty much replicates Renzo Carbonara's [`Pipes.Attoparsec` module](https://hackage.haskell.org/package/pipes-attoparsec-0.5.1.2/docs/Pipes-Attoparsec.html). It permits parsing an effectful bytestring with an attoparsec parser, and also the conversion of an effectful bytestring into stream of parsed values. + diff --git a/package.yaml b/package.yaml new file mode 100644 index 0000000..a91c626 --- /dev/null +++ b/package.yaml @@ -0,0 +1,33 @@ +name: streaming-attoparsec +version: 0.1.0.0 +synopsis: Attoparsec integration for the streaming ecosystem +description: Attoparsec integration for the streaming ecosystem. +homepage: https://github.com/haskell-streaming/streaming-attoparsec +license: BSD3 +author: Michael Thompson +maintainer: Colin Woodbury, colingw@gmail.com +category: Streaming + +extra-source-files: + - README.md + - CHANGELOG.md + +ghc-options: + - -fwarn-unused-imports + - -fwarn-unused-binds + - -fwarn-name-shadowing + - -fwarn-unused-matches + - -fwarn-incomplete-patterns + - -fwarn-incomplete-uni-patterns + +dependencies: + - base >= 4.7 && < 5 + - aeson >= 1.1 && < 1.4 + - attoparsec >= 0.13 && < 0.14 + - bytestring + - streaming >= 0.1.4.5 && < 0.3 + - streaming-bytestring >= 0.1.4.0 && < 0.2 + +library: + source-dirs: . + other-modules: [] diff --git a/stack.yaml b/stack.yaml new file mode 100644 index 0000000..44e8d17 --- /dev/null +++ b/stack.yaml @@ -0,0 +1,4 @@ +resolver: lts-11.1 + +packages: + - .