Skip to content

Commit

Permalink
Added max_s3_upload_threads_per_task option to Redshift
Browse files Browse the repository at this point in the history
  • Loading branch information
t3t5u committed Jan 17, 2024
1 parent 6aba538 commit 24cd4e4
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
1 change: 1 addition & 0 deletions embulk-output-redshift/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Redshift output plugin for Embulk loads records to Redshift.
- **s3_bucket**: S3 bucket name for temporary files
- **s3_key_prefix**: S3 key prefix for temporary files (string, default: "")
- **delete_s3_temp_file**: whether to delete temporary files uploaded on S3 (boolean, default: true)
- **max_s3_upload_threads_per_task** The maximum number of threads per task which upload and copy data to Redshift via S3 (integer, optional). For example, if this option is 5 and the number of tasks is 8, 40 threads are created. If this option is increased, it may shorten the transfer time, but cause too many connections error. If this option is not specified, create as many new threads as needed, by default.
- **copy_iam_role_name**: IAM Role for COPY credential(https://docs.aws.amazon.com/redshift/latest/dg/copy-usage_notes-access-permissions.html), if this is set, IAM Role is used instead of aws access key and aws secret access key(string, optional)
- **copy_aws_account_id**: IAM Role's account ID for multi account COPY. If this is set, the ID is used instead of authenticated user's account ID. This is enabled only if copy_iam_role_name is set.(string, optional)
- **options**: extra connection properties (hash, default: {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public interface RedshiftPluginTask extends AwsCredentialsTaskWithPrefix, Plugin
@ConfigDefault("true")
public boolean getDeleteS3TempFile();

@Config("max_s3_upload_threads_per_task")
@ConfigDefault("null")
public Optional<Integer> getMaxS3UploadThreadsPerTask();

@Config("ssl")
@ConfigDefault("\"disable\"")
public Ssl getSsl();
Expand Down Expand Up @@ -205,6 +209,8 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
RedshiftPluginTask t = (RedshiftPluginTask) task;
setAWSCredentialsBackwardCompatibility(t);
return new RedshiftCopyBatchInsert(getConnector(task, true),
getAWSCredentialsProvider(t), t.getS3Bucket(), t.getS3KeyPrefix(), t.getIamUserName(), t.getDeleteS3TempFile(), t.getCopyIamRoleName().orElse(null), t.getCopyAwsAccountId().orElse(null));
getAWSCredentialsProvider(t), t.getS3Bucket(), t.getS3KeyPrefix(),
t.getIamUserName(), t.getDeleteS3TempFile(), t.getMaxS3UploadThreadsPerTask().orElse(null),
t.getCopyIamRoleName().orElse(null), t.getCopyAwsAccountId().orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public class RedshiftCopyBatchInsert

public RedshiftCopyBatchInsert(JdbcOutputConnector connector,
AWSCredentialsProvider credentialsProvider, String s3BucketName, String s3KeyPrefix,
String iamReaderUserName, boolean deleteS3TempFile, String copyIamRoleName, String copyAwsAccountId) throws IOException, SQLException
String iamReaderUserName, boolean deleteS3TempFile, Integer maxS3UploadThreadsPerTask,
String copyIamRoleName, String copyAwsAccountId) throws IOException, SQLException
{
super();
this.connector = connector;
Expand All @@ -82,7 +83,9 @@ public RedshiftCopyBatchInsert(JdbcOutputConnector connector,
this.credentialsProvider = credentialsProvider;
this.s3 = new AmazonS3Client(credentialsProvider); // TODO options
this.sts = new AWSSecurityTokenServiceClient(credentialsProvider); // options
this.executorService = Executors.newCachedThreadPool();
this.executorService = maxS3UploadThreadsPerTask != null
? Executors.newFixedThreadPool(maxS3UploadThreadsPerTask)
: Executors.newCachedThreadPool();
this.uploadAndCopyFutures = new ArrayList<Future<Void>>();

String s3RegionName = null;
Expand Down

0 comments on commit 24cd4e4

Please sign in to comment.