Skip to content

Commit

Permalink
Renamed other priority resources to normal priority resources
Browse files Browse the repository at this point in the history
Signed-off-by: vikasvb90 <[email protected]>
  • Loading branch information
vikasvb90 authored and Bhumika Saini committed May 2, 2024
1 parent bcaa6ac commit 2ad91c7
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
// Therefore, redirecting it to slow client.
if ((uploadRequest.getWritePriority() == WritePriority.LOW
&& blobStore.getLowPrioritySizeBasedBlockingQ().canProduce(uploadRequest.getContentLength()) == false)
&& blobStore.getLowPrioritySizeBasedBlockingQ().isBelowCapacity(uploadRequest.getContentLength()) == false)
|| (uploadRequest.getWritePriority() != WritePriority.HIGH
&& uploadRequest.getWritePriority() != WritePriority.URGENT
&& blobStore.getOtherPrioritySizeBasedBlockingQ().canProduce(uploadRequest.getContentLength()) == false)) {
&& blobStore.getNormalPrioritySizeBasedBlockingQ().isBelowCapacity(uploadRequest.getContentLength()) == false)) {
StreamContext streamContext = SocketAccess.doPrivileged(
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
);
Expand Down Expand Up @@ -276,14 +276,16 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
)
);
} else {
blobStore.getOtherPrioritySizeBasedBlockingQ()
} else if (writeContext.getWritePriority() == WritePriority.NORMAL) {
blobStore.getNormalPrioritySizeBasedBlockingQ()
.produce(
new SizeBasedBlockingQ.Item(
writeContext.getFileSize(),
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
)
);
} else {
throw new IllegalStateException("Cannot perform upload for other priority types.");
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class S3BlobStore implements BlobStore {
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;
private final boolean multipartUploadEnabled;
private final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;

S3BlobStore(
Expand All @@ -113,7 +113,7 @@ class S3BlobStore implements BlobStore {
AsyncExecutorContainer urgentExecutorBuilder,
AsyncExecutorContainer priorityExecutorBuilder,
AsyncExecutorContainer normalExecutorBuilder,
SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ,
SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
) {
this.service = service;
Expand All @@ -133,7 +133,7 @@ class S3BlobStore implements BlobStore {
// Settings to initialize blobstore with.
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.otherPrioritySizeBasedBlockingQ = otherPrioritySizeBasedBlockingQ;
this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ;
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
}

Expand Down Expand Up @@ -191,8 +191,8 @@ public int getBulkDeletesSize() {
return bulkDeletesSize;
}

public SizeBasedBlockingQ getOtherPrioritySizeBasedBlockingQ() {
return otherPrioritySizeBasedBlockingQ;
public SizeBasedBlockingQ getNormalPrioritySizeBasedBlockingQ() {
return normalPrioritySizeBasedBlockingQ;
}

public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class S3Repository extends MeteredBlobStoreRepository {
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;
private final Path pluginConfigPath;
private final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;

private volatile int bulkDeletesSize;
Expand All @@ -303,7 +303,7 @@ class S3Repository extends MeteredBlobStoreRepository {
final AsyncExecutorContainer normalExecutorBuilder,
final S3AsyncService s3AsyncService,
final boolean multipartUploadEnabled,
final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ,
final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
) {
this(
Expand All @@ -319,7 +319,7 @@ class S3Repository extends MeteredBlobStoreRepository {
s3AsyncService,
multipartUploadEnabled,
Path.of(""),
otherPrioritySizeBasedBlockingQ,
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
);
}
Expand All @@ -340,7 +340,7 @@ class S3Repository extends MeteredBlobStoreRepository {
final S3AsyncService s3AsyncService,
final boolean multipartUploadEnabled,
Path pluginConfigPath,
final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ,
final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
) {
super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
Expand All @@ -352,7 +352,7 @@ class S3Repository extends MeteredBlobStoreRepository {
this.urgentExecutorBuilder = urgentExecutorBuilder;
this.priorityExecutorBuilder = priorityExecutorBuilder;
this.normalExecutorBuilder = normalExecutorBuilder;
this.otherPrioritySizeBasedBlockingQ = otherPrioritySizeBasedBlockingQ;
this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ;
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;

validateRepositoryMetadata(metadata);
Expand Down Expand Up @@ -417,7 +417,7 @@ protected S3BlobStore createBlobStore() {
urgentExecutorBuilder,
priorityExecutorBuilder,
normalExecutorBuilder,
otherPrioritySizeBasedBlockingQ,
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private static final String FUTURE_COMPLETION = "future_completion";
private static final String STREAM_READER = "stream_reader";
private static final String LOW_TRANSFER_QUEUE_CONSUMER = "low_transfer_queue_consumer";
private static final String OTHER_TRANSFER_QUEUE_CONSUMER = "other_transfer_queue_consumer";
private static final String NORMAL_TRANSFER_QUEUE_CONSUMER = "normal_transfer_queue_consumer";

protected final S3Service service;
private final S3AsyncService s3AsyncService;
Expand All @@ -101,8 +101,8 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private AsyncExecutorContainer priorityExecutorBuilder;
private AsyncExecutorContainer normalExecutorBuilder;
private ExecutorService lowTransferQConsumerService;
private ExecutorService otherTransferQConsumerService;
private SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ;
private ExecutorService normalTransferQConsumerService;
private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private TransferSemaphoresHolder transferSemaphoresHolder;

Expand Down Expand Up @@ -146,10 +146,10 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
executorBuilders.add(
new FixedExecutorBuilder(
settings,
OTHER_TRANSFER_QUEUE_CONSUMER,
otherPriorityTransferQConsumers(settings),
NORMAL_TRANSFER_QUEUE_CONSUMER,
normalPriorityTransferQConsumers(settings),
10,
"thread_pool." + OTHER_TRANSFER_QUEUE_CONSUMER
"thread_pool." + NORMAL_TRANSFER_QUEUE_CONSUMER
)
);
return executorBuilders;
Expand All @@ -160,7 +160,7 @@ private int lowPriorityTransferQConsumers(Settings settings) {
return Math.max(2, (int) (lowPriorityAllocation * S3Repository.S3_TRANSFER_QUEUE_CONSUMERS.get(settings)));
}

private int otherPriorityTransferQConsumers(Settings settings) {
private int normalPriorityTransferQConsumers(Settings settings) {
return S3Repository.S3_TRANSFER_QUEUE_CONSUMERS.get(settings);
}

Expand Down Expand Up @@ -231,12 +231,12 @@ public Collection<Object> createComponents(
new AsyncTransferEventLoopGroup(normalEventLoopThreads)
);
this.lowTransferQConsumerService = threadPool.executor(LOW_TRANSFER_QUEUE_CONSUMER);
this.otherTransferQConsumerService = threadPool.executor(OTHER_TRANSFER_QUEUE_CONSUMER);
int otherPriorityConsumers = otherPriorityTransferQConsumers(clusterService.getSettings());
this.otherPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
new ByteSizeValue(otherPriorityConsumers * 10L, ByteSizeUnit.GB),
otherTransferQConsumerService,
otherPriorityConsumers
this.normalTransferQConsumerService = threadPool.executor(NORMAL_TRANSFER_QUEUE_CONSUMER);
int normalPriorityConsumers = normalPriorityTransferQConsumers(clusterService.getSettings());
this.normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
new ByteSizeValue(normalPriorityConsumers * 10L, ByteSizeUnit.GB),
normalTransferQConsumerService,
normalPriorityConsumers
);
int lowPriorityConsumers = lowPriorityTransferQConsumers(clusterService.getSettings());
LowPrioritySizeBasedBlockingQ lowPrioritySizeBasedBlockingQ = new LowPrioritySizeBasedBlockingQ(
Expand All @@ -253,7 +253,7 @@ public Collection<Object> createComponents(
TimeUnit.MINUTES
);

return CollectionUtils.arrayAsArrayList(this.otherPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ);
return CollectionUtils.arrayAsArrayList(this.normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ);
}

// New class because in core, components are injected via guice only by instance creation due to which
Expand Down Expand Up @@ -292,7 +292,7 @@ protected S3Repository createRepository(
s3AsyncService,
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()),
configPath,
otherPrioritySizeBasedBlockingQ,
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void produce(Item item) throws InterruptedException {
if (item == null || item.size <= 0) {
throw new IllegalStateException("Invalid item input to produce.");
}
log.debug(() -> "Transfer queue event received of size: " + item.size + ". Current queue utilisation: " + currentSize.get());

if (currentSize.get() + item.size >= capacity.getBytes()) {
throw new S3TransferRejectedException("S3 Transfer queue capacity reached");
Expand Down Expand Up @@ -96,8 +97,8 @@ public int getSize() {
return queue.size();
}

public boolean canProduce(long contentLength) {
return (currentSize.get() + contentLength) < capacity.getBytes();
public boolean isBelowCapacity(long contentLength) {
return contentLength < capacity.getBytes();
}

protected static class Consumer extends Thread {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.repositories.s3.async;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.blobstore.stream.write.WritePriority;

import java.util.Objects;
Expand All @@ -18,7 +20,7 @@
* Transfer semaphore holder for controlled transfer of data to remote.
*/
public class TransferSemaphoresHolder {

private static final Logger log = LogManager.getLogger(TransferSemaphoresHolder.class);
// For tests
protected TypeSemaphore lowPrioritySemaphore;
protected TypeSemaphore highPrioritySemaphore;
Expand Down Expand Up @@ -88,6 +90,14 @@ public RequestContext createRequestContext() {
* transfers.
*/
public TypeSemaphore acquirePermit(WritePriority writePriority, RequestContext requestContext) throws InterruptedException {
log.debug(
() -> "Acquire permit request for transfer type: "
+ writePriority
+ ". Available high priority permits: "
+ highPrioritySemaphore.availablePermits()
+ " and low priority permits: "
+ lowPrioritySemaphore.availablePermits()
);
// Try acquiring low priority permit or high priority permit immediately if available.
// Otherwise, we wait for low priority permit.
if (Objects.requireNonNull(writePriority) == WritePriority.LOW) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class S3BlobContainerMockClientTests extends OpenSearchTestCase implement
private ScheduledExecutorService scheduler;
private AsyncTransferEventLoopGroup transferNIOGroup;
private S3BlobContainer blobContainer;
private SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ;
private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;

static class MockS3AsyncService extends S3AsyncService {
Expand Down Expand Up @@ -377,7 +377,7 @@ public void setUp() throws Exception {
transferQueueConsumerService = Executors.newFixedThreadPool(20);
scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1);
transferNIOGroup = new AsyncTransferEventLoopGroup(1);
otherPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 10L, ByteSizeUnit.GB),
transferQueueConsumerService,
10
Expand All @@ -387,7 +387,7 @@ public void setUp() throws Exception {
transferQueueConsumerService,
5
);
otherPrioritySizeBasedBlockingQ.start();
normalPrioritySizeBasedBlockingQ.start();
lowPrioritySizeBasedBlockingQ.start();
blobContainer = createBlobContainer();
super.setUp();
Expand All @@ -401,7 +401,7 @@ public void tearDown() throws Exception {
streamReaderService.shutdown();
remoteTransferRetry.shutdown();
transferQueueConsumerService.shutdown();
otherPrioritySizeBasedBlockingQ.close();
normalPrioritySizeBasedBlockingQ.close();
lowPrioritySizeBasedBlockingQ.close();
scheduler.shutdown();
transferNIOGroup.close();
Expand Down Expand Up @@ -448,7 +448,7 @@ private S3BlobStore createBlobStore() {
asyncExecutorContainer,
asyncExecutorContainer,
asyncExecutorContainer,
otherPrioritySizeBasedBlockingQ,
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
);
}
Expand Down Expand Up @@ -651,7 +651,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize);

when(blobStore.getLowPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ);
when(blobStore.getOtherPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ);
when(blobStore.getNormalPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ);

final boolean serverSideEncryption = randomBoolean();
when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
private ExecutorService transferQueueConsumerService;
private ScheduledExecutorService scheduler;
private AsyncTransferEventLoopGroup transferNIOGroup;
private SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ;
private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;

@Before
Expand All @@ -137,7 +137,7 @@ public void setUp() throws Exception {
remoteTransferRetry = Executors.newFixedThreadPool(20);
transferQueueConsumerService = Executors.newFixedThreadPool(2);
scheduler = new ScheduledThreadPoolExecutor(1);
otherPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB),
transferQueueConsumerService,
2
Expand All @@ -147,7 +147,7 @@ public void setUp() throws Exception {
transferQueueConsumerService,
2
);
otherPrioritySizeBasedBlockingQ.start();
normalPrioritySizeBasedBlockingQ.start();
lowPrioritySizeBasedBlockingQ.start();
// needed by S3AsyncService
SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", configPath().toString()));
Expand All @@ -163,7 +163,7 @@ public void tearDown() throws Exception {
remoteTransferRetry.shutdown();
transferQueueConsumerService.shutdown();
scheduler.shutdown();
otherPrioritySizeBasedBlockingQ.close();
normalPrioritySizeBasedBlockingQ.close();
lowPrioritySizeBasedBlockingQ.close();
IOUtils.close(transferNIOGroup);

Expand Down Expand Up @@ -257,7 +257,7 @@ protected AsyncMultiStreamBlobContainer createBlobContainer(
asyncExecutorContainer,
asyncExecutorContainer,
asyncExecutorContainer,
otherPrioritySizeBasedBlockingQ,
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
* @opensearch.internal
*/
public enum WritePriority {
// Used for segment transfers during refresh, flush or merges
NORMAL,
// Used for transfer of translog or ckp files.
HIGH,
// Used for transfer of remote cluster state
URGENT,
// All other background transfers such as in snapshot recovery, recovery from local store or index etc.
LOW
}

0 comments on commit 2ad91c7

Please sign in to comment.