Skip to content

Commit

Permalink
scrooge: Optimize serialization performance
Browse files Browse the repository at this point in the history
Problem
TReusableMemoryTransport uses a ByteArrayInputStream with synchronised methods, the transport is used from a single thread and synchronization is not really required but it adds some overhead especially on newer JVMs without biased locking. Additionally this introduces additional level of indirection that JIT needs to optimize plus if we controlled the underlying buffer we could use methods from Unsafe to write primitives to the buffer without using an intermediate byte array and without using System.arraycopy which could be expensive for small arrays.

Solution
Introduce a single threaded implementation of MemoryTransport that directly manages the underlying byte array and also use direct memory writes for primitive types.

JDK21
```
Benchmark                                               (size)   Mode  Cnt         Score        Error  Units
TransportBenchmark.encodeAirlineBaseline                   N/A  thrpt    5      8975.741 ±     73.402  ops/s
TransportBenchmark.encodeAirlineByteArrayTransport         N/A  thrpt    5     14632.672 ±     18.125  ops/s
TransportBenchmark.encodeAirlineUnsafeTransport            N/A  thrpt    5     15742.416 ±    203.313  ops/s
TransportBenchmark.encodeDoubleArrayBaseline                 5  thrpt    5   3165165.229 ±    865.324  ops/s
TransportBenchmark.encodeDoubleArrayBaseline                10  thrpt    5   1996723.513 ±    813.442  ops/s
TransportBenchmark.encodeDoubleArrayBaseline                50  thrpt    5    517236.667 ±   9625.233  ops/s
TransportBenchmark.encodeDoubleArrayBaseline               500  thrpt    5     55744.128 ±     31.492  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport       5  thrpt    5  13034582.982 ± 113618.553  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport      10  thrpt    5   7965754.309 ±  11327.671  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport      50  thrpt    5   1976067.803 ±   8051.731  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport     500  thrpt    5    212641.665 ±    279.586  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport          5  thrpt    5  34390166.003 ±  42283.346  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport         10  thrpt    5  26869677.811 ± 761342.592  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport         50  thrpt    5   8028375.541 ±  75541.740  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport        500  thrpt    5   1107122.950 ±   7175.220  ops/s
```

TwitterJDK 11
```
Benchmark                                               (size)   Mode  Cnt         Score       Error  Units
TransportBenchmark.encodeAirlineBaseline                   N/A  thrpt    5     11765.483 ±   192.296  ops/s
TransportBenchmark.encodeAirlineByteArrayTransport         N/A  thrpt    5     14030.813 ±    49.984  ops/s
TransportBenchmark.encodeAirlineUnsafeTransport            N/A  thrpt    5     15385.809 ±   428.500  ops/s
TransportBenchmark.encodeDoubleArrayBaseline                 5  thrpt    5   7024298.511 ± 82939.013  ops/s
TransportBenchmark.encodeDoubleArrayBaseline                10  thrpt    5   4436471.652 ± 76548.770  ops/s
TransportBenchmark.encodeDoubleArrayBaseline                50  thrpt    5   1136314.914 ±  6571.789  ops/s
TransportBenchmark.encodeDoubleArrayBaseline               500  thrpt    5    131403.572 ±  1452.554  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport       5  thrpt    5  10450745.663 ± 23594.889  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport      10  thrpt    5   6542692.999 ± 63437.149  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport      50  thrpt    5   1710071.757 ± 10593.509  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport     500  thrpt    5    187700.152 ±   969.060  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport          5  thrpt    5  25352502.715 ± 65770.557  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport         10  thrpt    5  19030775.867 ± 76415.965  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport         50  thrpt    5   6069935.386 ± 19759.800  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport        500  thrpt    5    698704.098 ±  1804.536  ops/s
```

TwitterJDK with -XX:-UseBiasedLocking
```
Benchmark                                               (size)   Mode  Cnt         Score        Error  Units
TransportBenchmark.encodeAirlineBaseline                   N/A  thrpt    5      8908.851 ±     94.576  ops/s
TransportBenchmark.encodeAirlineByteArrayTransport         N/A  thrpt    5     13334.376 ±    208.466  ops/s
TransportBenchmark.encodeAirlineUnsafeTransport            N/A  thrpt    5     14206.590 ±    120.613  ops/s
TransportBenchmark.encodeDoubleArrayBaseline                 5  thrpt    5   3305481.887 ±   9922.319  ops/s
TransportBenchmark.encodeDoubleArrayBaseline                10  thrpt    5   2152912.133 ±   3121.711  ops/s
TransportBenchmark.encodeDoubleArrayBaseline                50  thrpt    5    605487.993 ±   4952.450  ops/s
TransportBenchmark.encodeDoubleArrayBaseline               500  thrpt    5     67344.671 ±     51.999  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport       5  thrpt    5   9273549.548 ±   5494.655  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport      10  thrpt    5   6435802.855 ±   6062.881  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport      50  thrpt    5   1712034.742 ±   2193.393  ops/s
TransportBenchmark.encodeDoubleArrayByteArrayTransport     500  thrpt    5    187845.588 ±    703.777  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport          5  thrpt    5  25114440.253 ± 272593.238  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport         10  thrpt    5  19643037.550 ± 207643.475  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport         50  thrpt    5   6659417.813 ±  51079.641  ops/s
TransportBenchmark.encodeDoubleArrayUnsafeTransport        500  thrpt    5    698239.188 ±   3356.311  ops/s
```

Differential Revision: https://phabricator.twitter.biz/D1176860
  • Loading branch information
mbezoyan authored and jenkins committed Oct 17, 2024
1 parent 05a6102 commit 5ead807
Show file tree
Hide file tree
Showing 41 changed files with 753 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package com.twitter.scrooge.benchmark

import com.twitter.scrooge.BinaryThriftStructSerializer
import com.twitter.scrooge.adapt.testutil.ReloadOnceAdaptBinarySerializer
import com.twitter.scrooge.benchmark.AdaptTProtocolBenchmark.AirlineThreadState
import com.twitter.scrooge.benchmark.AdaptTProtocol.AirlineThreadState
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole
import thrift.benchmark._
import com.twitter.scrooge.ThriftStructSerializer

object AdaptTProtocolBenchmark {
object AdaptTProtocol {
// Pass in a seed and fixed number of airlines.
// Will be initialized with this object so separate from the benchmarks.
val (airlines: Array[Airline], airlinesBytes) =
Expand Down Expand Up @@ -37,8 +37,8 @@ object AdaptTProtocolBenchmark {

@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class AdaptTProtocolBenchmark {
import AdaptTProtocolBenchmark._
class AdaptTProtocol {
import AdaptTProtocol._

// todo: Add benchmarks for toBytes, all fields accessed, more fields accessed etc.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object AirlineGenerator {
Airline(name, Some(headQuarter), Some(owner), Some(airports), Some(routes), Some(flights))
}

private[this] def buildAirlines(rng: Random, num: Int): Array[Airline] =
def buildAirlines(rng: Random, num: Int): Array[Airline] =
(0 until num).map { _ => buildAirline(rng) }.toArray

def buildAirlinesAndBytes(seed: Long, num: Int): (Array[Airline], Array[Array[Byte]]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,78 @@ package com.twitter.scrooge.benchmark

import com.twitter.scrooge.ThriftStruct
import com.twitter.scrooge.ThriftStructCodec
import java.io.ByteArrayOutputStream
import java.util.concurrent.TimeUnit
import java.util.Random
import java.util.concurrent.TimeUnit
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.protocol.TProtocol
import org.apache.thrift.transport.TTransport
import org.openjdk.jmh.annotations._
import thrift.benchmark._
import scala.collection.mutable
import thrift.benchmark._

private class ExposedBAOS extends ByteArrayOutputStream {
def get: Array[Byte] = buf
def len: Int = count
}

class TRewindable extends TTransport {
private[this] var pos = 0
private[this] val arr = new ExposedBAOS()
object CollectionsBenchmark {
@State(Scope.Thread)
class CollectionsState {
@Param(Array("5", "10", "50", "500"))
var size: Int = 1

override def isOpen = true
override def open(): Unit = {}
override def close(): Unit = {}
override def flush(): Unit = {}
var col: CollectionsBenchmarkState = _

override def read(buf: Array[Byte], off: Int, len: Int): Int = {
val amtToRead = if (len > arr.len - pos) arr.len - pos else len
if (amtToRead > 0) {
System.arraycopy(arr.get, pos, buf, off, amtToRead)
pos += amtToRead
@Setup(Level.Trial)
def setup(): Unit = {
col = new CollectionsBenchmarkState(size)
}
amtToRead
}
}

override def write(buf: Array[Byte], off: Int, len: Int): Unit = {
arr.write(buf, off, len)
}
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(value = 1 /*, jvmArgsAppend = Array[String] { "-XX:-UseBiasedLocking" }*/ )
@Warmup(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
class CollectionsBenchmark {
import CollectionsBenchmark._

def rewind(): Unit = {
pos = 0
arr.reset()
}
@Benchmark
def timeEncodeList(state: CollectionsState): Unit =
state.col.encode(
ListCollections,
state.col.listProt,
state.col.list,
state.col.listCollections
)

def inspect: String = {
var buf = ""
var i = 0
val bytes = arr.toByteArray()
bytes foreach { byte =>
buf += (if (pos == i) "==>" else "") + Integer.toHexString(byte & 0xff) + " "
i += 1
}
buf
}
@Benchmark
def timeEncodeArray(state: CollectionsState): Unit =
state.col.encode(
ListCollections,
state.col.listProt,
state.col.list,
state.col.arrayCollections
)

@Benchmark
def timeEncodeDoubleArray(state: CollectionsState): Unit =
state.col.encode(
ListDoubleCollections,
state.col.listDoubleProt,
state.col.listDouble,
state.col.arrayDoubleCollections
)

@Benchmark
def timeDecodeMap(state: CollectionsState): Unit =
state.col.decode(MapCollections, state.col.mapProt, state.col.map)

@Benchmark
def timeDecodeSet(state: CollectionsState): Unit =
state.col.decode(SetCollections, state.col.setProt, state.col.set)

@Benchmark
def timeDecodeList(state: CollectionsState): Unit =
state.col.decode(ListCollections, state.col.listProt, state.col.list)
}

class Collections(size: Int) {
class CollectionsBenchmarkState(size: Int) {
val map: TRewindable = new TRewindable
val mapProt: TBinaryProtocol = new TBinaryProtocol(map)

Expand Down Expand Up @@ -110,68 +127,6 @@ class Collections(size: Int) {
): Unit = {
codec.encode(obj, prot)
buff.rewind()
buff.resetBuf();
}
}

object CollectionsBenchmark {
@State(Scope.Thread)
class CollectionsState {
@Param(Array("1", "5", "10", "100", "500"))
var size: Int = 1

var col: Collections = _

@Setup(Level.Trial)
def setup(): Unit = {
col = new Collections(size)
}
}
}

@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(1)
@Warmup(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
class CollectionsBenchmark {
import CollectionsBenchmark._

@Benchmark
def timeEncodeList(state: CollectionsState): Unit =
state.col.encode(
ListCollections,
state.col.listProt,
state.col.list,
state.col.listCollections
)

@Benchmark
def timeEncodeArray(state: CollectionsState): Unit =
state.col.encode(
ListCollections,
state.col.listProt,
state.col.list,
state.col.arrayCollections
)

@Benchmark
def timeEncodeDoubleArray(state: CollectionsState): Unit =
state.col.encode(
ListDoubleCollections,
state.col.listDoubleProt,
state.col.listDouble,
state.col.arrayDoubleCollections
)

@Benchmark
def timeDecodeMap(state: CollectionsState): Unit =
state.col.decode(MapCollections, state.col.mapProt, state.col.map)

@Benchmark
def timeDecodeSet(state: CollectionsState): Unit =
state.col.decode(SetCollections, state.col.setProt, state.col.set)

@Benchmark
def timeDecodeList(state: CollectionsState): Unit =
state.col.decode(ListCollections, state.col.listProt, state.col.list)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.twitter.scrooge.benchmark

import java.io.ByteArrayOutputStream
import org.apache.thrift.transport.TTransport

object TRewindable {
private class ExposedBAOS extends ByteArrayOutputStream {
def get: Array[Byte] = buf

def len: Int = count
}
}
class TRewindable extends TTransport {
private[this] var pos = 0
private[this] val arr = new TRewindable.ExposedBAOS()

override def isOpen = true
override def open(): Unit = {}
override def close(): Unit = {}
override def flush(): Unit = {}

override def read(buf: Array[Byte], off: Int, len: Int): Int = {
val amtToRead = if (len > arr.len - pos) arr.len - pos else len
if (amtToRead > 0) {
System.arraycopy(arr.get, pos, buf, off, amtToRead)
pos += amtToRead
}
amtToRead
}

override def write(buf: Array[Byte], off: Int, len: Int): Unit = {
arr.write(buf, off, len)
}

def rewind(): Unit = {
pos = 0;
}

def resetBuf(): Unit = {
arr.reset()
}

def inspect: String = {
var buf = ""
var i = 0
val bytes = arr.toByteArray()
bytes foreach { byte =>
buf += (if (pos == i) "==>" else "") + Integer.toHexString(byte & 0xff) + " "
i += 1
}
buf
}
}
Loading

0 comments on commit 5ead807

Please sign in to comment.