Skip to content

Commit

Permalink
[Remote Store] Add tracker factory to manage remote store stats track…
Browse files Browse the repository at this point in the history
…ers (opensearch-project#9546)

---------

Signed-off-by: Bhumika Saini <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
Bhumika Saini authored and shiv0408 committed Apr 25, 2024
1 parent 7657817 commit c333e6d
Show file tree
Hide file tree
Showing 22 changed files with 630 additions and 731 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.IndexService;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.indices.IndicesService;
Expand All @@ -50,7 +50,7 @@ public class TransportRemoteStoreStatsAction extends TransportBroadcastByNodeAct

private final IndicesService indicesService;

private final RemoteStorePressureService remoteStorePressureService;
private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory;

@Inject
public TransportRemoteStoreStatsAction(
Expand All @@ -59,7 +59,7 @@ public TransportRemoteStoreStatsAction(
IndicesService indicesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteStorePressureService remoteStorePressureService
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
) {
super(
RemoteStoreStatsAction.NAME,
Expand All @@ -71,7 +71,7 @@ public TransportRemoteStoreStatsAction(
ThreadPool.Names.MANAGEMENT
);
this.indicesService = indicesService;
this.remoteStorePressureService = remoteStorePressureService;
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
}

/**
Expand Down Expand Up @@ -153,7 +153,7 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard
throw new ShardNotFoundException(indexShard.shardId());
}

RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(
RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(
indexShard.shardId()
);
assert Objects.nonNull(remoteSegmentTransferTracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.index.remote.RemoteStorePressureSettings;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
Expand Down Expand Up @@ -656,9 +657,9 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR,
RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR,
RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT,
RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE,

// Settings related to Remote Store stats
RemoteStoreStatsTrackerFactory.MOVING_AVERAGE_WINDOW_SIZE,

// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
Expand Down
6 changes: 3 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.SearchIndexNameMatcher;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -443,7 +443,7 @@ public synchronized IndexShard createShard(
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RemoteStorePressureService remoteStorePressureService
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -512,7 +512,7 @@ public synchronized IndexShard createShard(
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore,
remoteStorePressureService
remoteStoreStatsTrackerFactory
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ public class RemoteSegmentTransferTracker {
public RemoteSegmentTransferTracker(
ShardId shardId,
DirectoryFileTransferTracker directoryFileTransferTracker,
int uploadBytesMovingAverageWindowSize,
int uploadBytesPerSecMovingAverageWindowSize,
int uploadTimeMsMovingAverageWindowSize
int movingAverageWindowSize
) {
logger = Loggers.getLogger(getClass(), shardId);
this.shardId = shardId;
Expand All @@ -207,9 +205,9 @@ public RemoteSegmentTransferTracker(
remoteRefreshTimeMs = currentTimeMs;
localRefreshClockTimeMs = currentClockTimeMs;
remoteRefreshClockTimeMs = currentClockTimeMs;
uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize));
uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize));
uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize));
uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(movingAverageWindowSize));
uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(movingAverageWindowSize));
uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(movingAverageWindowSize));
this.directoryFileTransferTracker = directoryFileTransferTracker;
}

Expand Down Expand Up @@ -470,14 +468,22 @@ public void addUploadBytes(long size) {
}

/**
* Updates the window size for data collection of upload bytes. This also resets any data collected so far.
* Updates the window size for data collection. This also resets any data collected so far.
*
* @param updatedSize the updated size
*/
void updateUploadBytesMovingAverageWindowSize(int updatedSize) {
void updateMovingAverageWindowSize(int updatedSize) {
synchronized (uploadBytesMutex) {
this.uploadBytesMovingAverageReference.set(this.uploadBytesMovingAverageReference.get().copyWithSize(updatedSize));
}

synchronized (uploadBytesPerSecMutex) {
this.uploadBytesPerSecMovingAverageReference.set(this.uploadBytesPerSecMovingAverageReference.get().copyWithSize(updatedSize));
}

synchronized (uploadTimeMsMutex) {
this.uploadTimeMsMovingAverageReference.set(this.uploadTimeMsMovingAverageReference.get().copyWithSize(updatedSize));
}
}

boolean isUploadBytesPerSecAverageReady() {
Expand All @@ -494,17 +500,6 @@ public void addUploadBytesPerSec(long bytesPerSec) {
}
}

/**
* Updates the window size for data collection of upload bytes per second. This also resets any data collected so far.
*
* @param updatedSize the updated size
*/
void updateUploadBytesPerSecMovingAverageWindowSize(int updatedSize) {
synchronized (uploadBytesPerSecMutex) {
this.uploadBytesPerSecMovingAverageReference.set(this.uploadBytesPerSecMovingAverageReference.get().copyWithSize(updatedSize));
}
}

boolean isUploadTimeMsAverageReady() {
return uploadTimeMsMovingAverageReference.get().isReady();
}
Expand All @@ -519,17 +514,6 @@ public void addTimeForCompletedUploadSync(long timeMs) {
}
}

/**
* Updates the window size for data collection of upload time (ms). This also resets any data collected so far.
*
* @param updatedSize the updated size
*/
void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) {
synchronized (uploadTimeMsMutex) {
this.uploadTimeMsMovingAverageReference.set(this.uploadTimeMsMovingAverageReference.get().copyWithSize(updatedSize));
}
}

public void addTotalUploadTimeInMs(long fileUploadTimeInMs) {
this.totalUploadTimeInMs.addAndGet(fileUploadTimeInMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,84 +13,44 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.BiConsumer;

/**
* Service used to validate if the incoming indexing request should be rejected based on the {@link RemoteSegmentTransferTracker}.
*
* @opensearch.internal
*/
public class RemoteStorePressureService implements IndexEventListener {
public class RemoteStorePressureService {

private static final Logger logger = LogManager.getLogger(RemoteStorePressureService.class);

/**
* Keeps map of remote-backed index shards and their corresponding backpressure tracker.
*/
private final Map<ShardId, RemoteSegmentTransferTracker> trackerMap = ConcurrentCollections.newConcurrentMap();

/**
* Remote refresh segment pressure settings which is used for creation of the backpressure tracker and as well as rejection.
*/
private final RemoteStorePressureSettings pressureSettings;

private final List<LagValidator> lagValidators;

private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory;

@Inject
public RemoteStorePressureService(ClusterService clusterService, Settings settings) {
public RemoteStorePressureService(
ClusterService clusterService,
Settings settings,
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
) {
pressureSettings = new RemoteStorePressureSettings(clusterService, settings, this);
lagValidators = Arrays.asList(
new ConsecutiveFailureValidator(pressureSettings),
new BytesLagValidator(pressureSettings),
new TimeLagValidator(pressureSettings)
);
}

/**
* Get {@code RemoteSegmentTransferTracker} only if the underlying Index has remote segments integration enabled.
*
* @param shardId shard id
* @return the tracker if index is remote-backed, else null.
*/
public RemoteSegmentTransferTracker getRemoteRefreshSegmentTracker(ShardId shardId) {
return trackerMap.get(shardId);
}

@Override
public void afterIndexShardCreated(IndexShard indexShard) {
if (indexShard.indexSettings().isRemoteStoreEnabled() == false) {
return;
}
ShardId shardId = indexShard.shardId();
trackerMap.put(
shardId,
new RemoteSegmentTransferTracker(
shardId,
indexShard.store().getDirectoryFileTransferTracker(),
pressureSettings.getUploadBytesMovingAverageWindowSize(),
pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(),
pressureSettings.getUploadTimeMovingAverageWindowSize()
)
);
logger.trace("Created tracker for shardId={}", shardId);
}

@Override
public void afterIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) {
RemoteSegmentTransferTracker remoteSegmentTransferTracker = trackerMap.remove(shardId);
if (remoteSegmentTransferTracker != null) {
logger.trace("Deleted tracker for shardId={}", shardId);
}
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
}

/**
Expand All @@ -108,7 +68,7 @@ public boolean isSegmentsUploadBackpressureEnabled() {
* @param shardId shardId for which the validation needs to be done.
*/
public void validateSegmentsUploadLag(ShardId shardId) {
RemoteSegmentTransferTracker remoteSegmentTransferTracker = getRemoteRefreshSegmentTracker(shardId);
RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId);
// condition 1 - This will be null for non-remote backed indexes
// condition 2 - This will be zero if the remote store is
if (remoteSegmentTransferTracker == null || remoteSegmentTransferTracker.getRefreshSeqNoLag() == 0) {
Expand All @@ -123,22 +83,6 @@ public void validateSegmentsUploadLag(ShardId shardId) {
}
}

void updateUploadBytesMovingAverageWindowSize(int updatedSize) {
updateMovingAverageWindowSize(RemoteSegmentTransferTracker::updateUploadBytesMovingAverageWindowSize, updatedSize);
}

void updateUploadBytesPerSecMovingAverageWindowSize(int updatedSize) {
updateMovingAverageWindowSize(RemoteSegmentTransferTracker::updateUploadBytesPerSecMovingAverageWindowSize, updatedSize);
}

void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) {
updateMovingAverageWindowSize(RemoteSegmentTransferTracker::updateUploadTimeMsMovingAverageWindowSize, updatedSize);
}

void updateMovingAverageWindowSize(BiConsumer<RemoteSegmentTransferTracker, Integer> biConsumer, int updatedSize) {
trackerMap.values().forEach(tracker -> biConsumer.accept(tracker, updatedSize));
}

/**
* Abstract class for validating if lag is acceptable or not.
*
Expand Down
Loading

0 comments on commit c333e6d

Please sign in to comment.