Skip to content

Commit

Permalink
Change pinned timestamp file format stored in remote store (opensearc…
Browse files Browse the repository at this point in the history
…h-project#15526)

---------

Signed-off-by: Sachin Kale <[email protected]>
Co-authored-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale and Sachin Kale authored Sep 3, 2024
1 parent 2e9db40 commit cfcfe21
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 508 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -17,6 +18,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Set;
import java.util.concurrent.CountDownLatch;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -75,10 +77,25 @@ public void testTimestampPinUnpin() throws Exception {

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// This should be a no-op as pinning entity is different
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener);
// Unpinning already pinned entity
remoteStorePinnedTimestampService.unpinTimestamp(timestamp2, "ss3", noOpActionListener);

// This should fail as timestamp is not pinned by pinning entity
CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", new LatchedActionListener<>(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
// onResponse should not get called.
fail();
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}, latch));
latch.await();

// Adding different entity to already pinned timestamp
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener);

Expand All @@ -93,4 +110,74 @@ public void testTimestampPinUnpin() throws Exception {

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}

public void testPinnedTimestampClone() throws Exception {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

long timestamp1 = System.currentTimeMillis() + 30000L;
long timestamp2 = System.currentTimeMillis() + 60000L;
long timestamp3 = System.currentTimeMillis() + 900000L;
remoteStorePinnedTimestampService.pinTimestamp(timestamp1, "ss2", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener);

// Clone timestamp1
remoteStorePinnedTimestampService.cloneTimestamp(timestamp1, "ss2", "ss2-2", noOpActionListener);

// With clone, set of pinned timestamp will not change
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// Clone timestamp1 but provide invalid existing entity
CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.cloneTimestamp(
timestamp1,
"ss3",
"ss2-3",
new LatchedActionListener<>(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
// onResponse should not get called.
fail();
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}, latch)
);
latch.await();

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// Now we have timestamp1 pinned by 2 entities, unpin 1, this should not change set of pinned timestamps
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "ss2", noOpActionListener);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// Now unpin second entity as well, set of pinned timestamp should be reduced by 1
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "ss2-2", noOpActionListener);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(() -> assertEquals(Set.of(timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2()));
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,11 @@ public static List<String> filterOutMetadataFilesBasedOnAge(
if (RemoteStoreSettings.isPinnedTimestampsEnabled() == false) {
return new ArrayList<>(metadataFiles);
}
long maximumAllowedTimestamp = lastSuccessfulFetchOfPinnedTimestamps - RemoteStoreSettings.getPinnedTimestampsLookbackInterval()
.getMillis();
// We allow now() - loopback interval to be pinned. Also, the actual pinning can take at most loopback interval
// This means the pinned timestamp can be available for read after at most (2 * loopback interval)
long maximumAllowedTimestamp = lastSuccessfulFetchOfPinnedTimestamps - (2 * RemoteStoreSettings
.getPinnedTimestampsLookbackInterval()
.getMillis());
List<String> metadataFilesWithMinAge = new ArrayList<>();
for (String metadataFileName : metadataFiles) {
long metadataTimestamp = getTimestampFunction.apply(metadataFileName);
Expand Down
Loading

0 comments on commit cfcfe21

Please sign in to comment.