diff --git a/embulk-output-redshift/README.md b/embulk-output-redshift/README.md index 70db3cd3..d8a0d584 100644 --- a/embulk-output-redshift/README.md +++ b/embulk-output-redshift/README.md @@ -82,6 +82,7 @@ Redshift output plugin for Embulk loads records to Redshift. - **s3_bucket**: S3 bucket name for temporary files - **s3_key_prefix**: S3 key prefix for temporary files (string, default: "") - **delete_s3_temp_file**: whether to delete temporary files uploaded on S3 (boolean, default: true) +- **max_s3_upload_threads_per_task**: The maximum number of threads per task which upload and copy data to Redshift via S3 (integer, optional). For example, if this option is 5 and the number of tasks is 8, 40 threads are created. If this option is increased, it may shorten the transfer time, but cause too many connections error. If this option is not specified, create as many new threads as needed, by default. - **copy_iam_role_name**: IAM Role for COPY credential(https://docs.aws.amazon.com/redshift/latest/dg/copy-usage_notes-access-permissions.html), if this is set, IAM Role is used instead of aws access key and aws secret access key(string, optional) - **copy_aws_account_id**: IAM Role's account ID for multi account COPY. If this is set, the ID is used instead of authenticated user's account ID. This is enabled only if copy_iam_role_name is set.(string, optional) - **options**: extra connection properties (hash, default: {}) diff --git a/embulk-output-redshift/src/main/java/org/embulk/output/RedshiftOutputPlugin.java b/embulk-output-redshift/src/main/java/org/embulk/output/RedshiftOutputPlugin.java index 1d018869..08bd775a 100644 --- a/embulk-output-redshift/src/main/java/org/embulk/output/RedshiftOutputPlugin.java +++ b/embulk-output-redshift/src/main/java/org/embulk/output/RedshiftOutputPlugin.java @@ -83,6 +83,10 @@ public interface RedshiftPluginTask extends AwsCredentialsTaskWithPrefix, Plugin @ConfigDefault("true") public boolean getDeleteS3TempFile(); + @Config("max_s3_upload_threads_per_task") + @ConfigDefault("null") + public Optional getMaxS3UploadThreadsPerTask(); + @Config("ssl") @ConfigDefault("\"disable\"") public Ssl getSsl(); @@ -205,6 +209,8 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional merg RedshiftPluginTask t = (RedshiftPluginTask) task; setAWSCredentialsBackwardCompatibility(t); return new RedshiftCopyBatchInsert(getConnector(task, true), - getAWSCredentialsProvider(t), t.getS3Bucket(), t.getS3KeyPrefix(), t.getIamUserName(), t.getDeleteS3TempFile(), t.getCopyIamRoleName().orElse(null), t.getCopyAwsAccountId().orElse(null)); + getAWSCredentialsProvider(t), t.getS3Bucket(), t.getS3KeyPrefix(), + t.getIamUserName(), t.getDeleteS3TempFile(), t.getMaxS3UploadThreadsPerTask().orElse(null), + t.getCopyIamRoleName().orElse(null), t.getCopyAwsAccountId().orElse(null)); } } diff --git a/embulk-output-redshift/src/main/java/org/embulk/output/redshift/RedshiftCopyBatchInsert.java b/embulk-output-redshift/src/main/java/org/embulk/output/redshift/RedshiftCopyBatchInsert.java index 5b160bf6..5bd3d61b 100644 --- a/embulk-output-redshift/src/main/java/org/embulk/output/redshift/RedshiftCopyBatchInsert.java +++ b/embulk-output-redshift/src/main/java/org/embulk/output/redshift/RedshiftCopyBatchInsert.java @@ -67,7 +67,8 @@ public class RedshiftCopyBatchInsert public RedshiftCopyBatchInsert(JdbcOutputConnector connector, AWSCredentialsProvider credentialsProvider, String s3BucketName, String s3KeyPrefix, - String iamReaderUserName, boolean deleteS3TempFile, String copyIamRoleName, String copyAwsAccountId) throws IOException, SQLException + String iamReaderUserName, boolean deleteS3TempFile, Integer maxS3UploadThreadsPerTask, + String copyIamRoleName, String copyAwsAccountId) throws IOException, SQLException { super(); this.connector = connector; @@ -82,7 +83,9 @@ public RedshiftCopyBatchInsert(JdbcOutputConnector connector, this.credentialsProvider = credentialsProvider; this.s3 = new AmazonS3Client(credentialsProvider); // TODO options this.sts = new AWSSecurityTokenServiceClient(credentialsProvider); // options - this.executorService = Executors.newCachedThreadPool(); + this.executorService = maxS3UploadThreadsPerTask != null + ? Executors.newFixedThreadPool(maxS3UploadThreadsPerTask) + : Executors.newCachedThreadPool(); this.uploadAndCopyFutures = new ArrayList>(); String s3RegionName = null;