Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fork devgrok's changes #1

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions zebra-cli/src/Zebra/Command/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import qualified Viking.ByteStream as ByteStream
import qualified X.Data.Vector.Cons as Cons

import Zebra.Command.Util
import Zebra.Merge.Table (UnionTableError)
import Zebra.Merge.Table (UnionTableError, MergeRowsPerBlock(..))
import qualified Zebra.Merge.Table as Merge
import Zebra.Serial.Binary (BinaryStripedEncodeError, BinaryStripedDecodeError)
import Zebra.Serial.Binary (BinaryVersion(..))
Expand All @@ -58,11 +58,6 @@ data Merge =
, mergeMaximumRowSize :: !(Maybe MergeMaximumRowSize)
} deriving (Eq, Ord, Show)

newtype MergeRowsPerBlock =
MergeRowsPerBlock {
unMergeRowsPerBlock :: Int
} deriving (Eq, Ord, Show)

newtype MergeMaximumRowSize =
MergeMaximumRowSize {
unMergeMaximumRowSize :: Int64
Expand Down Expand Up @@ -127,8 +122,6 @@ zebraMerge x = do
writeFileOrStdout (mergeOutput x) .
hoist (firstJoin MergeBinaryStripedEncodeError) .
Binary.encodeStripedWith (mergeVersion x) .
hoist (firstJoin MergeStripedError) .
Striped.rechunk (unMergeRowsPerBlock $ mergeRowsPerChunk x) .
hoist (firstJoin MergeUnionTableError) $
union msize inputs
union msize (mergeRowsPerChunk x) inputs
{-# SPECIALIZE zebraMerge :: Merge -> EitherT MergeError (ResourceT IO) () #-}
272 changes: 130 additions & 142 deletions zebra-core/src/Zebra/Merge/Table.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,26 @@
{-# LANGUAGE OverloadedStrings #-}
module Zebra.Merge.Table (
MaximumRowSize(..)
, MergeRowsPerBlock(..)

, UnionTableError(..)
, renderUnionTableError

, unionLogical
, unionStriped
, unionStripedWith
) where

import Control.Monad.Morph (hoist, squash)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.State.Strict (StateT, runStateT, modify')
import Control.Monad.Trans.Either (EitherT, hoistEither, left)
import Control.Monad.Trans.Either (EitherT, newEitherT, runEitherT, hoistEither, left)

import Data.Map (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Vector as Boxed

import P

import Viking (Stream, Of)
import Streaming.Internal (Stream (..))
import Viking (Of (..))
import qualified Viking.Stream as Stream

import X.Data.Vector.Cons (Cons)
Expand All @@ -45,24 +44,24 @@ newtype MaximumRowSize =
unMaximumRowSize :: Int64
} deriving (Eq, Ord, Show)

data Input m =
Input {
inputData :: !(Map Logical.Value Logical.Value)
, inputStream :: !(Maybe (Stream (Of Logical.Table) m ()))
}
newtype MergeRowsPerBlock =
MergeRowsPerBlock {
unMergeRowsPerBlock :: Int
} deriving (Eq, Ord, Show)

data Step m =
Step {
_stepComplete :: !(Map Logical.Value Logical.Value)
, _stepRemaining :: !(Cons Boxed.Vector (Input m))
}
data Row =
Row {
rowKey :: !Logical.Value
, rowValue :: !Logical.Value
} deriving (Eq, Ord, Show)

data UnionTableError =
UnionEmptyInput
| UnionStripedError !StripedError
| UnionLogicalSchemaError !LogicalSchemaError
| UnionLogicalMergeError !LogicalMergeError
| UnionSchemaError !SchemaUnionError
| UnionNotMap
deriving (Eq, Show)

renderUnionTableError :: UnionTableError -> Text
Expand All @@ -77,6 +76,8 @@ renderUnionTableError = \case
Logical.renderLogicalMergeError err
UnionSchemaError err ->
Schema.renderSchemaUnionError err
UnionNotMap ->
"Can not merge zebra files which aren't maps"

------------------------------------------------------------------------
-- General
Expand All @@ -96,156 +97,143 @@ peekHead input = do
pure (hd, Stream.cons hd tl)
{-# INLINABLE peekHead #-}

hasData :: Input m -> Bool
hasData =
not . Map.null . inputData
{-# INLINABLE hasData #-}

replaceData :: Map Logical.Value Logical.Value -> Input m -> Input m
replaceData values input =
input {
inputData =
values
}
{-# INLINABLE replaceData #-}

dropData :: Map Logical.Value a -> Input m -> Input m
dropData drops input =
input {
inputData =
inputData input `Map.difference` drops
}
{-# INLINABLE dropData #-}

isClosed :: Input m -> Bool
isClosed =
isNothing . inputStream
{-# INLINABLE isClosed #-}

closeStream :: Input m -> Input m
closeStream input =
input {
inputStream =
Nothing
}
{-# INLINABLE closeStream #-}

updateInput ::
streamStripedAsRows ::
Monad m
=> Input m
-> StateT (Map Logical.Value Int64) (EitherT UnionTableError m) (Input m)
updateInput input =
case inputStream input of
Nothing ->
pure input
Just stream ->
if hasData input then
pure input
else do
e <- lift . lift $ Stream.next stream
case e of
Left () ->
pure $
closeStream input

Right (table, remaining) -> do
values <- lift . firstT UnionLogicalSchemaError . hoistEither $ Logical.takeMap table
modify' $ Map.unionWith (+) (Map.map Logical.sizeValue values)
pure $ Input values (Just remaining)
{-# INLINABLE updateInput #-}

takeExcessiveValues :: Maybe MaximumRowSize -> Map Logical.Value Int64 -> Map Logical.Value Int64
takeExcessiveValues = \case
Nothing ->
const Map.empty
Just size ->
Map.filter (> unMaximumRowSize size)
{-# INLINABLE takeExcessiveValues #-}

unionStep :: Monad m => Logical.Value -> Cons Boxed.Vector (Input m) -> EitherT UnionTableError m (Step m)
unionStep key inputs = do
step <- firstT UnionLogicalMergeError . hoistEither . (Logical.unionStep key) $ fmap inputData inputs
pure $
Step
(Logical.unionComplete step)
(Cons.zipWith replaceData (Logical.unionRemaining step) inputs)
{-# INLINABLE unionStep #-}

maximumKey :: Map Logical.Value Logical.Value -> Maybe Logical.Value
maximumKey kvs =
if Map.null kvs then
Nothing
else
pure . fst $ Map.findMax kvs
{-# INLINABLE maximumKey #-}

unionInput ::
=> Int
-> Stream (Of Striped.Table) m ()
-> Stream (Of Row) (EitherT UnionTableError m) ()
streamStripedAsRows _num stream =
Stream.map (uncurry Row) $
Stream.concat $
Stream.mapM (hoistEither . logicalPairs) $
hoist lift
stream
{-# INLINABLE streamStripedAsRows #-}

mergeStreams ::
Monad m
=> Maybe MaximumRowSize
-> Cons Boxed.Vector (Input m)
-> Map Logical.Value Int64
-> Stream (Of Logical.Table) (EitherT UnionTableError m) ()
unionInput msize inputs0 sizes0 = do
(inputs1, sizes1) <- lift $ runStateT (traverse updateInput inputs0) sizes0
unless (Cons.all isClosed inputs1) $ do
let
drops =
takeExcessiveValues msize sizes1

inputs2 =
fmap (dropData drops) inputs1

maximums =
Cons.mapMaybe (maximumKey . inputData) inputs1

if Boxed.null maximums then
unionInput msize inputs2 sizes1
else do
Step values inputs3 <- lift $ unionStep (Boxed.minimum maximums) inputs2
=> Stream (Of Row) (EitherT UnionTableError m) ()
-> Stream (Of Row) (EitherT UnionTableError m) ()
-> Stream (Of Row) (EitherT UnionTableError m) ()
mergeStreams leftStream rightStream = Effect . newEitherT $ do
left0 <- runEitherT $ Stream.next leftStream
right0 <- runEitherT $ Stream.next rightStream

return $ do
left1 <- left0
right1 <- right0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two are just monadic actions on m, which in our case, is IO. So this could be done with Async.

case (left1, right1) of
(Left (), Left ()) ->
return (Return ())

(Left (), Right (right2, rightRest)) ->
return $
Step (right2 :> rightRest)

(Right (left2, leftRest), Left ()) ->
return $
Step (left2 :> leftRest)

(Right (left2@(Row leftKey leftValue), leftRest), Right (right2@(Row rightKey rightValue), rightRest)) ->
case compare leftKey rightKey of
LT ->
return $
Step (left2 :> mergeStreams leftRest (Step (right2 :> rightRest)))
EQ -> do
merged <-
first UnionLogicalMergeError $
Row leftKey <$> Logical.mergeValue leftValue rightValue
return $
Step (merged :> mergeStreams leftRest rightRest)
GT ->
return $
Step (right2 :> mergeStreams (Step (left2 :> leftRest)) rightRest)

-- merge streams in a binary tree fashion
mergeStreamsBinary ::
Monad m
=> Cons Boxed.Vector (Stream (Of Row) (EitherT UnionTableError m) ())
-> Stream (Of Row) (EitherT UnionTableError m) ()
mergeStreamsBinary kvss =
case Cons.length kvss of
1 ->
Cons.head kvss

2 -> do
let k = Cons.toVector kvss
mergeStreams
(k Boxed.! 0)
(k Boxed.! 1)

n -> do
let
unyieldedSizes
= sizes1 `Map.difference` values
(kvss0, kvss1) = Boxed.splitAt (n `div` 2) $ Cons.toVector kvss
kvs0 = mergeStreamsBinary $ Cons.unsafeFromVector kvss0
kvs1 = mergeStreamsBinary $ Cons.unsafeFromVector kvss1
mergeStreamsBinary $ Cons.from2 kvs0 kvs1
{-# INLINABLE mergeStreamsBinary #-}

Stream.yield $ Logical.Map values
unionInput msize inputs3 unyieldedSizes
{-# INLINABLE unionInput #-}

unionLogical ::
Monad m
=> Schema.Table
-> Maybe MaximumRowSize
-> Cons Boxed.Vector (Stream (Of Logical.Table) m ())
-> Stream (Of Logical.Table) (EitherT UnionTableError m) ()
unionLogical schema msize inputs = do
Stream.whenEmpty (Logical.empty schema) $
unionInput msize (fmap (Input Map.empty . Just) inputs) Map.empty
{-# INLINABLE unionLogical #-}

unionStripedWith ::
Monad m
=> Schema.Table
-> Maybe MaximumRowSize
-> MergeRowsPerBlock
-> Cons Boxed.Vector (Stream (Of Striped.Table) m ())
-> Stream (Of Striped.Table) (EitherT UnionTableError m) ()
unionStripedWith schema msize inputs0 = do
unionStripedWith schema _msize blockRows inputs0 = do
let
fromStriped =
Stream.mapM (hoistEither . first UnionStripedError . Striped.toLogical) .
Stream.mapM (hoistEither . first UnionStripedError . Striped.transmute schema) .
hoist lift

hoist squash .
hoist squash $
Stream.mapM (hoistEither . first UnionStripedError . Striped.fromLogical schema) $
unionLogical schema msize (fmap fromStriped inputs0)
Stream.whenEmpty (Logical.empty schema) $
chunkRows blockRows $
mergeStreamsBinary $
Cons.imap streamStripedAsRows $
(fmap fromStriped inputs0)
{-# INLINABLE unionStripedWith #-}

unionStriped ::
Monad m
=> Maybe MaximumRowSize
-> MergeRowsPerBlock
-> Cons Boxed.Vector (Stream (Of Striped.Table) m ())
-> Stream (Of Striped.Table) (EitherT UnionTableError m) ()
unionStriped msize inputs0 = do
unionStriped msize blockRows inputs0 = do
(heads, inputs1) <- fmap Cons.unzip . lift $ traverse peekHead inputs0
schema <- lift . hoistEither . unionSchemas $ fmap Striped.schema heads
unionStripedWith schema msize inputs1
schema <- lift . hoistEither . unionSchemas $ fmap Striped.schema heads
unionStripedWith schema msize blockRows inputs1
{-# INLINABLE unionStriped #-}

-- | Groups together the rows as per Chunk Size and forms a logical table from them
chunkRows ::
Monad m
=> MergeRowsPerBlock
-> Stream (Of Row) (EitherT UnionTableError m) ()
-> Stream (Of Logical.Table) (EitherT UnionTableError m) ()
chunkRows blockRows inputs =
let
rowsToTable =
Logical.Map . Map.fromDistinctAscList . fmap (\(Row k v) -> (k, v))
in
Stream.map rowsToTable $
Stream.mapped Stream.toList $
Stream.chunksOf (unMergeRowsPerBlock blockRows)
inputs
{-# INLINABLE chunkRows #-}

-- | Convert striped table to a vector of logical key, value pairs
-- tries to be strict on the key, lazy on the value
logicalPairs ::
Striped.Table
-> Either UnionTableError (Boxed.Vector (Logical.Value, Logical.Value))
logicalPairs (Striped.Map _ k v) = do
!ks <- first UnionStripedError $ Striped.toValues k
vs <- first UnionStripedError $ Striped.toValues v
pure $ Boxed.zip ks vs
logicalPairs _ =
Left UnionNotMap
{-# INLINABLE logicalPairs #-}
2 changes: 1 addition & 1 deletion zebra-core/test/Test/Zebra/Merge/Table.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ unionList ::
-> Cons Boxed.Vector (NonEmpty Striped.Table)
-> Either String (Maybe Striped.Table)
unionList msize xss0 =
case runIdentity . runEitherT . Stream.toList . Merge.unionStriped msize $ fmap Stream.each xss0 of
case runIdentity . runEitherT . Stream.toList . Merge.unionStriped msize (Merge.MergeRowsPerBlock 256000000) $ fmap Stream.each xss0 of
Left (UnionLogicalMergeError _) ->
pure Nothing
Left err ->
Expand Down