Skip to content

Commit

Permalink
Fix failures in S3BlobStoreRepositoryTests
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Dec 16, 2024
1 parent 1b4270c commit 1c58299
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.repositories.RepositoryStats;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.repositories.s3.utils.AwsRequestSigner;
import org.opensearch.snapshots.mockstore.BlobStoreWrapper;
import org.opensearch.test.BackgroundIndexer;
Expand Down Expand Up @@ -95,6 +96,7 @@ public class S3BlobStoreRepositoryTests extends OpenSearchMockAPIBasedRepository
public void setUp() throws Exception {
signerOverride = AwsRequestSigner.VERSION_FOUR_SIGNER.getName();
previousOpenSearchPathConf = SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", "config"));
logger.info("previousOpenSearchPathConf={}", previousOpenSearchPathConf);
super.setUp();
}

Expand Down Expand Up @@ -252,22 +254,27 @@ protected S3Repository createRepository(
ClusterService clusterService,
RecoverySettings recoverySettings
) {
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);

AsyncTransferManager asyncUploadUtils = new AsyncTransferManager(
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(),
normalExecutorBuilder.getStreamReader(),
priorityExecutorBuilder.getStreamReader(),
urgentExecutorBuilder.getStreamReader(),
transferSemaphoresHolder
);
return new S3Repository(
metadata,
registry,
service,
clusterService,
recoverySettings,
null,
null,
null,
null,
null,
false,
null,
null,
asyncUploadUtils,
urgentExecutorBuilder,
priorityExecutorBuilder,
normalExecutorBuilder,
s3AsyncService,
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()),
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ,
genericStatsMetricPublisher
) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
import software.amazon.awssdk.profiles.ProfileFileSystemSetting;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
Expand Down Expand Up @@ -235,17 +234,17 @@ synchronized AmazonAsyncS3WithCredentials buildClient(
}

static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSettings clientSettings) {
RetryPolicy retryPolicy = SocketAccess.doPrivileged(
() -> RetryPolicy.builder()
.numRetries(clientSettings.maxRetries)
.throttlingBackoffStrategy(
clientSettings.throttleRetries ? BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD) : BackoffStrategy.none()
)
.build()
);

return ClientOverrideConfiguration.builder()
.retryPolicy(
RetryPolicy.builder()
.numRetries(clientSettings.maxRetries)
.throttlingBackoffStrategy(
clientSettings.throttleRetries
? BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD)
: BackoffStrategy.none()
)
.build()
)
.retryPolicy(retryPolicy)
.apiCallAttemptTimeout(Duration.ofMillis(clientSettings.requestTimeoutMillis))
.build();
}
Expand Down Expand Up @@ -346,12 +345,7 @@ static AwsCredentialsProvider buildCredentials(Logger logger, S3ClientSettings c
// valid paths.
@SuppressForbidden(reason = "Need to provide this override to v2 SDK so that path does not default to home path")
private static void setDefaultAwsProfilePath() {
if (ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.getStringValue().isEmpty()) {
System.setProperty(ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.property(), System.getProperty("opensearch.path.conf"));
}
if (ProfileFileSystemSetting.AWS_CONFIG_FILE.getStringValue().isEmpty()) {
System.setProperty(ProfileFileSystemSetting.AWS_CONFIG_FILE.property(), System.getProperty("opensearch.path.conf"));
}
S3Service.setDefaultAwsProfilePath();
}

private static IrsaCredentials buildFromEnvironment(IrsaCredentials defaults) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,11 @@ public void deleteAsync(ActionListener<DeleteResult> completionListener) {
try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build();
ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
.bucket(blobStore.bucket())
.prefix(keyPath)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().listObjectsMetricPublisher))
.build();
ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest);

AtomicLong deletedBlobs = new AtomicLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,19 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private static final String NORMAL_TRANSFER_QUEUE_CONSUMER = "normal_transfer_queue_consumer";

protected final S3Service service;
private final S3AsyncService s3AsyncService;
protected final S3AsyncService s3AsyncService;

private final Path configPath;

private AsyncExecutorContainer urgentExecutorBuilder;
private AsyncExecutorContainer priorityExecutorBuilder;
private AsyncExecutorContainer normalExecutorBuilder;
protected AsyncExecutorContainer urgentExecutorBuilder;
protected AsyncExecutorContainer priorityExecutorBuilder;
protected AsyncExecutorContainer normalExecutorBuilder;
private ExecutorService lowTransferQConsumerService;
private ExecutorService normalTransferQConsumerService;
private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private TransferSemaphoresHolder transferSemaphoresHolder;
private GenericStatsMetricPublisher genericStatsMetricPublisher;
protected SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
protected SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
protected TransferSemaphoresHolder transferSemaphoresHolder;
protected GenericStatsMetricPublisher genericStatsMetricPublisher;

public S3RepositoryPlugin(final Settings settings, final Path configPath) {
this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath));
Expand Down

0 comments on commit 1c58299

Please sign in to comment.