Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate Limiter integration for remote transfer #9448

Merged
merged 19 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
protected final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand All @@ -26,7 +30,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.greaterThan;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -453,4 +459,42 @@ public void testRTSRestoreNoData() throws IOException {
}

// TODO: Restore flow - index aliases

public void testRateLimitedRemoteDownloads() throws Exception {
assertAcked(
client().admin()
.cluster()
.preparePutRepository(REPOSITORY_NAME)
.setType("fs")
.setSettings(
Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("max_remote_download_bytes_per_sec", "2kb")
.put("chunk_size", 100, ByteSizeUnit.BYTES)

)
);
int shardCount = randomIntBetween(1, 5);
int numberOfIteration = randomIntBetween(2, 5);
testRestoreFlow(numberOfIteration, false, shardCount);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
Map<String, Long> indexStats = indexData(numberOfIteration, false, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureRed(INDEX_NAME);
restore(INDEX_NAME);
assertBusy(() -> {
long downloadPauseTime = 0L;
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
downloadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getRemoteDownloadThrottleTimeInNanos();
}
assertThat(downloadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(10, 30)).nanos()));
}, 30, TimeUnit.SECONDS);
ensureGreen(INDEX_NAME);
// This is required to get updated number from already active shards which were not restored
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
assertEquals(0, getNumShards(INDEX_NAME).numReplicas);
verifyRestoredData(indexStats, true, INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collection;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class RemoteStoreThrottlingIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

@Before
public void setup() {
setupRepo();
}

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@

package org.opensearch.remotestore.multipart;

import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.RemoteStoreIT;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.repositories.RepositoriesService;

import java.nio.file.Path;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

public class RemoteStoreMultipartIT extends RemoteStoreIT {

Expand All @@ -35,4 +42,43 @@ protected void putRepository(Path path) {
.setSettings(Settings.builder().put("location", path))
);
}

public void testRateLimitedRemoteUploads() throws Exception {
internalCluster().startDataOnlyNodes(1);
Client client = client();
logger.info("--> updating repository");
Path repositoryLocation = randomRepoPath();
assertAcked(
client.admin()
.cluster()
.preparePutRepository(REPOSITORY_NAME)
.setType(MockFsRepositoryPlugin.TYPE)
.setSettings(
Settings.builder()
.put("location", repositoryLocation)
.put("compress", randomBoolean())
.put("max_remote_upload_bytes_per_sec", "200b")
.put("chunk_size", 100, ByteSizeUnit.BYTES)
)
);

createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureGreen();

logger.info("--> indexing some data");
for (int i = 0; i < 10; i++) {
index(INDEX_NAME, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
// check if throttling is active
assertBusy(() -> {
long uploadPauseTime = 0L;
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
uploadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getRemoteUploadThrottleTimeInNanos();
}
assertThat(uploadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(10, 30)).nanos()));
}, 30, TimeUnit.SECONDS);

assertThat(client.prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value, equalTo(10L));
}
}
58 changes: 58 additions & 0 deletions server/src/main/java/org/opensearch/common/StreamLimiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.RateLimiter;

import java.io.IOException;
import java.util.function.Supplier;

/**
* The stream limiter that limits the transfer of bytes
*/
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
public class StreamLimiter {

private final Supplier<RateLimiter> rateLimiterSupplier;

private final StreamLimiter.Listener listener;

private int bytesSinceLastRateLimit;

private static final Logger logger = LogManager.getLogger(StreamLimiter.class);
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

public StreamLimiter(Supplier<RateLimiter> rateLimiterSupplier, Listener listener) {
this.rateLimiterSupplier = rateLimiterSupplier;
this.listener = listener;
}

public void maybePause(int bytes) throws IOException {
bytesSinceLastRateLimit += bytes;
final RateLimiter rateLimiter = rateLimiterSupplier.get();
if (rateLimiter != null) {
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
bytesSinceLastRateLimit = 0;
if (pause > 0) {
listener.onPause(pause);
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

/**
* Internal listener
*
* @opensearch.internal
*/
public interface Listener {
void onPause(long nanos);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.transfer.stream;

import org.apache.lucene.store.RateLimiter;
import org.opensearch.common.StreamLimiter;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Rate Limits an {@link OffsetRangeInputStream}
*/
public class RateLimitingOffsetRangeInputStream extends OffsetRangeInputStream {

private final StreamLimiter streamLimiter;

private final OffsetRangeInputStream delegate;

/**
* The ctor for RateLimitingOffsetRangeInputStream
* @param delegate the underlying {@link OffsetRangeInputStream}
* @param rateLimiterSupplier the supplier for {@link RateLimiter}
* @param listener the listener to be invoked on rate limits
*/
public RateLimitingOffsetRangeInputStream(
OffsetRangeInputStream delegate,
Supplier<RateLimiter> rateLimiterSupplier,
StreamLimiter.Listener listener
) {
this.streamLimiter = new StreamLimiter(rateLimiterSupplier, listener);
this.delegate = delegate;
}

@Override
public int read() throws IOException {
int b = delegate.read();
streamLimiter.maybePause(1);
return b;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int n = delegate.read(b, off, len);
if (n > 0) {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
streamLimiter.maybePause(n);
}
return n;
}

@Override
public synchronized void mark(int readlimit) {
delegate.mark(readlimit);
}

@Override
public boolean markSupported() {
return delegate.markSupported();
}

@Override
public long getFilePointer() throws IOException {
return delegate.getFilePointer();
}

@Override
public synchronized void reset() throws IOException {
delegate.reset();
}

@Override
public void close() throws IOException {
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.snapshots.blobstore;

import org.apache.lucene.store.RateLimiter;
import org.opensearch.common.StreamLimiter;

import java.io.FilterInputStream;
import java.io.IOException;
Expand All @@ -46,53 +47,25 @@
*/
public class RateLimitingInputStream extends FilterInputStream {

private final Supplier<RateLimiter> rateLimiterSupplier;
private StreamLimiter streamLimiter;

private final Listener listener;

private long bytesSinceLastRateLimit;

/**
* Internal listener
*
* @opensearch.internal
*/
public interface Listener {
void onPause(long nanos);
}

public RateLimitingInputStream(InputStream delegate, Supplier<RateLimiter> rateLimiterSupplier, Listener listener) {
public RateLimitingInputStream(InputStream delegate, Supplier<RateLimiter> rateLimiterSupplier, StreamLimiter.Listener listener) {
super(delegate);
this.rateLimiterSupplier = rateLimiterSupplier;
this.listener = listener;
}

private void maybePause(int bytes) throws IOException {
bytesSinceLastRateLimit += bytes;
final RateLimiter rateLimiter = rateLimiterSupplier.get();
if (rateLimiter != null) {
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
bytesSinceLastRateLimit = 0;
if (pause > 0) {
listener.onPause(pause);
}
}
}
this.streamLimiter = new StreamLimiter(rateLimiterSupplier, listener);
}

@Override
public int read() throws IOException {
int b = super.read();
maybePause(1);
streamLimiter.maybePause(1);
return b;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int n = super.read(b, off, len);
if (n > 0) {
maybePause(n);
streamLimiter.maybePause(n);
}
return n;
}
Expand Down
Loading