Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
fosskers committed Mar 23, 2018
0 parents commit 5dde4bf
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 0 deletions.
137 changes: 137 additions & 0 deletions Data/Attoparsec/ByteString/Streaming.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
{- | Here is a simple use of 'parsed' and standard @Streaming@ segmentation devices to
parse a file in which groups of numbers are separated by blank lines. Such a problem
of \'nesting streams\' is described in the @conduit@ context in
<http://stackoverflow.com/questions/32957258/how-to-model-nested-streams-with-conduits/32961296 this StackOverflow question>
> -- $ cat nums.txt
> -- 1
> -- 2
> -- 3
> --
> -- 4
> -- 5
> -- 6
> --
> -- 7
> -- 8
We will sum the groups and stream the results to standard output:
> import Streaming
> import qualified Streaming.Prelude as S
> import qualified Data.ByteString.Streaming.Char8 as Q
> import qualified Data.Attoparsec.ByteString.Char8 as A
> import qualified Data.Attoparsec.ByteString.Streaming as A
> import Data.Function ((&))
>
> main = Q.getContents -- raw bytes
> & A.parsed lineParser -- stream of parsed `Maybe Int`s; blank lines are `Nothing`
> & void -- drop any unparsed nonsense at the end
> & S.split Nothing -- split on blank lines
> & S.maps S.concat -- keep `Just x` values in the sub-streams (cp. catMaybes)
> & S.mapped S.sum -- sum each substream
> & S.print -- stream results to stdout
>
> lineParser = Just <$> A.scientific <* A.endOfLine <|> Nothing <$ A.endOfLine
> -- $ cat nums.txt | ./atto
> -- 6.0
> -- 15.0
> -- 15.0
-}

module Data.Attoparsec.ByteString.Streaming
(Message
, parse
, parsed
) where

import qualified Data.Attoparsec.ByteString as A
import qualified Data.Attoparsec.Internal.Types as T
import qualified Data.ByteString as B
import Data.ByteString.Streaming
import Data.ByteString.Streaming.Internal
import Streaming hiding (concats, unfold)
import Streaming.Internal (Stream (..))

---

type Message = ([String], String)

{- | The result of a parse (@Either a ([String], String)@), with the unconsumed byte stream.
>>> :set -XOverloadedStrings -- the string literal below is a streaming bytestring
>>> (r,rest1) <- AS.parse (A.scientific <* A.many' A.space) "12.3 4.56 78.3"
>>> print r
Left 12.3
>>> (s,rest2) <- AS.parse (A.scientific <* A.many' A.space) rest1
>>> print s
Left 4.56
>>> (t,rest3) <- AS.parse (A.scientific <* A.many' A.space) rest2
>>> print t
Left 78.3
>>> Q.putStrLn rest3
-}
parse :: Monad m
=> A.Parser a
-> ByteString m x -> m (Either a Message, ByteString m x)
parse parser bs = do
(e,rest) <- apply parser bs
return (either Right Left e, rest)
{-#INLINE parse #-}

apply :: Monad m
=> A.Parser a
-> ByteString m x -> m (Either Message a, ByteString m x)
apply parser = begin
where begin p0 = case p0 of
Go m -> m >>= begin
Empty r -> step id (A.parse parser mempty) (return r)
Chunk bs p1 | B.null bs -> begin p1 -- attoparsec understands "" as eof
| otherwise -> step (chunk bs >>) (A.parse parser bs) p1

step diff res p0 = case res of
T.Fail _ c m -> return (Left (c,m), diff p0)
T.Done a b -> return (Right b, chunk a >> p0)
T.Partial k -> do
let clean p = case p of -- inspect for null chunks before
Go m -> m >>= clean -- feeding attoparsec
Empty r -> step diff (k mempty) (return r)
Chunk bs p1 | B.null bs -> clean p1
| otherwise -> step (diff . (chunk bs >>)) (k bs) p1
clean p0
{-#INLINABLE apply #-}

{-| Apply a parser repeatedly to a stream of bytes, streaming the parsed values, but
ending when the parser fails.or the bytes run out.
>>> S.print $ AS.parsed (A.scientific <* A.many' A.space) $ "12.3 4.56 78.9"
12.3
4.56
78.9
18.282
-}
parsed
:: Monad m
=> A.Parser a -- ^ Attoparsec parser
-> ByteString m r -- ^ Raw input
-> Stream (Of a) m (Either (Message, ByteString m r) r)
parsed parser = begin
where begin p0 = case p0 of -- inspect for null chunks before
Go m -> lift m >>= begin -- feeding attoparsec
Empty r -> Return (Right r)
Chunk bs p1 | B.null bs -> begin p1
| otherwise -> step (chunk bs >>) (A.parse parser bs) p1
step diffP res p0 = case res of
A.Fail _ c m -> Return (Left ((c,m), diffP p0))
A.Done bs a | B.null bs -> Step (a :> begin p0)
| otherwise -> Step (a :> begin (chunk bs >> p0))
A.Partial k -> do
x <- lift (nextChunk p0)
case x of
Left e -> step diffP (k mempty) (return e)
Right (bs,p1) | B.null bs -> step diffP res p1
| otherwise -> step (diffP . (chunk bs >>)) (k bs) p1
{-# INLINABLE parsed #-}
30 changes: 30 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Copyright (c) 2015, Michael Thompson, 2014 Gabriel Gonzalez, 2014 Renzo Carbonara

All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.

* Neither the name of michaelt nor the names of other
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# streaming-utils

*http, json, attoparsec and pipes material for* `streaming` *and* `streaming-bytestring`

`Streaming.Pipes` reimplements some of the standard pipes splitting and joining operations with `Stream` in place of `FreeT`. The operations are all plain functions, not lenses. They will thus be simpler to use, unless of course you are using pipes' `StateT` parsing. Another module is planned to recover this style of parsing.

`Data.ByteString.Streaming.HTTP` just replicates [`Pipes.HTTP`](https://hackage.haskell.org/package/pipes-http-1.0.2/docs/Pipes-HTTP.html) (barely a character is changed) so that the response takes the form of a `ByteString m ()` rather than `Producer ByteString m ()`. Something like this is the intuitively correct response type, insofar as a pipes Producer (like a conduit Source and an io-streams InputStream) is properly a succession of independent semantically significant values.

`Data.ByteString.Streaming.Aeson` replicates Renzo Carbonara's [`Pipes.Aeson`](https://hackage.haskell.org/package/pipes-aeson-0.4.1.5/docs/Pipes-Aeson.html). It also includes materials for appying the additional parsers exported by `json-streams` library, which have some advantages for properly streaming applications as is explained in the documentation.

`Data.Attoparsec.ByteString.Streaming` in turn pretty much replicates Renzo Carbonara's [`Pipes.Attoparsec` module](https://hackage.haskell.org/package/pipes-attoparsec-0.5.1.2/docs/Pipes-Attoparsec.html). It permits parsing an effectful bytestring with an attoparsec parser, and also the conversion of an effectful bytestring into stream of parsed values.

33 changes: 33 additions & 0 deletions package.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: streaming-attoparsec
version: 0.1.0.0
synopsis: Attoparsec integration for the streaming ecosystem
description: Attoparsec integration for the streaming ecosystem.
homepage: https://github.com/haskell-streaming/streaming-attoparsec
license: BSD3
author: Michael Thompson
maintainer: Colin Woodbury, [email protected]
category: Streaming

extra-source-files:
- README.md
- CHANGELOG.md

ghc-options:
- -fwarn-unused-imports
- -fwarn-unused-binds
- -fwarn-name-shadowing
- -fwarn-unused-matches
- -fwarn-incomplete-patterns
- -fwarn-incomplete-uni-patterns

dependencies:
- base >= 4.7 && < 5
- aeson >= 1.1 && < 1.4
- attoparsec >= 0.13 && < 0.14
- bytestring
- streaming >= 0.1.4.5 && < 0.3
- streaming-bytestring >= 0.1.4.0 && < 0.2

library:
source-dirs: .
other-modules: []
4 changes: 4 additions & 0 deletions stack.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
resolver: lts-11.1

packages:
- .

0 comments on commit 5dde4bf

Please sign in to comment.