diff --git a/.scalafmt.conf b/.scalafmt.conf index 7a9fe74..b564ec8 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,5 +1,5 @@ # https://scalameta.org/scalafmt/#Configuration -version = "2.3.2" +version = "3.7.1" +runner.dialect = "scala3" newlines.alwaysBeforeElseAfterCurlyIf = true -newlines.alwaysBeforeTopLevelStatements = true diff --git a/LICENSE b/LICENSE.txt similarity index 95% rename from LICENSE rename to LICENSE.txt index 9c97600..2790bec 100644 --- a/LICENSE +++ b/LICENSE.txt @@ -1,6 +1,7 @@ The MIT License (MIT) Copyright (c) 2015 Daisuke Higashi +Copyright (c) 2020 Takahiro Nakayama (civitaspo) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -19,4 +20,3 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - diff --git a/README.md b/README.md index 806ebe6..af2c10b 100644 --- a/README.md +++ b/README.md @@ -123,27 +123,6 @@ This type of `DynamodbAttributeValue` is one that can express Dynamodb `Attribut - **NULL**: null or not. (boolean, optional) - **BOOL**: `true` or `false`. (boolean, optional) -### Deprecated Configuration - -You can use the below options yet for the backward compatibility before `v0.3.0`. However, these are already deprecated, so please use new options instead. - -- **access_key**: *[Deprecated: Use **access_key** instead]* aws access key id. this is required when **auth_method** is `"basic"` or `"session"`. (string, optional) -- **secret_key**: *[Deprecated: Use **secret_access_key** instead]* aws secret access key. this is required when **auth_method** is `"basic"` or `"session"`. (string, optional) -- **end_point**: *[Deprecated: Use **endpoint** instead]* The AWS Service endpoint (string, optional) -- **operation**: *[Deprecated: Use **scan** or **query** option instead]* Operation Type (`"scan"` or `"query"`, required) -- **filters**: *[Deprecated: Use **query.filter_expression** option or **query.filter_expression** instead]* Query Filters. (Required if **operation** is `"query"`, optional if **operation** is `"scan"`) - - **name**: Column name. - - **type**: Column type. - - **condition**: Comparison Operator. - - **value(s)**: Attribute Value(s). -- **limit**: *[Deprecated: Use **query.batch_size** option or **query.batch_size** instead]* DynamoDB 1-time Scan/Query Operation size limit (int, optional) -- **scan_limit**: *[Deprecated: Use **query.batch_size** option or **query.batch_size** instead]* DynamoDB 1-time Scan Query size limit (int, optional) -- **record_limit**: *[Deprecated: Use **query.limit** option or **query.limit** instead]* Max Record Search limit (long, optional) -- **columns**: *[Deprecated: This **columns** option for the deprecated operation. See the above **columns** option when using a new operation.]* a key-value pairs where key is a column name and value is options for the column (required) - - **name**: Column name. (string, required) - - **type**: Column values are converted to this embulk type. (`"boolean"`, `"long"`, `"double"`, `"string"`, `"json"`, required) - - NOTE: Be careful that storing values is skipped when you specify `"timestamp"`. - ## Example - Scan Operation @@ -195,9 +174,11 @@ You can see more examples [here](./example). ```shell $ ./run_dynamodb_local.sh +# Set dummy credentials (access_key_id=dummy and secret_access_key=dummy) +$ aws configure $ ./example/prepare_dynamodb_table.sh -$ ./gradlew classpath -$ embulk run example/config-query.yml -Ilib +$ ./gradlew gem +$ embulk run example/config-query.yml -Ibuild/gemContents/lib ``` ### Run tests @@ -251,4 +232,4 @@ $ ./gradlew gemPush ## License -[MIT LICENSE](./LICENSE) +[MIT LICENSE](./LICENSE.txt) diff --git a/build.gradle b/build.gradle index 1e375f0..410dee4 100644 --- a/build.gradle +++ b/build.gradle @@ -1,110 +1,84 @@ plugins { - id "com.jfrog.bintray" version "1.1" - id "com.github.jruby-gradle.base" version "1.5.0" id "scala" - id "com.diffplug.gradle.spotless" version "3.27.1" + id "maven-publish" + id "org.embulk.embulk-plugins" version "0.5.5" + // note: Incompatible because this component declares an API of a component compatible with Java 11 and the consumer needed a runtime of a component compatible with Java 8 + // id "com.diffplug.spotless" version "6.15.0" + // https://github.com/diffplug/spotless/blob/main/plugin-gradle/CHANGES.md#6130---2023-01-14 + id "com.diffplug.spotless" version "6.13.0" + // note: We cannot use the latest version because of the following error. + // > org/eclipse/jgit/lib/Repository has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0 + // id "com.palantir.git-version" version "0.15.0" + id "com.palantir.git-version" version "0.12.3" } - -import com.github.jrubygradle.JRubyExec repositories { mavenCentral() - jcenter() -} -configurations { - provided } - -version = "0.3.1" +group = "pro.civitaspo" +description = "Loads records from Dynamodb." +version = { + def vd = versionDetails() + if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+(\.[a-zA-Z0-9]+)?/) { + vd.lastTag + } else { + "0.0.0.${vd.gitHash}" + } +}() sourceCompatibility = 1.8 targetCompatibility = 1.8 dependencies { - compile "org.scala-lang:scala-library:2.13.1" - - compile "org.embulk:embulk-core:0.9.23" - provided "org.embulk:embulk-core:0.9.23" - - compile "com.amazonaws:aws-java-sdk-dynamodb:1.11.711" - compile "com.amazonaws:aws-java-sdk-sts:1.11.711" - // For @delegate macro. - compile "dev.zio:zio-macros-core_2.13:0.6.2" - - testCompile "junit:junit:4.+" - testCompile "org.embulk:embulk-core:0.9.23:tests" - testCompile "org.embulk:embulk-standards:0.9.23" - testCompile "org.embulk:embulk-deps-buffer:0.9.23" - testCompile "org.embulk:embulk-deps-config:0.9.23" -} - -compileScala { - scalaCompileOptions.additionalParameters = [ - "-Ymacro-annotations" - ] + // note: The upper versions of embulk includes Guava Dependency removal, + // so we need to specify the version of embulk to avoid some errors. + // We may need to update the version of embulk in the future. + def embulkVersion = "0.10.41" + compileOnly "org.embulk:embulk-api:${embulkVersion}" + compileOnly "org.embulk:embulk-spi:${embulkVersion}" + implementation "org.scala-lang:scala-library:2.13.1" + + implementation "org.embulk:embulk-util-config:0.3.2" + implementation "org.embulk:embulk-util-json:0.1.1" + implementation "org.embulk:embulk-util-timestamp:0.2.1" + implementation "com.amazonaws:aws-java-sdk-dynamodb:1.11.711" + implementation "com.amazonaws:aws-java-sdk-sts:1.11.711" + + testImplementation "junit:junit:4.+" + testImplementation "org.embulk:embulk-junit4:${embulkVersion}" + testImplementation "org.embulk:embulk-deps:${embulkVersion}" + testImplementation "org.embulk:embulk-input-config:0.10.36" } - test { jvmArgs '-Xms4g', '-Xmx4g', '-XX:MaxMetaspaceSize=1g' maxHeapSize = "4g" } - spotless { scala { - scalafmt('2.3.2').configFile('.scalafmt.conf') + scalafmt('3.7.1').configFile('.scalafmt.conf') } } - -task classpath(type: Copy, dependsOn: ["jar"]) { - doFirst { file("classpath").deleteDir() } - from (configurations.runtime - configurations.provided + files(jar.archivePath)) - into "classpath" +embulkPlugin { + mainClass = "org.embulk.input.dynamodb.DynamodbInputPlugin" + category = "input" + type = "dynamodb" } -clean { delete "classpath" } - -task gem(type: JRubyExec, dependsOn: ["gemspec", "classpath"]) { - jrubyArgs "-S" - script "gem" - scriptArgs "build", "${project.name}.gemspec" - doLast { ant.move(file: "${project.name}-${project.version}.gem", todir: "pkg") } -} - -task gemPush(type: JRubyExec, dependsOn: ["gem"]) { - jrubyArgs "-S" - script "gem" - scriptArgs "push", "pkg/${project.name}-${project.version}.gem" -} - -task "package"(dependsOn: ["gemspec", "classpath"]) { - doLast { - println "> Build succeeded." - println "> You can run embulk with '-L ${file(".").absolutePath}' argument." +publishing { + publications { + embulkPluginMaven(MavenPublication) { + from components.java + } } -} - -task gemspec { - ext.gemspecFile = file("${project.name}.gemspec") - inputs.file "build.gradle" - outputs.file gemspecFile - doLast { gemspecFile.write($/ -Gem::Specification.new do |spec| - spec.name = "${project.name}" - spec.version = "${project.version}" - spec.authors = ["Daisuke Higashi", "Civitaspo"] - spec.summary = %[Dynamodb input plugin for Embulk] - spec.description = %["Loads records from Dynamodb."] - spec.email = ["daisuke.develop@gmail.com", "civitaspo@gmail.com"] - spec.licenses = ["MIT"] - spec.homepage = "https://github.com/lulichn/embulk-input-dynamodb" - - spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"] - spec.test_files = spec.files.grep(%r"^(test|spec)/") - spec.require_paths = ["lib"] - - spec.add_development_dependency 'bundler', ['~> 1.0'] - spec.add_development_dependency 'rake', ['~> 12.0'] -end -/$) + repositories { + maven { + url = "${project.buildDir}/mavenPublishLocal" + } } } -clean { delete "${project.name}.gemspec" } - +gem { + from("LICENSE.txt") + authors = ["Daisuke Higashi", "Civitaspo"] + email = ["daisuke.develop@gmail.com", "civitaspo@gmail.com"] + summary = "An Embulk plugin to ingest data from Dynamodb." + homepage = "https://github.com/lulichn/embulk-input-dynamodb" + licenses = [ "MIT" ] +} diff --git a/example/config-deprecated.yml b/example/config-deprecated.yml deleted file mode 100644 index 1a64aea..0000000 --- a/example/config-deprecated.yml +++ /dev/null @@ -1,20 +0,0 @@ -in: - type: dynamodb - region: us-east-1 - endpoint: http://localhost:8000 - operation: scan - table: embulk-input-dynamodb_example - auth_method: basic - access_key_id: dummy - secret_access_key: dummy - columns: - - {name: primary-key, type: string} - - {name: sort-key, type: long} - - {name: doubleValue, type: double} - - {name: boolValue, type: boolean} - - {name: listValue, type: json} - - {name: mapValue, type: json} - -out: - type: stdout - diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index f3d88b1..943f0cb 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ba94df8..f398c33 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip +networkTimeout=10000 zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 2fe81a7..65dcd68 100755 --- a/gradlew +++ b/gradlew @@ -1,7 +1,7 @@ -#!/usr/bin/env sh +#!/bin/sh # -# Copyright 2015 the original author or authors. +# Copyright © 2015-2021 the original authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,78 +17,113 @@ # ############################################################################## -## -## Gradle start up script for UN*X -## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# ############################################################################## # Attempt to set APP_HOME + # Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >/dev/null -APP_HOME="`pwd -P`" -cd "$SAVED" >/dev/null -APP_NAME="Gradle" -APP_BASE_NAME=`basename "$0"` +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' # Use the maximum available, or set MAX_FD != -1 to use that value. -MAX_FD="maximum" +MAX_FD=maximum warn () { echo "$*" -} +} >&2 die () { echo echo "$*" echo exit 1 -} +} >&2 # OS specific support (must be 'true' or 'false'). cygwin=false msys=false darwin=false nonstop=false -case "`uname`" in - CYGWIN* ) - cygwin=true - ;; - Darwin* ) - darwin=true - ;; - MINGW* ) - msys=true - ;; - NONSTOP* ) - nonstop=true - ;; +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + # Determine the Java command to use to start the JVM. if [ -n "$JAVA_HOME" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then # IBM's JDK on AIX uses strange locations for the executables - JAVACMD="$JAVA_HOME/jre/sh/java" + JAVACMD=$JAVA_HOME/jre/sh/java else - JAVACMD="$JAVA_HOME/bin/java" + JAVACMD=$JAVA_HOME/bin/java fi if [ ! -x "$JAVACMD" ] ; then die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME @@ -97,7 +132,7 @@ Please set the JAVA_HOME variable in your environment to match the location of your Java installation." fi else - JAVACMD="java" + JAVACMD=java which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the @@ -105,79 +140,105 @@ location of your Java installation." fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then - MAX_FD_LIMIT=`ulimit -H -n` - if [ $? -eq 0 ] ; then - if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then - MAX_FD="$MAX_FD_LIMIT" - fi - ulimit -n $MAX_FD - if [ $? -ne 0 ] ; then - warn "Could not set maximum file descriptor limit: $MAX_FD" - fi - else - warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" - fi +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac fi -# For Darwin, add options to specify how the application appears in the dock -if $darwin; then - GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" -fi +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. # For Cygwin or MSYS, switch paths to Windows format before running java -if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then - APP_HOME=`cygpath --path --mixed "$APP_HOME"` - CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` - JAVACMD=`cygpath --unix "$JAVACMD"` - - # We build the pattern for arguments to be converted via cygpath - ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` - SEP="" - for dir in $ROOTDIRSRAW ; do - ROOTDIRS="$ROOTDIRS$SEP$dir" - SEP="|" - done - OURCYGPATTERN="(^($ROOTDIRS))" - # Add a user-defined pattern to the cygpath arguments - if [ "$GRADLE_CYGPATTERN" != "" ] ; then - OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" - fi +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + # Now convert the arguments - kludge to limit ourselves to /bin/sh - i=0 - for arg in "$@" ; do - CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` - CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option - - if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition - eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` - else - eval `echo args$i`="\"$arg\"" + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) fi - i=`expr $i + 1` + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg done - case $i in - 0) set -- ;; - 1) set -- "$args0" ;; - 2) set -- "$args0" "$args1" ;; - 3) set -- "$args0" "$args1" "$args2" ;; - 4) set -- "$args0" "$args1" "$args2" "$args3" ;; - 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; - 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; - 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; - 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; - 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; - esac fi -# Escape application args -save () { - for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done - echo " " -} -APP_ARGS=`save "$@"` +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# -# Collect all arguments for the java command, following the shell quoting and substitution rules -eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat index 24467a1..6689b85 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -14,7 +14,7 @@ @rem limitations under the License. @rem -@if "%DEBUG%" == "" @echo off +@if "%DEBUG%"=="" @echo off @rem ########################################################################## @rem @rem Gradle startup script for Windows @@ -25,10 +25,14 @@ if "%OS%"=="Windows_NT" setlocal set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" @@ -37,7 +41,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init +if %ERRORLEVEL% equ 0 goto execute echo. echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. @@ -51,7 +55,7 @@ goto fail set JAVA_HOME=%JAVA_HOME:"=% set JAVA_EXE=%JAVA_HOME%/bin/java.exe -if exist "%JAVA_EXE%" goto init +if exist "%JAVA_EXE%" goto execute echo. echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% @@ -61,38 +65,26 @@ echo location of your Java installation. goto fail -:init -@rem Get command-line arguments, handling Windows variants - -if not "%OS%" == "Windows_NT" goto win9xME_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* - :execute @rem Setup the command line set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + @rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* :end @rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd +if %ERRORLEVEL% equ 0 goto mainEnd :fail rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% :mainEnd if "%OS%"=="Windows_NT" endlocal diff --git a/src/main/scala/org/embulk/input/dynamodb/DeprecatedDynamodbInputPlugin.scala b/src/main/scala/org/embulk/input/dynamodb/DeprecatedDynamodbInputPlugin.scala deleted file mode 100644 index 17411f9..0000000 --- a/src/main/scala/org/embulk/input/dynamodb/DeprecatedDynamodbInputPlugin.scala +++ /dev/null @@ -1,73 +0,0 @@ -package org.embulk.input.dynamodb - -import java.util.{List => JList} - -import org.embulk.config.{ - ConfigDiff, - ConfigException, - ConfigSource, - TaskReport, - TaskSource -} -import org.embulk.input.dynamodb.aws.Aws -import org.embulk.input.dynamodb.deprecated.ope.{QueryOperation, ScanOperation} -import org.embulk.spi.{Exec, InputPlugin, PageOutput, Schema} - -@deprecated(since = "0.3.0") -object DeprecatedDynamodbInputPlugin extends InputPlugin { - - override def transaction( - config: ConfigSource, - control: InputPlugin.Control - ): ConfigDiff = { - val task: PluginTask = PluginTask.load(config) - val schema: Schema = task.getColumns.toSchema - if (schema.isEmpty) - throw new ConfigException("\"columns\" option must be set.") - val taskCount: Int = 1 - - control.run(task.dump(), schema, taskCount) - Exec.newConfigDiff() - } - - override def resume( - taskSource: TaskSource, - schema: Schema, - taskCount: Int, - control: InputPlugin.Control - ): ConfigDiff = { - throw new UnsupportedOperationException - } - - override def run( - taskSource: TaskSource, - schema: Schema, - taskIndex: Int, - output: PageOutput - ): TaskReport = { - val task: PluginTask = PluginTask.load(taskSource) - - Aws(task).withDynamodb { dynamodb => - task.getOperation.ifPresent { ope => - val o = ope.toLowerCase match { - case "scan" => new ScanOperation(dynamodb) - case "query" => new QueryOperation(dynamodb) - } - o.execute(task, schema, output) - } - } - - Exec.newTaskReport() - } - - override def cleanup( - taskSource: TaskSource, - schema: Schema, - taskCount: Int, - successTaskReports: JList[TaskReport] - ): Unit = {} - - override def guess(config: ConfigSource): ConfigDiff = { - throw new UnsupportedOperationException - } -} diff --git a/src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala b/src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala index 545a340..4f49f0d 100644 --- a/src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala +++ b/src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala @@ -15,14 +15,11 @@ class DynamodbInputPlugin extends InputPlugin { control: InputPlugin.Control ): ConfigDiff = { val task: PluginTask = PluginTask.load(config) - if (isDeprecatedOperationRequired(task)) - return DeprecatedDynamodbInputPlugin.transaction(config, control) - val schema: Schema = DynamodbItemSchema(task).getEmbulkSchema val taskCount: Int = DynamodbOperationProxy(task).getEmbulkTaskCount - control.run(task.dump(), schema, taskCount) - Exec.newConfigDiff() + control.run(task.toTaskSource(), schema, taskCount) + PluginTask.newConfigDiff() } override def resume( @@ -31,14 +28,6 @@ class DynamodbInputPlugin extends InputPlugin { taskCount: Int, control: InputPlugin.Control ): ConfigDiff = { - val task: PluginTask = PluginTask.load(taskSource) - if (isDeprecatedOperationRequired(task)) - return DeprecatedDynamodbInputPlugin.resume( - taskSource, - schema, - taskCount, - control - ) throw new UnsupportedOperationException } @@ -49,15 +38,7 @@ class DynamodbInputPlugin extends InputPlugin { output: PageOutput ): TaskReport = { val task: PluginTask = PluginTask.load(taskSource) - if (isDeprecatedOperationRequired(task)) - return DeprecatedDynamodbInputPlugin.run( - taskSource, - schema, - taskIndex, - output - ) - - val pageBuilder = new PageBuilder(task.getBufferAllocator, schema, output) + val pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output) Aws(task).withDynamodb { dynamodb => DynamodbOperationProxy(task).run( @@ -67,7 +48,7 @@ class DynamodbInputPlugin extends InputPlugin { ) } pageBuilder.finish() - Exec.newTaskReport() + PluginTask.newTaskReport() } override def cleanup( @@ -75,24 +56,9 @@ class DynamodbInputPlugin extends InputPlugin { schema: Schema, taskCount: Int, successTaskReports: JList[TaskReport] - ): Unit = { - val task: PluginTask = PluginTask.load(taskSource) - if (isDeprecatedOperationRequired(task)) - DeprecatedDynamodbInputPlugin.cleanup( - taskSource, - schema, - taskCount, - successTaskReports - ) - } + ): Unit = {} override def guess(config: ConfigSource): ConfigDiff = { - val task: PluginTask = PluginTask.load(config) - if (isDeprecatedOperationRequired(task)) - return DeprecatedDynamodbInputPlugin.guess(config) throw new UnsupportedOperationException } - - private def isDeprecatedOperationRequired(task: PluginTask): Boolean = - task.getOperation.isPresent } diff --git a/src/main/scala/org/embulk/input/dynamodb/PluginTask.scala b/src/main/scala/org/embulk/input/dynamodb/PluginTask.scala index e387369..c2862de 100644 --- a/src/main/scala/org/embulk/input/dynamodb/PluginTask.scala +++ b/src/main/scala/org/embulk/input/dynamodb/PluginTask.scala @@ -3,157 +3,45 @@ package org.embulk.input.dynamodb import java.util.Optional import org.embulk.config.{ - Config, - ConfigDefault, + ConfigDiff, ConfigException, - ConfigInject, ConfigSource, - Task, - TaskSource + TaskSource, + TaskReport +} +import org.embulk.util.config.{ + Config, + ConfigDefault, + ConfigMapperFactory, + Task => EmbulkTask } import org.embulk.input.dynamodb.aws.Aws -import org.embulk.input.dynamodb.deprecated.Filter import org.embulk.input.dynamodb.item.DynamodbItemSchema import org.embulk.input.dynamodb.operation.DynamodbOperationProxy import org.embulk.spi.BufferAllocator -import zio.macros.annotation.delegate import scala.util.chaining._ trait PluginTask - extends Task + extends EmbulkTask with Aws.Task with DynamodbItemSchema.Task - with DynamodbOperationProxy.Task { - - @deprecated( - message = "Use #getScan() or #getQuery() instead.", - since = "0.3.0" - ) - @Config("operation") - @ConfigDefault("null") - def getOperation: Optional[String] - - @deprecated( - message = - "Use DynamodbQueryOperation.Task#getBatchSize() or DynamodbScanOperation.Task#getBatchSize() instead.", - since = "0.3.0" - ) - @Config("limit") - @ConfigDefault("0") - def getLimit: Long - - @deprecated( - message = - "Use DynamodbQueryOperation.Task#getBatchSize() or DynamodbScanOperation.Task#getBatchSize() instead.", - since = "0.3.0" - ) - @Config("scan_limit") - @ConfigDefault("0") - def getScanLimit: Long - - @deprecated( - message = - "Use DynamodbQueryOperation.Task#getLimit() or DynamodbScanOperation.Task#getLimit() instead.", - since = "0.3.0" - ) - @Config("record_limit") - @ConfigDefault("0") - def getRecordLimit: Long - - @deprecated( - message = - "Use DynamodbQueryOperation.Task#getFilterExpression() or DynamodbScanOperation.Task#getFilterExpression() instead.", - since = "0.3.0" - ) - @Config("filters") - @ConfigDefault("null") - def getFilters: Optional[Filter] - - @ConfigInject - def getBufferAllocator: BufferAllocator -} - -case class PluginTaskCompat(@delegate task: PluginTask) extends PluginTask { - - override def getOperation: Optional[String] = { - task.getOperation.ifPresent { op => - logger.warn( - "[Deprecated] The option \"operation\" is deprecated. Use \"scan\" or \"query\" option instead." - ) - - op.toLowerCase match { - case "scan" | "query" => // do nothing - case x => - throw new ConfigException( - s"Operation '$x' is unsupported. Available values are 'scan' or 'query'." - ) - } - - if (getScan.isPresent || getQuery.isPresent) - throw new ConfigException( - "The option \"operation\" must not be used together with either \"scan\" or \"query\" options." - ) - } - task.getOperation - } - - override def getLimit: Long = { - logger.warn( - "[Deprecated] The option \"limit\" is deprecated. Use \"query.batch_size\" or \"scan.batch_size\" instead." - ) - task.getLimit - } - - override def getScanLimit: Long = { - logger.warn( - "[Deprecated] The option \"scan_limit\" is deprecated. Use \"query.batch_size\" or \"scan.batch_size\" instead." - ) - task.getScanLimit - } - - override def getRecordLimit: Long = { - logger.warn( - "[Deprecated] The option \"record_limit\" is deprecated. Use \"query.limit\" or \"scan.limit\" instead." - ) - task.getRecordLimit - } - - override def getFilters: Optional[Filter] = { - logger.warn( - "[Deprecated] The option \"filters\" is deprecated. Use \"query.filter_expression\" or \"scan.filter_expression\" instead." - ) - task.getFilters - } -} + with DynamodbOperationProxy.Task {} object PluginTask { + private val configMapperFactory: ConfigMapperFactory = + ConfigMapperFactory.builder().addDefaultModules().build() def load(configSource: ConfigSource): PluginTask = { - configSource - .loadConfig(classOf[PluginTask]) - .pipe(PluginTaskCompat) - .tap(configure) + configMapperFactory + .createConfigMapper() + .map(configSource, classOf[PluginTask]) } def load(taskSource: TaskSource): PluginTask = { - taskSource - .loadTask(classOf[PluginTask]) - .pipe(PluginTaskCompat) - .tap(configure) + configMapperFactory.createTaskMapper().map(taskSource, classOf[PluginTask]) } - private def configure(task: PluginTask): Unit = { - if (!task.getOperation.isPresent && !task.getScan.isPresent && !task.getQuery.isPresent) { - // NOTE: "operation" option is deprecated, so this is not shown the message. - throw new ConfigException( - "Either \"scan\" or \"query\" option is required." - ) - } - if (task.getOperation.isPresent && (task.getScan.isPresent || task.getQuery.isPresent)) { - throw new ConfigException( - "[Deprecated] You must not use \"scan\" or \"query\" option with \"operation\" option." - ) - } - } + def newConfigDiff(): ConfigDiff = configMapperFactory.newConfigDiff() + def newTaskReport(): TaskReport = configMapperFactory.newTaskReport() } diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/Aws.scala b/src/main/scala/org/embulk/input/dynamodb/aws/Aws.scala index 9703410..6a9cca1 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/Aws.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/Aws.scala @@ -5,11 +5,13 @@ import com.amazonaws.services.dynamodbv2.{ AmazonDynamoDB, AmazonDynamoDBClientBuilder } +import org.embulk.util.config.{Task => EmbulkTask} object Aws { trait Task - extends AwsCredentials.Task + extends EmbulkTask + with AwsCredentials.Task with AwsEndpointConfiguration.Task with AwsClientConfiguration.Task with AwsDynamodbConfiguration.Task diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/AwsClientConfiguration.scala b/src/main/scala/org/embulk/input/dynamodb/aws/AwsClientConfiguration.scala index a43bfdc..2e07a23 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/AwsClientConfiguration.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/AwsClientConfiguration.scala @@ -4,12 +4,12 @@ import java.util.Optional import com.amazonaws.ClientConfiguration import com.amazonaws.client.builder.AwsClientBuilder -import org.embulk.config.{Config, ConfigDefault} +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import org.embulk.input.dynamodb.aws.AwsClientConfiguration.Task object AwsClientConfiguration { - trait Task { + trait Task extends EmbulkTask { @Config("http_proxy") @ConfigDefault("null") diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/AwsCredentials.scala b/src/main/scala/org/embulk/input/dynamodb/aws/AwsCredentials.scala index d19b363..eab934c 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/AwsCredentials.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/AwsCredentials.scala @@ -19,34 +19,24 @@ import com.amazonaws.auth.profile.{ ProfileCredentialsProvider, ProfilesConfigFile } -import org.embulk.config.{Config, ConfigDefault, ConfigException} +import org.embulk.config.ConfigException +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import org.embulk.input.dynamodb.aws.AwsCredentials.Task import org.embulk.input.dynamodb.logger -import org.embulk.spi.unit.LocalFile -import zio.macros.annotation.delegate +import org.embulk.util.config.units.LocalFile object AwsCredentials { - trait Task { + trait Task extends EmbulkTask { @Config("auth_method") @ConfigDefault("\"default\"") def getAuthMethod: String - @deprecated(message = "Use #getAccessKeyId() instead.", since = "0.3.0") - @Config("access_key") - @ConfigDefault("null") - def getAccessKey: Optional[String] - @Config("access_key_id") @ConfigDefault("null") def getAccessKeyId: Optional[String] - @deprecated(message = "Use #getSecretAccessKey() instead.", since = "0.3.0") - @Config("secret_key") - @ConfigDefault("null") - def getSecretKey: Optional[String] - @Config("secret_access_key") @ConfigDefault("null") def getSecretAccessKey: Optional[String] @@ -89,61 +79,7 @@ object AwsCredentials { } def apply(task: Task): AwsCredentials = { - new AwsCredentials(AwsCredentialsTaskCompat(task)) - } -} - -case class AwsCredentialsTaskCompat(@delegate task: Task) extends Task { - - override def getAccessKey: Optional[String] = { - throw new NotImplementedError() - } - - override def getSecretKey: Optional[String] = { - throw new NotImplementedError() - } - - override def getAuthMethod: String = { - if (getAccessKeyId.isPresent && getSecretAccessKey.isPresent) { - if (task.getAuthMethod != "basic") { - logger.warn( - "[Deprecated] The default value of \"auth_method\" option is \"default\", " + - "but currently use \"basic\" auth_method for backward compatibility " + - "because you set \"access_key_id\" and \"secret_access_key\" options. " + - "Please set \"basic\" to \"auth_method\" option expressly." - ) - return "basic" - } - } - task.getAuthMethod - } - - override def getAccessKeyId: Optional[String] = { - if (task.getAccessKeyId.isPresent && task.getAccessKey.isPresent) - throw new ConfigException( - "You cannot use both \"access_key_id\" option and \"access_key\" option. Use \"access_key_id\" option." - ) - if (task.getAccessKey.isPresent) { - logger.warn( - "[Deprecated] \"access_key\" option is deprecated. Use \"access_key_id\" option instead." - ) - return task.getAccessKey - } - task.getAccessKeyId - } - - override def getSecretAccessKey: Optional[String] = { - if (task.getSecretAccessKey.isPresent && task.getSecretKey.isPresent) - throw new ConfigException( - "You cannot use both \"secret_access_key\" option and \"secret_key\" option. Use \"secret_access_key\" option." - ) - if (task.getSecretKey.isPresent) { - logger.warn( - "[Deprecated] \"secret_key\" option is deprecated. Use \"secret_access_key\" option instead." - ) - return task.getSecretKey - } - task.getSecretAccessKey + new AwsCredentials(task) } } diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/AwsDynamodbConfiguration.scala b/src/main/scala/org/embulk/input/dynamodb/aws/AwsDynamodbConfiguration.scala index ed91b52..7adcfd8 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/AwsDynamodbConfiguration.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/AwsDynamodbConfiguration.scala @@ -3,12 +3,12 @@ package org.embulk.input.dynamodb.aws import java.util.Optional import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder -import org.embulk.config.{Config, ConfigDefault} +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import org.embulk.input.dynamodb.aws.AwsDynamodbConfiguration.Task object AwsDynamodbConfiguration { - trait Task { + trait Task extends EmbulkTask { @Config("enable_endpoint_discovery") @ConfigDefault("null") diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/AwsEndpointConfiguration.scala b/src/main/scala/org/embulk/input/dynamodb/aws/AwsEndpointConfiguration.scala index cf68533..bef8f91 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/AwsEndpointConfiguration.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/AwsEndpointConfiguration.scala @@ -5,22 +5,16 @@ import java.util.Optional import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions} -import org.embulk.config.{Config, ConfigDefault, ConfigException} +import org.embulk.config.ConfigException +import org.embulk.util.config.{Task => EmbulkTask, Config, ConfigDefault} import org.embulk.input.dynamodb.aws.AwsEndpointConfiguration.Task import org.embulk.input.dynamodb.logger -import zio.macros.annotation.delegate import scala.util.Try object AwsEndpointConfiguration { - trait Task { - - @deprecated(message = "Use #getEndpoint() instead.", since = "0.3.0") - @Config("end_point") - @ConfigDefault("null") - def getEndPoint: Optional[String] - + trait Task extends EmbulkTask { @Config("endpoint") @ConfigDefault("null") def getEndpoint: Optional[String] @@ -31,26 +25,7 @@ object AwsEndpointConfiguration { } def apply(task: Task): AwsEndpointConfiguration = { - new AwsEndpointConfiguration(AwsEndpointConfigurationTaskCompat(task)) - } -} - -case class AwsEndpointConfigurationTaskCompat(@delegate task: Task) - extends Task { - override def getEndPoint: Optional[String] = throw new NotImplementedError() - - override def getEndpoint: Optional[String] = { - if (task.getEndpoint.isPresent && task.getEndPoint.isPresent) - throw new ConfigException( - "You cannot use both \"endpoint\" option and \"end_point\" option. Use \"endpoint\" option." - ) - if (task.getEndPoint.isPresent) { - logger.warn( - "[Deprecated] \"end_point\" option is deprecated. Use \"endpoint\" option instead." - ) - return task.getEndPoint - } - task.getEndpoint + new AwsEndpointConfiguration(task) } } diff --git a/src/main/scala/org/embulk/input/dynamodb/aws/HttpProxy.scala b/src/main/scala/org/embulk/input/dynamodb/aws/HttpProxy.scala index edd1fb0..c986f38 100644 --- a/src/main/scala/org/embulk/input/dynamodb/aws/HttpProxy.scala +++ b/src/main/scala/org/embulk/input/dynamodb/aws/HttpProxy.scala @@ -3,12 +3,13 @@ package org.embulk.input.dynamodb.aws import java.util.Optional import com.amazonaws.{ClientConfiguration, Protocol} -import org.embulk.config.{Config, ConfigDefault, ConfigException} +import org.embulk.config.ConfigException +import org.embulk.util.config.{Task => EmbulkTask, Config, ConfigDefault} import org.embulk.input.dynamodb.aws.HttpProxy.Task object HttpProxy { - trait Task { + trait Task extends EmbulkTask { @Config("host") @ConfigDefault("null") @@ -50,8 +51,8 @@ class HttpProxy(task: Task) { case None => throw new ConfigException( s"'${task.getProtocol}' is unsupported: `protocol` must be one of [${Protocol.values - .map(v => s"'$v'") - .mkString(", ")}]." + .map(v => s"'$v'") + .mkString(", ")}]." ) } diff --git a/src/main/scala/org/embulk/input/dynamodb/deprecated/AttributeValueHelper.scala b/src/main/scala/org/embulk/input/dynamodb/deprecated/AttributeValueHelper.scala deleted file mode 100644 index 72bed20..0000000 --- a/src/main/scala/org/embulk/input/dynamodb/deprecated/AttributeValueHelper.scala +++ /dev/null @@ -1,72 +0,0 @@ -package org.embulk.input.dynamodb.deprecated - -import com.amazonaws.services.dynamodbv2.model.AttributeValue -import org.msgpack.value.{Value, ValueFactory} - -import scala.util.Try - -object AttributeValueHelper { - - // referring aws-scala - def decodeToValue(value: AttributeValue): Value = { - import scala.jdk.CollectionConverters._ - - // FIXME: Need Encode? - lazy val _bin = Option(value.getB).map(v => ValueFactory.newBinary(v.array)) - lazy val _bool = Option(value.getBOOL).map(v => ValueFactory.newBoolean(v)) - lazy val _num = Option(value.getN).map(v => - Try(v.toLong) - .map(ValueFactory.newInteger) - .getOrElse(ValueFactory.newFloat(v.toDouble)) - ) - lazy val _str = Option(value.getS).map(v => ValueFactory.newString(v)) - lazy val _nil = Option(value.getNULL).map(v => ValueFactory.newNil) - - lazy val _list = Option(value.getL).map(l => - ValueFactory.newArray(l.asScala.map(v => decodeToValue(v)).asJava) - ) - lazy val _ss = Option(value.getSS).map(l => - ValueFactory.newArray( - l.asScala.map(v => ValueFactory.newString(v)).asJava - ) - ) - lazy val _ns = Option(value.getNS).map(l => - ValueFactory.newArray( - l.asScala - .map(v => - Try(v.toLong) - .map(ValueFactory.newInteger) - .getOrElse(ValueFactory.newFloat(v.toDouble)) - ) - .asJava - ) - ) - // FIXME: Need Encode? - lazy val _bs = Option(value.getBS).map(l => - ValueFactory.newArray( - l.asScala.map(v => ValueFactory.newBinary(v.array)).asJava - ) - ) - lazy val _map = Option(value.getM).map(m => - ValueFactory.newMap( - m.asScala - .map(v => ValueFactory.newString(v._1) -> decodeToValue(v._2)) - .asJava - ) - ) - - _bin - .orElse(_bool) - .orElse(_num) - .orElse(_str) - .orElse(_nil) - .orElse(_list) - .orElse(_ss) - .orElse(_ns) - .orElse(_bs) - .orElse(_map) match { - case None => ValueFactory.newNil - case Some(j) => j - } - } -} diff --git a/src/main/scala/org/embulk/input/dynamodb/deprecated/Filter.scala b/src/main/scala/org/embulk/input/dynamodb/deprecated/Filter.scala deleted file mode 100644 index 32e9140..0000000 --- a/src/main/scala/org/embulk/input/dynamodb/deprecated/Filter.scala +++ /dev/null @@ -1,32 +0,0 @@ -package org.embulk.input.dynamodb.deprecated - -import java.util.{List => JList} - -import com.fasterxml.jackson.annotation.{JsonCreator, JsonValue} -import com.google.common.base.Objects - -class Filter { - private var filters: JList[FilterConfig] = _ - - @JsonCreator - def this(filters: JList[FilterConfig]) { - this() - this.filters = filters - } - - @JsonValue - def getFilters: JList[FilterConfig] = filters - - override def equals(obj: Any): Boolean = { - if (this == obj) return true - - if (!obj.isInstanceOf[Filter]) return false - - val other: Filter = obj.asInstanceOf[Filter] - Objects.equal(filters, other.filters) - } - - override def hashCode: Int = { - Objects.hashCode(filters) - } -} diff --git a/src/main/scala/org/embulk/input/dynamodb/deprecated/FilterConfig.scala b/src/main/scala/org/embulk/input/dynamodb/deprecated/FilterConfig.scala deleted file mode 100644 index ab20290..0000000 --- a/src/main/scala/org/embulk/input/dynamodb/deprecated/FilterConfig.scala +++ /dev/null @@ -1,55 +0,0 @@ -package org.embulk.input.dynamodb.deprecated - -import com.fasterxml.jackson.annotation.JsonProperty -import com.google.common.base.Objects - -class FilterConfig { - private var _name: String = _ - private var _type: String = _ - private var _condition: String = _ - private var _value: String = _ - private var _value2: String = _ - - def this( - @JsonProperty("name") _name: String, - @JsonProperty("type") _type: String, - @JsonProperty("condition") _condition: String, - @JsonProperty("value") _value: String, - @JsonProperty("value2") _value2: String - ) { - this() - this._name = _name - this._type = _type - this._condition = _condition - this._value = _value - this._value2 = _value2 - } - - @JsonProperty("name") - def getName = _name - - @JsonProperty("type") - def getType = _type - - @JsonProperty("condition") - def getCondition = _condition - - @JsonProperty("value") - def getValue = _value - - @JsonProperty("value2") - def getValue2 = _value2 - - override def equals(obj: Any): Boolean = { - if (this == obj) return true - - if (!obj.isInstanceOf[FilterConfig]) return false - - val other: FilterConfig = obj.asInstanceOf[FilterConfig] - Objects.equal(this._name, other._name) && - Objects.equal(this._type, other._type) && - Objects.equal(this._condition, other._condition) && - Objects.equal(this._value, other._value) && - Objects.equal(this._value2, other._value2) - } -} diff --git a/src/main/scala/org/embulk/input/dynamodb/deprecated/ope/AbstractOperation.scala b/src/main/scala/org/embulk/input/dynamodb/deprecated/ope/AbstractOperation.scala deleted file mode 100644 index ed3cd38..0000000 --- a/src/main/scala/org/embulk/input/dynamodb/deprecated/ope/AbstractOperation.scala +++ /dev/null @@ -1,119 +0,0 @@ -package org.embulk.input.dynamodb.deprecated.ope - -import com.amazonaws.services.dynamodbv2.model.{AttributeValue, Condition} -import org.embulk.input.dynamodb.PluginTask -import org.embulk.input.dynamodb.deprecated.AttributeValueHelper -import org.embulk.spi._ -import org.embulk.spi.`type`.Types -import org.msgpack.value.{Value, ValueFactory} - -import scala.jdk.CollectionConverters._ - -abstract class AbstractOperation { - def execute(task: PluginTask, schema: Schema, output: PageOutput): Unit - - def getLimit(limit: Long, recordLimit: Long, recordCount: Long): Int = { - if (limit > 0 && recordLimit > 0) { - math.min(limit, recordLimit - recordCount).toInt - } - else if (limit > 0 || recordLimit > 0) { - math.max(limit, recordLimit).toInt - } - else { - 0 - } - } - - def createFilters(task: PluginTask): Map[String, Condition] = { - val filterMap = collection.mutable.HashMap[String, Condition]() - - Option(task.getFilters.orElse(null)).map { filters => - filters.getFilters.asScala.map { filter => - val attributeValueList = - collection.mutable.ArrayBuffer[AttributeValue]() - attributeValueList += createAttributeValue( - filter.getType, - filter.getValue - ) - Option(filter.getValue2).map { value2 => - attributeValueList += createAttributeValue(filter.getType, value2) - } - - filterMap += filter.getName -> new Condition() - .withComparisonOperator(filter.getCondition) - .withAttributeValueList(attributeValueList.asJava) - } - } - - filterMap.toMap - } - - def createAttributeValue(t: String, v: String): AttributeValue = { - t match { - case "string" => - new AttributeValue().withS(v) - case "long" | "double" => - new AttributeValue().withN(v) - case "boolean" => - new AttributeValue().withBOOL(v.toBoolean) - } - } - - def write( - pageBuilder: PageBuilder, - schema: Schema, - items: Seq[Map[String, AttributeValue]] - ): Long = { - var count = 0 - - items.foreach { item => - schema.getColumns.asScala.foreach { column => - val value = item.get(column.getName) - column.getType match { - case Types.STRING => - convert(column, value, pageBuilder.setString) - case Types.LONG => - convert(column, value, pageBuilder.setLong) - case Types.DOUBLE => - convert(column, value, pageBuilder.setDouble) - case Types.BOOLEAN => - convert(column, value, pageBuilder.setBoolean) - case Types.JSON => - convert(column, value, pageBuilder.setJson) - case _ => /* Do nothing */ - } - } - pageBuilder.addRecord() - count += 1 - } - - count - } - - def convert[A]( - column: Column, - value: Option[AttributeValue], - f: (Column, A) => Unit - )(implicit f1: Option[AttributeValue] => A): Unit = - f(column, f1(value)) - - implicit def StringConvert(value: Option[AttributeValue]): String = - value.map(_.getS).getOrElse("") - - implicit def LongConvert(value: Option[AttributeValue]): Long = - value.map(_.getN).flatMap(Option(_)).map(_.toLong).getOrElse(0L) - - implicit def DoubleConvert(value: Option[AttributeValue]): Double = - value.map(_.getN).flatMap(Option(_)).map(_.toDouble).getOrElse(0d) - - implicit def BooleanConvert(value: Option[AttributeValue]): Boolean = - value.exists(_.getBOOL) - - implicit def JsonConvert(value: Option[AttributeValue]): Value = { - value - .map { attr => - AttributeValueHelper.decodeToValue(attr) - } - .getOrElse(ValueFactory.newNil()) - } -} diff --git a/src/main/scala/org/embulk/input/dynamodb/deprecated/ope/QueryOperation.scala b/src/main/scala/org/embulk/input/dynamodb/deprecated/ope/QueryOperation.scala deleted file mode 100644 index 4dea80d..0000000 --- a/src/main/scala/org/embulk/input/dynamodb/deprecated/ope/QueryOperation.scala +++ /dev/null @@ -1,58 +0,0 @@ -package org.embulk.input.dynamodb.deprecated.ope - -import java.util.{List => JList, Map => JMap} - -import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClient} -import com.amazonaws.services.dynamodbv2.model.{ - AttributeValue, - Condition, - QueryRequest, - QueryResult -} -import org.embulk.input.dynamodb.PluginTask -import org.embulk.spi.{BufferAllocator, PageBuilder, PageOutput, Schema} - -import scala.jdk.CollectionConverters._ - -class QueryOperation(client: AmazonDynamoDB) extends AbstractOperation { - - override def execute( - task: PluginTask, - schema: Schema, - output: PageOutput - ): Unit = { - val allocator: BufferAllocator = task.getBufferAllocator - val pageBuilder: PageBuilder = new PageBuilder(allocator, schema, output) - - val attributes: JList[String] = - schema.getColumns.asScala.map(_.getName).asJava - val conditions: JMap[String, Condition] = createFilters(task).asJava - var evaluateKey: JMap[String, AttributeValue] = null - - val limit: Long = math.max(task.getScanLimit, task.getLimit) - val recordLimit: Long = task.getRecordLimit - var recordCount: Long = 0 - - do { - val batchSize = getLimit(limit, recordLimit, recordCount) - - val request: QueryRequest = new QueryRequest() - .withTableName(task.getTable) - .withAttributesToGet(attributes) - .withKeyConditions(conditions) - .withExclusiveStartKey(evaluateKey) - - if (batchSize > 0) { - request.setLimit(batchSize) - } - - val result: QueryResult = client.query(request) - evaluateKey = result.getLastEvaluatedKey - - val items = result.getItems.asScala.map(_.asScala.toMap).toSeq - recordCount += write(pageBuilder, schema, items) - } while (evaluateKey != null && (recordLimit == 0 || recordLimit > recordCount)) - - pageBuilder.finish() - } -} diff --git a/src/main/scala/org/embulk/input/dynamodb/deprecated/ope/ScanOperation.scala b/src/main/scala/org/embulk/input/dynamodb/deprecated/ope/ScanOperation.scala deleted file mode 100644 index 383004c..0000000 --- a/src/main/scala/org/embulk/input/dynamodb/deprecated/ope/ScanOperation.scala +++ /dev/null @@ -1,58 +0,0 @@ -package org.embulk.input.dynamodb.deprecated.ope - -import java.util.{List => JList, Map => JMap} - -import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClient} -import com.amazonaws.services.dynamodbv2.model.{ - AttributeValue, - Condition, - ScanRequest, - ScanResult -} -import org.embulk.input.dynamodb.PluginTask -import org.embulk.spi.{BufferAllocator, PageBuilder, PageOutput, Schema} - -import scala.jdk.CollectionConverters._ - -class ScanOperation(client: AmazonDynamoDB) extends AbstractOperation { - - override def execute( - task: PluginTask, - schema: Schema, - output: PageOutput - ): Unit = { - val allocator: BufferAllocator = task.getBufferAllocator - val pageBuilder: PageBuilder = new PageBuilder(allocator, schema, output) - - val attributes: JList[String] = - schema.getColumns.asScala.map(_.getName).asJava - val scanFilter: JMap[String, Condition] = createFilters(task).asJava - var evaluateKey: JMap[String, AttributeValue] = null - - val scanLimit: Long = task.getScanLimit - val recordLimit: Long = task.getRecordLimit - var recordCount: Long = 0 - - do { - val batchSize = getLimit(scanLimit, recordLimit, recordCount) - - val request: ScanRequest = new ScanRequest() - .withTableName(task.getTable) - .withAttributesToGet(attributes) - .withScanFilter(scanFilter) - .withExclusiveStartKey(evaluateKey) - - if (batchSize > 0) { - request.setLimit(batchSize) - } - - val result: ScanResult = client.scan(request) - evaluateKey = result.getLastEvaluatedKey - - val items = result.getItems.asScala.map(_.asScala.toMap).toSeq - recordCount += write(pageBuilder, schema, items) - } while (evaluateKey != null && (recordLimit == 0 || recordLimit > recordCount)) - - pageBuilder.finish() - } -} diff --git a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValue.scala b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValue.scala index 8d4449a..2345022 100644 --- a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValue.scala +++ b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValue.scala @@ -5,15 +5,15 @@ import java.nio.charset.StandardCharsets import java.util.{Optional, List => JList, Map => JMap} import com.amazonaws.services.dynamodbv2.model.AttributeValue -import org.embulk.config.{Config, ConfigDefault, Task => EmbulkTask} +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import scala.jdk.CollectionConverters._ import scala.util.chaining._ -/** - * TODO: I want to bind directly `org.embulk.config.Config`` to `com.amazonaws.services.dynamodbv2.model.AttributeValue`. - * Should I implement `com.amazonaws.transform.JsonUnmarshallerContext`? - **/ +/** TODO: I want to bind directly `org.embulk.util.config.Config`` to + * `com.amazonaws.services.dynamodbv2.model.AttributeValue`. Should I implement + * `com.amazonaws.transform.JsonUnmarshallerContext`? + */ object DynamodbAttributeValue { trait Task extends EmbulkTask { diff --git a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValueEmbulkTypeTransformable.scala b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValueEmbulkTypeTransformable.scala index 67c44f1..188e497 100644 --- a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValueEmbulkTypeTransformable.scala +++ b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbAttributeValueEmbulkTypeTransformable.scala @@ -4,11 +4,13 @@ import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import org.embulk.input.dynamodb.logger -import org.embulk.spi.time.{Timestamp, TimestampParser} +import org.embulk.spi.time.Timestamp +import org.embulk.util.timestamp.TimestampFormatter import org.msgpack.value.{Value, ValueFactory} import scala.jdk.CollectionConverters._ import scala.util.chaining._ +import java.time.Instant object DynamodbAttributeValueEmbulkTypeTransformable { @@ -50,7 +52,7 @@ object DynamodbAttributeValueEmbulkTypeTransformable { case class DynamodbAttributeValueEmbulkTypeTransformable( attributeValue: DynamodbAttributeValue, typeEnforcer: Option[DynamodbAttributeValueType] = None, - timestampParser: Option[TimestampParser] = None + timestampFormatter: Option[TimestampFormatter] = None ) { private def fromAttributeValueType: DynamodbAttributeValueType = @@ -142,10 +144,14 @@ case class DynamodbAttributeValueEmbulkTypeTransformable( Option(fromAttributeValueType match { case DynamodbAttributeValueType.S => - if (DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS - .contains(attributeValue.getS)) true - else if (DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS - .contains(attributeValue.getS)) false + if ( + DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS + .contains(attributeValue.getS) + ) true + else if ( + DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS + .contains(attributeValue.getS) + ) false else return None case DynamodbAttributeValueType.N => convertNAsLongOrDouble(attributeValue.getN) match { @@ -154,11 +160,15 @@ case class DynamodbAttributeValueEmbulkTypeTransformable( } case DynamodbAttributeValueType.B => val s = convertBAsString(attributeValue.getB) - if (DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS - .contains(s)) + if ( + DynamodbAttributeValueEmbulkTypeTransformable.TRUTHY_STRINGS + .contains(s) + ) true - else if (DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS - .contains(s)) false + else if ( + DynamodbAttributeValueEmbulkTypeTransformable.FALSY_STRINGS + .contains(s) + ) false else return None case DynamodbAttributeValueType.BOOL => attributeValue.getBOOL case unsupported => @@ -221,9 +231,9 @@ case class DynamodbAttributeValueEmbulkTypeTransformable( if (attributeValue.isNull) return None Option(fromAttributeValueType match { - case DynamodbAttributeValueType.S => attributeValue.getS - case DynamodbAttributeValueType.N => attributeValue.getN - case DynamodbAttributeValueType.B => convertBAsString(attributeValue.getB) + case DynamodbAttributeValueType.S => attributeValue.getS + case DynamodbAttributeValueType.N => attributeValue.getN + case DynamodbAttributeValueType.B => convertBAsString(attributeValue.getB) case DynamodbAttributeValueType.SS => asMessagePack.map(_.toJson).get case DynamodbAttributeValueType.NS => asMessagePack.map(_.toJson).get case DynamodbAttributeValueType.BS => asMessagePack.map(_.toJson).get @@ -238,8 +248,8 @@ case class DynamodbAttributeValueEmbulkTypeTransformable( }) } - def asTimestamp: Option[Timestamp] = { - timestampParser.flatMap(p => asString.map(p.parse)) + def asTimestamp: Option[Instant] = { + timestampFormatter.flatMap(p => asString.map(p.parse)) } } diff --git a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemReader.scala b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemReader.scala index 96dd77f..34081fe 100644 --- a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemReader.scala +++ b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemReader.scala @@ -3,6 +3,7 @@ package org.embulk.input.dynamodb.item import org.embulk.spi.Column import org.embulk.spi.time.Timestamp import org.msgpack.value.Value +import java.time.Instant case class DynamodbItemReader( private val schema: DynamodbItemSchema, @@ -28,7 +29,7 @@ case class DynamodbItemReader( DynamodbAttributeValueEmbulkTypeTransformable( value, typeEnforcer = schema.getAttributeType(name), - timestampParser = schema.getTimestampParser(name) + timestampFormatter = schema.getTimestampFormatter(name) ) } @@ -52,7 +53,7 @@ case class DynamodbItemReader( .get(column.getName) .flatMap(v => getTransformable(column.getName, v).asDouble) - def getTimestamp(column: Column): Option[Timestamp] = + def getTimestamp(column: Column): Option[Instant] = currentItem .get(column.getName) .flatMap(v => getTransformable(column.getName, v).asTimestamp) diff --git a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemSchema.scala b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemSchema.scala index 624ec90..b994ad1 100644 --- a/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemSchema.scala +++ b/src/main/scala/org/embulk/input/dynamodb/item/DynamodbItemSchema.scala @@ -4,10 +4,10 @@ import java.util.{Optional, List => JList} import com.amazonaws.services.dynamodbv2.model.AttributeValue import com.fasterxml.jackson.annotation.{JsonCreator, JsonValue} -import org.embulk.config.{Config, ConfigDefault, Task => EmbulkTask} +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import org.embulk.spi.{Column, PageBuilder, Schema} import org.embulk.spi.`type`.{Type, Types} -import org.embulk.spi.time.TimestampParser +import org.embulk.util.timestamp.TimestampFormatter import scala.jdk.CollectionConverters._ import scala.util.chaining._ @@ -15,9 +15,7 @@ import scala.util.Try object DynamodbItemSchema { - trait ColumnTask - extends EmbulkTask - with TimestampParser.TimestampColumnOption { + trait ColumnTask extends EmbulkTask { @Config("name") def getName: String @@ -28,34 +26,21 @@ object DynamodbItemSchema { @Config("attribute_type") @ConfigDefault("null") def getAttributeType: Optional[String] - } - @deprecated( - message = "for DeprecatedDynamodbInputPlugin", - since = "0.3.0" - ) - case class SchemaConfigCompat(columnTasks: Seq[ColumnTask]) { - @JsonCreator - def this(columnTasks: JList[ColumnTask]) = - this(columnTasks.asScala.toSeq) - - @JsonValue - def getColumnTasks: JList[ColumnTask] = columnTasks.asJava - - def toSchema: Schema = - Schema - .builder() - .tap { b => - columnTasks.foreach { t => - b.add(t.getName, t.getType) - } - } - .build() + @Config("timezone") + @ConfigDefault("null") + def getTimeZoneId: Optional[String] - def isEmpty: Boolean = columnTasks.isEmpty + @Config("format") + @ConfigDefault("null") + def getFormat: Optional[String] + + @Config("date") + @ConfigDefault("null") + def getDate: Optional[String] } - trait Task extends EmbulkTask with TimestampParser.Task { + trait Task extends EmbulkTask { @Config("json_column_name") @ConfigDefault("\"record\"") @@ -63,29 +48,55 @@ object DynamodbItemSchema { @Config("columns") @ConfigDefault("[]") - def getColumns: SchemaConfigCompat + def getColumns: JList[ColumnTask] + + @Config("default_timezone") + @ConfigDefault("\"UTC\"") + def getDefaultTimeZoneId: String + + @Config("default_timestamp_format") + @ConfigDefault("\"%Y-%m-%d %H:%M:%S.%6N %z\"") + def getDefaultTimestampFormat: String + + @Config("default_date") + @ConfigDefault("\"1970-01-01\"") + def getDefaultDate: String } } case class DynamodbItemSchema(task: DynamodbItemSchema.Task) { - // TODO: build in this class after removing SchemaConfigCompat. private lazy val embulkSchema: Schema = - if (!isItemAsJson) task.getColumns.toSchema - else - Schema - .builder() - .add(task.getJsonColumnName, Types.JSON) + Schema + .builder() + .tap { b => + if (isItemAsJson) b.add(task.getJsonColumnName, Types.JSON) + else + task.getColumns.asScala.foreach { t => + b.add(t.getName, t.getType) + } + } + .build() + + private lazy val timestampFormatters: Map[String, TimestampFormatter] = + task.getColumns.asScala.map { columnTask => + columnTask.getName -> TimestampFormatter + .builder( + columnTask.getFormat.orElse(task.getDefaultTimestampFormat), + true + ) + .setDefaultZoneFromString( + columnTask.getTimeZoneId.orElse(task.getDefaultTimeZoneId) + ) + .setDefaultDateFromString( + columnTask.getDate.orElse(task.getDefaultDate) + ) .build() - - private lazy val timestampParsers: Map[String, TimestampParser] = - task.getColumns.columnTasks.map { columnTask => - columnTask.getName -> TimestampParser.of(task, columnTask) }.toMap private lazy val attributeTypes: Map[String, DynamodbAttributeValueType] = - task.getColumns.columnTasks + task.getColumns.asScala .filter(_.getAttributeType.isPresent) .map { columnTask => columnTask.getName -> DynamodbAttributeValueType( @@ -101,11 +112,11 @@ case class DynamodbItemSchema(task: DynamodbItemSchema.Task) { def getEmbulkSchema: Schema = embulkSchema - def getTimestampParser(column: Column): Option[TimestampParser] = - timestampParsers.get(column.getName) + def getTimestampFormatter(column: Column): Option[TimestampFormatter] = + timestampFormatters.get(column.getName) - def getTimestampParser(columnName: String): Option[TimestampParser] = - getEmbulkColumn(columnName).flatMap(getTimestampParser) + def getTimestampFormatter(columnName: String): Option[TimestampFormatter] = + getEmbulkColumn(columnName).flatMap(getTimestampFormatter) def getAttributeType(column: Column): Option[DynamodbAttributeValueType] = attributeTypes.get(column.getName) @@ -121,7 +132,7 @@ case class DynamodbItemSchema(task: DynamodbItemSchema.Task) { def getEmbulkColumn(columnIndex: Int): Option[Column] = Try(getEmbulkSchema.getColumn(columnIndex)).toOption - def isItemAsJson: Boolean = task.getColumns.isEmpty + def isItemAsJson: Boolean = task.getColumns.asScala.isEmpty def visitColumns(visitor: DynamodbItemColumnVisitor): Unit = getEmbulkSchema.visitColumns(visitor) diff --git a/src/main/scala/org/embulk/input/dynamodb/operation/AbstractDynamodbOperation.scala b/src/main/scala/org/embulk/input/dynamodb/operation/AbstractDynamodbOperation.scala index 9a34352..9dcda78 100644 --- a/src/main/scala/org/embulk/input/dynamodb/operation/AbstractDynamodbOperation.scala +++ b/src/main/scala/org/embulk/input/dynamodb/operation/AbstractDynamodbOperation.scala @@ -13,12 +13,8 @@ import com.amazonaws.services.dynamodbv2.model.{ ReturnConsumedCapacity, Select } -import org.embulk.config.{ - Config, - ConfigDefault, - ConfigException, - Task => EmbulkTask -} +import org.embulk.config.ConfigException +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} import org.embulk.input.dynamodb.item.DynamodbAttributeValue import scala.jdk.CollectionConverters._ @@ -159,7 +155,9 @@ abstract class AbstractDynamodbOperation( throw new ConfigException( "\"batch_size\" must be greater than or equal to 1." ) - req.setLimit(JInteger.valueOf(v)) // Note: Use BatchSize for the limit per a request. + req.setLimit( + JInteger.valueOf(v) + ) // Note: Use BatchSize for the limit per a request. } task.getProjectionExpression.ifPresent(req.setProjectionExpression) task.getReturnConsumedCapacity.ifPresent(req.setReturnConsumedCapacity) diff --git a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbOperationProxy.scala b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbOperationProxy.scala index 7b5edb7..8863b27 100644 --- a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbOperationProxy.scala +++ b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbOperationProxy.scala @@ -4,11 +4,12 @@ import java.util.Optional import com.amazonaws.services.dynamodbv2.model.AttributeValue import com.amazonaws.services.dynamodbv2.AmazonDynamoDB -import org.embulk.config.{ - Config, - ConfigDefault, - ConfigException, - Task => EmbulkTask +import org.embulk.config.ConfigException +import org.embulk.util.config.{Config, ConfigDefault, Task => EmbulkTask} +import org.embulk.input.dynamodb.operation.{ + DynamodbQueryOperation, + DynamodbScanOperation, + EmbulkDynamodbOperation } object DynamodbOperationProxy { diff --git a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala index 3667880..b4c99b3 100644 --- a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala +++ b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbQueryOperation.scala @@ -2,7 +2,7 @@ package org.embulk.input.dynamodb.operation import com.amazonaws.services.dynamodbv2.model.{AttributeValue, QueryRequest} import com.amazonaws.services.dynamodbv2.AmazonDynamoDB -import org.embulk.config.{Config, ConfigDefault} +import org.embulk.util.config.{Config, ConfigDefault} import org.embulk.input.dynamodb.logger import scala.jdk.CollectionConverters._ diff --git a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala index ca57916..cc76d1d 100644 --- a/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala +++ b/src/main/scala/org/embulk/input/dynamodb/operation/DynamodbScanOperation.scala @@ -4,7 +4,8 @@ import java.util.Optional import com.amazonaws.services.dynamodbv2.model.{AttributeValue, ScanRequest} import com.amazonaws.services.dynamodbv2.AmazonDynamoDB -import org.embulk.config.{Config, ConfigDefault, ConfigException} +import org.embulk.config.ConfigException +import org.embulk.util.config.{Config, ConfigDefault} import org.embulk.input.dynamodb.logger import scala.jdk.CollectionConverters._ diff --git a/src/test/scala/org/embulk/input/dynamodb/AttributeValueHelperTest.scala b/src/test/scala/org/embulk/input/dynamodb/AttributeValueHelperTest.scala deleted file mode 100644 index 75a6ba8..0000000 --- a/src/test/scala/org/embulk/input/dynamodb/AttributeValueHelperTest.scala +++ /dev/null @@ -1,342 +0,0 @@ -package org.embulk.input.dynamodb - -import java.io.File -import java.{util => JUtil} - -import com.amazonaws.services.dynamodbv2.model.AttributeValue -import com.fasterxml.jackson.databind.ObjectMapper -import org.embulk.input.dynamodb.deprecated.AttributeValueHelper._ -import org.hamcrest.CoreMatchers._ -import org.hamcrest.MatcherAssert.assertThat -import org.junit.Assert._ -import org.junit.Test -import org.msgpack.value.ValueFactory - -import scala.jdk.CollectionConverters._ - -class AttributeValueHelperTest { - - @Test - def decodeTest(): Unit = { - val stringValue = decodeToValue(new AttributeValue().withS("STR")) - assertEquals(stringValue.asStringValue.asString, "STR") - - val intValue = decodeToValue(new AttributeValue().withN("123456789")) - assertEquals(intValue.asNumberValue().toInt, 123456789) - - val doubleValue = decodeToValue( - new AttributeValue().withN("-98765432.00000001") - ) - assertEquals(doubleValue.asNumberValue().toDouble, -98765432.00000001, 0.0) - - val trueValue = decodeToValue(new AttributeValue().withBOOL(true)) - assertEquals(trueValue.asBooleanValue().getBoolean, true) - - val falseValue = decodeToValue(new AttributeValue().withBOOL(false)) - assertEquals(falseValue.asBooleanValue().getBoolean, false) - - val nilValue = decodeToValue(new AttributeValue().withNULL(true)) - assertEquals(nilValue.isNilValue, true) - } - - @Test - def listDecodeTest(): Unit = { - val stringListValue = decodeToValue( - new AttributeValue().withL( - new AttributeValue().withS("ValueA"), - new AttributeValue().withS("ValueB"), - new AttributeValue().withS("ValueC") - ) - ) - - assertTrue(stringListValue.isArrayValue) - assertEquals(stringListValue.asArrayValue().size(), 3) - - assertTrue(stringListValue.asArrayValue().get(0).isStringValue) - assertEquals( - stringListValue.asArrayValue().get(0).asStringValue().asString(), - "ValueA" - ) - assertEquals( - stringListValue.asArrayValue().get(1).asStringValue().asString(), - "ValueB" - ) - assertEquals( - stringListValue.asArrayValue().get(2).asStringValue().asString(), - "ValueC" - ) - - val numberListValue = decodeToValue( - new AttributeValue().withL( - new AttributeValue().withN("123"), - new AttributeValue().withN("-456"), - new AttributeValue().withN("0.0000045679"), - new AttributeValue().withN("-1234567890.123") - ) - ) - - assertTrue(numberListValue.isArrayValue) - assertEquals(numberListValue.asArrayValue().size(), 4) - - assertTrue(numberListValue.asArrayValue().get(0).isIntegerValue) - assertEquals( - numberListValue.asArrayValue().get(0).asNumberValue().toInt, - 123 - ) - assertEquals( - numberListValue.asArrayValue().get(1).asNumberValue().toInt, - -456 - ) - - assertTrue(numberListValue.asArrayValue().get(2).isFloatValue) - assertEquals( - numberListValue.asArrayValue().get(2).asNumberValue().toDouble, - 0.0000045679, - 0.0 - ) - assertEquals( - numberListValue.asArrayValue().get(3).asNumberValue().toDouble, - -1234567890.123, - 0.0 - ) - - val stringSetValue = decodeToValue( - new AttributeValue().withSS(new JUtil.HashSet[String]() { - add("ValueA") - add("ValueB") - add("ValueC") - }) - ) - - assertTrue(stringSetValue.isArrayValue) - assertEquals(stringSetValue.asArrayValue().size(), 3) - - assertThat( - List("ValueA", "ValueB", "ValueC").asJava, - hasItems( - equalTo(stringSetValue.asArrayValue().get(0).asStringValue().asString), - equalTo(stringSetValue.asArrayValue().get(1).asStringValue().asString), - equalTo(stringSetValue.asArrayValue().get(2).asStringValue().asString) - ) - ) - - val numberSetValue = decodeToValue( - new AttributeValue().withNS(new JUtil.HashSet[String]() { - add("123") - add("-456") - add("0.0000045679") - add("-1234567890.123") - }) - ) - - assertTrue(numberSetValue.isArrayValue) - assertEquals(numberSetValue.asArrayValue().size(), 4) - } - - @Test - def mapDecodeTest(): Unit = { - val stringMap = decodeToValue( - new AttributeValue().withM(new JUtil.HashMap[String, AttributeValue]() { - put("KeyA", new AttributeValue().withS("ValueA")) - put("KeyB", new AttributeValue().withS("ValueB")) - put("KeyC", new AttributeValue().withS("ValueC")) - }) - ) - - assertTrue(stringMap.isMapValue) - assertEquals(stringMap.asMapValue().size(), 3) - assertEquals( - stringMap - .asMapValue() - .map() - .get(ValueFactory.newString("KeyA")) - .asStringValue() - .asString(), - "ValueA" - ) - assertEquals( - stringMap - .asMapValue() - .map() - .get(ValueFactory.newString("KeyB")) - .asStringValue() - .asString(), - "ValueB" - ) - assertEquals( - stringMap - .asMapValue() - .map() - .get(ValueFactory.newString("KeyC")) - .asStringValue() - .asString(), - "ValueC" - ) - - val numberMap = decodeToValue( - new AttributeValue().withM(new JUtil.HashMap[String, AttributeValue]() { - put("KeyA", new AttributeValue().withN("123")) - put("KeyB", new AttributeValue().withN("-456")) - put("KeyC", new AttributeValue().withN("0.0000045679")) - put("KeyD", new AttributeValue().withN("-1234567890.123")) - }) - ) - - assertTrue(numberMap.isMapValue) - assertEquals(numberMap.asMapValue().size(), 4) - - assertTrue( - numberMap - .asMapValue() - .map() - .get(ValueFactory.newString("KeyA")) - .isIntegerValue - ) - assertEquals( - numberMap - .asMapValue() - .map() - .get(ValueFactory.newString("KeyA")) - .asNumberValue() - .toInt, - 123 - ) - assertEquals( - numberMap - .asMapValue() - .map() - .get(ValueFactory.newString("KeyB")) - .asNumberValue() - .toInt, - -456 - ) - - assertTrue( - numberMap - .asMapValue() - .map() - .get(ValueFactory.newString("KeyC")) - .isFloatValue - ) - assertEquals( - numberMap - .asMapValue() - .map() - .get(ValueFactory.newString("KeyC")) - .asFloatValue() - .toDouble, - 0.0000045679, - 0.0 - ) - assertEquals( - numberMap - .asMapValue() - .map() - .get(ValueFactory.newString("KeyD")) - .asFloatValue() - .toDouble, - -1234567890.123, - 0.0 - ) - } - - def attr[A](value: A)(implicit f: A => AttributeValue): AttributeValue = - f(value) - - implicit def StringAttributeValue(value: String): AttributeValue = - new AttributeValue().withS(value) - - implicit def IntegerAttributeValue(value: Int): AttributeValue = - new AttributeValue().withN(value.toString) - - implicit def LongAttributeValue(value: Long): AttributeValue = - new AttributeValue().withN(value.toString) - - implicit def FloatAttributeValue(value: Float): AttributeValue = - new AttributeValue().withN(value.toString) - - implicit def DoubleAttributeValue(value: Double): AttributeValue = - new AttributeValue().withN(value.toString) - - implicit def BooleanAttributeValue(value: Boolean): AttributeValue = - new AttributeValue().withBOOL(value) - - implicit def MapAttributeValue( - value: Map[String, AttributeValue] - ): AttributeValue = new AttributeValue().withM(value.asJava) - - implicit def ListAttributeValue(value: List[AttributeValue]): AttributeValue = - new AttributeValue().withL(value.asJava) - - @Test - def nestedDecodeTest(): Unit = { - // TODO: Json -> AttributeValue... - val testData = decodeToValue( - attr( - Map( - "_id" -> attr("56d8e1377a72374918f73bd2"), - "index" -> attr(0), - "guid" -> attr("5309640c-499a-43f6-801d-3076c810892b"), - "isActive" -> attr(true), - "age" -> attr(37), - "name" -> attr("Battle Lancaster"), - "email" -> attr("battlelancaster@zytrac.com"), - "registered" -> attr("2014-07-16T04:40:58 -09:00"), - "latitude" -> attr(45.574906), - "longitude" -> attr(36.596302), - "tags" -> attr( - List( - attr("veniam"), - attr("exercitation"), - attr("velit"), - attr("pariatur"), - attr("sit"), - attr("non"), - attr("dolore") - ) - ), - "friends" -> attr( - List( - attr( - Map( - "id" -> attr(0), - "name" -> attr("Mejia Montgomery"), - "tags" -> attr( - List(attr("duis"), attr("proident"), attr("et")) - ) - ) - ), - attr( - Map( - "id" -> attr(1), - "name" -> attr("Carpenter Reed"), - "tags" -> attr( - List(attr("labore"), attr("nisi"), attr("ipsum")) - ) - ) - ), - attr( - Map( - "id" -> attr(2), - "name" -> attr("Gamble Watts"), - "tags" -> attr( - List(attr("occaecat"), attr("voluptate"), attr("eu")) - ) - ) - ) - ) - ) - ) - ) - ) - - val testA = new ObjectMapper() - .readValue(testData.toJson, classOf[JUtil.Map[String, Any]]) - val testB = new ObjectMapper().readValue( - new File("src/test/resources/json/test.json"), - classOf[JUtil.Map[String, Any]] - ) - - assertThat(testA, is(testB)) - } -} diff --git a/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala b/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala index b9110b0..6bea047 100644 --- a/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala +++ b/src/test/scala/org/embulk/input/dynamodb/AwsCredentialsTest.scala @@ -1,7 +1,7 @@ package org.embulk.input.dynamodb import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException -import org.embulk.config.{ConfigException, ConfigSource} +import org.embulk.config.{ConfigSource, ConfigException} import org.embulk.input.dynamodb.aws.AwsCredentials import org.embulk.input.dynamodb.testutil.EmbulkTestBase import org.hamcrest.CoreMatchers._ @@ -36,7 +36,7 @@ class AwsCredentialsTest extends EmbulkTestBase { ) def doTest(inConfig: ConfigSource): Unit = { - val task: PluginTask = inConfig.loadConfig(classOf[PluginTask]) + val task: PluginTask = PluginTask.load(inConfig) val provider = AwsCredentials(task).createAwsCredentialsProvider val cred = provider.getCredentials assertThat(cred.getAWSAccessKeyId, notNullValue()) @@ -56,17 +56,6 @@ class AwsCredentialsTest extends EmbulkTestBase { |""".stripMargin) } - @deprecated(since = "0.3.0") - @Test - def notSetAuthMethod_SetCredentials_deprecated(): Unit = - if (runAwsCredentialsTest) { - val inConfig: ConfigSource = defaultInConfig - .set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY) - .set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) - - doTest(inConfig) - } - @Test def notSetAuthMethod_SetCredentials(): Unit = if (runAwsCredentialsTest) { val inConfig: ConfigSource = defaultInConfig @@ -76,17 +65,6 @@ class AwsCredentialsTest extends EmbulkTestBase { doTest(inConfig) } - @deprecated(since = "0.3.0") - @Test - def setAuthMethod_Basic_deprecated(): Unit = if (runAwsCredentialsTest) { - val inConfig: ConfigSource = defaultInConfig - .set("auth_method", "basic") - .set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY) - .set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) - - doTest(inConfig) - } - @Test def setAuthMethod_Basic(): Unit = if (runAwsCredentialsTest) { val inConfig: ConfigSource = defaultInConfig @@ -97,43 +75,17 @@ class AwsCredentialsTest extends EmbulkTestBase { doTest(inConfig) } - @deprecated(since = "0.3.0") - @Test - def throwIfSetAccessKeyAndAccessKeyId(): Unit = if (runAwsCredentialsTest) { - val inConfig: ConfigSource = defaultInConfig - .set("auth_method", "basic") - .set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY) - .set("access_key_id", EMBULK_DYNAMODB_TEST_ACCESS_KEY) - .set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) - - Assert.assertThrows(classOf[ConfigException], () => { - doTest(inConfig) - }) - } - - @deprecated(since = "0.3.0") - @Test - def throwIfSetSecretKeyAndSecretAccessKeyId(): Unit = - if (runAwsCredentialsTest) { - val inConfig: ConfigSource = defaultInConfig - .set("auth_method", "basic") - .set("access_key", EMBULK_DYNAMODB_TEST_ACCESS_KEY) - .set("secret_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) - .set("secret_access_key", EMBULK_DYNAMODB_TEST_SECRET_KEY) - - Assert.assertThrows(classOf[ConfigException], () => { - doTest(inConfig) - }) - } - @Test def setAuthMethod_Basic_NotSet(): Unit = { val inConfig: ConfigSource = defaultInConfig .set("auth_method", "basic") - Assert.assertThrows(classOf[ConfigException], () => { - doTest(inConfig) - }) + Assert.assertThrows( + classOf[ConfigException], + () => { + doTest(inConfig) + } + ) } @Test @@ -161,9 +113,12 @@ class AwsCredentialsTest extends EmbulkTestBase { .set("auth_method", "profile") .set("profile_name", "DO_NOT_EXIST") - Assert.assertThrows(classOf[IllegalArgumentException], () => { - doTest(inConfig) - }) + Assert.assertThrows( + classOf[IllegalArgumentException], + () => { + doTest(inConfig) + } + ) } @Test @@ -184,8 +139,11 @@ class AwsCredentialsTest extends EmbulkTestBase { .set("role_arn", "DO_NOT_EXIST") .set("role_session_name", "dummy") - Assert.assertThrows(classOf[AWSSecurityTokenServiceException], () => { - doTest(inConfig) - }) + Assert.assertThrows( + classOf[AWSSecurityTokenServiceException], + () => { + doTest(inConfig) + } + ) } } diff --git a/src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala b/src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala index 3452a10..6005112 100644 --- a/src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala +++ b/src/test/scala/org/embulk/input/dynamodb/DynamodbOperationTest.scala @@ -105,9 +105,12 @@ class DynamodbOperationTest extends EmbulkTestBase { |scan: | limit: 1 |""".stripMargin) - runInput(inScanConfig, { result => - assert(result.size.equals(1)) - }) + runInput( + inScanConfig, + { result => + assert(result.size.equals(1)) + } + ) val inQueryConfig: ConfigSource = loadConfigSourceFromYamlString(s""" |type: dynamodb @@ -124,8 +127,11 @@ class DynamodbOperationTest extends EmbulkTestBase { | ":v": {S: a} | limit: 1 |""".stripMargin) - runInput(inQueryConfig, { result => - assert(result.size.equals(1)) - }) + runInput( + inQueryConfig, + { result => + assert(result.size.equals(1)) + } + ) } } diff --git a/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationBackwardCompatibilityTest.scala b/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationBackwardCompatibilityTest.scala index d911911..d314c0c 100644 --- a/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationBackwardCompatibilityTest.scala +++ b/src/test/scala/org/embulk/input/dynamodb/DynamodbQueryOperationBackwardCompatibilityTest.scala @@ -88,7 +88,8 @@ class DynamodbQueryOperationBackwardCompatibilityTest extends EmbulkTestBase { } runInput( - embulkInConfig, { result: Seq[Seq[AnyRef]] => + embulkInConfig, + { result: Seq[Seq[AnyRef]] => val head = result.head assertThat(head(0).toString, is("key-1")) assertThat(head(1).asInstanceOf[Long], is(0L)) @@ -128,54 +129,4 @@ class DynamodbQueryOperationBackwardCompatibilityTest extends EmbulkTestBase { ) } - @Test - def deprecatedQueryOperationTest(): Unit = { - val inConfig: ConfigSource = loadConfigSourceFromYamlString(s""" - |type: dynamodb - |end_point: http://${dynamoDBHost}:${dynamoDBPort}/ - |table: EMBULK_DYNAMODB_TEST_TABLE - |auth_method: basic - |access_key: dummy - |secret_key: dummy - |operation: query - |filters: - | - {name: pri-key, type: string, condition: EQ, value: key-1} - |columns: - | - {name: pri-key, type: string} - | - {name: sort-key, type: long} - | - {name: doubleValue, type: double} - | - {name: boolValue, type: boolean} - | - {name: listValue, type: json} - | - {name: mapValue, type: json} - |""".stripMargin) - - testBackwardCompatibility(inConfig) - } - - @Test - def keepTheSameBehaviourAsDeprecatedQueryOperationTest(): Unit = { - val inConfig: ConfigSource = loadConfigSourceFromYamlString(s""" - |type: dynamodb - |endpoint: http://${dynamoDBHost}:${dynamoDBPort}/ - |table: EMBULK_DYNAMODB_TEST_TABLE - |auth_method: basic - |access_key: dummy - |secret_key: dummy - |query: - | key_condition_expression: "#x = :v" - | expression_attribute_names: - | "#x": pri-key - | expression_attribute_values: - | ":v": {S: key-1} - |columns: - | - {name: pri-key, type: string} - | - {name: sort-key, type: long} - | - {name: doubleValue, type: double} - | - {name: boolValue, type: boolean} - | - {name: listValue, type: json} - | - {name: mapValue, type: json} - |""".stripMargin) - - testBackwardCompatibility(inConfig) - } } diff --git a/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationBackwardCompatibilityTest.scala b/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationBackwardCompatibilityTest.scala index a248f67..a007329 100644 --- a/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationBackwardCompatibilityTest.scala +++ b/src/test/scala/org/embulk/input/dynamodb/DynamodbScanOperationBackwardCompatibilityTest.scala @@ -88,7 +88,8 @@ class DynamodbScanOperationBackwardCompatibilityTest extends EmbulkTestBase { } runInput( - embulkInConfig, { result: Seq[Seq[AnyRef]] => + embulkInConfig, + { result: Seq[Seq[AnyRef]] => val head = result.head assertThat(head(0).toString, is("key-1")) assertThat(head(1).asInstanceOf[Long], is(0L)) @@ -128,48 +129,4 @@ class DynamodbScanOperationBackwardCompatibilityTest extends EmbulkTestBase { ) } - @Test - def deprecatedScanOperationTest(): Unit = { - - val inConfig: ConfigSource = loadConfigSourceFromYamlString(s""" - |type: dynamodb - |end_point: http://${dynamoDBHost}:${dynamoDBPort}/ - |table: EMBULK_DYNAMODB_TEST_TABLE - |auth_method: basic - |access_key: dummy - |secret_key: dummy - |operation: scan - |columns: - | - {name: pri-key, type: string} - | - {name: sort-key, type: long} - | - {name: doubleValue, type: double} - | - {name: boolValue, type: boolean} - | - {name: listValue, type: json} - | - {name: mapValue, type: json} - |""".stripMargin) - - testBackwardCompatibility(inConfig) - } - - @Test - def keepTheSameBehaviourAsDeprecatedScanOperationTest(): Unit = { - val inConfig: ConfigSource = loadConfigSourceFromYamlString(s""" - |type: dynamodb - |endpoint: http://${dynamoDBHost}:${dynamoDBPort}/ - |table: EMBULK_DYNAMODB_TEST_TABLE - |auth_method: basic - |access_key: dummy - |secret_key: dummy - |scan: {} - |columns: - | - {name: pri-key, type: string} - | - {name: sort-key, type: long} - | - {name: doubleValue, type: double} - | - {name: boolValue, type: boolean} - | - {name: listValue, type: json} - | - {name: mapValue, type: json} - |""".stripMargin) - - testBackwardCompatibility(inConfig) - } } diff --git a/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala b/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala index 0355666..a45e031 100644 --- a/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala +++ b/src/test/scala/org/embulk/input/dynamodb/testutil/EmbulkTestBase.scala @@ -75,22 +75,21 @@ trait EmbulkTestBase { } def runInput(inConfig: ConfigSource, test: Seq[Seq[AnyRef]] => Unit): Unit = { - runtime.getInstance(classOf[DynamodbInputPlugin]).tap { plugin => - plugin.transaction( - inConfig, - (taskSource: TaskSource, schema: Schema, taskCount: Int) => { - val output: MockPageOutput = new MockPageOutput() - val reports: Seq[TaskReport] = 0.until(taskCount).map { taskIndex => - plugin.run(taskSource, schema, taskIndex, output) - } - output.finish() + val plugin = new DynamodbInputPlugin() + plugin.transaction( + inConfig, + (taskSource: TaskSource, schema: Schema, taskCount: Int) => { + val output: MockPageOutput = new MockPageOutput() + val reports: Seq[TaskReport] = 0.until(taskCount).map { taskIndex => + plugin.run(taskSource, schema, taskIndex, output) + } + output.finish() - test(Pages.toObjects(schema, output.pages).asScala.toSeq.map(_.toSeq)) + test(Pages.toObjects(schema, output.pages).asScala.toSeq.map(_.toSeq)) - reports.asJava - } - ) - } + reports.asJava + } + ) } def loadConfigSourceFromYamlString(yaml: String): ConfigSource = {