Skip to content

Commit

Permalink
Add MessagePack parser
Browse files Browse the repository at this point in the history
  • Loading branch information
jarmuszz committed Jun 2, 2024
1 parent 08fa967 commit 4c55887
Show file tree
Hide file tree
Showing 11 changed files with 820 additions and 1 deletion.
1 change: 1 addition & 0 deletions benchmarks/src/main/resources/twitter_msgpack.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 fs2-data Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2
package data.benchmarks

import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._

import cats.effect.SyncIO

import scodec.bits._
import fs2._


@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Array(Mode.AverageTime))
@State(org.openjdk.jmh.annotations.Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3, time = 2)
@Measurement(iterations = 10, time = 2)
class MsgPackItemParserBenchmarks {

// The file contains hex representation of the values so we have to convert it
val msgpackBytes: ByteVector = ByteVector.fromHex(
fs2.io
.readClassLoaderResource[SyncIO]("twitter_msgpack.txt", 4096)
.through(fs2.text.utf8.decode)
.compile
.string
.unsafeRunSync()
).get

@Benchmark
def parseMsgpackItems() =
Stream
.chunk(Chunk.byteVector(msgpackBytes))
.through(fs2.data.msgpack.low.items[SyncIO])
.compile
.drain
.unsafeRunSync()
}
12 changes: 11 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ val root = tlCrossRootProject
scalaXml,
cbor,
cborJson,
msgpack,
finiteState,
unidocs,
exampleJq
Expand Down Expand Up @@ -462,6 +463,15 @@ lazy val cborJson = crossProject(JVMPlatform, JSPlatform, NativePlatform)
tlVersionIntroduced := Map("3" -> "1.5.1", "2.13" -> "1.5.1", "2.12" -> "1.5.1")
)

lazy val msgpack = crossProject(JVMPlatform, JSPlatform, NativePlatform)
.crossType(CrossType.Full)
.in(file("msgpack"))
.settings(commonSettings)
.settings(
name := "fs2-data-msgpack",
description := "Streaming MessagePack library"
)

lazy val benchmarks = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("benchmarks"))
Expand All @@ -474,7 +484,7 @@ lazy val benchmarks = crossProject(JVMPlatform)
"io.circe" %%% "circe-fs2" % "0.14.1"
)
)
.dependsOn(csv, scalaXml, jsonCirce)
.dependsOn(csv, scalaXml, jsonCirce, msgpack)

lazy val exampleJq = crossProject(JVMPlatform, NativePlatform, JSPlatform)
.crossType(CrossType.Pure)
Expand Down
63 changes: 63 additions & 0 deletions msgpack/shared/src/main/scala/fs2/data/msgpack/Headers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2024 fs2-data Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2.data.msgpack

object Headers {
final val Bin8 = 0xc4.toByte
final val Bin16 = 0xc5.toByte
final val Bin32 = 0xc6.toByte

final val Ext8 = 0xc7.toByte
final val Ext16 = 0xc8.toByte
final val Ext32 = 0xc9.toByte

final val Float32 = 0xca.toByte
final val Float64 = 0xcb.toByte

final val Uint8 = 0xcc.toByte
final val Uint16 = 0xcd.toByte
final val Uint32 = 0xce.toByte
final val Uint64 = 0xcf.toByte

final val Int8 = 0xd0.toByte
final val Int16 = 0xd1.toByte
final val Int32 = 0xd2.toByte
final val Int64 = 0xd3.toByte

final val FixExt1 = 0xd4.toByte
final val FixExt2 = 0xd5.toByte
final val FixExt4 = 0xd6.toByte
final val FixExt8 = 0xd7.toByte
final val FixExt16 = 0xd8.toByte

final val Str8 = 0xd9.toByte
final val Str16 = 0xda.toByte
final val Str32 = 0xdb.toByte

final val Array16 = 0xdc.toByte
final val Array32 = 0xdd.toByte

final val Map16 = 0xde.toByte
final val Map32 = 0xdf.toByte

final val Nil = 0xc0.toByte
final val NeverUsed: Byte = 0xc1.toByte
final val False = 0xc2.toByte
final val True = 0xc3.toByte

final val Timestamp = 0xff.toByte
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2024 fs2-data Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2
package data
package msgpack
package low
package internal

import fs2.data.msgpack.low.MsgpackItem
import fs2.data.msgpack.low.internal.Helpers._
import scodec.bits._

private[internal] object FormatParsers {
def parseSimpleType[F[_]](lift: ByteVector => MsgpackItem)(length: Int, ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireBytes(length, ctx) map { case (ctx, result) =>
ctx.prepend(lift(result))
}
}

def parseUnsignedInt[F[_]](length: Int, ctx: ParserContext[F])(implicit F: RaiseThrowable[F]) =
parseSimpleType(MsgpackItem.UnsignedInt)(length, ctx)

def parseArray[F[_]](length: Int, ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireBytes(length, ctx) map { case (ctx, result) =>
ctx.prepend(MsgpackItem.Array(result.toInt(false, ByteOrdering.BigEndian)))
}
}

def parseMap[F[_]](length: Int, ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireBytes(length, ctx) map { case (ctx, result) =>
ctx.prepend(MsgpackItem.Map(result.toInt(false, ByteOrdering.BigEndian)))
}
}

def parseTimestamp[F[_]](length: Int, ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
length match {
case 4 =>
requireBytes(4, ctx) map { case (ctx, result) =>
ctx.prepend(MsgpackItem.Timestamp32(result))
}
case 8 =>
requireBytes(8, ctx) map {
case (ctx, result) => {
val seconds = result & hex"00000003ffffffff"
val nanosec = result >> 34

ctx.prepend(MsgpackItem.Timestamp64(nanosec.drop(4), seconds.drop(3)))
}
}
case 12 =>
for {
(ctx, nanosec) <- requireBytes(4, ctx)
(ctx, seconds) <- requireBytes(8, ctx)
} yield ctx.prepend(MsgpackItem.Timestamp96(nanosec, seconds))
case _ => Pull.raiseError(new MsgpackParsingException(s"Invalid timestamp length: ${length}"))
}
}

def parseFixExt[F[_]](length: Int, ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireOneByte(ctx) flatMap {
case (ctx, header) => {
if (header == Headers.Timestamp) {
parseTimestamp(length, ctx)
} else {
requireBytes(length, ctx) map { case (ctx, bytes) =>
ctx.prepend(MsgpackItem.Extension(header, bytes))
}
}
}
}
}

def parsePlainExt[F[_]](length: Int, ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
for {
(ctx, size) <- requireBytes(length, ctx)
(ctx, header) <- requireOneByte(ctx)
sizeI = size.toInt(false, ByteOrdering.BigEndian)
out <-
if (header == Headers.Timestamp) {
parseTimestamp(sizeI, ctx)
} else {
requireBytes(sizeI, ctx) map { case (ctx, bytes) =>
ctx.prepend(MsgpackItem.Extension(header, bytes))
}
}
} yield out
}

def parseBin[F[_]](length: Int, ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireBytes(length, ctx) flatMap { case (ctx, result) =>
requireBytes(result.toInt(false, ByteOrdering.BigEndian), ctx) map { case (ctx, result) =>
ctx.prepend(MsgpackItem.Bin(result))
}
}
}

def parseString[F[_]](length: Int, ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireBytes(length, ctx) flatMap { case (ctx, result) =>
requireBytes(result.toInt(false, ByteOrdering.BigEndian), ctx) map { case (ctx, result) =>
ctx.prepend(MsgpackItem.Str(result))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2024 fs2-data Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2.data.msgpack.low.internal

import fs2.Chunk
import fs2.Pull
import fs2.RaiseThrowable
import fs2.Stream
import fs2.data.msgpack.low.MsgpackItem
import scodec.bits.ByteVector

private[internal] object Helpers {
case class MsgpackParsingException(str: String) extends Exception

/** @param chunk Current chunk
* @param idx Index of the current [[Byte]] in `chunk`
* @param rest Rest of the stream
* @param acc Accumulator of which contents are emitted when acquiring a new chunk
*/
case class ParserContext[F[_]](chunk: Chunk[Byte], idx: Int, rest: Stream[F, Byte], acc: List[MsgpackItem]) {
def prepend(item: MsgpackItem) = ParserContext(chunk, idx, rest, item :: acc)
def next = ParserContext(chunk, idx + 1, rest, acc)
def toResult[T](result: T) = (this, result)
}

type ParserResult[F[_], T] = (ParserContext[F], T)

/** Ensures that a computation `cont` will happen inside a valid context.
* @param cont function to be run with a chunk ensured
* @param onEos ran when out of stream
*/
def ensureChunk[F[_], T](ctx: ParserContext[F])(cont: ParserContext[F] => Pull[F, MsgpackItem, T])(
onEos: => Pull[F, MsgpackItem, T]): Pull[F, MsgpackItem, T] = {
if (ctx.idx >= ctx.chunk.size) {
Pull.output(Chunk.from(ctx.acc.reverse)) >> ctx.rest.pull.uncons flatMap {
case Some((hd, tl)) => ensureChunk(ParserContext(hd, 0, tl, Nil))(cont)(onEos)
case None => onEos
}
} else {
cont(ctx)
}
}

def requireOneByte[F[_]](ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserResult[F, Byte]] = {
ensureChunk(ctx) { ctx =>
// Inbounds chunk access is guaranteed by `ensureChunk`
Pull.pure((ctx.next, ctx.chunk(ctx.idx)))
} {
Pull.raiseError(new MsgpackParsingException("Unexpected end of input"))
}
}

def requireBytes[F[_]](count: Int, ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserResult[F, ByteVector]] = {
def go(count: Int, ctx: ParserContext[F], bytes: ByteVector): Pull[F, MsgpackItem, ParserResult[F, ByteVector]] = {
if (count <= 0) {
Pull.pure(ctx.toResult(bytes.reverse))
} else {
requireOneByte(ctx) flatMap { case (ctx, byte) =>
go(count - 1, ctx, byte +: bytes)
}
}
}

go(count, ctx, ByteVector.empty)
}
}
Loading

0 comments on commit 4c55887

Please sign in to comment.