Skip to content

Commit

Permalink
Merge pull request #23 from lulichn/#21
Browse files Browse the repository at this point in the history
Fix a bug: Caused by: scala.MatchError: Some(xxxx) (of class scala.Some)
  • Loading branch information
civitaspo authored Mar 28, 2020
2 parents 264b481 + f10d17f commit ecc885f
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 172 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ dependencies {
compile "dev.zio:zio-macros-core_2.13:0.6.2"

testCompile "junit:junit:4.+"
testCompile "org.embulk:embulk-core:0.9.23:tests"
testCompile "org.embulk:embulk-standards:0.9.23"
testCompile "org.embulk:embulk-deps-buffer:0.9.23"
testCompile "org.embulk:embulk-deps-config:0.9.23"
testCompile "org.embulk:embulk-test:0.9.23"
}

compileScala {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ case class DynamodbQueryOperation(task: DynamodbQueryOperation.Task)
loadableRecords match {
case Some(v) if (result.getCount > v) =>
f(result.getItems.asScala.take(v.toInt).map(_.asScala.toMap).toSeq)
case None =>
case _ =>
f(result.getItems.asScala.map(_.asScala.toMap).toSeq)
Option(result.getLastEvaluatedKey).foreach { lastEvaluatedKey =>
runInternal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class DynamodbScanOperation(task: DynamodbScanOperation.Task)
loadableRecords match {
case Some(v) if (result.getCount > v) =>
f(result.getItems.asScala.take(v.toInt).map(_.asScala.toMap).toSeq)
case None =>
case _ =>
f(result.getItems.asScala.map(_.asScala.toMap).toSeq)
Option(result.getLastEvaluatedKey).foreach { lastEvaluatedKey =>
runInternal(
Expand Down
121 changes: 62 additions & 59 deletions src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.embulk.input.dynamodb.aws.AwsCredentials
import org.embulk.input.dynamodb.testutil.EmbulkTestBase
import org.hamcrest.CoreMatchers._
import org.hamcrest.MatcherAssert.assertThat
import org.junit.{Assume, Test}
import org.junit.{Assert, Test}

class AwsCredentialsTest extends EmbulkTestBase {

Expand Down Expand Up @@ -44,32 +44,31 @@ class AwsCredentialsTest extends EmbulkTestBase {
}

def defaultInConfig: ConfigSource = {
embulk.configLoader().fromYamlString(s"""
|type: dynamodb
|region: us-east-1
|table: hoge
|operation: scan
|columns:
| - {name: key1, type: string}
| - {name: key2, type: long}
| - {name: value1, type: string}
|""".stripMargin)
loadConfigSourceFromYamlString(s"""
|type: dynamodb
|region: us-east-1
|table: hoge
|operation: scan
|columns:
| - {name: key1, type: string}
| - {name: key2, type: long}
| - {name: value1, type: string}
|""".stripMargin)
}

@deprecated(since = "0.3.0")
@Test
def notSetAuthMethod_SetCredentials_deprecated(): Unit = {
Assume.assumeTrue(runAwsCredentialsTest)
val inConfig: ConfigSource = defaultInConfig
.set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY)
.set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY)
def notSetAuthMethod_SetCredentials_deprecated(): Unit =
if (runAwsCredentialsTest) {
val inConfig: ConfigSource = defaultInConfig
.set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY)
.set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY)

doTest(inConfig)
}
doTest(inConfig)
}

@Test
def notSetAuthMethod_SetCredentials(): Unit = {
Assume.assumeTrue(runAwsCredentialsTest)
def notSetAuthMethod_SetCredentials(): Unit = if (runAwsCredentialsTest) {
val inConfig: ConfigSource = defaultInConfig
.set("access_key_id", EMBULK_DYNAMODB_TEST_ACCESS_KEY)
.set("secret_access_key", EMBULK_DYNAMODB_TEST_SECRET_KEY)
Expand All @@ -79,8 +78,7 @@ class AwsCredentialsTest extends EmbulkTestBase {

@deprecated(since = "0.3.0")
@Test
def setAuthMethod_Basic_deprecated(): Unit = {
Assume.assumeTrue(runAwsCredentialsTest)
def setAuthMethod_Basic_deprecated(): Unit = if (runAwsCredentialsTest) {
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "basic")
.set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY)
Expand All @@ -90,8 +88,7 @@ class AwsCredentialsTest extends EmbulkTestBase {
}

@Test
def setAuthMethod_Basic(): Unit = {
Assume.assumeTrue(runAwsCredentialsTest)
def setAuthMethod_Basic(): Unit = if (runAwsCredentialsTest) {
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "basic")
.set("access_key_id", EMBULK_DYNAMODB_TEST_ACCESS_KEY)
Expand All @@ -101,42 +98,46 @@ class AwsCredentialsTest extends EmbulkTestBase {
}

@deprecated(since = "0.3.0")
@Test(expected = classOf[ConfigException])
def throwIfSetAccessKeyAndAccessKeyId(): Unit = {
Assume.assumeTrue(runAwsCredentialsTest)
@Test
def throwIfSetAccessKeyAndAccessKeyId(): Unit = if (runAwsCredentialsTest) {
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "basic")
.set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY)
.set("access_key_id", EMBULK_DYNAMODB_TEST_ACCESS_KEY)
.set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY)

doTest(inConfig)
Assert.assertThrows(classOf[ConfigException], () => {
doTest(inConfig)
})
}

@deprecated(since = "0.3.0")
@Test(expected = classOf[ConfigException])
def throwIfSetSecretKeyAndSecretAccessKeyId(): Unit = {
Assume.assumeTrue(runAwsCredentialsTest)
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "basic")
.set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY)
.set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY)
.set("secret_access_key", EMBULK_DYNAMODB_TEST_SECRET_KEY)

doTest(inConfig)
}
@Test
def throwIfSetSecretKeyAndSecretAccessKeyId(): Unit =
if (runAwsCredentialsTest) {
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "basic")
.set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY)
.set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY)
.set("secret_access_key", EMBULK_DYNAMODB_TEST_SECRET_KEY)

Assert.assertThrows(classOf[ConfigException], () => {
doTest(inConfig)
})
}

@Test(expected = classOf[ConfigException])
@Test
def setAuthMethod_Basic_NotSet(): Unit = {
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "basic")

doTest(inConfig)
Assert.assertThrows(classOf[ConfigException], () => {
doTest(inConfig)
})
}

@Test
def setAuthMethod_Env(): Unit = {
Assume.assumeTrue(runAwsCredentialsTest)
def setAuthMethod_Env(): Unit = if (runAwsCredentialsTest) {
// NOTE: Requires to set the env vars like 'AWS_ACCESS_KEY_ID' and so on when testing.
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "env")
Expand All @@ -145,8 +146,7 @@ class AwsCredentialsTest extends EmbulkTestBase {
}

@Test
def setAuthMethod_Profile(): Unit = {
Assume.assumeTrue(runAwsCredentialsTest)
def setAuthMethod_Profile(): Unit = if (runAwsCredentialsTest) {
// NOTE: Requires to set credentials to '~/.aws' when testing.
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "profile")
Expand All @@ -155,18 +155,19 @@ class AwsCredentialsTest extends EmbulkTestBase {
doTest(inConfig)
}

@Test(expected = classOf[IllegalArgumentException])
@Test
def setAuthMethod_Profile_NotExistProfileName(): Unit = {
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "profile")
.set("profile_name", "DO_NOT_EXIST")

doTest(inConfig)
Assert.assertThrows(classOf[IllegalArgumentException], () => {
doTest(inConfig)
})
}

@Test
def setAuthMethod_assume_role(): Unit = {
Assume.assumeTrue(runAwsCredentialsTest)
def setAuthMethod_assume_role(): Unit = if (runAwsCredentialsTest) {
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "assume_role")
.set("role_arn", EMBULK_DYNAMODB_TEST_ASSUME_ROLE_ROLE_ARN)
Expand All @@ -175,14 +176,16 @@ class AwsCredentialsTest extends EmbulkTestBase {
doTest(inConfig)
}

@Test(expected = classOf[AWSSecurityTokenServiceException])
def setAuthMethod_assume_role_NotExistRoleArn(): Unit = {
Assume.assumeTrue(runAwsCredentialsTest)
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "assume_role")
.set("role_arn", "DO_NOT_EXIST")
.set("role_session_name", "dummy")

doTest(inConfig)
}
@Test
def setAuthMethod_assume_role_NotExistRoleArn(): Unit =
if (runAwsCredentialsTest) {
val inConfig: ConfigSource = defaultInConfig
.set("auth_method", "assume_role")
.set("role_arn", "DO_NOT_EXIST")
.set("role_session_name", "dummy")

Assert.assertThrows(classOf[AWSSecurityTokenServiceException], () => {
doTest(inConfig)
})
}
}
131 changes: 131 additions & 0 deletions src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package org.embulk.input.dynamodb

import com.amazonaws.services.dynamodbv2.model.{
AttributeDefinition,
AttributeValue,
BillingMode,
CreateTableRequest,
KeySchemaElement,
KeyType,
PutItemRequest,
ScalarAttributeType
}
import org.embulk.config.ConfigSource
import org.embulk.input.dynamodb.testutil.EmbulkTestBase
import org.junit.Test

import scala.jdk.CollectionConverters._

class DynamodbOperationTest extends EmbulkTestBase {

@Test
def limitTest(): Unit = {
val tableName = "limit_test"
cleanupTable(tableName)
withDynamodb { dynamodb =>
dynamodb.createTable(
new CreateTableRequest()
.withTableName(tableName)
.withAttributeDefinitions(
new AttributeDefinition()
.withAttributeName("pk")
.withAttributeType(ScalarAttributeType.S)
)
.withKeySchema(
new KeySchemaElement()
.withAttributeName("pk")
.withKeyType(KeyType.HASH)
)
.withBillingMode(BillingMode.PAY_PER_REQUEST)
)
dynamodb.putItem(
new PutItemRequest()
.withTableName(tableName)
.withItem(
Map
.newBuilder[String, AttributeValue]
.addOne("pk", new AttributeValue().withS("a"))
.result()
.asJava
)
)
dynamodb.putItem(
new PutItemRequest()
.withTableName(tableName)
.withItem(
Map
.newBuilder[String, AttributeValue]
.addOne("pk", new AttributeValue().withS("a"))
.result()
.asJava
)
)
dynamodb.putItem(
new PutItemRequest()
.withTableName(tableName)
.withItem(
Map
.newBuilder[String, AttributeValue]
.addOne("pk", new AttributeValue().withS("a"))
.result()
.asJava
)
)
dynamodb.putItem(
new PutItemRequest()
.withTableName(tableName)
.withItem(
Map
.newBuilder[String, AttributeValue]
.addOne("pk", new AttributeValue().withS("b"))
.result()
.asJava
)
)
dynamodb.putItem(
new PutItemRequest()
.withTableName(tableName)
.withItem(
Map
.newBuilder[String, AttributeValue]
.addOne("pk", new AttributeValue().withS("b"))
.result()
.asJava
)
)
}

val inScanConfig: ConfigSource = loadConfigSourceFromYamlString(s"""
|type: dynamodb
|table: $tableName
|endpoint: http://$dynamoDBHost:$dynamoDBPort/
|auth_method: basic
|access_key_id: dummy
|secret_access_key: dummy
|scan:
| limit: 1
|""".stripMargin)
runInput(inScanConfig, { result =>
assert(result.size.equals(1))
})

val inQueryConfig: ConfigSource = loadConfigSourceFromYamlString(s"""
|type: dynamodb
|table: $tableName
|endpoint: http://$dynamoDBHost:$dynamoDBPort/
|auth_method: basic
|access_key_id: dummy
|secret_access_key: dummy
|query:
| key_condition_expression: "#x = :v"
| expression_attribute_names:
| "#x": pk
| expression_attribute_values:
| ":v": {S: a}
| limit: 1
|""".stripMargin)
runInput(inQueryConfig, { result =>
assert(result.size.equals(1))
})
}
}
Loading

0 comments on commit ecc885f

Please sign in to comment.