From b3958fc633696bc1992459672f336cb105c5d1cd Mon Sep 17 00:00:00 2001 From: Huw Campbell Date: Fri, 24 Apr 2020 23:15:10 +1000 Subject: [PATCH 1/4] Fork devgrok's changes --- zebra-cli/src/Zebra/Command/Merge.hs | 11 +- zebra-core/src/Zebra/Merge/Table.hs | 272 +++++++++++----------- zebra-core/test/Test/Zebra/Merge/Table.hs | 2 +- 3 files changed, 133 insertions(+), 152 deletions(-) diff --git a/zebra-cli/src/Zebra/Command/Merge.hs b/zebra-cli/src/Zebra/Command/Merge.hs index 87417b3..0da0ea9 100644 --- a/zebra-cli/src/Zebra/Command/Merge.hs +++ b/zebra-cli/src/Zebra/Command/Merge.hs @@ -36,7 +36,7 @@ import qualified Viking.ByteStream as ByteStream import qualified X.Data.Vector.Cons as Cons import Zebra.Command.Util -import Zebra.Merge.Table (UnionTableError) +import Zebra.Merge.Table (UnionTableError, MergeRowsPerBlock(..)) import qualified Zebra.Merge.Table as Merge import Zebra.Serial.Binary (BinaryStripedEncodeError, BinaryStripedDecodeError) import Zebra.Serial.Binary (BinaryVersion(..)) @@ -58,11 +58,6 @@ data Merge = , mergeMaximumRowSize :: !(Maybe MergeMaximumRowSize) } deriving (Eq, Ord, Show) -newtype MergeRowsPerBlock = - MergeRowsPerBlock { - unMergeRowsPerBlock :: Int - } deriving (Eq, Ord, Show) - newtype MergeMaximumRowSize = MergeMaximumRowSize { unMergeMaximumRowSize :: Int64 @@ -127,8 +122,6 @@ zebraMerge x = do writeFileOrStdout (mergeOutput x) . hoist (firstJoin MergeBinaryStripedEncodeError) . Binary.encodeStripedWith (mergeVersion x) . - hoist (firstJoin MergeStripedError) . - Striped.rechunk (unMergeRowsPerBlock $ mergeRowsPerChunk x) . hoist (firstJoin MergeUnionTableError) $ - union msize inputs + union msize (mergeRowsPerChunk x) inputs {-# SPECIALIZE zebraMerge :: Merge -> EitherT MergeError (ResourceT IO) () #-} diff --git a/zebra-core/src/Zebra/Merge/Table.hs b/zebra-core/src/Zebra/Merge/Table.hs index 9760fd0..333dc0c 100644 --- a/zebra-core/src/Zebra/Merge/Table.hs +++ b/zebra-core/src/Zebra/Merge/Table.hs @@ -6,27 +6,26 @@ {-# LANGUAGE OverloadedStrings #-} module Zebra.Merge.Table ( MaximumRowSize(..) + , MergeRowsPerBlock(..) , UnionTableError(..) , renderUnionTableError - , unionLogical , unionStriped , unionStripedWith ) where import Control.Monad.Morph (hoist, squash) import Control.Monad.Trans.Class (lift) -import Control.Monad.Trans.State.Strict (StateT, runStateT, modify') -import Control.Monad.Trans.Either (EitherT, hoistEither, left) +import Control.Monad.Trans.Either (EitherT, newEitherT, runEitherT, hoistEither, left) -import Data.Map (Map) import qualified Data.Map.Strict as Map import qualified Data.Vector as Boxed import P -import Viking (Stream, Of) +import Streaming.Internal (Stream (..)) +import Viking (Of (..)) import qualified Viking.Stream as Stream import X.Data.Vector.Cons (Cons) @@ -45,17 +44,16 @@ newtype MaximumRowSize = unMaximumRowSize :: Int64 } deriving (Eq, Ord, Show) -data Input m = - Input { - inputData :: !(Map Logical.Value Logical.Value) - , inputStream :: !(Maybe (Stream (Of Logical.Table) m ())) - } +newtype MergeRowsPerBlock = + MergeRowsPerBlock { + unMergeRowsPerBlock :: Int + } deriving (Eq, Ord, Show) -data Step m = - Step { - _stepComplete :: !(Map Logical.Value Logical.Value) - , _stepRemaining :: !(Cons Boxed.Vector (Input m)) - } +data Row = + Row { + rowKey :: !Logical.Value + , rowValue :: !Logical.Value + } deriving (Eq, Ord, Show) data UnionTableError = UnionEmptyInput @@ -63,6 +61,7 @@ data UnionTableError = | UnionLogicalSchemaError !LogicalSchemaError | UnionLogicalMergeError !LogicalMergeError | UnionSchemaError !SchemaUnionError + | UnionNotMap deriving (Eq, Show) renderUnionTableError :: UnionTableError -> Text @@ -77,6 +76,8 @@ renderUnionTableError = \case Logical.renderLogicalMergeError err UnionSchemaError err -> Schema.renderSchemaUnionError err + UnionNotMap -> + "Can not merge zebra files which aren't maps" ------------------------------------------------------------------------ -- General @@ -96,156 +97,143 @@ peekHead input = do pure (hd, Stream.cons hd tl) {-# INLINABLE peekHead #-} -hasData :: Input m -> Bool -hasData = - not . Map.null . inputData -{-# INLINABLE hasData #-} - -replaceData :: Map Logical.Value Logical.Value -> Input m -> Input m -replaceData values input = - input { - inputData = - values - } -{-# INLINABLE replaceData #-} - -dropData :: Map Logical.Value a -> Input m -> Input m -dropData drops input = - input { - inputData = - inputData input `Map.difference` drops - } -{-# INLINABLE dropData #-} - -isClosed :: Input m -> Bool -isClosed = - isNothing . inputStream -{-# INLINABLE isClosed #-} - -closeStream :: Input m -> Input m -closeStream input = - input { - inputStream = - Nothing - } -{-# INLINABLE closeStream #-} - -updateInput :: +streamStripedAsRows :: Monad m - => Input m - -> StateT (Map Logical.Value Int64) (EitherT UnionTableError m) (Input m) -updateInput input = - case inputStream input of - Nothing -> - pure input - Just stream -> - if hasData input then - pure input - else do - e <- lift . lift $ Stream.next stream - case e of - Left () -> - pure $ - closeStream input - - Right (table, remaining) -> do - values <- lift . firstT UnionLogicalSchemaError . hoistEither $ Logical.takeMap table - modify' $ Map.unionWith (+) (Map.map Logical.sizeValue values) - pure $ Input values (Just remaining) -{-# INLINABLE updateInput #-} - -takeExcessiveValues :: Maybe MaximumRowSize -> Map Logical.Value Int64 -> Map Logical.Value Int64 -takeExcessiveValues = \case - Nothing -> - const Map.empty - Just size -> - Map.filter (> unMaximumRowSize size) -{-# INLINABLE takeExcessiveValues #-} - -unionStep :: Monad m => Logical.Value -> Cons Boxed.Vector (Input m) -> EitherT UnionTableError m (Step m) -unionStep key inputs = do - step <- firstT UnionLogicalMergeError . hoistEither . (Logical.unionStep key) $ fmap inputData inputs - pure $ - Step - (Logical.unionComplete step) - (Cons.zipWith replaceData (Logical.unionRemaining step) inputs) -{-# INLINABLE unionStep #-} - -maximumKey :: Map Logical.Value Logical.Value -> Maybe Logical.Value -maximumKey kvs = - if Map.null kvs then - Nothing - else - pure . fst $ Map.findMax kvs -{-# INLINABLE maximumKey #-} - -unionInput :: + => Int + -> Stream (Of Striped.Table) m () + -> Stream (Of Row) (EitherT UnionTableError m) () +streamStripedAsRows _num stream = + Stream.map (uncurry Row) $ + Stream.concat $ + Stream.mapM (hoistEither . logicalPairs) $ + hoist lift + stream +{-# INLINABLE streamStripedAsRows #-} + +mergeStreams :: Monad m - => Maybe MaximumRowSize - -> Cons Boxed.Vector (Input m) - -> Map Logical.Value Int64 - -> Stream (Of Logical.Table) (EitherT UnionTableError m) () -unionInput msize inputs0 sizes0 = do - (inputs1, sizes1) <- lift $ runStateT (traverse updateInput inputs0) sizes0 - unless (Cons.all isClosed inputs1) $ do - let - drops = - takeExcessiveValues msize sizes1 - - inputs2 = - fmap (dropData drops) inputs1 - - maximums = - Cons.mapMaybe (maximumKey . inputData) inputs1 - - if Boxed.null maximums then - unionInput msize inputs2 sizes1 - else do - Step values inputs3 <- lift $ unionStep (Boxed.minimum maximums) inputs2 + => Stream (Of Row) (EitherT UnionTableError m) () + -> Stream (Of Row) (EitherT UnionTableError m) () + -> Stream (Of Row) (EitherT UnionTableError m) () +mergeStreams leftStream rightStream = Effect . newEitherT $ do + left0 <- runEitherT $ Stream.next leftStream + right0 <- runEitherT $ Stream.next rightStream + + return $ do + left1 <- left0 + right1 <- right0 + case (left1, right1) of + (Left (), Left ()) -> + return (Return ()) + + (Left (), Right (right2, rightRest)) -> + return $ + Step (right2 :> rightRest) + + (Right (left2, leftRest), Left ()) -> + return $ + Step (left2 :> leftRest) + + (Right (left2@(Row leftKey leftValue), leftRest), Right (right2@(Row rightKey rightValue), rightRest)) -> + case compare leftKey rightKey of + LT -> + return $ + Step (left2 :> mergeStreams leftRest (Step (right2 :> rightRest))) + EQ -> do + merged <- + first UnionLogicalMergeError $ + Row leftKey <$> Logical.mergeValue leftValue rightValue + return $ + Step (merged :> mergeStreams leftRest rightRest) + GT -> + return $ + Step (right2 :> mergeStreams (Step (left2 :> leftRest)) rightRest) + +-- merge streams in a binary tree fashion +mergeStreamsBinary :: + Monad m + => Cons Boxed.Vector (Stream (Of Row) (EitherT UnionTableError m) ()) + -> Stream (Of Row) (EitherT UnionTableError m) () +mergeStreamsBinary kvss = + case Cons.length kvss of + 1 -> + Cons.head kvss + + 2 -> do + let k = Cons.toVector kvss + mergeStreams + (k Boxed.! 0) + (k Boxed.! 1) + + n -> do let - unyieldedSizes - = sizes1 `Map.difference` values + (kvss0, kvss1) = Boxed.splitAt (n `div` 2) $ Cons.toVector kvss + kvs0 = mergeStreamsBinary $ Cons.unsafeFromVector kvss0 + kvs1 = mergeStreamsBinary $ Cons.unsafeFromVector kvss1 + mergeStreamsBinary $ Cons.from2 kvs0 kvs1 +{-# INLINABLE mergeStreamsBinary #-} - Stream.yield $ Logical.Map values - unionInput msize inputs3 unyieldedSizes -{-# INLINABLE unionInput #-} - -unionLogical :: - Monad m - => Schema.Table - -> Maybe MaximumRowSize - -> Cons Boxed.Vector (Stream (Of Logical.Table) m ()) - -> Stream (Of Logical.Table) (EitherT UnionTableError m) () -unionLogical schema msize inputs = do - Stream.whenEmpty (Logical.empty schema) $ - unionInput msize (fmap (Input Map.empty . Just) inputs) Map.empty -{-# INLINABLE unionLogical #-} unionStripedWith :: Monad m => Schema.Table -> Maybe MaximumRowSize + -> MergeRowsPerBlock -> Cons Boxed.Vector (Stream (Of Striped.Table) m ()) -> Stream (Of Striped.Table) (EitherT UnionTableError m) () -unionStripedWith schema msize inputs0 = do +unionStripedWith schema _msize blockRows inputs0 = do let fromStriped = - Stream.mapM (hoistEither . first UnionStripedError . Striped.toLogical) . Stream.mapM (hoistEither . first UnionStripedError . Striped.transmute schema) . hoist lift - hoist squash . + hoist squash $ Stream.mapM (hoistEither . first UnionStripedError . Striped.fromLogical schema) $ - unionLogical schema msize (fmap fromStriped inputs0) + Stream.whenEmpty (Logical.empty schema) $ + chunkRows blockRows $ + mergeStreamsBinary $ + Cons.imap streamStripedAsRows $ + (fmap fromStriped inputs0) {-# INLINABLE unionStripedWith #-} unionStriped :: Monad m => Maybe MaximumRowSize + -> MergeRowsPerBlock -> Cons Boxed.Vector (Stream (Of Striped.Table) m ()) -> Stream (Of Striped.Table) (EitherT UnionTableError m) () -unionStriped msize inputs0 = do +unionStriped msize blockRows inputs0 = do (heads, inputs1) <- fmap Cons.unzip . lift $ traverse peekHead inputs0 - schema <- lift . hoistEither . unionSchemas $ fmap Striped.schema heads - unionStripedWith schema msize inputs1 + schema <- lift . hoistEither . unionSchemas $ fmap Striped.schema heads + unionStripedWith schema msize blockRows inputs1 {-# INLINABLE unionStriped #-} + +-- | Groups together the rows as per Chunk Size and forms a logical table from them +chunkRows :: + Monad m + => MergeRowsPerBlock + -> Stream (Of Row) (EitherT UnionTableError m) () + -> Stream (Of Logical.Table) (EitherT UnionTableError m) () +chunkRows blockRows inputs = + let + rowsToTable = + Logical.Map . Map.fromDistinctAscList . fmap (\(Row k v) -> (k, v)) + in + Stream.map rowsToTable $ + Stream.mapped Stream.toList $ + Stream.chunksOf (unMergeRowsPerBlock blockRows) + inputs +{-# INLINABLE chunkRows #-} + +-- | Convert striped table to a vector of logical key, value pairs +-- tries to be strict on the key, lazy on the value +logicalPairs :: + Striped.Table + -> Either UnionTableError (Boxed.Vector (Logical.Value, Logical.Value)) +logicalPairs (Striped.Map _ k v) = do + !ks <- first UnionStripedError $ Striped.toValues k + vs <- first UnionStripedError $ Striped.toValues v + pure $ Boxed.zip ks vs +logicalPairs _ = + Left UnionNotMap +{-# INLINABLE logicalPairs #-} diff --git a/zebra-core/test/Test/Zebra/Merge/Table.hs b/zebra-core/test/Test/Zebra/Merge/Table.hs index 02df6a6..a3b73a4 100644 --- a/zebra-core/test/Test/Zebra/Merge/Table.hs +++ b/zebra-core/test/Test/Zebra/Merge/Table.hs @@ -86,7 +86,7 @@ unionList :: -> Cons Boxed.Vector (NonEmpty Striped.Table) -> Either String (Maybe Striped.Table) unionList msize xss0 = - case runIdentity . runEitherT . Stream.toList . Merge.unionStriped msize $ fmap Stream.each xss0 of + case runIdentity . runEitherT . Stream.toList . Merge.unionStriped msize (Merge.MergeRowsPerBlock 256000000) $ fmap Stream.each xss0 of Left (UnionLogicalMergeError _) -> pure Nothing Left err -> From a1e3a36f12a4dc7727289bd5736bac5a438ea686 Mon Sep 17 00:00:00 2001 From: Huw Campbell Date: Fri, 24 Apr 2020 23:47:21 +1000 Subject: [PATCH 2/4] Concurrent binary tree merges --- zebra-core/ambiata-zebra-core.cabal | 1 + zebra-core/src/Zebra/Merge/Table.hs | 53 +++++++++++------------ zebra-core/test/Test/Zebra/Merge/Table.hs | 7 ++- 3 files changed, 32 insertions(+), 29 deletions(-) diff --git a/zebra-core/ambiata-zebra-core.cabal b/zebra-core/ambiata-zebra-core.cabal index f6e1e96..ba4efba 100644 --- a/zebra-core/ambiata-zebra-core.cabal +++ b/zebra-core/ambiata-zebra-core.cabal @@ -30,6 +30,7 @@ library , ambiata-x-vector , aeson >= 1.0 && < 1.5 , aeson-pretty == 0.8.* + , async , attoparsec >= 0.13 && < 0.14 , base64-bytestring == 1.0.* , bifunctors >= 4.2 && < 5.6 diff --git a/zebra-core/src/Zebra/Merge/Table.hs b/zebra-core/src/Zebra/Merge/Table.hs index 333dc0c..9612298 100644 --- a/zebra-core/src/Zebra/Merge/Table.hs +++ b/zebra-core/src/Zebra/Merge/Table.hs @@ -15,9 +15,12 @@ module Zebra.Merge.Table ( , unionStripedWith ) where +import Control.Concurrent.Async + import Control.Monad.Morph (hoist, squash) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Either (EitherT, newEitherT, runEitherT, hoistEither, left) +import Zebra.X.Either (firstJoin) import qualified Data.Map.Strict as Map import qualified Data.Vector as Boxed @@ -38,6 +41,7 @@ import qualified Zebra.Table.Schema as Schema import Zebra.Table.Striped (StripedError) import qualified Zebra.Table.Striped as Striped +import System.IO newtype MaximumRowSize = MaximumRowSize { @@ -98,29 +102,28 @@ peekHead input = do {-# INLINABLE peekHead #-} streamStripedAsRows :: - Monad m - => Int - -> Stream (Of Striped.Table) m () - -> Stream (Of Row) (EitherT UnionTableError m) () + Int + -> Stream (Of Striped.Table) (EitherT UnionTableError IO) () + -> Stream (Of Row) (EitherT UnionTableError IO) () streamStripedAsRows _num stream = Stream.map (uncurry Row) $ Stream.concat $ Stream.mapM (hoistEither . logicalPairs) $ - hoist lift stream {-# INLINABLE streamStripedAsRows #-} mergeStreams :: - Monad m - => Stream (Of Row) (EitherT UnionTableError m) () - -> Stream (Of Row) (EitherT UnionTableError m) () - -> Stream (Of Row) (EitherT UnionTableError m) () + Stream (Of Row) (EitherT UnionTableError IO) () + -> Stream (Of Row) (EitherT UnionTableError IO) () + -> Stream (Of Row) (EitherT UnionTableError IO) () mergeStreams leftStream rightStream = Effect . newEitherT $ do - left0 <- runEitherT $ Stream.next leftStream - right0 <- runEitherT $ Stream.next rightStream + (left0, right0) <- + concurrently + (runEitherT $ Stream.next leftStream) + (runEitherT $ Stream.next rightStream) return $ do - left1 <- left0 + left1 <- left0 right1 <- right0 case (left1, right1) of (Left (), Left ()) -> @@ -151,9 +154,8 @@ mergeStreams leftStream rightStream = Effect . newEitherT $ do -- merge streams in a binary tree fashion mergeStreamsBinary :: - Monad m - => Cons Boxed.Vector (Stream (Of Row) (EitherT UnionTableError m) ()) - -> Stream (Of Row) (EitherT UnionTableError m) () + Cons Boxed.Vector (Stream (Of Row) (EitherT UnionTableError IO) ()) + -> Stream (Of Row) (EitherT UnionTableError IO) () mergeStreamsBinary kvss = case Cons.length kvss of 1 -> @@ -175,33 +177,30 @@ mergeStreamsBinary kvss = unionStripedWith :: - Monad m - => Schema.Table + Schema.Table -> Maybe MaximumRowSize -> MergeRowsPerBlock - -> Cons Boxed.Vector (Stream (Of Striped.Table) m ()) - -> Stream (Of Striped.Table) (EitherT UnionTableError m) () + -> Cons Boxed.Vector (Stream (Of Striped.Table) IO ()) + -> Stream (Of Striped.Table) (EitherT UnionTableError IO) () unionStripedWith schema _msize blockRows inputs0 = do let fromStriped = Stream.mapM (hoistEither . first UnionStripedError . Striped.transmute schema) . hoist lift - hoist squash $ - Stream.mapM (hoistEither . first UnionStripedError . Striped.fromLogical schema) $ + Stream.mapM (hoistEither . first UnionStripedError . Striped.fromLogical schema) $ Stream.whenEmpty (Logical.empty schema) $ chunkRows blockRows $ mergeStreamsBinary $ - Cons.imap streamStripedAsRows $ - (fmap fromStriped inputs0) + Cons.imap streamStripedAsRows $ + fmap fromStriped inputs0 {-# INLINABLE unionStripedWith #-} unionStriped :: - Monad m - => Maybe MaximumRowSize + Maybe MaximumRowSize -> MergeRowsPerBlock - -> Cons Boxed.Vector (Stream (Of Striped.Table) m ()) - -> Stream (Of Striped.Table) (EitherT UnionTableError m) () + -> Cons Boxed.Vector (Stream (Of Striped.Table) IO ()) + -> Stream (Of Striped.Table) (EitherT UnionTableError IO) () unionStriped msize blockRows inputs0 = do (heads, inputs1) <- fmap Cons.unzip . lift $ traverse peekHead inputs0 schema <- lift . hoistEither . unionSchemas $ fmap Striped.schema heads diff --git a/zebra-core/test/Test/Zebra/Merge/Table.hs b/zebra-core/test/Test/Zebra/Merge/Table.hs index a3b73a4..842204e 100644 --- a/zebra-core/test/Test/Zebra/Merge/Table.hs +++ b/zebra-core/test/Test/Zebra/Merge/Table.hs @@ -1,5 +1,6 @@ {-# LANGUAGE NoImplicitPrelude #-} -{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE LambdaCase #-} module Test.Zebra.Merge.Table where import Data.Functor.Identity (runIdentity) @@ -34,6 +35,8 @@ import Zebra.Table.Striped (StripedError(..)) import qualified Zebra.Table.Striped as Striped import qualified Zebra.Table.Logical as Logical +import System.IO.Unsafe (unsafePerformIO) + jFileTable :: Schema.Table -> Gen Striped.Table jFileTable schema = do Gen.sized $ \size -> do @@ -86,7 +89,7 @@ unionList :: -> Cons Boxed.Vector (NonEmpty Striped.Table) -> Either String (Maybe Striped.Table) unionList msize xss0 = - case runIdentity . runEitherT . Stream.toList . Merge.unionStriped msize (Merge.MergeRowsPerBlock 256000000) $ fmap Stream.each xss0 of + case unsafePerformIO . runEitherT . Stream.toList . Merge.unionStriped msize (Merge.MergeRowsPerBlock 256000000) $ fmap Stream.each xss0 of Left (UnionLogicalMergeError _) -> pure Nothing Left err -> From 8c89b9980bf19753b40e6e77d5151f76cf2bf3e9 Mon Sep 17 00:00:00 2001 From: Huw Campbell Date: Sat, 25 Apr 2020 12:58:35 +1000 Subject: [PATCH 3/4] Add extra helper --- zebra-core/ambiata-zebra-core.cabal | 1 + zebra-core/src/Zebra/Merge/Table.hs | 15 ++++++--------- zebra-core/src/Zebra/X/Stream.hs | 27 +++++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 9 deletions(-) create mode 100644 zebra-core/src/Zebra/X/Stream.hs diff --git a/zebra-core/ambiata-zebra-core.cabal b/zebra-core/ambiata-zebra-core.cabal index ba4efba..659c404 100644 --- a/zebra-core/ambiata-zebra-core.cabal +++ b/zebra-core/ambiata-zebra-core.cabal @@ -121,6 +121,7 @@ library Zebra.Time Zebra.X.Either + Zebra.X.Stream Zebra.X.Vector.Generic Zebra.X.Vector.Segment Zebra.X.Vector.Storable diff --git a/zebra-core/src/Zebra/Merge/Table.hs b/zebra-core/src/Zebra/Merge/Table.hs index 9612298..a3835d4 100644 --- a/zebra-core/src/Zebra/Merge/Table.hs +++ b/zebra-core/src/Zebra/Merge/Table.hs @@ -17,10 +17,8 @@ module Zebra.Merge.Table ( import Control.Concurrent.Async -import Control.Monad.Morph (hoist, squash) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Either (EitherT, newEitherT, runEitherT, hoistEither, left) -import Zebra.X.Either (firstJoin) import qualified Data.Map.Strict as Map import qualified Data.Vector as Boxed @@ -30,6 +28,7 @@ import P import Streaming.Internal (Stream (..)) import Viking (Of (..)) import qualified Viking.Stream as Stream +import qualified Zebra.X.Stream as Stream import X.Data.Vector.Cons (Cons) import qualified X.Data.Vector.Cons as Cons @@ -102,10 +101,9 @@ peekHead input = do {-# INLINABLE peekHead #-} streamStripedAsRows :: - Int - -> Stream (Of Striped.Table) (EitherT UnionTableError IO) () + Stream (Of Striped.Table) (EitherT UnionTableError IO) () -> Stream (Of Row) (EitherT UnionTableError IO) () -streamStripedAsRows _num stream = +streamStripedAsRows stream = Stream.map (uncurry Row) $ Stream.concat $ Stream.mapM (hoistEither . logicalPairs) $ @@ -185,15 +183,14 @@ unionStripedWith :: unionStripedWith schema _msize blockRows inputs0 = do let fromStriped = - Stream.mapM (hoistEither . first UnionStripedError . Striped.transmute schema) . - hoist lift + Stream.hoistEither (first UnionStripedError . Striped.transmute schema) Stream.mapM (hoistEither . first UnionStripedError . Striped.fromLogical schema) $ Stream.whenEmpty (Logical.empty schema) $ chunkRows blockRows $ mergeStreamsBinary $ - Cons.imap streamStripedAsRows $ - fmap fromStriped inputs0 + fmap (streamStripedAsRows . fromStriped) + inputs0 {-# INLINABLE unionStripedWith #-} unionStriped :: diff --git a/zebra-core/src/Zebra/X/Stream.hs b/zebra-core/src/Zebra/X/Stream.hs new file mode 100644 index 0000000..5c00bae --- /dev/null +++ b/zebra-core/src/Zebra/X/Stream.hs @@ -0,0 +1,27 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NoImplicitPrelude #-} +{-# LANGUAGE OverloadedStrings #-} +module Zebra.X.Stream ( + hoistEither + ) where + +import P +import Control.Monad.Trans.Class (lift) +import qualified Control.Monad.Trans.Either as EitherT +import Streaming.Internal (Stream (..)) +import Viking + + +hoistEither :: Monad m => (a -> Either e b) -> Stream (Of a) m r -> Stream (Of b) (EitherT.EitherT e m) r +hoistEither f = loop + where + loop stream = case stream of + Return r -> Return r + Effect m -> Effect (lift (fmap loop m)) + Step (a :> rest) -> Effect $ do + a0 <- EitherT.hoistEither (f a) + return $ + Step (a0 :> loop rest) + From 46a75be3b996336aba2067a240dbd1f17abafa86 Mon Sep 17 00:00:00 2001 From: Huw Campbell Date: Sun, 26 Apr 2020 11:34:38 +1000 Subject: [PATCH 4/4] Use lifted-async --- zebra-cli/ambiata-zebra-cli.cabal | 1 + zebra-cli/main/zebra.hs | 3 +- zebra-cli/src/Zebra/Command/Merge.hs | 6 ++- zebra-core/ambiata-zebra-core.cabal | 5 ++- zebra-core/src/Zebra/Merge/Table.hs | 35 ++++++++++------- zebra-core/src/Zebra/X/WrappedResourceT.hs | 44 ++++++++++++++++++++++ 6 files changed, 76 insertions(+), 18 deletions(-) create mode 100644 zebra-core/src/Zebra/X/WrappedResourceT.hs diff --git a/zebra-cli/ambiata-zebra-cli.cabal b/zebra-cli/ambiata-zebra-cli.cabal index 7043595..b990980 100644 --- a/zebra-cli/ambiata-zebra-cli.cabal +++ b/zebra-cli/ambiata-zebra-cli.cabal @@ -30,6 +30,7 @@ library , bytestring >= 0.10 , containers == 0.5.* , exceptions >= 0.10 + , monad-control , mmorph >= 1.0 , pretty-show == 1.6.* , resourcet >= 1.1 diff --git a/zebra-cli/main/zebra.hs b/zebra-cli/main/zebra.hs index ede33b0..2dd2a5b 100644 --- a/zebra-cli/main/zebra.hs +++ b/zebra-cli/main/zebra.hs @@ -27,6 +27,7 @@ import Zebra.Command.Import import Zebra.Command.Merge import Zebra.Command.Summary import Zebra.Serial.Binary (BinaryVersion(..)) +import Zebra.X.WrappedResourceT (usingResourceT) main :: IO () @@ -324,7 +325,7 @@ run = \case zebraExport export ZebraMerge merge -> - orDie renderMergeError . hoist runResourceT $ + orDie renderMergeError . hoist (runResourceT . usingResourceT) $ zebraMerge merge ZebraAdapt adapt -> diff --git a/zebra-cli/src/Zebra/Command/Merge.hs b/zebra-cli/src/Zebra/Command/Merge.hs index 0da0ea9..5898eb0 100644 --- a/zebra-cli/src/Zebra/Command/Merge.hs +++ b/zebra-cli/src/Zebra/Command/Merge.hs @@ -2,6 +2,7 @@ {-# LANGUAGE DoAndIfThenElse #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NoImplicitPrelude #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -15,6 +16,7 @@ module Zebra.Command.Merge ( , renderMergeError ) where +import Control.Monad.Trans.Control (MonadBaseControl (..)) import Control.Monad.Catch (MonadCatch) import Control.Monad.IO.Class (liftIO) import Control.Monad.Morph (hoist) @@ -104,7 +106,7 @@ readSchema = \case fmap Just . firstT MergeTextSchemaDecodeError . hoistEither $ Text.decodeSchema schema -zebraMerge :: (MonadResource m, MonadCatch m) => Merge -> EitherT MergeError m () +zebraMerge :: (MonadResource m, MonadCatch m, MonadBaseControl IO m) => Merge -> EitherT MergeError m () zebraMerge x = do mschema <- readSchema (mergeSchema x) @@ -124,4 +126,4 @@ zebraMerge x = do Binary.encodeStripedWith (mergeVersion x) . hoist (firstJoin MergeUnionTableError) $ union msize (mergeRowsPerChunk x) inputs -{-# SPECIALIZE zebraMerge :: Merge -> EitherT MergeError (ResourceT IO) () #-} + diff --git a/zebra-core/ambiata-zebra-core.cabal b/zebra-core/ambiata-zebra-core.cabal index 659c404..a5f847a 100644 --- a/zebra-core/ambiata-zebra-core.cabal +++ b/zebra-core/ambiata-zebra-core.cabal @@ -30,7 +30,7 @@ library , ambiata-x-vector , aeson >= 1.0 && < 1.5 , aeson-pretty == 0.8.* - , async + , lifted-async , attoparsec >= 0.13 && < 0.14 , base64-bytestring == 1.0.* , bifunctors >= 4.2 && < 5.6 @@ -41,6 +41,7 @@ library , exceptions >= 0.10 && < 0.11 , ghc-prim >= 0.4 && < 0.6 , lens >= 4.7 + , monad-control , mmorph >= 1.1 , mtl >= 2.2 , old-locale == 1.0.* @@ -54,6 +55,7 @@ library , text == 1.2.* , thyme == 0.3.* , transformers == 0.5.* + , transformers-base , transformers-either , unordered-containers == 0.2.* , vector >= 0.11 && < 0.13 @@ -125,6 +127,7 @@ library Zebra.X.Vector.Generic Zebra.X.Vector.Segment Zebra.X.Vector.Storable + Zebra.X.WrappedResourceT include-dirs: csrc diff --git a/zebra-core/src/Zebra/Merge/Table.hs b/zebra-core/src/Zebra/Merge/Table.hs index a3835d4..aef4f32 100644 --- a/zebra-core/src/Zebra/Merge/Table.hs +++ b/zebra-core/src/Zebra/Merge/Table.hs @@ -1,6 +1,7 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE DoAndIfThenElse #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} @@ -15,8 +16,9 @@ module Zebra.Merge.Table ( , unionStripedWith ) where -import Control.Concurrent.Async +import Control.Concurrent.Async.Lifted +import Control.Monad.Trans.Control (MonadBaseControl (..)) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Either (EitherT, newEitherT, runEitherT, hoistEither, left) @@ -101,8 +103,9 @@ peekHead input = do {-# INLINABLE peekHead #-} streamStripedAsRows :: - Stream (Of Striped.Table) (EitherT UnionTableError IO) () - -> Stream (Of Row) (EitherT UnionTableError IO) () + MonadBaseControl IO m + => Stream (Of Striped.Table) (EitherT UnionTableError m) () + -> Stream (Of Row) (EitherT UnionTableError m) () streamStripedAsRows stream = Stream.map (uncurry Row) $ Stream.concat $ @@ -111,9 +114,10 @@ streamStripedAsRows stream = {-# INLINABLE streamStripedAsRows #-} mergeStreams :: - Stream (Of Row) (EitherT UnionTableError IO) () - -> Stream (Of Row) (EitherT UnionTableError IO) () - -> Stream (Of Row) (EitherT UnionTableError IO) () + MonadBaseControl IO m + => Stream (Of Row) (EitherT UnionTableError m) () + -> Stream (Of Row) (EitherT UnionTableError m) () + -> Stream (Of Row) (EitherT UnionTableError m) () mergeStreams leftStream rightStream = Effect . newEitherT $ do (left0, right0) <- concurrently @@ -152,8 +156,9 @@ mergeStreams leftStream rightStream = Effect . newEitherT $ do -- merge streams in a binary tree fashion mergeStreamsBinary :: - Cons Boxed.Vector (Stream (Of Row) (EitherT UnionTableError IO) ()) - -> Stream (Of Row) (EitherT UnionTableError IO) () + MonadBaseControl IO m + => Cons Boxed.Vector (Stream (Of Row) (EitherT UnionTableError m) ()) + -> Stream (Of Row) (EitherT UnionTableError m) () mergeStreamsBinary kvss = case Cons.length kvss of 1 -> @@ -175,11 +180,12 @@ mergeStreamsBinary kvss = unionStripedWith :: - Schema.Table + MonadBaseControl IO m + => Schema.Table -> Maybe MaximumRowSize -> MergeRowsPerBlock - -> Cons Boxed.Vector (Stream (Of Striped.Table) IO ()) - -> Stream (Of Striped.Table) (EitherT UnionTableError IO) () + -> Cons Boxed.Vector (Stream (Of Striped.Table) m ()) + -> Stream (Of Striped.Table) (EitherT UnionTableError m) () unionStripedWith schema _msize blockRows inputs0 = do let fromStriped = @@ -194,10 +200,11 @@ unionStripedWith schema _msize blockRows inputs0 = do {-# INLINABLE unionStripedWith #-} unionStriped :: - Maybe MaximumRowSize + MonadBaseControl IO m + => Maybe MaximumRowSize -> MergeRowsPerBlock - -> Cons Boxed.Vector (Stream (Of Striped.Table) IO ()) - -> Stream (Of Striped.Table) (EitherT UnionTableError IO) () + -> Cons Boxed.Vector (Stream (Of Striped.Table) m ()) + -> Stream (Of Striped.Table) (EitherT UnionTableError m) () unionStriped msize blockRows inputs0 = do (heads, inputs1) <- fmap Cons.unzip . lift $ traverse peekHead inputs0 schema <- lift . hoistEither . unionSchemas $ fmap Striped.schema heads diff --git a/zebra-core/src/Zebra/X/WrappedResourceT.hs b/zebra-core/src/Zebra/X/WrappedResourceT.hs new file mode 100644 index 0000000..614cfaf --- /dev/null +++ b/zebra-core/src/Zebra/X/WrappedResourceT.hs @@ -0,0 +1,44 @@ +-- | Extremely silly module to add back MonadBaseControl IO +-- to ResourceT. +-- +-- We use lifted async for concurrency inside Zebra, and +-- need both exception handling and multi-threading. +{-# LANGUAGE NoImplicitPrelude #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +module Zebra.X.WrappedResourceT ( + WrappedResourceT (..) +) where + +import P + +import Control.Monad.Catch (MonadThrow, MonadCatch) +import Control.Monad.IO.Class (MonadIO) +import Control.Monad.Trans.Class (MonadTrans (..)) +import Control.Monad.Trans.Resource.Internal (ResourceT (..), MonadResource) +import Control.Monad.Trans.Control (MonadTransControl (..), MonadBaseControl (..), defaultRestoreM) +import Control.Monad.Base (MonadBase, liftBase) + +newtype WrappedResourceT m a = + WrappedResourceT { + usingResourceT :: ResourceT m a + } deriving (Functor, Applicative, Monad, MonadResource, MonadIO, MonadTrans, MonadThrow, MonadCatch) + +instance MonadBase b m => MonadBase b (WrappedResourceT m) where + liftBase = lift . liftBase + +instance MonadTransControl WrappedResourceT where + type StT WrappedResourceT a = a + liftWith f = WrappedResourceT $ ResourceT $ \r -> f $ \(WrappedResourceT (ResourceT t)) -> t r + restoreT = WrappedResourceT . ResourceT . const + +instance MonadBaseControl b m => MonadBaseControl b (WrappedResourceT m) where + type StM (WrappedResourceT m) a = StM m a + liftBaseWith f = WrappedResourceT $ + ResourceT $ \reader' -> + liftBaseWith $ \runInBase -> + f $ runInBase . (\(WrappedResourceT (ResourceT r)) -> r reader' ) + restoreM = defaultRestoreM