diff --git a/.scalafmt.conf b/.scalafmt.conf index 7a9fe74..b564ec8 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,5 +1,5 @@ # https://scalameta.org/scalafmt/#Configuration -version = "2.3.2" +version = "3.7.1" +runner.dialect = "scala3" newlines.alwaysBeforeElseAfterCurlyIf = true -newlines.alwaysBeforeTopLevelStatements = true diff --git a/README.md b/README.md index fb81be9..af2c10b 100644 --- a/README.md +++ b/README.md @@ -174,9 +174,11 @@ You can see more examples [here](./example). ```shell $ ./run_dynamodb_local.sh +# Set dummy credentials (access_key_id=dummy and secret_access_key=dummy) +$ aws configure $ ./example/prepare_dynamodb_table.sh -$ ./gradlew classpath -$ embulk run example/config-query.yml -Ilib +$ ./gradlew gem +$ embulk run example/config-query.yml -Ibuild/gemContents/lib ``` ### Run tests @@ -230,4 +232,4 @@ $ ./gradlew gemPush ## License -[MIT LICENSE](./LICENSE) +[MIT LICENSE](./LICENSE.txt) diff --git a/build.gradle b/build.gradle index 50ce3c8..410dee4 100644 --- a/build.gradle +++ b/build.gradle @@ -2,7 +2,10 @@ plugins { id "scala" id "maven-publish" id "org.embulk.embulk-plugins" version "0.5.5" - id "com.diffplug.spotless" version "6.15.0" + // note: Incompatible because this component declares an API of a component compatible with Java 11 and the consumer needed a runtime of a component compatible with Java 8 + // id "com.diffplug.spotless" version "6.15.0" + // https://github.com/diffplug/spotless/blob/main/plugin-gradle/CHANGES.md#6130---2023-01-14 + id "com.diffplug.spotless" version "6.13.0" // note: We cannot use the latest version because of the following error. // > org/eclipse/jgit/lib/Repository has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0 // id "com.palantir.git-version" version "0.15.0" @@ -26,25 +29,24 @@ sourceCompatibility = 1.8 targetCompatibility = 1.8 dependencies { - compileOnly "org.scala-lang:scala-library:2.13.1" - - implementation "org.embulk:embulk-core:0.9.23" + // note: The upper versions of embulk includes Guava Dependency removal, + // so we need to specify the version of embulk to avoid some errors. + // We may need to update the version of embulk in the future. + def embulkVersion = "0.10.41" + compileOnly "org.embulk:embulk-api:${embulkVersion}" + compileOnly "org.embulk:embulk-spi:${embulkVersion}" + implementation "org.scala-lang:scala-library:2.13.1" + implementation "org.embulk:embulk-util-config:0.3.2" + implementation "org.embulk:embulk-util-json:0.1.1" + implementation "org.embulk:embulk-util-timestamp:0.2.1" implementation "com.amazonaws:aws-java-sdk-dynamodb:1.11.711" implementation "com.amazonaws:aws-java-sdk-sts:1.11.711" - // For @delegate macro. - implementation "dev.zio:zio-macros-core_2.13:0.6.2" testImplementation "junit:junit:4.+" - testImplementation "org.embulk:embulk-core:0.9.23:tests" - testImplementation "org.embulk:embulk-standards:0.9.23" - testImplementation "org.embulk:embulk-deps-buffer:0.9.23" - testImplementation "org.embulk:embulk-deps-config:0.9.23" -} -compileScala { - scalaCompileOptions.additionalParameters = [ - "-Ymacro-annotations" - ] + testImplementation "org.embulk:embulk-junit4:${embulkVersion}" + testImplementation "org.embulk:embulk-deps:${embulkVersion}" + testImplementation "org.embulk:embulk-input-config:0.10.36" } test { jvmArgs '-Xms4g', '-Xmx4g', '-XX:MaxMetaspaceSize=1g' diff --git a/src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala b/src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala index c783639..4f49f0d 100644 --- a/src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala +++ b/src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala @@ -18,8 +18,8 @@ class DynamodbInputPlugin extends InputPlugin { val schema: Schema = DynamodbItemSchema(task).getEmbulkSchema val taskCount: Int = DynamodbOperationProxy(task).getEmbulkTaskCount - control.run(task.dump(), schema, taskCount) - Exec.newConfigDiff() + control.run(task.toTaskSource(), schema, taskCount) + PluginTask.newConfigDiff() } override def resume( @@ -38,7 +38,7 @@ class DynamodbInputPlugin extends InputPlugin { output: PageOutput ): TaskReport = { val task: PluginTask = PluginTask.load(taskSource) - val pageBuilder = new PageBuilder(task.getBufferAllocator, schema, output) + val pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output) Aws(task).withDynamodb { dynamodb => DynamodbOperationProxy(task).run( @@ -48,7 +48,7 @@ class DynamodbInputPlugin extends InputPlugin { ) } pageBuilder.finish() - Exec.newTaskReport() + PluginTask.newTaskReport() } override def cleanup( @@ -56,8 +56,7 @@ class DynamodbInputPlugin extends InputPlugin { schema: Schema, taskCount: Int, successTaskReports: JList[TaskReport] - ): Unit = { - } + ): Unit = {} override def guess(config: ConfigSource): ConfigDiff = { throw new UnsupportedOperationException diff --git a/src/main/scala/org/embulk/input/dynamodb/PluginTask.scala b/src/main/scala/org/embulk/input/dynamodb/PluginTask.scala index 1c4e6af..c2862de 100644 --- a/src/main/scala/org/embulk/input/dynamodb/PluginTask.scala +++ b/src/main/scala/org/embulk/input/dynamodb/PluginTask.scala @@ -3,41 +3,45 @@ package org.embulk.input.dynamodb import java.util.Optional import org.embulk.config.{ - Config, - ConfigDefault, + ConfigDiff, ConfigException, - ConfigInject, ConfigSource, - Task, - TaskSource + TaskSource, + TaskReport +} +import org.embulk.util.config.{ + Config, + ConfigDefault, + ConfigMapperFactory, + Task => EmbulkTask } import org.embulk.input.dynamodb.aws.Aws import org.embulk.input.dynamodb.item.DynamodbItemSchema import org.embulk.input.dynamodb.operation.DynamodbOperationProxy import org.embulk.spi.BufferAllocator -import zio.macros.annotation.delegate import scala.util.chaining._ trait PluginTask - extends Task + extends EmbulkTask with Aws.Task with DynamodbItemSchema.Task - with DynamodbOperationProxy.Task { - - @ConfigInject - def getBufferAllocator: BufferAllocator -} + with DynamodbOperationProxy.Task {} object PluginTask { + private val configMapperFactory: ConfigMapperFactory = + ConfigMapperFactory.builder().addDefaultModules().build() def load(configSource: ConfigSource): PluginTask = { - configSource - .loadConfig(classOf[PluginTask]) + configMapperFactory + .createConfigMapper() + .map(configSource, classOf[PluginTask]) } def load(taskSource: TaskSource): PluginTask = { - taskSource - .loadTask(classOf[PluginTask]) + configMapperFactory.createTaskMapper().map(taskSource, classOf[PluginTask]) } + + def newConfigDiff(): ConfigDiff = configMapperFactory.newConfigDiff() + def newTaskReport(): TaskReport = configMapperFactory.newTaskReport() } diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/Aws.scala b/src/main/scala/org/embulk/input/dynamodb/aws/Aws.scala index 9703410..6a9cca1 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/Aws.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/Aws.scala @@ -5,11 +5,13 @@ import com.amazonaws.services.dynamodbv2.{ AmazonDynamoDB, AmazonDynamoDBClientBuilder } +import org.embulk.util.config.{Task => EmbulkTask} object Aws { trait Task - extends AwsCredentials.Task + extends EmbulkTask + with AwsCredentials.Task with AwsEndpointConfiguration.Task with AwsClientConfiguration.Task with AwsDynamodbConfiguration.Task diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/AwsClientConfiguration.scala b/src/main/scala/org/embulk/input/dynamodb/aws/AwsClientConfiguration.scala index a43bfdc..2e07a23 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/AwsClientConfiguration.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/AwsClientConfiguration.scala @@ -4,12 +4,12 @@ import java.util.Optional import com.amazonaws.ClientConfiguration import com.amazonaws.client.builder.AwsClientBuilder -import org.embulk.config.{Config, ConfigDefault} +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import org.embulk.input.dynamodb.aws.AwsClientConfiguration.Task object AwsClientConfiguration { - trait Task { + trait Task extends EmbulkTask { @Config("http_proxy") @ConfigDefault("null") diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/AwsCredentials.scala b/src/main/scala/org/embulk/input/dynamodb/aws/AwsCredentials.scala index 4efbd34..eab934c 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/AwsCredentials.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/AwsCredentials.scala @@ -19,15 +19,15 @@ import com.amazonaws.auth.profile.{ ProfileCredentialsProvider, ProfilesConfigFile } -import org.embulk.config.{Config, ConfigDefault, ConfigException} +import org.embulk.config.ConfigException +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import org.embulk.input.dynamodb.aws.AwsCredentials.Task import org.embulk.input.dynamodb.logger -import org.embulk.spi.unit.LocalFile -import zio.macros.annotation.delegate +import org.embulk.util.config.units.LocalFile object AwsCredentials { - trait Task { + trait Task extends EmbulkTask { @Config("auth_method") @ConfigDefault("\"default\"") diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/AwsDynamodbConfiguration.scala b/src/main/scala/org/embulk/input/dynamodb/aws/AwsDynamodbConfiguration.scala index ed91b52..7adcfd8 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/AwsDynamodbConfiguration.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/AwsDynamodbConfiguration.scala @@ -3,12 +3,12 @@ package org.embulk.input.dynamodb.aws import java.util.Optional import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder -import org.embulk.config.{Config, ConfigDefault} +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import org.embulk.input.dynamodb.aws.AwsDynamodbConfiguration.Task object AwsDynamodbConfiguration { - trait Task { + trait Task extends EmbulkTask { @Config("enable_endpoint_discovery") @ConfigDefault("null") diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/AwsEndpointConfiguration.scala b/src/main/scala/org/embulk/input/dynamodb/aws/AwsEndpointConfiguration.scala index bc497cb..bef8f91 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/AwsEndpointConfiguration.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/AwsEndpointConfiguration.scala @@ -5,16 +5,16 @@ import java.util.Optional import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions} -import org.embulk.config.{Config, ConfigDefault, ConfigException} +import org.embulk.config.ConfigException +import org.embulk.util.config.{Task => EmbulkTask, Config, ConfigDefault} import org.embulk.input.dynamodb.aws.AwsEndpointConfiguration.Task import org.embulk.input.dynamodb.logger -import zio.macros.annotation.delegate import scala.util.Try object AwsEndpointConfiguration { - trait Task { + trait Task extends EmbulkTask { @Config("endpoint") @ConfigDefault("null") def getEndpoint: Optional[String] diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/HttpProxy.scala b/src/main/scala/org/embulk/input/dynamodb/aws/HttpProxy.scala index edd1fb0..c986f38 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/HttpProxy.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/HttpProxy.scala @@ -3,12 +3,13 @@ package org.embulk.input.dynamodb.aws import java.util.Optional import com.amazonaws.{ClientConfiguration, Protocol} -import org.embulk.config.{Config, ConfigDefault, ConfigException} +import org.embulk.config.ConfigException +import org.embulk.util.config.{Task => EmbulkTask, Config, ConfigDefault} import org.embulk.input.dynamodb.aws.HttpProxy.Task object HttpProxy { - trait Task { + trait Task extends EmbulkTask { @Config("host") @ConfigDefault("null") @@ -50,8 +51,8 @@ class HttpProxy(task: Task) { case None => throw new ConfigException( s"'${task.getProtocol}' is unsupported: `protocol` must be one of [${Protocol.values - .map(v => s"'$v'") - .mkString(", ")}]." + .map(v => s"'$v'") + .mkString(", ")}]." ) } diff --git a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValue.scala b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValue.scala index 8d4449a..2345022 100644 --- a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValue.scala +++ b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValue.scala @@ -5,15 +5,15 @@ import java.nio.charset.StandardCharsets import java.util.{Optional, List => JList, Map => JMap} import com.amazonaws.services.dynamodbv2.model.AttributeValue -import org.embulk.config.{Config, ConfigDefault, Task => EmbulkTask} +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import scala.jdk.CollectionConverters._ import scala.util.chaining._ -/** - * TODO: I want to bind directly `org.embulk.config.Config`` to `com.amazonaws.services.dynamodbv2.model.AttributeValue`. - * Should I implement `com.amazonaws.transform.JsonUnmarshallerContext`? - **/ +/** TODO: I want to bind directly `org.embulk.util.config.Config`` to + * `com.amazonaws.services.dynamodbv2.model.AttributeValue`. Should I implement + * `com.amazonaws.transform.JsonUnmarshallerContext`? + */ object DynamodbAttributeValue { trait Task extends EmbulkTask { diff --git a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValueEmbulkTypeTransformable.scala b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValueEmbulkTypeTransformable.scala index 67c44f1..188e497 100644 --- a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValueEmbulkTypeTransformable.scala +++ b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValueEmbulkTypeTransformable.scala @@ -4,11 +4,13 @@ import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import org.embulk.input.dynamodb.logger -import org.embulk.spi.time.{Timestamp, TimestampParser} +import org.embulk.spi.time.Timestamp +import org.embulk.util.timestamp.TimestampFormatter import org.msgpack.value.{Value, ValueFactory} import scala.jdk.CollectionConverters._ import scala.util.chaining._ +import java.time.Instant object DynamodbAttributeValueEmbulkTypeTransformable { @@ -50,7 +52,7 @@ object DynamodbAttributeValueEmbulkTypeTransformable { case class DynamodbAttributeValueEmbulkTypeTransformable( attributeValue: DynamodbAttributeValue, typeEnforcer: Option[DynamodbAttributeValueType] = None, - timestampParser: Option[TimestampParser] = None + timestampFormatter: Option[TimestampFormatter] = None ) { private def fromAttributeValueType: DynamodbAttributeValueType = @@ -142,10 +144,14 @@ case class DynamodbAttributeValueEmbulkTypeTransformable( Option(fromAttributeValueType match { case DynamodbAttributeValueType.S => - if (DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS - .contains(attributeValue.getS)) true - else if (DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS - .contains(attributeValue.getS)) false + if ( + DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS + .contains(attributeValue.getS) + ) true + else if ( + DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS + .contains(attributeValue.getS) + ) false else return None case DynamodbAttributeValueType.N => convertNAsLongOrDouble(attributeValue.getN) match { @@ -154,11 +160,15 @@ case class DynamodbAttributeValueEmbulkTypeTransformable( } case DynamodbAttributeValueType.B => val s = convertBAsString(attributeValue.getB) - if (DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS - .contains(s)) + if ( + DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS + .contains(s) + ) true - else if (DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS - .contains(s)) false + else if ( + DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS + .contains(s) + ) false else return None case DynamodbAttributeValueType.BOOL => attributeValue.getBOOL case unsupported => @@ -221,9 +231,9 @@ case class DynamodbAttributeValueEmbulkTypeTransformable( if (attributeValue.isNull) return None Option(fromAttributeValueType match { - case DynamodbAttributeValueType.S => attributeValue.getS - case DynamodbAttributeValueType.N => attributeValue.getN - case DynamodbAttributeValueType.B => convertBAsString(attributeValue.getB) + case DynamodbAttributeValueType.S => attributeValue.getS + case DynamodbAttributeValueType.N => attributeValue.getN + case DynamodbAttributeValueType.B => convertBAsString(attributeValue.getB) case DynamodbAttributeValueType.SS => asMessagePack.map(_.toJson).get case DynamodbAttributeValueType.NS => asMessagePack.map(_.toJson).get case DynamodbAttributeValueType.BS => asMessagePack.map(_.toJson).get @@ -238,8 +248,8 @@ case class DynamodbAttributeValueEmbulkTypeTransformable( }) } - def asTimestamp: Option[Timestamp] = { - timestampParser.flatMap(p => asString.map(p.parse)) + def asTimestamp: Option[Instant] = { + timestampFormatter.flatMap(p => asString.map(p.parse)) } } diff --git a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemReader.scala b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemReader.scala index 96dd77f..34081fe 100644 --- a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemReader.scala +++ b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemReader.scala @@ -3,6 +3,7 @@ package org.embulk.input.dynamodb.item import org.embulk.spi.Column import org.embulk.spi.time.Timestamp import org.msgpack.value.Value +import java.time.Instant case class DynamodbItemReader( private val schema: DynamodbItemSchema, @@ -28,7 +29,7 @@ case class DynamodbItemReader( DynamodbAttributeValueEmbulkTypeTransformable( value, typeEnforcer = schema.getAttributeType(name), - timestampParser = schema.getTimestampParser(name) + timestampFormatter = schema.getTimestampFormatter(name) ) } @@ -52,7 +53,7 @@ case class DynamodbItemReader( .get(column.getName) .flatMap(v => getTransformable(column.getName, v).asDouble) - def getTimestamp(column: Column): Option[Timestamp] = + def getTimestamp(column: Column): Option[Instant] = currentItem .get(column.getName) .flatMap(v => getTransformable(column.getName, v).asTimestamp) diff --git a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemSchema.scala b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemSchema.scala index 24f00b5..b994ad1 100644 --- a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemSchema.scala +++ b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemSchema.scala @@ -4,10 +4,10 @@ import java.util.{Optional, List => JList} import com.amazonaws.services.dynamodbv2.model.AttributeValue import com.fasterxml.jackson.annotation.{JsonCreator, JsonValue} -import org.embulk.config.{Config, ConfigDefault, Task => EmbulkTask} +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import org.embulk.spi.{Column, PageBuilder, Schema} import org.embulk.spi.`type`.{Type, Types} -import org.embulk.spi.time.TimestampParser +import org.embulk.util.timestamp.TimestampFormatter import scala.jdk.CollectionConverters._ import scala.util.chaining._ @@ -15,9 +15,7 @@ import scala.util.Try object DynamodbItemSchema { - trait ColumnTask - extends EmbulkTask - with TimestampParser.TimestampColumnOption { + trait ColumnTask extends EmbulkTask { @Config("name") def getName: String @@ -28,34 +26,21 @@ object DynamodbItemSchema { @Config("attribute_type") @ConfigDefault("null") def getAttributeType: Optional[String] - } - @deprecated( - message = "for DeprecatedDynamodbInputPlugin", - since = "0.3.0" - ) - case class SchemaConfigCompat(columnTasks: Seq[ColumnTask]) { - @JsonCreator - def this(columnTasks: JList[ColumnTask]) = - this(columnTasks.asScala.toSeq) - - @JsonValue - def getColumnTasks: JList[ColumnTask] = columnTasks.asJava - - def toSchema: Schema = - Schema - .builder() - .tap { b => - columnTasks.foreach { t => - b.add(t.getName, t.getType) - } - } - .build() + @Config("timezone") + @ConfigDefault("null") + def getTimeZoneId: Optional[String] + + @Config("format") + @ConfigDefault("null") + def getFormat: Optional[String] - def isEmpty: Boolean = columnTasks.isEmpty + @Config("date") + @ConfigDefault("null") + def getDate: Optional[String] } - trait Task extends EmbulkTask with TimestampParser.Task { + trait Task extends EmbulkTask { @Config("json_column_name") @ConfigDefault("\"record\"") @@ -63,7 +48,7 @@ object DynamodbItemSchema { @Config("columns") @ConfigDefault("[]") - def getColumns: Jlist[ColumnTask] + def getColumns: JList[ColumnTask] @Config("default_timezone") @ConfigDefault("\"UTC\"") @@ -88,19 +73,30 @@ case class DynamodbItemSchema(task: DynamodbItemSchema.Task) { .tap { b => if (isItemAsJson) b.add(task.getJsonColumnName, Types.JSON) else - task.getColumns.toSeq.foreach { t => + task.getColumns.asScala.foreach { t => b.add(t.getName, t.getType) } } .build() - private lazy val timestampParsers: Map[String, TimestampParser] = - task.getColumns.toSeq.map { columnTask => - columnTask.getName -> TimestampParser.of(task, columnTask) + private lazy val timestampFormatters: Map[String, TimestampFormatter] = + task.getColumns.asScala.map { columnTask => + columnTask.getName -> TimestampFormatter + .builder( + columnTask.getFormat.orElse(task.getDefaultTimestampFormat), + true + ) + .setDefaultZoneFromString( + columnTask.getTimeZoneId.orElse(task.getDefaultTimeZoneId) + ) + .setDefaultDateFromString( + columnTask.getDate.orElse(task.getDefaultDate) + ) + .build() }.toMap private lazy val attributeTypes: Map[String, DynamodbAttributeValueType] = - task.getColumns.toSeq + task.getColumns.asScala .filter(_.getAttributeType.isPresent) .map { columnTask => columnTask.getName -> DynamodbAttributeValueType( @@ -116,11 +112,11 @@ case class DynamodbItemSchema(task: DynamodbItemSchema.Task) { def getEmbulkSchema: Schema = embulkSchema - def getTimestampParser(column: Column): Option[TimestampParser] = - timestampParsers.get(column.getName) + def getTimestampFormatter(column: Column): Option[TimestampFormatter] = + timestampFormatters.get(column.getName) - def getTimestampParser(columnName: String): Option[TimestampParser] = - getEmbulkColumn(columnName).flatMap(getTimestampParser) + def getTimestampFormatter(columnName: String): Option[TimestampFormatter] = + getEmbulkColumn(columnName).flatMap(getTimestampFormatter) def getAttributeType(column: Column): Option[DynamodbAttributeValueType] = attributeTypes.get(column.getName) @@ -136,7 +132,7 @@ case class DynamodbItemSchema(task: DynamodbItemSchema.Task) { def getEmbulkColumn(columnIndex: Int): Option[Column] = Try(getEmbulkSchema.getColumn(columnIndex)).toOption - def isItemAsJson: Boolean = task.getColumns.toSeq.isEmpty + def isItemAsJson: Boolean = task.getColumns.asScala.isEmpty def visitColumns(visitor: DynamodbItemColumnVisitor): Unit = getEmbulkSchema.visitColumns(visitor) diff --git a/src/main/scala/org/embulk/input/dynamodb/operation/AbstractDynamodbOperation.scala b/src/main/scala/org/embulk/input/dynamodb/operation/AbstractDynamodbOperation.scala index 9a34352..9dcda78 100644 --- a/src/main/scala/org/embulk/input/dynamodb/operation/AbstractDynamodbOperation.scala +++ b/src/main/scala/org/embulk/input/dynamodb/operation/AbstractDynamodbOperation.scala @@ -13,12 +13,8 @@ import com.amazonaws.services.dynamodbv2.model.{ ReturnConsumedCapacity, Select } -import org.embulk.config.{ - Config, - ConfigDefault, - ConfigException, - Task => EmbulkTask -} +import org.embulk.config.ConfigException +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import org.embulk.input.dynamodb.item.DynamodbAttributeValue import scala.jdk.CollectionConverters._ @@ -159,7 +155,9 @@ abstract class AbstractDynamodbOperation( throw new ConfigException( "\"batch_size\" must be greater than or equal to 1." ) - req.setLimit(JInteger.valueOf(v)) // Note: Use BatchSize for the limit per a request. + req.setLimit( + JInteger.valueOf(v) + ) // Note: Use BatchSize for the limit per a request. } task.getProjectionExpression.ifPresent(req.setProjectionExpression) task.getReturnConsumedCapacity.ifPresent(req.setReturnConsumedCapacity) diff --git a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbOperationProxy.scala b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbOperationProxy.scala index 7b5edb7..8863b27 100644 --- a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbOperationProxy.scala +++ b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbOperationProxy.scala @@ -4,11 +4,12 @@ import java.util.Optional import com.amazonaws.services.dynamodbv2.model.AttributeValue import com.amazonaws.services.dynamodbv2.AmazonDynamoDB -import org.embulk.config.{ - Config, - ConfigDefault, - ConfigException, - Task => EmbulkTask +import org.embulk.config.ConfigException +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} +import org.embulk.input.dynamodb.operation.{ + DynamodbQueryOperation, + DynamodbScanOperation, + EmbulkDynamodbOperation } object DynamodbOperationProxy { diff --git a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala index 3667880..b4c99b3 100644 --- a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala +++ b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala @@ -2,7 +2,7 @@ package org.embulk.input.dynamodb.operation import com.amazonaws.services.dynamodbv2.model.{AttributeValue, QueryRequest} import com.amazonaws.services.dynamodbv2.AmazonDynamoDB -import org.embulk.config.{Config, ConfigDefault} +import org.embulk.util.config.{Config, ConfigDefault} import org.embulk.input.dynamodb.logger import scala.jdk.CollectionConverters._ diff --git a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala index ca57916..cc76d1d 100644 --- a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala +++ b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala @@ -4,7 +4,8 @@ import java.util.Optional import com.amazonaws.services.dynamodbv2.model.{AttributeValue, ScanRequest} import com.amazonaws.services.dynamodbv2.AmazonDynamoDB -import org.embulk.config.{Config, ConfigDefault, ConfigException} +import org.embulk.config.ConfigException +import org.embulk.util.config.{Config, ConfigDefault} import org.embulk.input.dynamodb.logger import scala.jdk.CollectionConverters._ diff --git a/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala b/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala index 9c7c29b..6bea047 100644 --- a/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala +++ b/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala @@ -1,7 +1,7 @@ package org.embulk.input.dynamodb import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException -import org.embulk.config.{ConfigException, ConfigSource} +import org.embulk.config.{ConfigSource, ConfigException} import org.embulk.input.dynamodb.aws.AwsCredentials import org.embulk.input.dynamodb.testutil.EmbulkTestBase import org.hamcrest.CoreMatchers._ @@ -36,7 +36,7 @@ class AwsCredentialsTest extends EmbulkTestBase { ) def doTest(inConfig: ConfigSource): Unit = { - val task: PluginTask = inConfig.loadConfig(classOf[PluginTask]) + val task: PluginTask = PluginTask.load(inConfig) val provider = AwsCredentials(task).createAwsCredentialsProvider val cred = provider.getCredentials assertThat(cred.getAWSAccessKeyId, notNullValue()) @@ -80,9 +80,12 @@ class AwsCredentialsTest extends EmbulkTestBase { val inConfig: ConfigSource = defaultInConfig .set("auth_method", "basic") - Assert.assertThrows(classOf[ConfigException], () => { - doTest(inConfig) - }) + Assert.assertThrows( + classOf[ConfigException], + () => { + doTest(inConfig) + } + ) } @Test @@ -110,9 +113,12 @@ class AwsCredentialsTest extends EmbulkTestBase { .set("auth_method", "profile") .set("profile_name", "DO_NOT_EXIST") - Assert.assertThrows(classOf[IllegalArgumentException], () => { - doTest(inConfig) - }) + Assert.assertThrows( + classOf[IllegalArgumentException], + () => { + doTest(inConfig) + } + ) } @Test @@ -133,8 +139,11 @@ class AwsCredentialsTest extends EmbulkTestBase { .set("role_arn", "DO_NOT_EXIST") .set("role_session_name", "dummy") - Assert.assertThrows(classOf[AWSSecurityTokenServiceException], () => { - doTest(inConfig) - }) + Assert.assertThrows( + classOf[AWSSecurityTokenServiceException], + () => { + doTest(inConfig) + } + ) } } diff --git a/src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala b/src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala index 3452a10..6005112 100644 --- a/src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala +++ b/src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala @@ -105,9 +105,12 @@ class DynamodbOperationTest extends EmbulkTestBase { |scan: | limit: 1 |""".stripMargin) - runInput(inScanConfig, { result => - assert(result.size.equals(1)) - }) + runInput( + inScanConfig, + { result => + assert(result.size.equals(1)) + } + ) val inQueryConfig: ConfigSource = loadConfigSourceFromYamlString(s""" |type: dynamodb @@ -124,8 +127,11 @@ class DynamodbOperationTest extends EmbulkTestBase { | ":v": {S: a} | limit: 1 |""".stripMargin) - runInput(inQueryConfig, { result => - assert(result.size.equals(1)) - }) + runInput( + inQueryConfig, + { result => + assert(result.size.equals(1)) + } + ) } } diff --git a/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationBackwardCompatibilityTest.scala b/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationBackwardCompatibilityTest.scala index 3db620e..d314c0c 100644 --- a/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationBackwardCompatibilityTest.scala +++ b/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationBackwardCompatibilityTest.scala @@ -88,7 +88,8 @@ class DynamodbQueryOperationBackwardCompatibilityTest extends EmbulkTestBase { } runInput( - embulkInConfig, { result: Seq[Seq[AnyRef]] => + embulkInConfig, + { result: Seq[Seq[AnyRef]] => val head = result.head assertThat(head(0).toString, is("key-1")) assertThat(head(1).asInstanceOf[Long], is(0L)) diff --git a/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationBackwardCompatibilityTest.scala b/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationBackwardCompatibilityTest.scala index 2bab719..a007329 100644 --- a/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationBackwardCompatibilityTest.scala +++ b/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationBackwardCompatibilityTest.scala @@ -88,7 +88,8 @@ class DynamodbScanOperationBackwardCompatibilityTest extends EmbulkTestBase { } runInput( - embulkInConfig, { result: Seq[Seq[AnyRef]] => + embulkInConfig, + { result: Seq[Seq[AnyRef]] => val head = result.head assertThat(head(0).toString, is("key-1")) assertThat(head(1).asInstanceOf[Long], is(0L)) diff --git a/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala b/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala index 0355666..a45e031 100644 --- a/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala +++ b/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala @@ -75,22 +75,21 @@ trait EmbulkTestBase { } def runInput(inConfig: ConfigSource, test: Seq[Seq[AnyRef]] => Unit): Unit = { - runtime.getInstance(classOf[DynamodbInputPlugin]).tap { plugin => - plugin.transaction( - inConfig, - (taskSource: TaskSource, schema: Schema, taskCount: Int) => { - val output: MockPageOutput = new MockPageOutput() - val reports: Seq[TaskReport] = 0.until(taskCount).map { taskIndex => - plugin.run(taskSource, schema, taskIndex, output) - } - output.finish() + val plugin = new DynamodbInputPlugin() + plugin.transaction( + inConfig, + (taskSource: TaskSource, schema: Schema, taskCount: Int) => { + val output: MockPageOutput = new MockPageOutput() + val reports: Seq[TaskReport] = 0.until(taskCount).map { taskIndex => + plugin.run(taskSource, schema, taskIndex, output) + } + output.finish() - test(Pages.toObjects(schema, output.pages).asScala.toSeq.map(_.toSeq)) + test(Pages.toObjects(schema, output.pages).asScala.toSeq.map(_.toSeq)) - reports.asJava - } - ) - } + reports.asJava + } + ) } def loadConfigSourceFromYamlString(yaml: String): ConfigSource = {