Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 14, 2024
1 parent 3bad790 commit 4212363
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ public void pin(Long timestamp, String pinningEntity) {
*/
public void unpin(Long timestamp, String pinningEntity) {
logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity);
pinnedTimestampPinningEntityMap.computeIfPresent(timestamp, (k, v) -> {
if (pinnedTimestampPinningEntityMap.containsKey(timestamp) == false
|| pinnedTimestampPinningEntityMap.get(timestamp).contains(pinningEntity) == false) {
logger.warn("Timestamp: {} is not pinned by entity: {}", timestamp, pinningEntity);
}
pinnedTimestampPinningEntityMap.compute(timestamp, (k, v) -> {
v.remove(pinningEntity);
return v.isEmpty() ? null : v;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -50,7 +52,6 @@ public class RemoteStorePinnedTimestampService implements Closeable {
private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class);
private static Tuple<Long, Set<Long>> pinnedTimestampsSet = new Tuple<>(-1L, Set.of());
public static final int PINNED_TIMESTAMP_FILES_TO_KEEP = 5;
public static final int TIMESTAMP_PINNING_PAST_BUFFER_IN_MILLIS = 10000;

private final Supplier<RepositoriesService> repositoriesService;
private final Settings settings;
Expand All @@ -61,6 +62,7 @@ public class RemoteStorePinnedTimestampService implements Closeable {
private RemoteStorePinnedTimestampsBlobStore pinnedTimestampsBlobStore;
private AsyncUpdatePinnedTimestampTask asyncUpdatePinnedTimestampTask;
private volatile TimeValue pinnedTimestampsSchedulerInterval;
private final Semaphore updateTimetampPinningSemaphore = new Semaphore(1);

/**
* Controls pinned timestamp scheduler interval
Expand All @@ -72,6 +74,17 @@ public class RemoteStorePinnedTimestampService implements Closeable {
Setting.Property.NodeScope
);

/**
* Controls allowed timestamp values to be pinned from past
*/
public static final Setting<TimeValue> CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL = Setting.timeSetting(
"cluster.remote_store.pinned_timestamps.lookback_interval",
TimeValue.timeValueMinutes(1),
TimeValue.timeValueMinutes(1),
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope
);

public RemoteStorePinnedTimestampService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
Expand Down Expand Up @@ -101,7 +114,7 @@ private void validateRemoteStoreConfiguration() {
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
assert remoteStoreRepo != null : "Remote Segment Store repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
Expand Down Expand Up @@ -134,8 +147,11 @@ private void startAsyncUpdateTask() {
public void pinTimestamp(long timestamp, String pinningEntity, ActionListener<Void> listener) {
// If a caller uses current system time to pin the timestamp, following check will almost always fail.
// So, we allow pinning timestamp in the past upto some buffer
if (timestamp < TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - TIMESTAMP_PINNING_PAST_BUFFER_IN_MILLIS) {
throw new IllegalArgumentException("Timestamp to be pinned is less than current timestamp");
long lookbackIntervalInMills = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL.get(settings).millis();
if (timestamp < TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - lookbackIntervalInMills) {
throw new IllegalArgumentException(
"Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval"
);
}
updatePinning(pinnedTimestamps -> pinnedTimestamps.pin(timestamp, pinningEntity), listener);
}
Expand All @@ -157,39 +173,72 @@ private void updatePinning(Consumer<PinnedTimestamps> updateConsumer, ActionList
blobStoreRepository.getCompressor()
);
BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps);
blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), Integer.MAX_VALUE, new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
PinnedTimestamps pinnedTimestamps = new PinnedTimestamps(new HashMap<>());
if (blobMetadata.isEmpty() == false) {
pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps);
}
updateConsumer.accept(pinnedTimestamps);
remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps);
pinnedTimestampsBlobStore.writeAsync(remotePinnedTimestamps, listener);
try {
if (updateTimetampPinningSemaphore.tryAcquire(10, TimeUnit.MINUTES)) {
ActionListener<Void> semaphoreAwareListener = ActionListener.runBefore(listener, updateTimetampPinningSemaphore::release);
ActionListener<List<BlobMetadata>> listCallResponseListener = getListenerForListCallResponse(
remotePinnedTimestamps,
updateConsumer,
semaphoreAwareListener
);
blobStoreTransferService.listAllInSortedOrder(
path,
remotePinnedTimestamps.getType(),
Integer.MAX_VALUE,
listCallResponseListener
);
} else {
throw new TimeoutException("Timed out while waiting to acquire lock in updatePinning");
}
} catch (InterruptedException | TimeoutException e) {
listener.onFailure(e);
}
}

// Delete older pinnedTimestamp files
if (blobMetadata.size() > PINNED_TIMESTAMP_FILES_TO_KEEP) {
List<String> oldFilesToBeDeleted = blobMetadata.subList(PINNED_TIMESTAMP_FILES_TO_KEEP, blobMetadata.size())
.stream()
.map(BlobMetadata::name)
.collect(Collectors.toList());
try {
blobStoreTransferService.deleteBlobs(
pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps),
oldFilesToBeDeleted
);
} catch (IOException e) {
logger.error("Exception while deleting stale pinned timestamps", e);
}
}
private ActionListener<List<BlobMetadata>> getListenerForListCallResponse(
RemotePinnedTimestamps remotePinnedTimestamps,
Consumer<PinnedTimestamps> updateConsumer,
ActionListener<Void> listener
) {
return ActionListener.wrap(blobMetadata -> {
PinnedTimestamps pinnedTimestamps = new PinnedTimestamps(new HashMap<>());
if (blobMetadata.isEmpty() == false) {
pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps);
}
updateConsumer.accept(pinnedTimestamps);
remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps);
ActionListener<Void> writeCallResponseListener = getListenerForWriteCallResponse(
remotePinnedTimestamps,
blobMetadata,
listener
);
pinnedTimestampsBlobStore.writeAsync(remotePinnedTimestamps, writeCallResponseListener);
}, listener::onFailure);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
private ActionListener<Void> getListenerForWriteCallResponse(
RemotePinnedTimestamps remotePinnedTimestamps,
List<BlobMetadata> blobMetadata,
ActionListener<Void> listener
) {
return ActionListener.wrap(unused -> {
// Delete older pinnedTimestamp files
if (blobMetadata.size() > PINNED_TIMESTAMP_FILES_TO_KEEP) {
List<String> oldFilesToBeDeleted = blobMetadata.subList(PINNED_TIMESTAMP_FILES_TO_KEEP, blobMetadata.size())
.stream()
.map(BlobMetadata::name)
.collect(Collectors.toList());
try {
blobStoreTransferService.deleteBlobs(
pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps),
oldFilesToBeDeleted
);
} catch (IOException e) {
logger.error("Exception while deleting stale pinned timestamps", e);
}
}
});
listener.onResponse(null);
}, listener::onFailure);
}

private PinnedTimestamps readExistingPinnedTimestamps(String blobFilename, RemotePinnedTimestamps remotePinnedTimestamps) {
Expand Down

0 comments on commit 4212363

Please sign in to comment.