Skip to content

Commit

Permalink
Merge pull request #334 from trocco-io/add_max_s3_upload_threads_per_…
Browse files Browse the repository at this point in the history
…task_for_redshift

Added max_s3_upload_threads_per_task option to Redshift
  • Loading branch information
dmikurube authored Feb 22, 2024
2 parents 6aba538 + 9121b60 commit feae8a0
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
1 change: 1 addition & 0 deletions embulk-output-redshift/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Redshift output plugin for Embulk loads records to Redshift.
- **s3_bucket**: S3 bucket name for temporary files
- **s3_key_prefix**: S3 key prefix for temporary files (string, default: "")
- **delete_s3_temp_file**: whether to delete temporary files uploaded on S3 (boolean, default: true)
- **max_s3_upload_threads_per_task**: The maximum number of threads per task which upload and copy data to Redshift via S3 (integer, optional). For example, if this option is 5 and the number of tasks is 8, 40 threads are created. If this option is increased, it may shorten the transfer time, but cause too many connections error. If this option is not specified, create as many new threads as needed, by default.
- **copy_iam_role_name**: IAM Role for COPY credential(https://docs.aws.amazon.com/redshift/latest/dg/copy-usage_notes-access-permissions.html), if this is set, IAM Role is used instead of aws access key and aws secret access key(string, optional)
- **copy_aws_account_id**: IAM Role's account ID for multi account COPY. If this is set, the ID is used instead of authenticated user's account ID. This is enabled only if copy_iam_role_name is set.(string, optional)
- **options**: extra connection properties (hash, default: {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public interface RedshiftPluginTask extends AwsCredentialsTaskWithPrefix, Plugin
@ConfigDefault("true")
public boolean getDeleteS3TempFile();

@Config("max_s3_upload_threads_per_task")
@ConfigDefault("null")
public Optional<Integer> getMaxS3UploadThreadsPerTask();

@Config("ssl")
@ConfigDefault("\"disable\"")
public Ssl getSsl();
Expand Down Expand Up @@ -205,6 +209,8 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
RedshiftPluginTask t = (RedshiftPluginTask) task;
setAWSCredentialsBackwardCompatibility(t);
return new RedshiftCopyBatchInsert(getConnector(task, true),
getAWSCredentialsProvider(t), t.getS3Bucket(), t.getS3KeyPrefix(), t.getIamUserName(), t.getDeleteS3TempFile(), t.getCopyIamRoleName().orElse(null), t.getCopyAwsAccountId().orElse(null));
getAWSCredentialsProvider(t), t.getS3Bucket(), t.getS3KeyPrefix(),
t.getIamUserName(), t.getDeleteS3TempFile(), t.getMaxS3UploadThreadsPerTask().orElse(null),
t.getCopyIamRoleName().orElse(null), t.getCopyAwsAccountId().orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public class RedshiftCopyBatchInsert

public RedshiftCopyBatchInsert(JdbcOutputConnector connector,
AWSCredentialsProvider credentialsProvider, String s3BucketName, String s3KeyPrefix,
String iamReaderUserName, boolean deleteS3TempFile, String copyIamRoleName, String copyAwsAccountId) throws IOException, SQLException
String iamReaderUserName, boolean deleteS3TempFile, Integer maxS3UploadThreadsPerTask,
String copyIamRoleName, String copyAwsAccountId) throws IOException, SQLException
{
super();
this.connector = connector;
Expand All @@ -82,7 +83,9 @@ public RedshiftCopyBatchInsert(JdbcOutputConnector connector,
this.credentialsProvider = credentialsProvider;
this.s3 = new AmazonS3Client(credentialsProvider); // TODO options
this.sts = new AWSSecurityTokenServiceClient(credentialsProvider); // options
this.executorService = Executors.newCachedThreadPool();
this.executorService = maxS3UploadThreadsPerTask != null
? Executors.newFixedThreadPool(maxS3UploadThreadsPerTask)
: Executors.newCachedThreadPool();
this.uploadAndCopyFutures = new ArrayList<Future<Void>>();

String s3RegionName = null;
Expand Down

0 comments on commit feae8a0

Please sign in to comment.