diff --git a/build.gradle b/build.gradle index d46e92b..024c3e4 100644 --- a/build.gradle +++ b/build.gradle @@ -48,9 +48,10 @@ dependencies { } compile 'org.xerial.snappy:snappy-java:1.1.7.3' - ['test', 'standards', 'deps-buffer', 'deps-config'].each { v -> + ['core', 'standards', 'deps-buffer', 'deps-config'].each { v -> testImplementation "org.embulk:embulk-${v}:0.9.23" } + testImplementation "org.embulk:embulk-core:0.9.23:tests" testImplementation "org.scalatest:scalatest_2.13:3.1.1" testImplementation 'org.apache.parquet:parquet-tools:1.11.0' testImplementation 'org.apache.hadoop:hadoop-client:2.9.2' diff --git a/src/test/resources/org/embulk/output/s3_parquet/in1.csv b/src/test/resources/org/embulk/output/s3_parquet/in1.csv deleted file mode 100644 index 25c5e6c..0000000 --- a/src/test/resources/org/embulk/output/s3_parquet/in1.csv +++ /dev/null @@ -1,6 +0,0 @@ -c0:boolean,c1:long,c2:double,c3:string,c4:timestamp,c5:json -true,0,0.0,c212c89f91,2017-10-22 19:53:31.000000 +0900,"{""a"":0,""b"":""99""}" -false,1,-0.5,aaaaa,2017-10-22 19:53:31.000000 +0900,"{""a"":1,""b"":""a9""}" -false,2,1.5,90823c6a1f,2017-10-23 23:42:43.000000 +0900,"{""a"":2,""b"":""96""}" -true,3,0.44,,2017-10-22 06:12:13.000000 +0900,"{""a"":3,""b"":""86""}" -false,9999,10000.33333,e56a40571c,2017-10-23 04:59:16.000000 +0900,"{""a"":4,""b"":""d2""}" diff --git a/src/test/resources/org/embulk/output/s3_parquet/out1.tsv b/src/test/resources/org/embulk/output/s3_parquet/out1.tsv deleted file mode 100644 index d76ec00..0000000 --- a/src/test/resources/org/embulk/output/s3_parquet/out1.tsv +++ /dev/null @@ -1,5 +0,0 @@ -true 0 0.0 c212c89f91 2017-10-22 19:53:31.000000 +0900 {"a":0,"b":"99"} -false 1 -0.5 aaaaa 2017-10-22 19:53:31.000000 +0900 {"a":1,"b":"a9"} -false 2 1.5 90823c6a1f 2017-10-23 23:42:43.000000 +0900 {"a":2,"b":"96"} -true 3 0.44 2017-10-22 06:12:13.000000 +0900 {"a":3,"b":"86"} -false 9999 10000.33333 e56a40571c 2017-10-23 04:59:16.000000 +0900 {"a":4,"b":"d2"} diff --git a/src/test/scala/org/embulk/output/s3_parquet/EmbulkPluginTestHelper.scala b/src/test/scala/org/embulk/output/s3_parquet/EmbulkPluginTestHelper.scala new file mode 100644 index 0000000..f841b9a --- /dev/null +++ b/src/test/scala/org/embulk/output/s3_parquet/EmbulkPluginTestHelper.scala @@ -0,0 +1,276 @@ +package org.embulk.output.s3_parquet + +import java.io.File +import java.nio.file.{Files, Path} +import java.util.concurrent.ExecutionException + +import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration +import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} +import com.amazonaws.services.s3.model.ObjectListing +import com.amazonaws.services.s3.transfer.{ + TransferManager, + TransferManagerBuilder +} +import com.google.inject.{Binder, Guice, Module, Stage} +import org.apache.hadoop.fs.{Path => HadoopPath} +import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.tools.read.{SimpleReadSupport, SimpleRecord} +import org.embulk.{TestPluginSourceModule, TestUtilityModule} +import org.embulk.config.{ + ConfigLoader, + ConfigSource, + DataSourceImpl, + ModelManager, + TaskSource +} +import org.embulk.exec.{ + ExecModule, + ExtensionServiceLoaderModule, + SystemConfigModule +} +import org.embulk.jruby.JRubyScriptingModule +import org.embulk.plugin.{ + BuiltinPluginSourceModule, + InjectedPluginSource, + PluginClassLoaderModule +} +import org.embulk.spi.{Exec, ExecSession, OutputPlugin, PageTestUtils, Schema} +import org.msgpack.value.{Value, ValueFactory} +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.BeforeAndAfter +import org.scalatest.diagrams.Diagrams + +import scala.jdk.CollectionConverters._ +import scala.util.Using + +object EmbulkPluginTestHelper { + + case class TestRuntimeModule() extends Module { + + override def configure(binder: Binder): Unit = { + val systemConfig = new DataSourceImpl(null) + new SystemConfigModule(systemConfig).configure(binder) + new ExecModule(systemConfig).configure(binder) + new ExtensionServiceLoaderModule(systemConfig).configure(binder) + new BuiltinPluginSourceModule().configure(binder) + new JRubyScriptingModule(systemConfig).configure(binder) + new PluginClassLoaderModule().configure(binder) + new TestUtilityModule().configure(binder) + new TestPluginSourceModule().configure(binder) + InjectedPluginSource.registerPluginTo( + binder, + classOf[OutputPlugin], + "s3_parquet", + classOf[S3ParquetOutputPlugin] + ) + } + } + + def getExecSession: ExecSession = { + val injector = + Guice.createInjector(Stage.PRODUCTION, TestRuntimeModule()) + val execConfig = new DataSourceImpl( + injector.getInstance(classOf[ModelManager]) + ) + ExecSession.builder(injector).fromExecConfig(execConfig).build() + } +} + +abstract class EmbulkPluginTestHelper + extends AnyFunSuite + with BeforeAndAfter + with Diagrams { + private var exec: ExecSession = _ + + val TEST_S3_ENDPOINT: String = "http://localhost:4572" + val TEST_S3_REGION: String = "us-east-1" + val TEST_S3_ACCESS_KEY_ID: String = "test" + val TEST_S3_SECRET_ACCESS_KEY: String = "test" + val TEST_BUCKET_NAME: String = "my-bucket" + val TEST_PATH_PREFIX: String = "path/to/parquet-" + + before { + exec = EmbulkPluginTestHelper.getExecSession + + withLocalStackS3Client(_.createBucket(TEST_BUCKET_NAME)) + } + after { + exec.cleanup() + exec = null + + withLocalStackS3Client { cli => + @scala.annotation.tailrec + def rmRecursive(listing: ObjectListing): Unit = { + listing.getObjectSummaries.asScala.foreach(o => + cli.deleteObject(TEST_BUCKET_NAME, o.getKey) + ) + if (listing.isTruncated) + rmRecursive(cli.listNextBatchOfObjects(listing)) + } + rmRecursive(cli.listObjects(TEST_BUCKET_NAME)) + } + withLocalStackS3Client(_.deleteBucket(TEST_BUCKET_NAME)) + } + + def runOutput( + outConfig: ConfigSource, + schema: Schema, + data: Seq[Seq[Any]] + ): Seq[Seq[AnyRef]] = { + try { + Exec.doWith( + exec, + () => { + val plugin = + exec.getInjector.getInstance(classOf[S3ParquetOutputPlugin]) + plugin.transaction( + outConfig, + schema, + 1, + (taskSource: TaskSource) => { + Using.resource(plugin.open(taskSource, schema, 0)) { output => + try { + PageTestUtils + .buildPage( + exec.getBufferAllocator, + schema, + data.flatten: _* + ) + .asScala + .foreach(output.add) + output.commit() + } + catch { + case ex: Throwable => + output.abort() + throw ex + } + } + Seq.empty.asJava + } + ) + } + ) + } + catch { + case ex: ExecutionException => throw ex.getCause + } + + readS3Parquet(TEST_BUCKET_NAME, TEST_PATH_PREFIX) + } + + private def withLocalStackS3Client[A](f: AmazonS3 => A): A = { + val client: AmazonS3 = AmazonS3ClientBuilder.standard + .withEndpointConfiguration( + new EndpointConfiguration(TEST_S3_ENDPOINT, TEST_S3_REGION) + ) + .withCredentials( + new AWSStaticCredentialsProvider( + new BasicAWSCredentials( + TEST_S3_ACCESS_KEY_ID, + TEST_S3_SECRET_ACCESS_KEY + ) + ) + ) + .withPathStyleAccessEnabled(true) + .build() + + try f(client) + finally client.shutdown() + } + + def readS3Parquet(bucket: String, prefix: String): Seq[Seq[AnyRef]] = { + val tmpDir: Path = Files.createTempDirectory("embulk-output-parquet") + withLocalStackS3Client { s3 => + val xfer: TransferManager = TransferManagerBuilder + .standard() + .withS3Client(s3) + .build() + try xfer + .downloadDirectory(bucket, prefix, tmpDir.toFile) + .waitForCompletion() + finally xfer.shutdownNow() + } + + def listFiles(file: File): Seq[File] = { + file + .listFiles() + .flatMap(f => + if (f.isFile) Seq(f) + else listFiles(f) + ) + .toSeq + } + + listFiles(tmpDir.toFile) + .map(_.getAbsolutePath) + .foldLeft(Seq[Seq[AnyRef]]()) { + (result: Seq[Seq[AnyRef]], path: String) => + result ++ readParquetFile(path) + } + } + + private def readParquetFile(pathString: String): Seq[Seq[AnyRef]] = { + val reader: ParquetReader[SimpleRecord] = ParquetReader + .builder( + new SimpleReadSupport(), + new HadoopPath(pathString) + ) + .build() + + def read( + reader: ParquetReader[SimpleRecord], + records: Seq[Seq[AnyRef]] = Seq() + ): Seq[Seq[AnyRef]] = { + val simpleRecord: SimpleRecord = reader.read() + if (simpleRecord != null) { + val r: Seq[AnyRef] = simpleRecord.getValues.asScala + .map(_.getValue) + .toSeq + return read(reader, records :+ r) + } + records + } + try read(reader) + finally reader.close() + } + + def loadConfigSourceFromYamlString(yaml: String): ConfigSource = { + new ConfigLoader(exec.getModelManager).fromYamlString(yaml) + } + + def newJson(map: Map[String, Any]): Value = { + ValueFactory + .newMapBuilder() + .putAll(map.map { + case (k: String, v: Any) => + val value: Value = + v match { + case str: String => ValueFactory.newString(str) + case bool: Boolean => ValueFactory.newBoolean(bool) + case long: Long => ValueFactory.newInteger(long) + case int: Int => ValueFactory.newInteger(int) + case double: Double => ValueFactory.newFloat(double) + case float: Float => ValueFactory.newFloat(float) + case _ => ValueFactory.newNil() + } + ValueFactory.newString(k) -> value + }.asJava) + .build() + } + + def newDefaultConfig: ConfigSource = + loadConfigSourceFromYamlString( + s""" + |endpoint: $TEST_S3_ENDPOINT + |bucket: $TEST_BUCKET_NAME + |path_prefix: $TEST_PATH_PREFIX + |auth_method: basic + |access_key_id: $TEST_S3_ACCESS_KEY_ID + |secret_access_key: $TEST_S3_SECRET_ACCESS_KEY + |path_style_access_enabled: true + |default_timezone: Asia/Tokyo + |""".stripMargin + ) +} diff --git a/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala b/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala index 43f7bb7..f078d62 100644 --- a/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala +++ b/src/test/scala/org/embulk/output/s3_parquet/TestS3ParquetOutputPlugin.scala @@ -1,187 +1,114 @@ package org.embulk.output.s3_parquet -import java.io.File -import java.nio.file.FileSystems - -import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} -import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration -import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} -import com.amazonaws.services.s3.model.ObjectListing -import com.amazonaws.services.s3.transfer.TransferManagerBuilder -import com.google.common.io.Resources -import org.apache.hadoop.fs.{Path => HadoopPath} -import org.apache.parquet.hadoop.ParquetReader -import org.apache.parquet.tools.read.{SimpleReadSupport, SimpleRecord} -import org.embulk.config.ConfigSource -import org.embulk.spi.OutputPlugin -import org.embulk.test.{EmbulkTests, TestingEmbulk} -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.scalatest.diagrams.Diagrams -import org.scalatest.funsuite.AnyFunSuite - -import scala.jdk.CollectionConverters._ - -class TestS3ParquetOutputPlugin - extends AnyFunSuite - with BeforeAndAfter - with BeforeAndAfterAll - with Diagrams { - - val RESOURCE_NAME_PREFIX: String = "org/embulk/output/s3_parquet/" - val TEST_S3_ENDPOINT: String = "http://localhost:4572" - val TEST_S3_REGION: String = "us-east-1" - val TEST_S3_ACCESS_KEY_ID: String = "test" - val TEST_S3_SECRET_ACCESS_KEY: String = "test" - val TEST_BUCKET_NAME: String = "my-bucket" - - val embulk: TestingEmbulk = TestingEmbulk - .builder() - .registerPlugin( - classOf[OutputPlugin], - "s3_parquet", - classOf[S3ParquetOutputPlugin] +import org.embulk.spi.Schema +import org.embulk.spi.`type`.Types +import org.embulk.spi.time.{Timestamp, TimestampFormatter, TimestampParser} +import org.msgpack.value.Value + +import scala.util.chaining._ + +class TestS3ParquetOutputPlugin extends EmbulkPluginTestHelper { + + test("minimal default case") { + val schema: Schema = Schema + .builder() + .add("c0", Types.BOOLEAN) + .add("c1", Types.LONG) + .add("c2", Types.DOUBLE) + .add("c3", Types.STRING) + .add("c4", Types.TIMESTAMP) + .add("c5", Types.JSON) + .build() + // scalafmt: { maxColumn = 200 } + val parser = TimestampParser.of("%Y-%m-%d %H:%M:%S.%N %z", "UTC") + val data: Seq[Seq[Any]] = Seq( + Seq(true, 0L, 0.0d, "c212c89f91", parser.parse("2017-10-22 19:53:31.000000 +0900"), newJson(Map("a" -> 0, "b" -> "00"))), + Seq(false, 1L, -0.5d, "aaaaa", parser.parse("2017-10-22 19:53:31.000000 +0900"), newJson(Map("a" -> 1, "b" -> "11"))), + Seq(false, 2L, 1.5d, "90823c6a1f", parser.parse("2017-10-23 23:42:43.000000 +0900"), newJson(Map("a" -> 2, "b" -> "22"))), + Seq(true, 3L, 0.44d, "", parser.parse("2017-10-22 06:12:13.000000 +0900"), newJson(Map("a" -> 3, "b" -> "33", "c" -> 3.3))), + Seq(false, 9999L, 10000.33333d, "e56a40571c", parser.parse("2017-10-23 04:59:16.000000 +0900"), newJson(Map("a" -> 4, "b" -> "44", "c" -> 4.4, "d" -> true))) ) - .build() - - before { - withLocalStackS3Client(_.createBucket(TEST_BUCKET_NAME)) - } - - after { - withLocalStackS3Client { cli => - @scala.annotation.tailrec - def rmRecursive(listing: ObjectListing): Unit = { - listing.getObjectSummaries.asScala.foreach(o => - cli.deleteObject(TEST_BUCKET_NAME, o.getKey) - ) - if (listing.isTruncated) - rmRecursive(cli.listNextBatchOfObjects(listing)) + // scalafmt: { maxColumn = 80 } + + val result: Seq[Seq[AnyRef]] = runOutput(newDefaultConfig, schema, data) + + assert(result.size == 5) + data.indices.foreach { i => + data(i).indices.foreach { j => + data(i)(j) match { + case timestamp: Timestamp => + val formatter = + TimestampFormatter.of("%Y-%m-%d %H:%M:%S.%6N %z", "Asia/Tokyo") + assert( + formatter.format(timestamp) == result(i)(j), + s"A different timestamp value is found (Record Index: $i, Column Index: $j)" + ) + case value: Value => + assert( + value.toJson == result(i)(j), + s"A different json value is found (Record Index: $i, Column Index: $j)" + ) + case _ => + assert( + data(i)(j) == result(i)(j), + s"A different value is found (Record Index: $i, Column Index: $j)" + ) + } } - rmRecursive(cli.listObjects(TEST_BUCKET_NAME)) } - withLocalStackS3Client(_.deleteBucket(TEST_BUCKET_NAME)) - } - - def defaultOutConfig(): ConfigSource = { - embulk - .newConfig() - .set("type", "s3_parquet") - .set( - "endpoint", - "http://localhost:4572" - ) // See https://github.com/localstack/localstack#overview - .set("bucket", TEST_BUCKET_NAME) - .set("path_prefix", "path/to/p") - .set("auth_method", "basic") - .set("access_key_id", TEST_S3_ACCESS_KEY_ID) - .set("secret_access_key", TEST_S3_SECRET_ACCESS_KEY) - .set("path_style_access_enabled", true) - .set("default_timezone", "Asia/Tokyo") } - test("first test") { - val inPath = toPath("in1.csv") - val outConfig = defaultOutConfig() - - val result: TestingEmbulk.RunResult = embulk.runOutput(outConfig, inPath) - - val outRecords: Seq[Map[String, String]] = - result.getOutputTaskReports.asScala - .map { tr => - val b = tr.get(classOf[String], "bucket") - val k = tr.get(classOf[String], "key") - readParquetFile(b, k) - } - .foldLeft(Seq[Map[String, String]]()) { (merged, records) => - merged ++ records - } - - val inRecords: Seq[Seq[String]] = EmbulkTests - .readResource(RESOURCE_NAME_PREFIX + "out1.tsv") - .stripLineEnd - .split("\n") - .map(record => record.split("\t").toSeq) - .toSeq + test("timestamp-millis") { + val schema = Schema.builder().add("c0", Types.TIMESTAMP).build() + val data: Seq[Seq[Timestamp]] = Seq( + Seq(Timestamp.ofEpochMilli(111_111_111L)), + Seq(Timestamp.ofEpochMilli(222_222_222L)), + Seq(Timestamp.ofEpochMilli(333_333_333L)) + ) + val cfg = newDefaultConfig.merge( + loadConfigSourceFromYamlString(""" + |type_options: + | timestamp: + | logical_type: "timestamp-millis" + |""".stripMargin) + ) - inRecords.zipWithIndex.foreach { - case (record, recordIndex) => - 0.to(5).foreach { columnIndex => - val columnName = s"c$columnIndex" - val inData: String = inRecords(recordIndex)(columnIndex) - val outData: String = - outRecords(recordIndex).getOrElse(columnName, "") + runOutput(newDefaultConfig, schema, data) + val result: Seq[Seq[AnyRef]] = runOutput(cfg, schema, data) - assert( - outData === inData, - s"record: $recordIndex, column: $columnName" - ) - } + assert(data.size == result.size) + data.indices.foreach { i => + assert { + data(i).head.toEpochMilli == result(i).head.asInstanceOf[Long] + } } } - def readParquetFile(bucket: String, key: String): Seq[Map[String, String]] = { - val createdParquetFile = embulk.createTempFile("in") - withLocalStackS3Client { s3 => - val xfer = TransferManagerBuilder - .standard() - .withS3Client(s3) - .build() - try xfer - .download(bucket, key, createdParquetFile.toFile) - .waitForCompletion() - finally xfer.shutdownNow() - } + test("timestamp-micros") { + val schema = Schema.builder().add("c0", Types.TIMESTAMP).build() + val data: Seq[Seq[Timestamp]] = Seq( + Seq(Timestamp.ofEpochSecond(111_111_111L, 111_111_000L)), + Seq(Timestamp.ofEpochSecond(222_222_222L, 222_222_222L)), + Seq(Timestamp.ofEpochSecond(333_333_333L, 333_000L)) + ) + val cfg = newDefaultConfig.merge( + loadConfigSourceFromYamlString(""" + |type_options: + | timestamp: + | logical_type: "timestamp-micros" + |""".stripMargin) + ) - val reader: ParquetReader[SimpleRecord] = ParquetReader - .builder( - new SimpleReadSupport(), - new HadoopPath(createdParquetFile.toString) - ) - .build() + runOutput(newDefaultConfig, schema, data) + val result: Seq[Seq[AnyRef]] = runOutput(cfg, schema, data) - def read( - reader: ParquetReader[SimpleRecord], - records: Seq[Map[String, String]] = Seq() - ): Seq[Map[String, String]] = { - val simpleRecord: SimpleRecord = reader.read() - if (simpleRecord != null) { - val r: Map[String, String] = simpleRecord.getValues.asScala - .map(v => v.getName -> v.getValue.toString) - .toMap - return read(reader, records :+ r) + assert(data.size == result.size) + data.indices.foreach { i => + assert { + data(i).head.pipe(ts => + (ts.getEpochSecond * 1_000_000L) + (ts.getNano / 1_000L) + ) == result(i).head.asInstanceOf[Long] } - records } - - try read(reader) - finally { - reader.close() - - } - } - - private def toPath(fileName: String) = { - val url = Resources.getResource(RESOURCE_NAME_PREFIX + fileName) - FileSystems.getDefault.getPath(new File(url.toURI).getAbsolutePath) - } - - private def withLocalStackS3Client[A](f: AmazonS3 => A): A = { - val client: AmazonS3 = AmazonS3ClientBuilder.standard - .withEndpointConfiguration( - new EndpointConfiguration(TEST_S3_ENDPOINT, TEST_S3_REGION) - ) - .withCredentials( - new AWSStaticCredentialsProvider( - new BasicAWSCredentials( - TEST_S3_ACCESS_KEY_ID, - TEST_S3_SECRET_ACCESS_KEY - ) - ) - ) - .withPathStyleAccessEnabled(true) - .build() - - try f(client) - finally client.shutdown() } } diff --git a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala index 842c520..4bbd99f 100644 --- a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala +++ b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandler.scala @@ -2,11 +2,12 @@ package org.embulk.output.s3_parquet.parquet import org.embulk.spi.DataException import org.embulk.spi.`type`.Types +import org.scalatest.diagrams.Diagrams import org.scalatest.funsuite.AnyFunSuite import scala.util.Try -class TestLogicalTypeHandler extends AnyFunSuite { +class TestLogicalTypeHandler extends AnyFunSuite with Diagrams { test("IntLogicalTypeHandler.isConvertible() returns true for long") { val h = Int8LogicalTypeHandler diff --git a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala index 4ef2e0e..7d426a5 100644 --- a/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala +++ b/src/test/scala/org/embulk/output/s3_parquet/parquet/TestLogicalTypeHandlerStore.scala @@ -9,12 +9,13 @@ import org.embulk.output.s3_parquet.S3ParquetOutputPlugin.{ TypeOptionTask } import org.embulk.spi.`type`.{Types, Type => EType} +import org.scalatest.diagrams.Diagrams import org.scalatest.funsuite.AnyFunSuite import scala.jdk.CollectionConverters._ import scala.util.Try -class TestLogicalTypeHandlerStore extends AnyFunSuite { +class TestLogicalTypeHandlerStore extends AnyFunSuite with Diagrams { test("empty() returns empty maps") { val rv = LogicalTypeHandlerStore.empty