Skip to content

Commit

Permalink
Fix deprecation against embulk 0.10.41 upgrade
Browse files Browse the repository at this point in the history
Signed-off-by: Takahiro Nakayama <[email protected]>
  • Loading branch information
civitaspo committed Feb 14, 2023
1 parent 356f10f commit 084c579
Show file tree
Hide file tree
Showing 24 changed files with 200 additions and 167 deletions.
4 changes: 2 additions & 2 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# https://scalameta.org/scalafmt/#Configuration

version = "2.3.2"
version = "3.7.1"
runner.dialect = "scala3"
newlines.alwaysBeforeElseAfterCurlyIf = true
newlines.alwaysBeforeTopLevelStatements = true
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,11 @@ You can see more examples [here](./example).
```shell
$ ./run_dynamodb_local.sh
# Set dummy credentials (access_key_id=dummy and secret_access_key=dummy)
$ aws configure
$ ./example/prepare_dynamodb_table.sh
$ ./gradlew classpath
$ embulk run example/config-query.yml -Ilib
$ ./gradlew gem
$ embulk run example/config-query.yml -Ibuild/gemContents/lib
```

### Run tests
Expand Down Expand Up @@ -230,4 +232,4 @@ $ ./gradlew gemPush

## License

[MIT LICENSE](./LICENSE)
[MIT LICENSE](./LICENSE.txt)
32 changes: 17 additions & 15 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ plugins {
id "scala"
id "maven-publish"
id "org.embulk.embulk-plugins" version "0.5.5"
id "com.diffplug.spotless" version "6.15.0"
// note: Incompatible because this component declares an API of a component compatible with Java 11 and the consumer needed a runtime of a component compatible with Java 8
// id "com.diffplug.spotless" version "6.15.0"
// https://github.com/diffplug/spotless/blob/main/plugin-gradle/CHANGES.md#6130---2023-01-14
id "com.diffplug.spotless" version "6.13.0"
// note: We cannot use the latest version because of the following error.
// > org/eclipse/jgit/lib/Repository has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0
// id "com.palantir.git-version" version "0.15.0"
Expand All @@ -26,25 +29,24 @@ sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
compileOnly "org.scala-lang:scala-library:2.13.1"

implementation "org.embulk:embulk-core:0.9.23"
// note: The upper versions of embulk includes Guava Dependency removal,
// so we need to specify the version of embulk to avoid some errors.
// We may need to update the version of embulk in the future.
def embulkVersion = "0.10.41"
compileOnly "org.embulk:embulk-api:${embulkVersion}"
compileOnly "org.embulk:embulk-spi:${embulkVersion}"
implementation "org.scala-lang:scala-library:2.13.1"

implementation "org.embulk:embulk-util-config:0.3.2"
implementation "org.embulk:embulk-util-json:0.1.1"
implementation "org.embulk:embulk-util-timestamp:0.2.1"
implementation "com.amazonaws:aws-java-sdk-dynamodb:1.11.711"
implementation "com.amazonaws:aws-java-sdk-sts:1.11.711"
// For @delegate macro.
implementation "dev.zio:zio-macros-core_2.13:0.6.2"

testImplementation "junit:junit:4.+"
testImplementation "org.embulk:embulk-core:0.9.23:tests"
testImplementation "org.embulk:embulk-standards:0.9.23"
testImplementation "org.embulk:embulk-deps-buffer:0.9.23"
testImplementation "org.embulk:embulk-deps-config:0.9.23"
}
compileScala {
scalaCompileOptions.additionalParameters = [
"-Ymacro-annotations"
]
testImplementation "org.embulk:embulk-junit4:${embulkVersion}"
testImplementation "org.embulk:embulk-deps:${embulkVersion}"
testImplementation "org.embulk:embulk-input-config:0.10.36"
}
test {
jvmArgs '-Xms4g', '-Xmx4g', '-XX:MaxMetaspaceSize=1g'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class DynamodbInputPlugin extends InputPlugin {
val schema: Schema = DynamodbItemSchema(task).getEmbulkSchema
val taskCount: Int = DynamodbOperationProxy(task).getEmbulkTaskCount

control.run(task.dump(), schema, taskCount)
Exec.newConfigDiff()
control.run(task.toTaskSource(), schema, taskCount)
PluginTask.newConfigDiff()
}

override def resume(
Expand All @@ -38,7 +38,7 @@ class DynamodbInputPlugin extends InputPlugin {
output: PageOutput
): TaskReport = {
val task: PluginTask = PluginTask.load(taskSource)
val pageBuilder = new PageBuilder(task.getBufferAllocator, schema, output)
val pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)

Aws(task).withDynamodb { dynamodb =>
DynamodbOperationProxy(task).run(
Expand All @@ -48,16 +48,15 @@ class DynamodbInputPlugin extends InputPlugin {
)
}
pageBuilder.finish()
Exec.newTaskReport()
PluginTask.newTaskReport()
}

override def cleanup(
taskSource: TaskSource,
schema: Schema,
taskCount: Int,
successTaskReports: JList[TaskReport]
): Unit = {
}
): Unit = {}

override def guess(config: ConfigSource): ConfigDiff = {
throw new UnsupportedOperationException
Expand Down
36 changes: 20 additions & 16 deletions src/main/scala/org/embulk/input/dynamodb/PluginTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,45 @@ package org.embulk.input.dynamodb
import java.util.Optional

import org.embulk.config.{
Config,
ConfigDefault,
ConfigDiff,
ConfigException,
ConfigInject,
ConfigSource,
Task,
TaskSource
TaskSource,
TaskReport
}
import org.embulk.util.config.{
Config,
ConfigDefault,
ConfigMapperFactory,
Task => EmbulkTask
}
import org.embulk.input.dynamodb.aws.Aws
import org.embulk.input.dynamodb.item.DynamodbItemSchema
import org.embulk.input.dynamodb.operation.DynamodbOperationProxy
import org.embulk.spi.BufferAllocator
import zio.macros.annotation.delegate

import scala.util.chaining._

trait PluginTask
extends Task
extends EmbulkTask
with Aws.Task
with DynamodbItemSchema.Task
with DynamodbOperationProxy.Task {

@ConfigInject
def getBufferAllocator: BufferAllocator
}
with DynamodbOperationProxy.Task {}

object PluginTask {
private val configMapperFactory: ConfigMapperFactory =
ConfigMapperFactory.builder().addDefaultModules().build()

def load(configSource: ConfigSource): PluginTask = {
configSource
.loadConfig(classOf[PluginTask])
configMapperFactory
.createConfigMapper()
.map(configSource, classOf[PluginTask])
}

def load(taskSource: TaskSource): PluginTask = {
taskSource
.loadTask(classOf[PluginTask])
configMapperFactory.createTaskMapper().map(taskSource, classOf[PluginTask])
}

def newConfigDiff(): ConfigDiff = configMapperFactory.newConfigDiff()
def newTaskReport(): TaskReport = configMapperFactory.newTaskReport()
}
4 changes: 3 additions & 1 deletion src/main/scala/org/embulk/input/dynamodb/aws/Aws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import com.amazonaws.services.dynamodbv2.{
AmazonDynamoDB,
AmazonDynamoDBClientBuilder
}
import org.embulk.util.config.{Task => EmbulkTask}

object Aws {

trait Task
extends AwsCredentials.Task
extends EmbulkTask
with AwsCredentials.Task
with AwsEndpointConfiguration.Task
with AwsClientConfiguration.Task
with AwsDynamodbConfiguration.Task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import java.util.Optional

import com.amazonaws.ClientConfiguration
import com.amazonaws.client.builder.AwsClientBuilder
import org.embulk.config.{Config, ConfigDefault}
import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask}
import org.embulk.input.dynamodb.aws.AwsClientConfiguration.Task

object AwsClientConfiguration {

trait Task {
trait Task extends EmbulkTask {

@Config("http_proxy")
@ConfigDefault("null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import com.amazonaws.auth.profile.{
ProfileCredentialsProvider,
ProfilesConfigFile
}
import org.embulk.config.{Config, ConfigDefault, ConfigException}
import org.embulk.config.ConfigException
import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask}
import org.embulk.input.dynamodb.aws.AwsCredentials.Task
import org.embulk.input.dynamodb.logger
import org.embulk.spi.unit.LocalFile
import zio.macros.annotation.delegate
import org.embulk.util.config.units.LocalFile

object AwsCredentials {

trait Task {
trait Task extends EmbulkTask {

@Config("auth_method")
@ConfigDefault("\"default\"")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package org.embulk.input.dynamodb.aws
import java.util.Optional

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import org.embulk.config.{Config, ConfigDefault}
import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask}
import org.embulk.input.dynamodb.aws.AwsDynamodbConfiguration.Task

object AwsDynamodbConfiguration {

trait Task {
trait Task extends EmbulkTask {

@Config("enable_endpoint_discovery")
@ConfigDefault("null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import java.util.Optional
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions}
import org.embulk.config.{Config, ConfigDefault, ConfigException}
import org.embulk.config.ConfigException
import org.embulk.util.config.{Task => EmbulkTask, Config, ConfigDefault}
import org.embulk.input.dynamodb.aws.AwsEndpointConfiguration.Task
import org.embulk.input.dynamodb.logger
import zio.macros.annotation.delegate

import scala.util.Try

object AwsEndpointConfiguration {

trait Task {
trait Task extends EmbulkTask {
@Config("endpoint")
@ConfigDefault("null")
def getEndpoint: Optional[String]
Expand Down
9 changes: 5 additions & 4 deletions src/main/scala/org/embulk/input/dynamodb/aws/HttpProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package org.embulk.input.dynamodb.aws
import java.util.Optional

import com.amazonaws.{ClientConfiguration, Protocol}
import org.embulk.config.{Config, ConfigDefault, ConfigException}
import org.embulk.config.ConfigException
import org.embulk.util.config.{Task => EmbulkTask, Config, ConfigDefault}
import org.embulk.input.dynamodb.aws.HttpProxy.Task

object HttpProxy {

trait Task {
trait Task extends EmbulkTask {

@Config("host")
@ConfigDefault("null")
Expand Down Expand Up @@ -50,8 +51,8 @@ class HttpProxy(task: Task) {
case None =>
throw new ConfigException(
s"'${task.getProtocol}' is unsupported: `protocol` must be one of [${Protocol.values
.map(v => s"'$v'")
.mkString(", ")}]."
.map(v => s"'$v'")
.mkString(", ")}]."
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import java.nio.charset.StandardCharsets
import java.util.{Optional, List => JList, Map => JMap}

import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.embulk.config.{Config, ConfigDefault, Task => EmbulkTask}
import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask}

import scala.jdk.CollectionConverters._
import scala.util.chaining._

/**
* TODO: I want to bind directly `org.embulk.config.Config`` to `com.amazonaws.services.dynamodbv2.model.AttributeValue`.
* Should I implement `com.amazonaws.transform.JsonUnmarshallerContext`?
**/
/** TODO: I want to bind directly `org.embulk.util.config.Config`` to
* `com.amazonaws.services.dynamodbv2.model.AttributeValue`. Should I implement
* `com.amazonaws.transform.JsonUnmarshallerContext`?
*/
object DynamodbAttributeValue {

trait Task extends EmbulkTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

import org.embulk.input.dynamodb.logger
import org.embulk.spi.time.{Timestamp, TimestampParser}
import org.embulk.spi.time.Timestamp
import org.embulk.util.timestamp.TimestampFormatter
import org.msgpack.value.{Value, ValueFactory}

import scala.jdk.CollectionConverters._
import scala.util.chaining._
import java.time.Instant

object DynamodbAttributeValueEmbulkTypeTransformable {

Expand Down Expand Up @@ -50,7 +52,7 @@ object DynamodbAttributeValueEmbulkTypeTransformable {
case class DynamodbAttributeValueEmbulkTypeTransformable(
attributeValue: DynamodbAttributeValue,
typeEnforcer: Option[DynamodbAttributeValueType] = None,
timestampParser: Option[TimestampParser] = None
timestampFormatter: Option[TimestampFormatter] = None
) {

private def fromAttributeValueType: DynamodbAttributeValueType =
Expand Down Expand Up @@ -142,10 +144,14 @@ case class DynamodbAttributeValueEmbulkTypeTransformable(

Option(fromAttributeValueType match {
case DynamodbAttributeValueType.S =>
if (DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS
.contains(attributeValue.getS)) true
else if (DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS
.contains(attributeValue.getS)) false
if (
DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS
.contains(attributeValue.getS)
) true
else if (
DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS
.contains(attributeValue.getS)
) false
else return None
case DynamodbAttributeValueType.N =>
convertNAsLongOrDouble(attributeValue.getN) match {
Expand All @@ -154,11 +160,15 @@ case class DynamodbAttributeValueEmbulkTypeTransformable(
}
case DynamodbAttributeValueType.B =>
val s = convertBAsString(attributeValue.getB)
if (DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS
.contains(s))
if (
DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS
.contains(s)
)
true
else if (DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS
.contains(s)) false
else if (
DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS
.contains(s)
) false
else return None
case DynamodbAttributeValueType.BOOL => attributeValue.getBOOL
case unsupported =>
Expand Down Expand Up @@ -221,9 +231,9 @@ case class DynamodbAttributeValueEmbulkTypeTransformable(
if (attributeValue.isNull) return None

Option(fromAttributeValueType match {
case DynamodbAttributeValueType.S => attributeValue.getS
case DynamodbAttributeValueType.N => attributeValue.getN
case DynamodbAttributeValueType.B => convertBAsString(attributeValue.getB)
case DynamodbAttributeValueType.S => attributeValue.getS
case DynamodbAttributeValueType.N => attributeValue.getN
case DynamodbAttributeValueType.B => convertBAsString(attributeValue.getB)
case DynamodbAttributeValueType.SS => asMessagePack.map(_.toJson).get
case DynamodbAttributeValueType.NS => asMessagePack.map(_.toJson).get
case DynamodbAttributeValueType.BS => asMessagePack.map(_.toJson).get
Expand All @@ -238,8 +248,8 @@ case class DynamodbAttributeValueEmbulkTypeTransformable(
})
}

def asTimestamp: Option[Timestamp] = {
timestampParser.flatMap(p => asString.map(p.parse))
def asTimestamp: Option[Instant] = {
timestampFormatter.flatMap(p => asString.map(p.parse))
}

}
Loading

0 comments on commit 084c579

Please sign in to comment.