Skip to content

Commit

Permalink
Add benchmarks for magnolify-parquet vs parquet-avro R/W (#1040)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty authored Sep 20, 2024
1 parent d7827e2 commit 073c2e3
Show file tree
Hide file tree
Showing 3 changed files with 292 additions and 3 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ lazy val jmh: Project = project
cats % Test,
datastore % Test,
guava % Test,
parquet % Test,
protobuf % "test->test",
scalacheck % Test,
tensorflow % Test,
Expand All @@ -766,7 +767,13 @@ lazy val jmh: Project = project
"com.google.apis" % "google-api-services-bigquery" % bigqueryVersion % Test,
"com.google.cloud.datastore" % "datastore-v1-proto-client" % datastoreVersion % Test,
"org.apache.avro" % "avro" % avroVersion % Test,
"org.tensorflow" % "tensorflow-core-api" % tensorflowVersion % Test
"org.tensorflow" % "tensorflow-core-api" % tensorflowVersion % Test,
"joda-time" % "joda-time" % jodaTimeVersion % Test,
"org.apache.parquet" % "parquet-avro" % parquetVersion % Test,
"org.apache.parquet" % "parquet-column" % parquetVersion % Test,
"org.apache.parquet" % "parquet-hadoop" % parquetVersion % Test,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % Test,
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % Test
)
)

Expand Down
147 changes: 145 additions & 2 deletions jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package magnolify.jmh

import java.util.concurrent.TimeUnit

import magnolify.scalacheck.auto._
import magnolify.test.Simple._
import org.scalacheck._
import org.openjdk.jmh.annotations._

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

object MagnolifyBench {
val seed: rng.Seed = rng.Seed(0)
val prms: Gen.Parameters = Gen.Parameters.default
Expand Down Expand Up @@ -157,7 +159,148 @@ class ExampleBench {
private val exampleNested = implicitly[Arbitrary[ExampleNested]].arbitrary(prms, seed).get
private val example = exampleType.to(exampleNested).build()
@Benchmark def exampleTo: Example.Builder = exampleType.to(exampleNested)
@Benchmark def exampleFrom: ExampleNested = exampleType.from(example)
@Benchmark def exampleFrom: ExampleNested =
exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap)
}

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
class ParquetBench {
import MagnolifyBench._
import ParquetStates._
import magnolify.avro._
import org.apache.avro.generic.GenericRecord

private val genericRecord = AvroType[Nested].to(nested)

@Benchmark def parquetWriteMagnolify(state: ParquetCaseClassWriteState): Unit =
state.writer.write(nested)
@Benchmark def parquetWriteAvro(state: ParquetAvroWriteState): Unit =
state.writer.write(genericRecord)

@Benchmark def parquetReadMagnolify(state: ParquetCaseClassReadState): Nested =
state.reader.read()
@Benchmark def parquetReadAvro(state: ParquetAvroReadState): GenericRecord = state.reader.read()
}

object ParquetStates {
import MagnolifyBench._
import magnolify.avro._
import magnolify.parquet._
import magnolify.parquet.ParquetArray.AvroCompat._
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.conf.PlainParquetConfiguration
import org.apache.parquet.avro.{AvroReadSupport, AvroWriteSupport}
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.api.{ReadSupport, WriteSupport}
import org.apache.parquet.schema.MessageType
import org.apache.parquet.io._
import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.apache.parquet.column.impl.ColumnWriteStoreV1

@State(Scope.Benchmark)
class ReadState[T](
schema: MessageType,
writeSupport: WriteSupport[T],
readSupport: ReadSupport[T],
record: T
) {
import org.apache.parquet.hadoop.api.InitContext

var reader: RecordReader[T] = null

@Setup(Level.Trial)
def setup(): Unit = {
// Write page
val columnIO = new ColumnIOFactory(true).getColumnIO(schema)
val pageStore = new ParquetInMemoryPageStore(1)
val columnWriteStore = new ColumnWriteStoreV1(
schema,
pageStore,
ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build
)
val recordConsumer = columnIO.getRecordWriter(columnWriteStore)
writeSupport.init(new PlainParquetConfiguration())
writeSupport.prepareForWrite(recordConsumer)
writeSupport.write(record)
recordConsumer.flush()
columnWriteStore.flush()

// Set up reader
val conf = new Configuration()
reader = columnIO.getRecordReader(
pageStore,
readSupport.prepareForRead(
conf,
new java.util.HashMap,
schema,
readSupport.init(new InitContext(conf, new java.util.HashMap, schema))
)
): @nowarn("cat=deprecation")
}
}

@State(Scope.Benchmark)
class WriteState[T](writeSupport: WriteSupport[T]) {
val writer = writeSupport

@Setup(Level.Trial)
def setup(): Unit = {
writeSupport.init(new PlainParquetConfiguration())
// Use a no-op RecordConsumer; we want to measure only the record -> group conversion, and not pollute the
// benchmark with background tasks like flushing pages/blocks or validating records
writeSupport.prepareForWrite(new RecordConsumer {
override def startMessage(): Unit = {}
override def endMessage(): Unit = {}
override def startField(field: String, index: Int): Unit = {}
override def endField(field: String, index: Int): Unit = {}
override def startGroup(): Unit = {}
override def endGroup(): Unit = {}
override def addInteger(value: Int): Unit = {}
override def addLong(value: Long): Unit = {}
override def addBoolean(value: Boolean): Unit = {}
override def addBinary(value: Binary): Unit = {}
override def addFloat(value: Float): Unit = {}
override def addDouble(value: Double): Unit = {}
})
}
}

// R/W support for Group <-> Case Class Conversion (magnolify-parquet)
private val parquetType = ParquetType[Nested]
class ParquetCaseClassReadState
extends ParquetStates.ReadState[Nested](
parquetType.schema,
parquetType.writeSupport,
parquetType.readSupport,
nested
)
class ParquetCaseClassWriteState
extends ParquetStates.WriteState[Nested](parquetType.writeSupport)

// R/W support for Group <-> Avro Conversion (parquet-avro)
private val avroType = AvroType[Nested]
class ParquetAvroReadState
extends ParquetStates.ReadState[GenericRecord](
parquetType.schema,
new AvroWriteSupport[GenericRecord](
parquetType.schema,
parquetType.avroSchema,
GenericData.get()
),
new AvroReadSupport[GenericRecord](GenericData.get()),
avroType.to(nested)
)
class ParquetAvroWriteState
extends ParquetStates.WriteState[GenericRecord](
new AvroWriteSupport[GenericRecord](
parquetType.schema,
parquetType.avroSchema,
GenericData.get()
)
)
}

// Collections are not supported
Expand Down
139 changes: 139 additions & 0 deletions jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package magnolify.jmh

import org.apache.parquet.bytes.{ByteBufferReleaser, BytesInput, HeapByteBufferAllocator}
import org.apache.parquet.column.{ColumnDescriptor, Encoding}
import org.apache.parquet.column.page._
import org.apache.parquet.column.statistics._

import scala.collection.mutable

/**
* An in-memory Parquet page store modeled after parquet-java's MemPageStore, used to benchmark
* ParquetType conversion between Parquet Groups and Scala case classes
*/
class ParquetInMemoryPageStore(rowCount: Long) extends PageReadStore with PageWriteStore {
lazy val writers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryWriter]()
lazy val readers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryReader]()

override def getPageReader(path: ColumnDescriptor): PageReader =
readers.getOrElseUpdate(
path, {
val writer = writers(path)
new ParquetInMemoryReader(writer.pages.toList, writer.dictionaryPage)
}
)

override def getPageWriter(path: ColumnDescriptor): PageWriter =
writers.getOrElseUpdate(path, new ParquetInMemoryWriter)

override def getRowCount: Long = rowCount
}

class ParquetInMemoryReader(pages: List[DataPageV1], dictionaryPage: DictionaryPage)
extends PageReader {
// Infinitely return the first page; for the purposes of benchmarking, we don't care about the data itself
private val page = pages.head

override def readDictionaryPage(): DictionaryPage = dictionaryPage
override def getTotalValueCount: Long = Long.MaxValue
override def readPage(): DataPage = new DataPageV1(
page.getBytes.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)),
page.getValueCount,
page.getUncompressedSize,
page.getStatistics,
page.getRlEncoding,
page.getDlEncoding,
page.getValueEncoding
)
}

class ParquetInMemoryWriter extends PageWriter {
var numRows = 0
var numValues: Long = 0
var memSize: Long = 0
val pages = new mutable.ListBuffer[DataPageV1]()
var dictionaryPage: DictionaryPage = null

override def writePage(
bytesInput: BytesInput,
valueCount: Int,
statistics: Statistics[_],
rlEncoding: Encoding,
dlEncoding: Encoding,
valuesEncoding: Encoding
): Unit =
writePage(bytesInput, valueCount, 1, statistics, rlEncoding, dlEncoding, valuesEncoding)

override def writePage(
bytesInput: BytesInput,
valueCount: Int,
rowCount: Int,
statistics: Statistics[_],
sizeStatistics: SizeStatistics,
rlEncoding: Encoding,
dlEncoding: Encoding,
valuesEncoding: Encoding
): Unit =
writePage(bytesInput, valueCount, rowCount, statistics, rlEncoding, dlEncoding, valuesEncoding)

override def writePage(
bytesInput: BytesInput,
valueCount: Int,
rowCount: Int,
statistics: Statistics[_],
rlEncoding: Encoding,
dlEncoding: Encoding,
valuesEncoding: Encoding
): Unit = {
pages.addOne(
new DataPageV1(
bytesInput.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)),
valueCount,
bytesInput.size().toInt,
statistics,
rlEncoding,
dlEncoding,
valuesEncoding
)
)
memSize += bytesInput.size()
numRows += rowCount
numValues += valueCount
}

override def writePageV2(
rowCount: Int,
nullCount: Int,
valueCount: Int,
repetitionLevels: BytesInput,
definitionLevels: BytesInput,
dataEncoding: Encoding,
data: BytesInput,
statistics: Statistics[_]
): Unit = ???

override def getMemSize: Long = memSize

override def allocatedSize(): Long = memSize

override def writeDictionaryPage(dictionaryPage: DictionaryPage): Unit =
this.dictionaryPage = dictionaryPage

override def memUsageString(prefix: String): String = s"$prefix $memSize bytes"
}

0 comments on commit 073c2e3

Please sign in to comment.