Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Oct 9, 2024
1 parent 8854340 commit d627609
Showing 1 changed file with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,45 @@
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_SLOWDOWN_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
String segmentRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
REPOSITORY_NAME
);

return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(segmentRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
}
Expand Down Expand Up @@ -222,10 +237,7 @@ public void onFailure(Exception e) {
latch.await();
}

// This test fails as we can't control actual upload of pinned timestamp file. We ideally need a BlobStoreRepository
// which can control the speed of upload.
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/16246")
public void testPinExceptionsRemoteStoreCallTakeTime() throws InterruptedException {
public void testPinExceptionsRemoteStoreCallTakeTime() throws InterruptedException, ExecutionException {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

Expand All @@ -234,10 +246,10 @@ public void testPinExceptionsRemoteStoreCallTakeTime() throws InterruptedExcepti
primaryNodeName(INDEX_NAME)
);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueNanos(50000));

CountDownLatch latch = new CountDownLatch(1);
long timestampToBePinned = System.currentTimeMillis();
slowDownRepo(REPOSITORY_NAME, 10);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueSeconds(1));
long timestampToBePinned = System.currentTimeMillis() + 600000;
remoteStorePinnedTimestampService.pinTimestamp(timestampToBePinned, "ss1", new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
Expand All @@ -260,6 +272,16 @@ public void onFailure(Exception e) {
latch.await();
}

protected void slowDownRepo(String repoName, int value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(REPOSITORIES_SLOWDOWN_SETTING.getKey(), value);
createRepository(repoName, rmd.type(), settings);
}

public void testUnpinException() throws InterruptedException {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);
Expand Down

0 comments on commit d627609

Please sign in to comment.