diff --git a/build.sbt b/build.sbt index 0e4675401..4460bd345 100644 --- a/build.sbt +++ b/build.sbt @@ -747,6 +747,7 @@ lazy val jmh: Project = project cats % Test, datastore % Test, guava % Test, + parquet % Test, protobuf % "test->test", scalacheck % Test, tensorflow % Test, @@ -766,7 +767,13 @@ lazy val jmh: Project = project "com.google.apis" % "google-api-services-bigquery" % bigqueryVersion % Test, "com.google.cloud.datastore" % "datastore-v1-proto-client" % datastoreVersion % Test, "org.apache.avro" % "avro" % avroVersion % Test, - "org.tensorflow" % "tensorflow-core-api" % tensorflowVersion % Test + "org.tensorflow" % "tensorflow-core-api" % tensorflowVersion % Test, + "joda-time" % "joda-time" % jodaTimeVersion % Test, + "org.apache.parquet" % "parquet-avro" % parquetVersion % Test, + "org.apache.parquet" % "parquet-column" % parquetVersion % Test, + "org.apache.parquet" % "parquet-hadoop" % parquetVersion % Test, + "org.apache.hadoop" % "hadoop-common" % hadoopVersion % Test, + "org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % Test ) ) diff --git a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala index ccf745160..e3fd8335b 100644 --- a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala +++ b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala @@ -17,12 +17,14 @@ package magnolify.jmh import java.util.concurrent.TimeUnit - import magnolify.scalacheck.auto._ import magnolify.test.Simple._ import org.scalacheck._ import org.openjdk.jmh.annotations._ +import scala.annotation.nowarn +import scala.jdk.CollectionConverters._ + object MagnolifyBench { val seed: rng.Seed = rng.Seed(0) val prms: Gen.Parameters = Gen.Parameters.default @@ -157,7 +159,148 @@ class ExampleBench { private val exampleNested = implicitly[Arbitrary[ExampleNested]].arbitrary(prms, seed).get private val example = exampleType.to(exampleNested).build() @Benchmark def exampleTo: Example.Builder = exampleType.to(exampleNested) - @Benchmark def exampleFrom: ExampleNested = exampleType.from(example) + @Benchmark def exampleFrom: ExampleNested = + exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap) +} + +@BenchmarkMode(Array(Mode.AverageTime)) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Thread) +class ParquetBench { + import MagnolifyBench._ + import ParquetStates._ + import magnolify.avro._ + import org.apache.avro.generic.GenericRecord + + private val genericRecord = AvroType[Nested].to(nested) + + @Benchmark def parquetWriteMagnolify(state: ParquetCaseClassWriteState): Unit = + state.writer.write(nested) + @Benchmark def parquetWriteAvro(state: ParquetAvroWriteState): Unit = + state.writer.write(genericRecord) + + @Benchmark def parquetReadMagnolify(state: ParquetCaseClassReadState): Nested = + state.reader.read() + @Benchmark def parquetReadAvro(state: ParquetAvroReadState): GenericRecord = state.reader.read() +} + +object ParquetStates { + import MagnolifyBench._ + import magnolify.avro._ + import magnolify.parquet._ + import magnolify.parquet.ParquetArray.AvroCompat._ + import org.apache.avro.generic.{GenericData, GenericRecord} + import org.apache.hadoop.conf.Configuration + import org.apache.parquet.conf.PlainParquetConfiguration + import org.apache.parquet.avro.{AvroReadSupport, AvroWriteSupport} + import org.apache.parquet.column.ParquetProperties + import org.apache.parquet.hadoop.api.{ReadSupport, WriteSupport} + import org.apache.parquet.schema.MessageType + import org.apache.parquet.io._ + import org.apache.parquet.io.api.{Binary, RecordConsumer} + import org.apache.parquet.column.impl.ColumnWriteStoreV1 + + @State(Scope.Benchmark) + class ReadState[T]( + schema: MessageType, + writeSupport: WriteSupport[T], + readSupport: ReadSupport[T], + record: T + ) { + import org.apache.parquet.hadoop.api.InitContext + + var reader: RecordReader[T] = null + + @Setup(Level.Trial) + def setup(): Unit = { + // Write page + val columnIO = new ColumnIOFactory(true).getColumnIO(schema) + val pageStore = new ParquetInMemoryPageStore(1) + val columnWriteStore = new ColumnWriteStoreV1( + schema, + pageStore, + ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build + ) + val recordConsumer = columnIO.getRecordWriter(columnWriteStore) + writeSupport.init(new PlainParquetConfiguration()) + writeSupport.prepareForWrite(recordConsumer) + writeSupport.write(record) + recordConsumer.flush() + columnWriteStore.flush() + + // Set up reader + val conf = new Configuration() + reader = columnIO.getRecordReader( + pageStore, + readSupport.prepareForRead( + conf, + new java.util.HashMap, + schema, + readSupport.init(new InitContext(conf, new java.util.HashMap, schema)) + ) + ): @nowarn("cat=deprecation") + } + } + + @State(Scope.Benchmark) + class WriteState[T](writeSupport: WriteSupport[T]) { + val writer = writeSupport + + @Setup(Level.Trial) + def setup(): Unit = { + writeSupport.init(new PlainParquetConfiguration()) + // Use a no-op RecordConsumer; we want to measure only the record -> group conversion, and not pollute the + // benchmark with background tasks like flushing pages/blocks or validating records + writeSupport.prepareForWrite(new RecordConsumer { + override def startMessage(): Unit = {} + override def endMessage(): Unit = {} + override def startField(field: String, index: Int): Unit = {} + override def endField(field: String, index: Int): Unit = {} + override def startGroup(): Unit = {} + override def endGroup(): Unit = {} + override def addInteger(value: Int): Unit = {} + override def addLong(value: Long): Unit = {} + override def addBoolean(value: Boolean): Unit = {} + override def addBinary(value: Binary): Unit = {} + override def addFloat(value: Float): Unit = {} + override def addDouble(value: Double): Unit = {} + }) + } + } + + // R/W support for Group <-> Case Class Conversion (magnolify-parquet) + private val parquetType = ParquetType[Nested] + class ParquetCaseClassReadState + extends ParquetStates.ReadState[Nested]( + parquetType.schema, + parquetType.writeSupport, + parquetType.readSupport, + nested + ) + class ParquetCaseClassWriteState + extends ParquetStates.WriteState[Nested](parquetType.writeSupport) + + // R/W support for Group <-> Avro Conversion (parquet-avro) + private val avroType = AvroType[Nested] + class ParquetAvroReadState + extends ParquetStates.ReadState[GenericRecord]( + parquetType.schema, + new AvroWriteSupport[GenericRecord]( + parquetType.schema, + parquetType.avroSchema, + GenericData.get() + ), + new AvroReadSupport[GenericRecord](GenericData.get()), + avroType.to(nested) + ) + class ParquetAvroWriteState + extends ParquetStates.WriteState[GenericRecord]( + new AvroWriteSupport[GenericRecord]( + parquetType.schema, + parquetType.avroSchema, + GenericData.get() + ) + ) } // Collections are not supported diff --git a/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala b/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala new file mode 100644 index 000000000..b80e75c5d --- /dev/null +++ b/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala @@ -0,0 +1,139 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.jmh + +import org.apache.parquet.bytes.{ByteBufferReleaser, BytesInput, HeapByteBufferAllocator} +import org.apache.parquet.column.{ColumnDescriptor, Encoding} +import org.apache.parquet.column.page._ +import org.apache.parquet.column.statistics._ + +import scala.collection.mutable + +/** + * An in-memory Parquet page store modeled after parquet-java's MemPageStore, used to benchmark + * ParquetType conversion between Parquet Groups and Scala case classes + */ +class ParquetInMemoryPageStore(rowCount: Long) extends PageReadStore with PageWriteStore { + lazy val writers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryWriter]() + lazy val readers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryReader]() + + override def getPageReader(path: ColumnDescriptor): PageReader = + readers.getOrElseUpdate( + path, { + val writer = writers(path) + new ParquetInMemoryReader(writer.pages.toList, writer.dictionaryPage) + } + ) + + override def getPageWriter(path: ColumnDescriptor): PageWriter = + writers.getOrElseUpdate(path, new ParquetInMemoryWriter) + + override def getRowCount: Long = rowCount +} + +class ParquetInMemoryReader(pages: List[DataPageV1], dictionaryPage: DictionaryPage) + extends PageReader { + // Infinitely return the first page; for the purposes of benchmarking, we don't care about the data itself + private val page = pages.head + + override def readDictionaryPage(): DictionaryPage = dictionaryPage + override def getTotalValueCount: Long = Long.MaxValue + override def readPage(): DataPage = new DataPageV1( + page.getBytes.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)), + page.getValueCount, + page.getUncompressedSize, + page.getStatistics, + page.getRlEncoding, + page.getDlEncoding, + page.getValueEncoding + ) +} + +class ParquetInMemoryWriter extends PageWriter { + var numRows = 0 + var numValues: Long = 0 + var memSize: Long = 0 + val pages = new mutable.ListBuffer[DataPageV1]() + var dictionaryPage: DictionaryPage = null + + override def writePage( + bytesInput: BytesInput, + valueCount: Int, + statistics: Statistics[_], + rlEncoding: Encoding, + dlEncoding: Encoding, + valuesEncoding: Encoding + ): Unit = + writePage(bytesInput, valueCount, 1, statistics, rlEncoding, dlEncoding, valuesEncoding) + + override def writePage( + bytesInput: BytesInput, + valueCount: Int, + rowCount: Int, + statistics: Statistics[_], + sizeStatistics: SizeStatistics, + rlEncoding: Encoding, + dlEncoding: Encoding, + valuesEncoding: Encoding + ): Unit = + writePage(bytesInput, valueCount, rowCount, statistics, rlEncoding, dlEncoding, valuesEncoding) + + override def writePage( + bytesInput: BytesInput, + valueCount: Int, + rowCount: Int, + statistics: Statistics[_], + rlEncoding: Encoding, + dlEncoding: Encoding, + valuesEncoding: Encoding + ): Unit = { + pages.addOne( + new DataPageV1( + bytesInput.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)), + valueCount, + bytesInput.size().toInt, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding + ) + ) + memSize += bytesInput.size() + numRows += rowCount + numValues += valueCount + } + + override def writePageV2( + rowCount: Int, + nullCount: Int, + valueCount: Int, + repetitionLevels: BytesInput, + definitionLevels: BytesInput, + dataEncoding: Encoding, + data: BytesInput, + statistics: Statistics[_] + ): Unit = ??? + + override def getMemSize: Long = memSize + + override def allocatedSize(): Long = memSize + + override def writeDictionaryPage(dictionaryPage: DictionaryPage): Unit = + this.dictionaryPage = dictionaryPage + + override def memUsageString(prefix: String): String = s"$prefix $memSize bytes" +}