diff --git a/build.gradle b/build.gradle index 782a2e8..81a2d74 100644 --- a/build.gradle +++ b/build.gradle @@ -31,10 +31,10 @@ dependencies { compile "dev.zio:zio-macros-core_2.13:0.6.2" testCompile "junit:junit:4.+" + testCompile "org.embulk:embulk-core:0.9.23:tests" testCompile "org.embulk:embulk-standards:0.9.23" testCompile "org.embulk:embulk-deps-buffer:0.9.23" testCompile "org.embulk:embulk-deps-config:0.9.23" - testCompile "org.embulk:embulk-test:0.9.23" } compileScala { diff --git a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala index 33dcb47..3667880 100644 --- a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala +++ b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala @@ -51,7 +51,7 @@ case class DynamodbQueryOperation(task: DynamodbQueryOperation.Task) loadableRecords match { case Some(v) if (result.getCount > v) => f(result.getItems.asScala.take(v.toInt).map(_.asScala.toMap).toSeq) - case None => + case _ => f(result.getItems.asScala.map(_.asScala.toMap).toSeq) Option(result.getLastEvaluatedKey).foreach { lastEvaluatedKey => runInternal( diff --git a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala index a48d727..ca57916 100644 --- a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala +++ b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala @@ -71,7 +71,7 @@ case class DynamodbScanOperation(task: DynamodbScanOperation.Task) loadableRecords match { case Some(v) if (result.getCount > v) => f(result.getItems.asScala.take(v.toInt).map(_.asScala.toMap).toSeq) - case None => + case _ => f(result.getItems.asScala.map(_.asScala.toMap).toSeq) Option(result.getLastEvaluatedKey).foreach { lastEvaluatedKey => runInternal( diff --git a/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala b/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala index 0372ece..b9110b0 100644 --- a/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala +++ b/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala @@ -6,7 +6,7 @@ import org.embulk.input.dynamodb.aws.AwsCredentials import org.embulk.input.dynamodb.testutil.EmbulkTestBase import org.hamcrest.CoreMatchers._ import org.hamcrest.MatcherAssert.assertThat -import org.junit.{Assume, Test} +import org.junit.{Assert, Test} class AwsCredentialsTest extends EmbulkTestBase { @@ -44,32 +44,31 @@ class AwsCredentialsTest extends EmbulkTestBase { } def defaultInConfig: ConfigSource = { - embulk.configLoader().fromYamlString(s""" - |type: dynamodb - |region: us-east-1 - |table: hoge - |operation: scan - |columns: - | - {name: key1, type: string} - | - {name: key2, type: long} - | - {name: value1, type: string} - |""".stripMargin) + loadConfigSourceFromYamlString(s""" + |type: dynamodb + |region: us-east-1 + |table: hoge + |operation: scan + |columns: + | - {name: key1, type: string} + | - {name: key2, type: long} + | - {name: value1, type: string} + |""".stripMargin) } @deprecated(since = "0.3.0") @Test - def notSetAuthMethod_SetCredentials_deprecated(): Unit = { - Assume.assumeTrue(runAwsCredentialsTest) - val inConfig: ConfigSource = defaultInConfig - .set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY) - .set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) + def notSetAuthMethod_SetCredentials_deprecated(): Unit = + if (runAwsCredentialsTest) { + val inConfig: ConfigSource = defaultInConfig + .set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY) + .set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) - doTest(inConfig) - } + doTest(inConfig) + } @Test - def notSetAuthMethod_SetCredentials(): Unit = { - Assume.assumeTrue(runAwsCredentialsTest) + def notSetAuthMethod_SetCredentials(): Unit = if (runAwsCredentialsTest) { val inConfig: ConfigSource = defaultInConfig .set("access_key_id", EMBULK_DYNAMODB_TEST_ACCESS_KEY) .set("secret_access_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) @@ -79,8 +78,7 @@ class AwsCredentialsTest extends EmbulkTestBase { @deprecated(since = "0.3.0") @Test - def setAuthMethod_Basic_deprecated(): Unit = { - Assume.assumeTrue(runAwsCredentialsTest) + def setAuthMethod_Basic_deprecated(): Unit = if (runAwsCredentialsTest) { val inConfig: ConfigSource = defaultInConfig .set("auth_method", "basic") .set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY) @@ -90,8 +88,7 @@ class AwsCredentialsTest extends EmbulkTestBase { } @Test - def setAuthMethod_Basic(): Unit = { - Assume.assumeTrue(runAwsCredentialsTest) + def setAuthMethod_Basic(): Unit = if (runAwsCredentialsTest) { val inConfig: ConfigSource = defaultInConfig .set("auth_method", "basic") .set("access_key_id", EMBULK_DYNAMODB_TEST_ACCESS_KEY) @@ -101,42 +98,46 @@ class AwsCredentialsTest extends EmbulkTestBase { } @deprecated(since = "0.3.0") - @Test(expected = classOf[ConfigException]) - def throwIfSetAccessKeyAndAccessKeyId(): Unit = { - Assume.assumeTrue(runAwsCredentialsTest) + @Test + def throwIfSetAccessKeyAndAccessKeyId(): Unit = if (runAwsCredentialsTest) { val inConfig: ConfigSource = defaultInConfig .set("auth_method", "basic") .set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY) .set("access_key_id", EMBULK_DYNAMODB_TEST_ACCESS_KEY) .set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) - doTest(inConfig) + Assert.assertThrows(classOf[ConfigException], () => { + doTest(inConfig) + }) } @deprecated(since = "0.3.0") - @Test(expected = classOf[ConfigException]) - def throwIfSetSecretKeyAndSecretAccessKeyId(): Unit = { - Assume.assumeTrue(runAwsCredentialsTest) - val inConfig: ConfigSource = defaultInConfig - .set("auth_method", "basic") - .set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY) - .set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) - .set("secret_access_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) - - doTest(inConfig) - } + @Test + def throwIfSetSecretKeyAndSecretAccessKeyId(): Unit = + if (runAwsCredentialsTest) { + val inConfig: ConfigSource = defaultInConfig + .set("auth_method", "basic") + .set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY) + .set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) + .set("secret_access_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) + + Assert.assertThrows(classOf[ConfigException], () => { + doTest(inConfig) + }) + } - @Test(expected = classOf[ConfigException]) + @Test def setAuthMethod_Basic_NotSet(): Unit = { val inConfig: ConfigSource = defaultInConfig .set("auth_method", "basic") - doTest(inConfig) + Assert.assertThrows(classOf[ConfigException], () => { + doTest(inConfig) + }) } @Test - def setAuthMethod_Env(): Unit = { - Assume.assumeTrue(runAwsCredentialsTest) + def setAuthMethod_Env(): Unit = if (runAwsCredentialsTest) { // NOTE: Requires to set the env vars like 'AWS_ACCESS_KEY_ID' and so on when testing. val inConfig: ConfigSource = defaultInConfig .set("auth_method", "env") @@ -145,8 +146,7 @@ class AwsCredentialsTest extends EmbulkTestBase { } @Test - def setAuthMethod_Profile(): Unit = { - Assume.assumeTrue(runAwsCredentialsTest) + def setAuthMethod_Profile(): Unit = if (runAwsCredentialsTest) { // NOTE: Requires to set credentials to '~/.aws' when testing. val inConfig: ConfigSource = defaultInConfig .set("auth_method", "profile") @@ -155,18 +155,19 @@ class AwsCredentialsTest extends EmbulkTestBase { doTest(inConfig) } - @Test(expected = classOf[IllegalArgumentException]) + @Test def setAuthMethod_Profile_NotExistProfileName(): Unit = { val inConfig: ConfigSource = defaultInConfig .set("auth_method", "profile") .set("profile_name", "DO_NOT_EXIST") - doTest(inConfig) + Assert.assertThrows(classOf[IllegalArgumentException], () => { + doTest(inConfig) + }) } @Test - def setAuthMethod_assume_role(): Unit = { - Assume.assumeTrue(runAwsCredentialsTest) + def setAuthMethod_assume_role(): Unit = if (runAwsCredentialsTest) { val inConfig: ConfigSource = defaultInConfig .set("auth_method", "assume_role") .set("role_arn", EMBULK_DYNAMODB_TEST_ASSUME_ROLE_ROLE_ARN) @@ -175,14 +176,16 @@ class AwsCredentialsTest extends EmbulkTestBase { doTest(inConfig) } - @Test(expected = classOf[AWSSecurityTokenServiceException]) - def setAuthMethod_assume_role_NotExistRoleArn(): Unit = { - Assume.assumeTrue(runAwsCredentialsTest) - val inConfig: ConfigSource = defaultInConfig - .set("auth_method", "assume_role") - .set("role_arn", "DO_NOT_EXIST") - .set("role_session_name", "dummy") - - doTest(inConfig) - } + @Test + def setAuthMethod_assume_role_NotExistRoleArn(): Unit = + if (runAwsCredentialsTest) { + val inConfig: ConfigSource = defaultInConfig + .set("auth_method", "assume_role") + .set("role_arn", "DO_NOT_EXIST") + .set("role_session_name", "dummy") + + Assert.assertThrows(classOf[AWSSecurityTokenServiceException], () => { + doTest(inConfig) + }) + } } diff --git a/src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala b/src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala new file mode 100644 index 0000000..3452a10 --- /dev/null +++ b/src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala @@ -0,0 +1,131 @@ +package org.embulk.input.dynamodb + +import com.amazonaws.services.dynamodbv2.model.{ + AttributeDefinition, + AttributeValue, + BillingMode, + CreateTableRequest, + KeySchemaElement, + KeyType, + PutItemRequest, + ScalarAttributeType +} +import org.embulk.config.ConfigSource +import org.embulk.input.dynamodb.testutil.EmbulkTestBase +import org.junit.Test + +import scala.jdk.CollectionConverters._ + +class DynamodbOperationTest extends EmbulkTestBase { + + @Test + def limitTest(): Unit = { + val tableName = "limit_test" + cleanupTable(tableName) + withDynamodb { dynamodb => + dynamodb.createTable( + new CreateTableRequest() + .withTableName(tableName) + .withAttributeDefinitions( + new AttributeDefinition() + .withAttributeName("pk") + .withAttributeType(ScalarAttributeType.S) + ) + .withKeySchema( + new KeySchemaElement() + .withAttributeName("pk") + .withKeyType(KeyType.HASH) + ) + .withBillingMode(BillingMode.PAY_PER_REQUEST) + ) + dynamodb.putItem( + new PutItemRequest() + .withTableName(tableName) + .withItem( + Map + .newBuilder[String, AttributeValue] + .addOne("pk", new AttributeValue().withS("a")) + .result() + .asJava + ) + ) + dynamodb.putItem( + new PutItemRequest() + .withTableName(tableName) + .withItem( + Map + .newBuilder[String, AttributeValue] + .addOne("pk", new AttributeValue().withS("a")) + .result() + .asJava + ) + ) + dynamodb.putItem( + new PutItemRequest() + .withTableName(tableName) + .withItem( + Map + .newBuilder[String, AttributeValue] + .addOne("pk", new AttributeValue().withS("a")) + .result() + .asJava + ) + ) + dynamodb.putItem( + new PutItemRequest() + .withTableName(tableName) + .withItem( + Map + .newBuilder[String, AttributeValue] + .addOne("pk", new AttributeValue().withS("b")) + .result() + .asJava + ) + ) + dynamodb.putItem( + new PutItemRequest() + .withTableName(tableName) + .withItem( + Map + .newBuilder[String, AttributeValue] + .addOne("pk", new AttributeValue().withS("b")) + .result() + .asJava + ) + ) + } + + val inScanConfig: ConfigSource = loadConfigSourceFromYamlString(s""" + |type: dynamodb + |table: $tableName + |endpoint: http://$dynamoDBHost:$dynamoDBPort/ + |auth_method: basic + |access_key_id: dummy + |secret_access_key: dummy + |scan: + | limit: 1 + |""".stripMargin) + runInput(inScanConfig, { result => + assert(result.size.equals(1)) + }) + + val inQueryConfig: ConfigSource = loadConfigSourceFromYamlString(s""" + |type: dynamodb + |table: $tableName + |endpoint: http://$dynamoDBHost:$dynamoDBPort/ + |auth_method: basic + |access_key_id: dummy + |secret_access_key: dummy + |query: + | key_condition_expression: "#x = :v" + | expression_attribute_names: + | "#x": pk + | expression_attribute_values: + | ":v": {S: a} + | limit: 1 + |""".stripMargin) + runInput(inQueryConfig, { result => + assert(result.size.equals(1)) + }) + } +} diff --git a/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationTest.scala b/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationBackwardCompatibilityTest.scala similarity index 72% rename from src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationTest.scala rename to src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationBackwardCompatibilityTest.scala index 3a8c8c5..d911911 100644 --- a/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationTest.scala +++ b/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationBackwardCompatibilityTest.scala @@ -12,7 +12,6 @@ import com.amazonaws.services.dynamodbv2.model.{ } import org.embulk.config.ConfigSource import org.embulk.input.dynamodb.testutil.EmbulkTestBase -import org.embulk.spi.util.Pages import org.hamcrest.CoreMatchers._ import org.hamcrest.MatcherAssert.assertThat import org.junit.Test @@ -20,7 +19,7 @@ import org.msgpack.value.Value import scala.jdk.CollectionConverters._ -class DynamodbQueryOperationTest extends EmbulkTestBase { +class DynamodbQueryOperationBackwardCompatibilityTest extends EmbulkTestBase { private def testBackwardCompatibility(embulkInConfig: ConfigSource): Unit = { cleanupTable("EMBULK_DYNAMODB_TEST_TABLE") @@ -86,58 +85,52 @@ class DynamodbQueryOperationTest extends EmbulkTestBase { .asJava ) ) + } - val path = embulk.createTempFile("csv") - val result = embulk - .inputBuilder() - .in(embulkInConfig) - .outputPath(path) - .preview() - - val pages = result.getPages - val head = Pages.toObjects(result.getSchema, pages.get(0)).get(0) - - assertThat(head(0).toString, is("key-1")) - assertThat(head(1).asInstanceOf[Long], is(0L)) - assertThat(head(2).asInstanceOf[Double], is(42.195)) - assertThat(head(3).asInstanceOf[Boolean], is(true)) + runInput( + embulkInConfig, { result: Seq[Seq[AnyRef]] => + val head = result.head + assertThat(head(0).toString, is("key-1")) + assertThat(head(1).asInstanceOf[Long], is(0L)) + assertThat(head(2).asInstanceOf[Double], is(42.195)) + assertThat(head(3).asInstanceOf[Boolean], is(true)) - val arrayValue = head(4).asInstanceOf[Value].asArrayValue() - assertThat(arrayValue.size(), is(2)) - assertThat(arrayValue.get(0).asStringValue().toString, is("list-value")) - assertThat(arrayValue.get(1).asIntegerValue().asLong(), is(123L)) + val arrayValue = head(4).asInstanceOf[Value].asArrayValue() + assertThat(arrayValue.size(), is(2)) + assertThat(arrayValue.get(0).asStringValue().toString, is("list-value")) + assertThat(arrayValue.get(1).asIntegerValue().asLong(), is(123L)) - val mapValue = head(5).asInstanceOf[Value].asMapValue() - assert(mapValue.keySet().asScala.map(_.toString).contains("map-key-1")) - assertThat( - mapValue - .entrySet() - .asScala - .filter(_.getKey.toString.equals("map-key-1")) - .head - .getValue - .toString, - is("map-value-1") - ) - assert(mapValue.keySet().asScala.map(_.toString).contains("map-key-2")) - assertThat( - mapValue - .entrySet() - .asScala - .filter(_.getKey.toString.equals("map-key-2")) - .head - .getValue - .asIntegerValue() - .asLong(), - is(456L) - ) - - } + val mapValue = head(5).asInstanceOf[Value].asMapValue() + assert(mapValue.keySet().asScala.map(_.toString).contains("map-key-1")) + assertThat( + mapValue + .entrySet() + .asScala + .filter(_.getKey.toString.equals("map-key-1")) + .head + .getValue + .toString, + is("map-value-1") + ) + assert(mapValue.keySet().asScala.map(_.toString).contains("map-key-2")) + assertThat( + mapValue + .entrySet() + .asScala + .filter(_.getKey.toString.equals("map-key-2")) + .head + .getValue + .asIntegerValue() + .asLong(), + is(456L) + ) + } + ) } @Test def deprecatedQueryOperationTest(): Unit = { - val inConfig: ConfigSource = embulk.configLoader().fromYamlString(s""" + val inConfig: ConfigSource = loadConfigSourceFromYamlString(s""" |type: dynamodb |end_point: http://${dynamoDBHost}:${dynamoDBPort}/ |table: EMBULK_DYNAMODB_TEST_TABLE @@ -161,7 +154,7 @@ class DynamodbQueryOperationTest extends EmbulkTestBase { @Test def keepTheSameBehaviourAsDeprecatedQueryOperationTest(): Unit = { - val inConfig: ConfigSource = embulk.configLoader().fromYamlString(s""" + val inConfig: ConfigSource = loadConfigSourceFromYamlString(s""" |type: dynamodb |endpoint: http://${dynamoDBHost}:${dynamoDBPort}/ |table: EMBULK_DYNAMODB_TEST_TABLE diff --git a/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationTest.scala b/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationBackwardCompatibilityTest.scala similarity index 72% rename from src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationTest.scala rename to src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationBackwardCompatibilityTest.scala index 0dc7fb4..a248f67 100644 --- a/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationTest.scala +++ b/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationBackwardCompatibilityTest.scala @@ -12,7 +12,6 @@ import com.amazonaws.services.dynamodbv2.model.{ } import org.embulk.config.ConfigSource import org.embulk.input.dynamodb.testutil.EmbulkTestBase -import org.embulk.spi.util.Pages import org.hamcrest.CoreMatchers._ import org.hamcrest.MatcherAssert.assertThat import org.junit.Test @@ -20,7 +19,7 @@ import org.msgpack.value.Value import scala.jdk.CollectionConverters._ -class DynamodbScanOperationTest extends EmbulkTestBase { +class DynamodbScanOperationBackwardCompatibilityTest extends EmbulkTestBase { private def testBackwardCompatibility(embulkInConfig: ConfigSource): Unit = { cleanupTable("EMBULK_DYNAMODB_TEST_TABLE") @@ -88,56 +87,51 @@ class DynamodbScanOperationTest extends EmbulkTestBase { ) } - val path = embulk.createTempFile("csv") - val result = embulk - .inputBuilder() - .in(embulkInConfig) - .outputPath(path) - .preview() + runInput( + embulkInConfig, { result: Seq[Seq[AnyRef]] => + val head = result.head + assertThat(head(0).toString, is("key-1")) + assertThat(head(1).asInstanceOf[Long], is(0L)) + assertThat(head(2).asInstanceOf[Double], is(42.195)) + assertThat(head(3).asInstanceOf[Boolean], is(true)) - val pages = result.getPages - val head = Pages.toObjects(result.getSchema, pages.get(0)).get(0) + val arrayValue = head(4).asInstanceOf[Value].asArrayValue() + assertThat(arrayValue.size(), is(2)) + assertThat(arrayValue.get(0).asStringValue().toString, is("list-value")) + assertThat(arrayValue.get(1).asIntegerValue().asLong(), is(123L)) - assertThat(head(0).toString, is("key-1")) - assertThat(head(1).asInstanceOf[Long], is(0L)) - assertThat(head(2).asInstanceOf[Double], is(42.195)) - assertThat(head(3).asInstanceOf[Boolean], is(true)) - - val arrayValue = head(4).asInstanceOf[Value].asArrayValue() - assertThat(arrayValue.size(), is(2)) - assertThat(arrayValue.get(0).asStringValue().toString, is("list-value")) - assertThat(arrayValue.get(1).asIntegerValue().asLong(), is(123L)) - - val mapValue = head(5).asInstanceOf[Value].asMapValue() - assert(mapValue.keySet().asScala.map(_.toString).contains("map-key-1")) - assertThat( - mapValue - .entrySet() - .asScala - .filter(_.getKey.toString.equals("map-key-1")) - .head - .getValue - .toString, - is("map-value-1") - ) - assert(mapValue.keySet().asScala.map(_.toString).contains("map-key-2")) - assertThat( - mapValue - .entrySet() - .asScala - .filter(_.getKey.toString.equals("map-key-2")) - .head - .getValue - .asIntegerValue() - .asLong(), - is(456L) + val mapValue = head(5).asInstanceOf[Value].asMapValue() + assert(mapValue.keySet().asScala.map(_.toString).contains("map-key-1")) + assertThat( + mapValue + .entrySet() + .asScala + .filter(_.getKey.toString.equals("map-key-1")) + .head + .getValue + .toString, + is("map-value-1") + ) + assert(mapValue.keySet().asScala.map(_.toString).contains("map-key-2")) + assertThat( + mapValue + .entrySet() + .asScala + .filter(_.getKey.toString.equals("map-key-2")) + .head + .getValue + .asIntegerValue() + .asLong(), + is(456L) + ) + } ) } @Test def deprecatedScanOperationTest(): Unit = { - val inConfig: ConfigSource = embulk.configLoader().fromYamlString(s""" + val inConfig: ConfigSource = loadConfigSourceFromYamlString(s""" |type: dynamodb |end_point: http://${dynamoDBHost}:${dynamoDBPort}/ |table: EMBULK_DYNAMODB_TEST_TABLE @@ -159,7 +153,7 @@ class DynamodbScanOperationTest extends EmbulkTestBase { @Test def keepTheSameBehaviourAsDeprecatedScanOperationTest(): Unit = { - val inConfig: ConfigSource = embulk.configLoader().fromYamlString(s""" + val inConfig: ConfigSource = loadConfigSourceFromYamlString(s""" |type: dynamodb |endpoint: http://${dynamoDBHost}:${dynamoDBPort}/ |table: EMBULK_DYNAMODB_TEST_TABLE diff --git a/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala b/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala index 8eec403..0355666 100644 --- a/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala +++ b/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala @@ -6,11 +6,15 @@ import com.amazonaws.services.dynamodbv2.{ AmazonDynamoDB, AmazonDynamoDBClientBuilder } +import org.embulk.config.{ConfigLoader, ConfigSource, TaskReport, TaskSource} import org.embulk.input.dynamodb.DynamodbInputPlugin -import org.embulk.spi.InputPlugin -import org.embulk.test.TestingEmbulk -import org.junit.{After, Rule} +import org.embulk.spi.Schema +import org.embulk.EmbulkTestRuntime +import org.embulk.spi.TestPageBuilderReader.MockPageOutput +import org.embulk.spi.util.Pages +import org.junit.Rule +import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} import scala.util.chaining._ @@ -49,20 +53,7 @@ trait EmbulkTestBase { } @Rule - def embulk: TestingEmbulk = - TestingEmbulk - .builder() - .registerPlugin( - classOf[InputPlugin], - "dynamodb", - classOf[DynamodbInputPlugin] - ) - .build() - - @After - def destroyEmbulk(): Unit = { - embulk.destroy() - } + def runtime: EmbulkTestRuntime = new EmbulkTestRuntime() def getEnvironmentVariableOrShowErrorMessage(name: String): String = { try { @@ -82,4 +73,27 @@ trait EmbulkTestBase { ) } } + + def runInput(inConfig: ConfigSource, test: Seq[Seq[AnyRef]] => Unit): Unit = { + runtime.getInstance(classOf[DynamodbInputPlugin]).tap { plugin => + plugin.transaction( + inConfig, + (taskSource: TaskSource, schema: Schema, taskCount: Int) => { + val output: MockPageOutput = new MockPageOutput() + val reports: Seq[TaskReport] = 0.until(taskCount).map { taskIndex => + plugin.run(taskSource, schema, taskIndex, output) + } + output.finish() + + test(Pages.toObjects(schema, output.pages).asScala.toSeq.map(_.toSeq)) + + reports.asJava + } + ) + } + } + + def loadConfigSourceFromYamlString(yaml: String): ConfigSource = { + new ConfigLoader(runtime.getModelManager).fromYamlString(yaml) + } }