From ae42debe44c5310b975efde20930973f4712bf63 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 31 Aug 2023 12:34:52 +0530 Subject: [PATCH] [Remote Store] Add tracker factory to manage remote store stats trackers (#9546) (#9654) --------- (cherry picked from commit c294c918c7415a451e6494b9f8a4d75a629dccd0) Signed-off-by: Bhumika Saini Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../TransportRemoteStoreStatsAction.java | 10 +- .../common/settings/ClusterSettings.java | 7 +- .../org/opensearch/index/IndexService.java | 6 +- .../remote/RemoteSegmentTransferTracker.java | 44 +- .../remote/RemoteStorePressureService.java | 76 +-- .../remote/RemoteStorePressureSettings.java | 86 +-- .../RemoteStoreStatsTrackerFactory.java | 100 +++ .../opensearch/index/shard/IndexShard.java | 16 +- .../org/opensearch/indices/IndicesModule.java | 2 + .../opensearch/indices/IndicesService.java | 6 +- .../cluster/IndicesClusterStateService.java | 19 +- .../TransportRemoteStoreStatsActionTests.java | 10 +- .../RemoteSegmentTransferTrackerTests.java | 594 ++++++++---------- .../RemoteStorePressureServiceTests.java | 57 +- .../RemoteStorePressureSettingsTests.java | 97 --- .../RemoteStoreStatsTrackerFactoryTests.java | 119 ++++ .../index/remote/RemoteStoreTestsHelper.java | 40 ++ .../index/shard/IndexShardTests.java | 4 +- .../RemoteStoreRefreshListenerTests.java | 49 +- ...actIndicesClusterStateServiceTestCase.java | 4 +- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../index/shard/IndexShardTestCase.java | 12 +- 22 files changed, 630 insertions(+), 731 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactory.java create mode 100644 server/src/test/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactoryTests.java create mode 100644 server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java index d05879aa1ae78..2595f783b4cf3 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java @@ -24,7 +24,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.index.IndexService; import org.opensearch.index.remote.RemoteSegmentTransferTracker; -import org.opensearch.index.remote.RemoteStorePressureService; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardNotFoundException; import org.opensearch.indices.IndicesService; @@ -50,7 +50,7 @@ public class TransportRemoteStoreStatsAction extends TransportBroadcastByNodeAct private final IndicesService indicesService; - private final RemoteStorePressureService remoteStorePressureService; + private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; @Inject public TransportRemoteStoreStatsAction( @@ -59,7 +59,7 @@ public TransportRemoteStoreStatsAction( IndicesService indicesService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - RemoteStorePressureService remoteStorePressureService + RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) { super( RemoteStoreStatsAction.NAME, @@ -71,7 +71,7 @@ public TransportRemoteStoreStatsAction( ThreadPool.Names.MANAGEMENT ); this.indicesService = indicesService; - this.remoteStorePressureService = remoteStorePressureService; + this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; } /** @@ -153,7 +153,7 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard throw new ShardNotFoundException(indexShard.shardId()); } - RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStorePressureService.getRemoteRefreshSegmentTracker( + RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker( indexShard.shardId() ); assert Objects.nonNull(remoteSegmentTransferTracker); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 85f5ae2e93ca4..1e127f27eff7b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -106,6 +106,7 @@ import org.opensearch.index.ShardIndexingPressureSettings; import org.opensearch.index.ShardIndexingPressureStore; import org.opensearch.index.remote.RemoteStorePressureSettings; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.indices.IndexingMemoryController; import org.opensearch.indices.IndicesQueryCache; @@ -657,9 +658,9 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR, RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR, RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT, - RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, - RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, - RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE, + + // Settings related to Remote Store stats + RemoteStoreStatsTrackerFactory.MOVING_AVERAGE_WINDOW_SIZE, // Related to monitoring of task cancellation TaskCancellationMonitoringSettings.IS_ENABLED_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 811768fc1540e..e5028ff2ecff9 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -78,7 +78,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.query.SearchIndexNameMatcher; -import org.opensearch.index.remote.RemoteStorePressureService; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -443,7 +443,7 @@ public synchronized IndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteStorePressureService remoteStorePressureService + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -512,7 +512,7 @@ public synchronized IndexShard createShard( translogFactorySupplier, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, remoteStore, - remoteStorePressureService + remoteStoreStatsTrackerFactory ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index 95902fd375145..564fdfbc0761d 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -194,9 +194,7 @@ public class RemoteSegmentTransferTracker { public RemoteSegmentTransferTracker( ShardId shardId, DirectoryFileTransferTracker directoryFileTransferTracker, - int uploadBytesMovingAverageWindowSize, - int uploadBytesPerSecMovingAverageWindowSize, - int uploadTimeMsMovingAverageWindowSize + int movingAverageWindowSize ) { logger = Loggers.getLogger(getClass(), shardId); this.shardId = shardId; @@ -207,9 +205,9 @@ public RemoteSegmentTransferTracker( remoteRefreshTimeMs = currentTimeMs; localRefreshClockTimeMs = currentClockTimeMs; remoteRefreshClockTimeMs = currentClockTimeMs; - uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize)); - uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize)); - uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize)); + uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(movingAverageWindowSize)); + uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(movingAverageWindowSize)); + uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(movingAverageWindowSize)); this.directoryFileTransferTracker = directoryFileTransferTracker; } @@ -470,14 +468,22 @@ public void addUploadBytes(long size) { } /** - * Updates the window size for data collection of upload bytes. This also resets any data collected so far. + * Updates the window size for data collection. This also resets any data collected so far. * * @param updatedSize the updated size */ - void updateUploadBytesMovingAverageWindowSize(int updatedSize) { + void updateMovingAverageWindowSize(int updatedSize) { synchronized (uploadBytesMutex) { this.uploadBytesMovingAverageReference.set(this.uploadBytesMovingAverageReference.get().copyWithSize(updatedSize)); } + + synchronized (uploadBytesPerSecMutex) { + this.uploadBytesPerSecMovingAverageReference.set(this.uploadBytesPerSecMovingAverageReference.get().copyWithSize(updatedSize)); + } + + synchronized (uploadTimeMsMutex) { + this.uploadTimeMsMovingAverageReference.set(this.uploadTimeMsMovingAverageReference.get().copyWithSize(updatedSize)); + } } boolean isUploadBytesPerSecAverageReady() { @@ -494,17 +500,6 @@ public void addUploadBytesPerSec(long bytesPerSec) { } } - /** - * Updates the window size for data collection of upload bytes per second. This also resets any data collected so far. - * - * @param updatedSize the updated size - */ - void updateUploadBytesPerSecMovingAverageWindowSize(int updatedSize) { - synchronized (uploadBytesPerSecMutex) { - this.uploadBytesPerSecMovingAverageReference.set(this.uploadBytesPerSecMovingAverageReference.get().copyWithSize(updatedSize)); - } - } - boolean isUploadTimeMsAverageReady() { return uploadTimeMsMovingAverageReference.get().isReady(); } @@ -519,17 +514,6 @@ public void addTimeForCompletedUploadSync(long timeMs) { } } - /** - * Updates the window size for data collection of upload time (ms). This also resets any data collected so far. - * - * @param updatedSize the updated size - */ - void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { - synchronized (uploadTimeMsMutex) { - this.uploadTimeMsMovingAverageReference.set(this.uploadTimeMsMovingAverageReference.get().copyWithSize(updatedSize)); - } - } - public void addTotalUploadTimeInMs(long fileUploadTimeInMs) { this.totalUploadTimeInMs.addAndGet(fileUploadTimeInMs); } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java index 427304935259b..3722be7dadbfb 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java @@ -13,32 +13,22 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.index.shard.IndexEventListener; -import org.opensearch.index.shard.IndexShard; import java.util.Arrays; import java.util.List; import java.util.Locale; -import java.util.Map; -import java.util.function.BiConsumer; /** * Service used to validate if the incoming indexing request should be rejected based on the {@link RemoteSegmentTransferTracker}. * * @opensearch.internal */ -public class RemoteStorePressureService implements IndexEventListener { +public class RemoteStorePressureService { private static final Logger logger = LogManager.getLogger(RemoteStorePressureService.class); - /** - * Keeps map of remote-backed index shards and their corresponding backpressure tracker. - */ - private final Map trackerMap = ConcurrentCollections.newConcurrentMap(); - /** * Remote refresh segment pressure settings which is used for creation of the backpressure tracker and as well as rejection. */ @@ -46,51 +36,21 @@ public class RemoteStorePressureService implements IndexEventListener { private final List lagValidators; + private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; + @Inject - public RemoteStorePressureService(ClusterService clusterService, Settings settings) { + public RemoteStorePressureService( + ClusterService clusterService, + Settings settings, + RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + ) { pressureSettings = new RemoteStorePressureSettings(clusterService, settings, this); lagValidators = Arrays.asList( new ConsecutiveFailureValidator(pressureSettings), new BytesLagValidator(pressureSettings), new TimeLagValidator(pressureSettings) ); - } - - /** - * Get {@code RemoteSegmentTransferTracker} only if the underlying Index has remote segments integration enabled. - * - * @param shardId shard id - * @return the tracker if index is remote-backed, else null. - */ - public RemoteSegmentTransferTracker getRemoteRefreshSegmentTracker(ShardId shardId) { - return trackerMap.get(shardId); - } - - @Override - public void afterIndexShardCreated(IndexShard indexShard) { - if (indexShard.indexSettings().isRemoteStoreEnabled() == false) { - return; - } - ShardId shardId = indexShard.shardId(); - trackerMap.put( - shardId, - new RemoteSegmentTransferTracker( - shardId, - indexShard.store().getDirectoryFileTransferTracker(), - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() - ) - ); - logger.trace("Created tracker for shardId={}", shardId); - } - - @Override - public void afterIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) { - RemoteSegmentTransferTracker remoteSegmentTransferTracker = trackerMap.remove(shardId); - if (remoteSegmentTransferTracker != null) { - logger.trace("Deleted tracker for shardId={}", shardId); - } + this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; } /** @@ -108,7 +68,7 @@ public boolean isSegmentsUploadBackpressureEnabled() { * @param shardId shardId for which the validation needs to be done. */ public void validateSegmentsUploadLag(ShardId shardId) { - RemoteSegmentTransferTracker remoteSegmentTransferTracker = getRemoteRefreshSegmentTracker(shardId); + RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId); // condition 1 - This will be null for non-remote backed indexes // condition 2 - This will be zero if the remote store is if (remoteSegmentTransferTracker == null || remoteSegmentTransferTracker.getRefreshSeqNoLag() == 0) { @@ -123,22 +83,6 @@ public void validateSegmentsUploadLag(ShardId shardId) { } } - void updateUploadBytesMovingAverageWindowSize(int updatedSize) { - updateMovingAverageWindowSize(RemoteSegmentTransferTracker::updateUploadBytesMovingAverageWindowSize, updatedSize); - } - - void updateUploadBytesPerSecMovingAverageWindowSize(int updatedSize) { - updateMovingAverageWindowSize(RemoteSegmentTransferTracker::updateUploadBytesPerSecMovingAverageWindowSize, updatedSize); - } - - void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { - updateMovingAverageWindowSize(RemoteSegmentTransferTracker::updateUploadTimeMsMovingAverageWindowSize, updatedSize); - } - - void updateMovingAverageWindowSize(BiConsumer biConsumer, int updatedSize) { - trackerMap.values().forEach(tracker -> biConsumer.accept(tracker, updatedSize)); - } - /** * Abstract class for validating if lag is acceptable or not. * diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java index 3f665890d43e9..af2e453f8107d 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java @@ -20,16 +20,12 @@ */ public class RemoteStorePressureSettings { - private static class Defaults { + static class Defaults { private static final double BYTES_LAG_VARIANCE_FACTOR = 10.0; private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 10.0; private static final double VARIANCE_FACTOR_MIN_VALUE = 1.0; private static final int MIN_CONSECUTIVE_FAILURES_LIMIT = 5; private static final int MIN_CONSECUTIVE_FAILURES_LIMIT_MIN_VALUE = 1; - private static final int UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE = 20; - private static final int UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE = 20; - private static final int UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE = 20; - private static final int MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE = 5; } public static final Setting REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED = Setting.boolSetting( @@ -63,30 +59,6 @@ private static class Defaults { Setting.Property.NodeScope ); - public static final Setting UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( - "remote_store.segment.pressure.upload_bytes_moving_average_window_size", - Defaults.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, - Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - public static final Setting UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( - "remote_store.segment.pressure.upload_bytes_per_sec_moving_average_window_size", - Defaults.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, - Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - public static final Setting UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( - "remote_store.segment.pressure.upload_time_moving_average_window_size", - Defaults.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE, - Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - private volatile boolean remoteRefreshSegmentPressureEnabled; private volatile long minRefreshSeqNoLagLimit; @@ -97,16 +69,10 @@ private static class Defaults { private volatile int minConsecutiveFailuresLimit; - private volatile int uploadBytesMovingAverageWindowSize; - - private volatile int uploadBytesPerSecMovingAverageWindowSize; - - private volatile int uploadTimeMovingAverageWindowSize; - public RemoteStorePressureSettings( ClusterService clusterService, Settings settings, - RemoteStorePressureService remoteUploadPressureService + RemoteStorePressureService remoteStorePressureService ) { ClusterSettings clusterSettings = clusterService.getClusterSettings(); @@ -121,30 +87,6 @@ public RemoteStorePressureSettings( this.minConsecutiveFailuresLimit = MIN_CONSECUTIVE_FAILURES_LIMIT.get(settings); clusterSettings.addSettingsUpdateConsumer(MIN_CONSECUTIVE_FAILURES_LIMIT, this::setMinConsecutiveFailuresLimit); - - this.uploadBytesMovingAverageWindowSize = UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.get(settings); - clusterSettings.addSettingsUpdateConsumer( - UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, - remoteUploadPressureService::updateUploadBytesMovingAverageWindowSize - ); - clusterSettings.addSettingsUpdateConsumer(UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, this::setUploadBytesMovingAverageWindowSize); - - this.uploadBytesPerSecMovingAverageWindowSize = UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.get(settings); - clusterSettings.addSettingsUpdateConsumer( - UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, - remoteUploadPressureService::updateUploadBytesPerSecMovingAverageWindowSize - ); - clusterSettings.addSettingsUpdateConsumer( - UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, - this::setUploadBytesPerSecMovingAverageWindowSize - ); - - this.uploadTimeMovingAverageWindowSize = UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.get(settings); - clusterSettings.addSettingsUpdateConsumer( - UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE, - remoteUploadPressureService::updateUploadTimeMsMovingAverageWindowSize - ); - clusterSettings.addSettingsUpdateConsumer(UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE, this::setUploadTimeMovingAverageWindowSize); } public boolean isRemoteRefreshSegmentPressureEnabled() { @@ -186,28 +128,4 @@ public int getMinConsecutiveFailuresLimit() { public void setMinConsecutiveFailuresLimit(int minConsecutiveFailuresLimit) { this.minConsecutiveFailuresLimit = minConsecutiveFailuresLimit; } - - public int getUploadBytesMovingAverageWindowSize() { - return uploadBytesMovingAverageWindowSize; - } - - public void setUploadBytesMovingAverageWindowSize(int uploadBytesMovingAverageWindowSize) { - this.uploadBytesMovingAverageWindowSize = uploadBytesMovingAverageWindowSize; - } - - public int getUploadBytesPerSecMovingAverageWindowSize() { - return uploadBytesPerSecMovingAverageWindowSize; - } - - public void setUploadBytesPerSecMovingAverageWindowSize(int uploadBytesPerSecMovingAverageWindowSize) { - this.uploadBytesPerSecMovingAverageWindowSize = uploadBytesPerSecMovingAverageWindowSize; - } - - public int getUploadTimeMovingAverageWindowSize() { - return uploadTimeMovingAverageWindowSize; - } - - public void setUploadTimeMovingAverageWindowSize(int uploadTimeMovingAverageWindowSize) { - this.uploadTimeMovingAverageWindowSize = uploadTimeMovingAverageWindowSize; - } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactory.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactory.java new file mode 100644 index 0000000000000..783f4195be156 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactory.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexEventListener; +import org.opensearch.index.shard.IndexShard; + +import java.util.Map; + +/** + * Factory to manage stats trackers for Remote Store operations + * + * @opensearch.internal + */ +public class RemoteStoreStatsTrackerFactory implements IndexEventListener { + static class Defaults { + static final int MOVING_AVERAGE_WINDOW_SIZE = 20; + static final int MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE = 5; + } + + public static final Setting MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "remote_store.moving_average_window_size", + Defaults.MOVING_AVERAGE_WINDOW_SIZE, + Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private static final Logger logger = LogManager.getLogger(RemoteStoreStatsTrackerFactory.class); + + /** + * Number of data points to consider for a moving average statistic + */ + private volatile int movingAverageWindowSize; + + /** + * Keeps map of remote-backed index shards and their corresponding stats tracker. + */ + private final Map remoteSegmentTrackerMap = ConcurrentCollections.newConcurrentMap(); + + @Inject + public RemoteStoreStatsTrackerFactory(ClusterService clusterService, Settings settings) { + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + + this.movingAverageWindowSize = MOVING_AVERAGE_WINDOW_SIZE.get(settings); + clusterSettings.addSettingsUpdateConsumer(MOVING_AVERAGE_WINDOW_SIZE, this::updateMovingAverageWindowSize); + } + + @Override + public void afterIndexShardCreated(IndexShard indexShard) { + if (indexShard.indexSettings().isRemoteStoreEnabled() == false) { + return; + } + ShardId shardId = indexShard.shardId(); + remoteSegmentTrackerMap.put( + shardId, + new RemoteSegmentTransferTracker(shardId, indexShard.store().getDirectoryFileTransferTracker(), movingAverageWindowSize) + ); + logger.trace("Created RemoteSegmentTransferTracker for shardId={}", shardId); + } + + @Override + public void afterIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) { + RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteSegmentTrackerMap.remove(shardId); + if (remoteSegmentTransferTracker != null) { + logger.trace("Deleted RemoteSegmentTransferTracker for shardId={}", shardId); + } + } + + void updateMovingAverageWindowSize(int updatedSize) { + remoteSegmentTrackerMap.values().forEach(tracker -> tracker.updateMovingAverageWindowSize(updatedSize)); + + // Update movingAverageWindowSize only if the trackers were successfully updated + movingAverageWindowSize = updatedSize; + } + + public RemoteSegmentTransferTracker getRemoteSegmentTransferTracker(ShardId shardId) { + return remoteSegmentTrackerMap.get(shardId); + } + + // visible for testing + int getMovingAverageWindowSize() { + return movingAverageWindowSize; + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b2e918faab8a2..63ccce7d3a53a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -149,7 +149,7 @@ import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; import org.opensearch.index.remote.RemoteSegmentStats; -import org.opensearch.index.remote.RemoteStorePressureService; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.search.stats.ShardSearchStats; import org.opensearch.index.seqno.ReplicationTracker; @@ -335,7 +335,7 @@ Runnable getGlobalCheckpointSyncer() { private final Store remoteStore; private final BiFunction translogFactorySupplier; private final boolean isTimeSeriesIndex; - private final RemoteStorePressureService remoteStorePressureService; + private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; private final List internalRefreshListener = new ArrayList<>(); @@ -363,7 +363,7 @@ public IndexShard( final BiFunction translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, @Nullable final Store remoteStore, - final RemoteStorePressureService remoteStorePressureService + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -458,7 +458,7 @@ public boolean shouldCache(Query query) { this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null) ? false : mapperService.documentMapper().mappers().containsTimeStampField(); - this.remoteStorePressureService = remoteStorePressureService; + this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; } public ThreadPool getThreadPool() { @@ -548,8 +548,8 @@ public QueryCachingPolicy getQueryCachingPolicy() { } /** Only used for testing **/ - protected RemoteStorePressureService getRemoteStorePressureService() { - return remoteStorePressureService; + protected RemoteStoreStatsTrackerFactory getRemoteStoreStatsTrackerFactory() { + return remoteStoreStatsTrackerFactory; } @Override @@ -1391,7 +1391,7 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu // Populate remote_store stats only if the index is remote store backed if (indexSettings.isRemoteStoreEnabled()) { segmentsStats.addRemoteSegmentStats( - new RemoteSegmentStats(remoteStorePressureService.getRemoteRefreshSegmentTracker(shardId).stats()) + new RemoteSegmentStats(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).stats()) ); } return segmentsStats; @@ -3707,7 +3707,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro this, // Add the checkpoint publisher if the Segment Replciation via remote store is enabled. indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY, - remoteStorePressureService.getRemoteRefreshSegmentTracker(shardId()) + remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId()) ) ); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index c6ae8b988aed0..d902323401c07 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -71,6 +71,7 @@ import org.opensearch.index.mapper.TextFieldMapper; import org.opensearch.index.mapper.VersionFieldMapper; import org.opensearch.index.remote.RemoteStorePressureService; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncAction; @@ -289,6 +290,7 @@ protected void configure() { bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); bind(SegmentReplicationPressureService.class).asEagerSingleton(); if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + bind(RemoteStoreStatsTrackerFactory.class).asEagerSingleton(); bind(RemoteStorePressureService.class).asEagerSingleton(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 4f057c3f7881a..69ba42655a92f 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -123,7 +123,7 @@ import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; -import org.opensearch.index.remote.RemoteStorePressureService; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.seqno.RetentionLeaseStats; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -981,7 +981,7 @@ public IndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteStorePressureService remoteStorePressureService + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); @@ -993,7 +993,7 @@ public IndexShard createShard( globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher, - remoteStorePressureService + remoteStoreStatsTrackerFactory ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index f94098ddb7c88..a1a5fc0c222c7 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -66,7 +66,7 @@ import org.opensearch.index.IndexComponent; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.remote.RemoteStorePressureService; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -149,7 +149,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final SegmentReplicationCheckpointPublisher checkpointPublisher; - private final RemoteStorePressureService remoteStorePressureService; + private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; @Inject public IndicesClusterStateService( @@ -170,7 +170,7 @@ public IndicesClusterStateService( final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteStorePressureService remoteStorePressureService + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) { this( settings, @@ -190,7 +190,7 @@ public IndicesClusterStateService( primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, retentionLeaseSyncer, - remoteStorePressureService + remoteStoreStatsTrackerFactory ); } @@ -213,7 +213,7 @@ public IndicesClusterStateService( final PrimaryReplicaSyncer primaryReplicaSyncer, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final RemoteStorePressureService remoteStorePressureService + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; @@ -225,7 +225,7 @@ public IndicesClusterStateService( indexEventListeners.add(segmentReplicationSourceService); // if remote store feature is not enabled, do not wire the remote upload pressure service as an IndexEventListener. if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { - indexEventListeners.add(remoteStorePressureService); + indexEventListeners.add(remoteStoreStatsTrackerFactory); } this.segmentReplicationTargetService = segmentReplicationTargetService; this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); @@ -240,7 +240,7 @@ public IndicesClusterStateService( this.globalCheckpointSyncer = globalCheckpointSyncer; this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); - this.remoteStorePressureService = remoteStorePressureService; + this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; } @Override @@ -683,7 +683,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR retentionLeaseSyncer, nodes.getLocalNode(), sourceNode, - remoteStorePressureService + remoteStoreStatsTrackerFactory ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -1028,6 +1028,7 @@ U createIndex(IndexMetadata indexMetadata, List builtInIndex * @param retentionLeaseSyncer a callback when this shard syncs retention leases * @param targetNode the node where this shard will be recovered * @param sourceNode the source node to recover this shard from (it might be null) + * @param remoteStoreStatsTrackerFactory factory for remote store stats trackers * @return a new shard * @throws IOException if an I/O exception occurs when creating the shard */ @@ -1042,7 +1043,7 @@ T createShard( RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, - RemoteStorePressureService remoteStorePressureService + RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException; /** diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java index c34f55d62fe89..75707f2a7853a 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java @@ -28,7 +28,7 @@ import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.remote.RemoteSegmentTransferTracker; -import org.opensearch.index.remote.RemoteStorePressureService; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; @@ -53,7 +53,7 @@ public class TransportRemoteStoreStatsActionTests extends IndexShardTestCase { private IndicesService indicesService; - private RemoteStorePressureService pressureService; + private RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; private IndexMetadata remoteStoreIndexMetadata; private TransportService transportService; private ClusterService clusterService; @@ -67,7 +67,7 @@ public void setUp() throws Exception { indicesService = mock(IndicesService.class); IndexService indexService = mock(IndexService.class); clusterService = mock(ClusterService.class); - pressureService = mock(RemoteStorePressureService.class); + remoteStoreStatsTrackerFactory = mock(RemoteStoreStatsTrackerFactory.class); MockTransport mockTransport = new MockTransport(); localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); remoteStoreIndexMetadata = IndexMetadata.builder(INDEX.getName()) @@ -90,7 +90,7 @@ public void setUp() throws Exception { Collections.emptySet() ); - when(pressureService.getRemoteRefreshSegmentTracker(any())).thenReturn(mock(RemoteSegmentTransferTracker.class)); + when(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(any())).thenReturn(mock(RemoteSegmentTransferTracker.class)); when(indicesService.indexService(INDEX)).thenReturn(indexService); when(indexService.getIndexSettings()).thenReturn(new IndexSettings(remoteStoreIndexMetadata, Settings.EMPTY)); statsAction = new TransportRemoteStoreStatsAction( @@ -99,7 +99,7 @@ public void setUp() throws Exception { indicesService, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), - pressureService + remoteStoreStatsTrackerFactory ); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java index 10fe3f95ab47c..e0a05e8d6b49e 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java @@ -23,19 +23,15 @@ import java.util.HashMap; import java.util.Map; -import static org.mockito.Mockito.mock; - public class RemoteSegmentTransferTrackerTests extends OpenSearchTestCase { - - private RemoteStorePressureSettings pressureSettings; - + private RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; private ClusterService clusterService; private ThreadPool threadPool; private ShardId shardId; - private RemoteSegmentTransferTracker pressureTracker; + private RemoteSegmentTransferTracker transferTracker; private DirectoryFileTransferTracker directoryFileTransferTracker; @@ -48,7 +44,7 @@ public void setUp() throws Exception { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - pressureSettings = new RemoteStorePressureSettings(clusterService, Settings.EMPTY, mock(RemoteStorePressureService.class)); + remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY); shardId = new ShardId("index", "uuid", 0); directoryFileTransferTracker = new DirectoryFileTransferTracker(); } @@ -60,545 +56,475 @@ public void tearDown() throws Exception { } public void testGetShardId() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - assertEquals(shardId, pressureTracker.getShardId()); + assertEquals(shardId, transferTracker.getShardId()); } public void testUpdateLocalRefreshSeqNo() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long refreshSeqNo = 2; - pressureTracker.updateLocalRefreshSeqNo(refreshSeqNo); - assertEquals(refreshSeqNo, pressureTracker.getLocalRefreshSeqNo()); + transferTracker.updateLocalRefreshSeqNo(refreshSeqNo); + assertEquals(refreshSeqNo, transferTracker.getLocalRefreshSeqNo()); } public void testUpdateRemoteRefreshSeqNo() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long refreshSeqNo = 4; - pressureTracker.updateRemoteRefreshSeqNo(refreshSeqNo); - assertEquals(refreshSeqNo, pressureTracker.getRemoteRefreshSeqNo()); + transferTracker.updateRemoteRefreshSeqNo(refreshSeqNo); + assertEquals(refreshSeqNo, transferTracker.getRemoteRefreshSeqNo()); } public void testUpdateLocalRefreshTimeMs() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long refreshTimeMs = System.nanoTime() / 1_000_000L + randomIntBetween(10, 100); - pressureTracker.updateLocalRefreshTimeMs(refreshTimeMs); - assertEquals(refreshTimeMs, pressureTracker.getLocalRefreshTimeMs()); + transferTracker.updateLocalRefreshTimeMs(refreshTimeMs); + assertEquals(refreshTimeMs, transferTracker.getLocalRefreshTimeMs()); } public void testUpdateRemoteRefreshTimeMs() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long refreshTimeMs = System.nanoTime() / 1_000_000 + randomIntBetween(10, 100); - pressureTracker.updateRemoteRefreshTimeMs(refreshTimeMs); - assertEquals(refreshTimeMs, pressureTracker.getRemoteRefreshTimeMs()); + transferTracker.updateRemoteRefreshTimeMs(refreshTimeMs); + assertEquals(refreshTimeMs, transferTracker.getRemoteRefreshTimeMs()); } public void testLastDownloadTimestampMs() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long currentTimeInMs = System.currentTimeMillis(); - pressureTracker.getDirectoryFileTransferTracker().updateLastTransferTimestampMs(currentTimeInMs); - assertEquals(currentTimeInMs, pressureTracker.getDirectoryFileTransferTracker().getLastTransferTimestampMs()); + transferTracker.getDirectoryFileTransferTracker().updateLastTransferTimestampMs(currentTimeInMs); + assertEquals(currentTimeInMs, transferTracker.getDirectoryFileTransferTracker().getLastTransferTimestampMs()); } public void testComputeSeqNoLagOnUpdate() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); int localRefreshSeqNo = randomIntBetween(50, 100); int remoteRefreshSeqNo = randomIntBetween(20, 50); - pressureTracker.updateLocalRefreshSeqNo(localRefreshSeqNo); - assertEquals(localRefreshSeqNo, pressureTracker.getRefreshSeqNoLag()); - pressureTracker.updateRemoteRefreshSeqNo(remoteRefreshSeqNo); - assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, pressureTracker.getRefreshSeqNoLag()); + transferTracker.updateLocalRefreshSeqNo(localRefreshSeqNo); + assertEquals(localRefreshSeqNo, transferTracker.getRefreshSeqNoLag()); + transferTracker.updateRemoteRefreshSeqNo(remoteRefreshSeqNo); + assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, transferTracker.getRefreshSeqNoLag()); } public void testComputeTimeLagOnUpdate() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - long currentLocalRefreshTimeMs = pressureTracker.getLocalRefreshTimeMs(); + long currentLocalRefreshTimeMs = transferTracker.getLocalRefreshTimeMs(); long currentTimeMs = System.nanoTime() / 1_000_000L; long localRefreshTimeMs = currentTimeMs + randomIntBetween(100, 500); long remoteRefreshTimeMs = currentTimeMs + randomIntBetween(50, 99); - pressureTracker.updateLocalRefreshTimeMs(localRefreshTimeMs); - assertEquals(localRefreshTimeMs - currentLocalRefreshTimeMs, pressureTracker.getTimeMsLag()); - pressureTracker.updateRemoteRefreshTimeMs(remoteRefreshTimeMs); - assertEquals(localRefreshTimeMs - remoteRefreshTimeMs, pressureTracker.getTimeMsLag()); + transferTracker.updateLocalRefreshTimeMs(localRefreshTimeMs); + assertEquals(localRefreshTimeMs - currentLocalRefreshTimeMs, transferTracker.getTimeMsLag()); + transferTracker.updateRemoteRefreshTimeMs(remoteRefreshTimeMs); + assertEquals(localRefreshTimeMs - remoteRefreshTimeMs, transferTracker.getTimeMsLag()); } public void testAddUploadBytesStarted() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long bytesToAdd = randomLongBetween(1000, 1000000); - pressureTracker.addUploadBytesStarted(bytesToAdd); - assertEquals(bytesToAdd, pressureTracker.getUploadBytesStarted()); + transferTracker.addUploadBytesStarted(bytesToAdd); + assertEquals(bytesToAdd, transferTracker.getUploadBytesStarted()); long moreBytesToAdd = randomLongBetween(1000, 10000); - pressureTracker.addUploadBytesStarted(moreBytesToAdd); - assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getUploadBytesStarted()); + transferTracker.addUploadBytesStarted(moreBytesToAdd); + assertEquals(bytesToAdd + moreBytesToAdd, transferTracker.getUploadBytesStarted()); } public void testAddUploadBytesFailed() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long bytesToAdd = randomLongBetween(1000, 1000000); - pressureTracker.addUploadBytesFailed(bytesToAdd); - assertEquals(bytesToAdd, pressureTracker.getUploadBytesFailed()); + transferTracker.addUploadBytesFailed(bytesToAdd); + assertEquals(bytesToAdd, transferTracker.getUploadBytesFailed()); long moreBytesToAdd = randomLongBetween(1000, 10000); - pressureTracker.addUploadBytesFailed(moreBytesToAdd); - assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getUploadBytesFailed()); + transferTracker.addUploadBytesFailed(moreBytesToAdd); + assertEquals(bytesToAdd + moreBytesToAdd, transferTracker.getUploadBytesFailed()); } public void testAddUploadBytesSucceeded() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long bytesToAdd = randomLongBetween(1000, 1000000); - pressureTracker.addUploadBytesSucceeded(bytesToAdd); - assertEquals(bytesToAdd, pressureTracker.getUploadBytesSucceeded()); + transferTracker.addUploadBytesSucceeded(bytesToAdd); + assertEquals(bytesToAdd, transferTracker.getUploadBytesSucceeded()); long moreBytesToAdd = randomLongBetween(1000, 10000); - pressureTracker.addUploadBytesSucceeded(moreBytesToAdd); - assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getUploadBytesSucceeded()); + transferTracker.addUploadBytesSucceeded(moreBytesToAdd); + assertEquals(bytesToAdd + moreBytesToAdd, transferTracker.getUploadBytesSucceeded()); } public void testAddDownloadBytesStarted() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long bytesToAdd = randomLongBetween(1000, 1000000); - pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(bytesToAdd); - assertEquals(bytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesStarted()); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(bytesToAdd); + assertEquals(bytesToAdd, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesStarted()); long moreBytesToAdd = randomLongBetween(1000, 10000); - pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(moreBytesToAdd); - assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesStarted()); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(moreBytesToAdd); + assertEquals(bytesToAdd + moreBytesToAdd, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesStarted()); } public void testAddDownloadBytesFailed() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long bytesToAdd = randomLongBetween(1000, 1000000); - pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesFailed(bytesToAdd, System.currentTimeMillis()); - assertEquals(bytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesFailed()); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesFailed(bytesToAdd, System.currentTimeMillis()); + assertEquals(bytesToAdd, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesFailed()); long moreBytesToAdd = randomLongBetween(1000, 10000); - pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesFailed(moreBytesToAdd, System.currentTimeMillis()); - assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesFailed()); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesFailed(moreBytesToAdd, System.currentTimeMillis()); + assertEquals(bytesToAdd + moreBytesToAdd, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesFailed()); } public void testAddDownloadBytesSucceeded() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long bytesToAdd = randomLongBetween(1000, 1000000); - pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(bytesToAdd, System.currentTimeMillis()); - assertEquals(bytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesSucceeded()); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(bytesToAdd, System.currentTimeMillis()); + assertEquals(bytesToAdd, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesSucceeded()); long moreBytesToAdd = randomLongBetween(1000, 10000); - pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(moreBytesToAdd, System.currentTimeMillis()); - assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesSucceeded()); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(moreBytesToAdd, System.currentTimeMillis()); + assertEquals(bytesToAdd + moreBytesToAdd, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesSucceeded()); } public void testGetInflightUploadBytes() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long bytesStarted = randomLongBetween(10000, 100000); long bytesSucceeded = randomLongBetween(1000, 10000); long bytesFailed = randomLongBetween(100, 1000); - pressureTracker.addUploadBytesStarted(bytesStarted); - pressureTracker.addUploadBytesSucceeded(bytesSucceeded); - pressureTracker.addUploadBytesFailed(bytesFailed); - assertEquals(bytesStarted - bytesSucceeded - bytesFailed, pressureTracker.getInflightUploadBytes()); + transferTracker.addUploadBytesStarted(bytesStarted); + transferTracker.addUploadBytesSucceeded(bytesSucceeded); + transferTracker.addUploadBytesFailed(bytesFailed); + assertEquals(bytesStarted - bytesSucceeded - bytesFailed, transferTracker.getInflightUploadBytes()); } public void testIncrementTotalUploadsStarted() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - pressureTracker.incrementTotalUploadsStarted(); - assertEquals(1, pressureTracker.getTotalUploadsStarted()); - pressureTracker.incrementTotalUploadsStarted(); - assertEquals(2, pressureTracker.getTotalUploadsStarted()); + transferTracker.incrementTotalUploadsStarted(); + assertEquals(1, transferTracker.getTotalUploadsStarted()); + transferTracker.incrementTotalUploadsStarted(); + assertEquals(2, transferTracker.getTotalUploadsStarted()); } public void testIncrementTotalUploadsFailed() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - pressureTracker.incrementTotalUploadsFailed(); - assertEquals(1, pressureTracker.getTotalUploadsFailed()); - pressureTracker.incrementTotalUploadsFailed(); - assertEquals(2, pressureTracker.getTotalUploadsFailed()); + transferTracker.incrementTotalUploadsFailed(); + assertEquals(1, transferTracker.getTotalUploadsFailed()); + transferTracker.incrementTotalUploadsFailed(); + assertEquals(2, transferTracker.getTotalUploadsFailed()); } public void testIncrementTotalUploadSucceeded() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - pressureTracker.incrementTotalUploadsSucceeded(); - assertEquals(1, pressureTracker.getTotalUploadsSucceeded()); - pressureTracker.incrementTotalUploadsSucceeded(); - assertEquals(2, pressureTracker.getTotalUploadsSucceeded()); + transferTracker.incrementTotalUploadsSucceeded(); + assertEquals(1, transferTracker.getTotalUploadsSucceeded()); + transferTracker.incrementTotalUploadsSucceeded(); + assertEquals(2, transferTracker.getTotalUploadsSucceeded()); } public void testGetInflightUploads() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - pressureTracker.incrementTotalUploadsStarted(); - assertEquals(1, pressureTracker.getInflightUploads()); - pressureTracker.incrementTotalUploadsStarted(); - assertEquals(2, pressureTracker.getInflightUploads()); - pressureTracker.incrementTotalUploadsSucceeded(); - assertEquals(1, pressureTracker.getInflightUploads()); - pressureTracker.incrementTotalUploadsFailed(); - assertEquals(0, pressureTracker.getInflightUploads()); + transferTracker.incrementTotalUploadsStarted(); + assertEquals(1, transferTracker.getInflightUploads()); + transferTracker.incrementTotalUploadsStarted(); + assertEquals(2, transferTracker.getInflightUploads()); + transferTracker.incrementTotalUploadsSucceeded(); + assertEquals(1, transferTracker.getInflightUploads()); + transferTracker.incrementTotalUploadsFailed(); + assertEquals(0, transferTracker.getInflightUploads()); } public void testIncrementRejectionCount() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - pressureTracker.incrementRejectionCount(); - assertEquals(1, pressureTracker.getRejectionCount()); - pressureTracker.incrementRejectionCount(); - assertEquals(2, pressureTracker.getRejectionCount()); + transferTracker.incrementRejectionCount(); + assertEquals(1, transferTracker.getRejectionCount()); + transferTracker.incrementRejectionCount(); + assertEquals(2, transferTracker.getRejectionCount()); } public void testGetConsecutiveFailureCount() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - pressureTracker.incrementTotalUploadsFailed(); - assertEquals(1, pressureTracker.getConsecutiveFailureCount()); - pressureTracker.incrementTotalUploadsFailed(); - assertEquals(2, pressureTracker.getConsecutiveFailureCount()); - pressureTracker.incrementTotalUploadsSucceeded(); - assertEquals(0, pressureTracker.getConsecutiveFailureCount()); + transferTracker.incrementTotalUploadsFailed(); + assertEquals(1, transferTracker.getConsecutiveFailureCount()); + transferTracker.incrementTotalUploadsFailed(); + assertEquals(2, transferTracker.getConsecutiveFailureCount()); + transferTracker.incrementTotalUploadsSucceeded(); + assertEquals(0, transferTracker.getConsecutiveFailureCount()); } public void testComputeBytesLag() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); // Create local file size map Map fileSizeMap = new HashMap<>(); fileSizeMap.put("a", 100L); fileSizeMap.put("b", 105L); - pressureTracker.updateLatestLocalFileNameLengthMap(fileSizeMap.keySet(), fileSizeMap::get); - assertEquals(205L, pressureTracker.getBytesLag()); + transferTracker.updateLatestLocalFileNameLengthMap(fileSizeMap.keySet(), fileSizeMap::get); + assertEquals(205L, transferTracker.getBytesLag()); - pressureTracker.addToLatestUploadedFiles("a"); - assertEquals(105L, pressureTracker.getBytesLag()); + transferTracker.addToLatestUploadedFiles("a"); + assertEquals(105L, transferTracker.getBytesLag()); fileSizeMap.put("c", 115L); - pressureTracker.updateLatestLocalFileNameLengthMap(fileSizeMap.keySet(), fileSizeMap::get); - assertEquals(220L, pressureTracker.getBytesLag()); + transferTracker.updateLatestLocalFileNameLengthMap(fileSizeMap.keySet(), fileSizeMap::get); + assertEquals(220L, transferTracker.getBytesLag()); - pressureTracker.addToLatestUploadedFiles("b"); - assertEquals(115L, pressureTracker.getBytesLag()); + transferTracker.addToLatestUploadedFiles("b"); + assertEquals(115L, transferTracker.getBytesLag()); - pressureTracker.addToLatestUploadedFiles("c"); - assertEquals(0L, pressureTracker.getBytesLag()); + transferTracker.addToLatestUploadedFiles("c"); + assertEquals(0L, transferTracker.getBytesLag()); } public void testIsUploadBytesAverageReady() { - int uploadBytesMovingAverageWindowSize = pressureSettings.getUploadBytesMovingAverageWindowSize(); - pressureTracker = new RemoteSegmentTransferTracker( - shardId, - directoryFileTransferTracker, - uploadBytesMovingAverageWindowSize, - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() - ); - assertFalse(pressureTracker.isUploadBytesAverageReady()); + int movingAverageWindowSize = remoteStoreStatsTrackerFactory.getMovingAverageWindowSize(); + transferTracker = new RemoteSegmentTransferTracker(shardId, directoryFileTransferTracker, movingAverageWindowSize); + assertFalse(transferTracker.isUploadBytesAverageReady()); long sum = 0; - for (int i = 1; i < uploadBytesMovingAverageWindowSize; i++) { - pressureTracker.addUploadBytes(i); + for (int i = 1; i < movingAverageWindowSize; i++) { + transferTracker.addUploadBytes(i); sum += i; - assertFalse(pressureTracker.isUploadBytesAverageReady()); - assertEquals((double) sum / i, pressureTracker.getUploadBytesAverage(), 0.0d); + assertFalse(transferTracker.isUploadBytesAverageReady()); + assertEquals((double) sum / i, transferTracker.getUploadBytesAverage(), 0.0d); } - pressureTracker.addUploadBytes(uploadBytesMovingAverageWindowSize); - sum += uploadBytesMovingAverageWindowSize; - assertTrue(pressureTracker.isUploadBytesAverageReady()); - assertEquals((double) sum / uploadBytesMovingAverageWindowSize, pressureTracker.getUploadBytesAverage(), 0.0d); + transferTracker.addUploadBytes(movingAverageWindowSize); + sum += movingAverageWindowSize; + assertTrue(transferTracker.isUploadBytesAverageReady()); + assertEquals((double) sum / movingAverageWindowSize, transferTracker.getUploadBytesAverage(), 0.0d); - pressureTracker.addUploadBytes(100); + transferTracker.addUploadBytes(100); sum = sum + 100 - 1; - assertEquals((double) sum / uploadBytesMovingAverageWindowSize, pressureTracker.getUploadBytesAverage(), 0.0d); + assertEquals((double) sum / movingAverageWindowSize, transferTracker.getUploadBytesAverage(), 0.0d); } public void testIsUploadBytesPerSecAverageReady() { - int uploadBytesPerSecMovingAverageWindowSize = pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(); - pressureTracker = new RemoteSegmentTransferTracker( - shardId, - directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - uploadBytesPerSecMovingAverageWindowSize, - pressureSettings.getUploadTimeMovingAverageWindowSize() - ); - assertFalse(pressureTracker.isUploadBytesPerSecAverageReady()); + int movingAverageWindowSize = remoteStoreStatsTrackerFactory.getMovingAverageWindowSize(); + transferTracker = new RemoteSegmentTransferTracker(shardId, directoryFileTransferTracker, movingAverageWindowSize); + assertFalse(transferTracker.isUploadBytesPerSecAverageReady()); long sum = 0; - for (int i = 1; i < uploadBytesPerSecMovingAverageWindowSize; i++) { - pressureTracker.addUploadBytesPerSec(i); + for (int i = 1; i < movingAverageWindowSize; i++) { + transferTracker.addUploadBytesPerSec(i); sum += i; - assertFalse(pressureTracker.isUploadBytesPerSecAverageReady()); - assertEquals((double) sum / i, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); + assertFalse(transferTracker.isUploadBytesPerSecAverageReady()); + assertEquals((double) sum / i, transferTracker.getUploadBytesPerSecAverage(), 0.0d); } - pressureTracker.addUploadBytesPerSec(uploadBytesPerSecMovingAverageWindowSize); - sum += uploadBytesPerSecMovingAverageWindowSize; - assertTrue(pressureTracker.isUploadBytesPerSecAverageReady()); - assertEquals((double) sum / uploadBytesPerSecMovingAverageWindowSize, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); + transferTracker.addUploadBytesPerSec(movingAverageWindowSize); + sum += movingAverageWindowSize; + assertTrue(transferTracker.isUploadBytesPerSecAverageReady()); + assertEquals((double) sum / movingAverageWindowSize, transferTracker.getUploadBytesPerSecAverage(), 0.0d); - pressureTracker.addUploadBytesPerSec(100); + transferTracker.addUploadBytesPerSec(100); sum = sum + 100 - 1; - assertEquals((double) sum / uploadBytesPerSecMovingAverageWindowSize, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); + assertEquals((double) sum / movingAverageWindowSize, transferTracker.getUploadBytesPerSecAverage(), 0.0d); } public void testIsUploadTimeMsAverageReady() { - int uploadTimeMovingAverageWindowSize = pressureSettings.getUploadTimeMovingAverageWindowSize(); - pressureTracker = new RemoteSegmentTransferTracker( - shardId, - directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - uploadTimeMovingAverageWindowSize - ); - assertFalse(pressureTracker.isUploadTimeMsAverageReady()); + int movingAverageWindowSize = remoteStoreStatsTrackerFactory.getMovingAverageWindowSize(); + transferTracker = new RemoteSegmentTransferTracker(shardId, directoryFileTransferTracker, movingAverageWindowSize); + assertFalse(transferTracker.isUploadTimeMsAverageReady()); long sum = 0; - for (int i = 1; i < uploadTimeMovingAverageWindowSize; i++) { - pressureTracker.addTimeForCompletedUploadSync(i); + for (int i = 1; i < movingAverageWindowSize; i++) { + transferTracker.addTimeForCompletedUploadSync(i); sum += i; - assertFalse(pressureTracker.isUploadTimeMsAverageReady()); - assertEquals((double) sum / i, pressureTracker.getUploadTimeMsAverage(), 0.0d); + assertFalse(transferTracker.isUploadTimeMsAverageReady()); + assertEquals((double) sum / i, transferTracker.getUploadTimeMsAverage(), 0.0d); } - pressureTracker.addTimeForCompletedUploadSync(uploadTimeMovingAverageWindowSize); - sum += uploadTimeMovingAverageWindowSize; - assertTrue(pressureTracker.isUploadTimeMsAverageReady()); - assertEquals((double) sum / uploadTimeMovingAverageWindowSize, pressureTracker.getUploadTimeMsAverage(), 0.0d); + transferTracker.addTimeForCompletedUploadSync(movingAverageWindowSize); + sum += movingAverageWindowSize; + assertTrue(transferTracker.isUploadTimeMsAverageReady()); + assertEquals((double) sum / movingAverageWindowSize, transferTracker.getUploadTimeMsAverage(), 0.0d); - pressureTracker.addTimeForCompletedUploadSync(100); + transferTracker.addTimeForCompletedUploadSync(100); sum = sum + 100 - 1; - assertEquals((double) sum / uploadTimeMovingAverageWindowSize, pressureTracker.getUploadTimeMsAverage(), 0.0d); + assertEquals((double) sum / movingAverageWindowSize, transferTracker.getUploadTimeMsAverage(), 0.0d); } public void testIsDownloadBytesAverageReady() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - assertFalse(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesAverageReady()); + assertFalse(transferTracker.getDirectoryFileTransferTracker().isTransferredBytesAverageReady()); long sum = 0; for (int i = 1; i < 20; i++) { - pressureTracker.getDirectoryFileTransferTracker().updateSuccessfulTransferSize(i); + transferTracker.getDirectoryFileTransferTracker().updateSuccessfulTransferSize(i); sum += i; - assertFalse(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesAverageReady()); - assertEquals((double) sum / i, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); + assertFalse(transferTracker.getDirectoryFileTransferTracker().isTransferredBytesAverageReady()); + assertEquals((double) sum / i, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); } - pressureTracker.getDirectoryFileTransferTracker().updateSuccessfulTransferSize(20); + transferTracker.getDirectoryFileTransferTracker().updateSuccessfulTransferSize(20); sum += 20; - assertTrue(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesAverageReady()); - assertEquals((double) sum / 20, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); + assertTrue(transferTracker.getDirectoryFileTransferTracker().isTransferredBytesAverageReady()); + assertEquals((double) sum / 20, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); - pressureTracker.getDirectoryFileTransferTracker().updateSuccessfulTransferSize(100); + transferTracker.getDirectoryFileTransferTracker().updateSuccessfulTransferSize(100); sum = sum + 100 - 1; - assertEquals((double) sum / 20, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); + assertEquals((double) sum / 20, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); } public void testIsDownloadBytesPerSecAverageReady() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - assertFalse(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesPerSecAverageReady()); + assertFalse(transferTracker.getDirectoryFileTransferTracker().isTransferredBytesPerSecAverageReady()); long sum = 0; for (int i = 1; i < 20; i++) { - pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(i); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(i); sum += i; - assertFalse(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesPerSecAverageReady()); - assertEquals((double) sum / i, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesPerSecAverage(), 0.0d); + assertFalse(transferTracker.getDirectoryFileTransferTracker().isTransferredBytesPerSecAverageReady()); + assertEquals((double) sum / i, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesPerSecAverage(), 0.0d); } - pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(20); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(20); sum += 20; - assertTrue(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesPerSecAverageReady()); - assertEquals((double) sum / 20, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesPerSecAverage(), 0.0d); + assertTrue(transferTracker.getDirectoryFileTransferTracker().isTransferredBytesPerSecAverageReady()); + assertEquals((double) sum / 20, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesPerSecAverage(), 0.0d); - pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(100); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(100); sum = sum + 100 - 1; - assertEquals((double) sum / 20, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesPerSecAverage(), 0.0d); + assertEquals((double) sum / 20, transferTracker.getDirectoryFileTransferTracker().getTransferredBytesPerSecAverage(), 0.0d); } public void testAddTotalUploadTimeInMs() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long timeToAdd = randomLongBetween(100, 200); - pressureTracker.addTotalUploadTimeInMs(timeToAdd); - assertEquals(timeToAdd, pressureTracker.getTotalUploadTimeInMs()); + transferTracker.addTotalUploadTimeInMs(timeToAdd); + assertEquals(timeToAdd, transferTracker.getTotalUploadTimeInMs()); long moreTimeToAdd = randomLongBetween(100, 200); - pressureTracker.addTotalUploadTimeInMs(moreTimeToAdd); - assertEquals(timeToAdd + moreTimeToAdd, pressureTracker.getTotalUploadTimeInMs()); + transferTracker.addTotalUploadTimeInMs(moreTimeToAdd); + assertEquals(timeToAdd + moreTimeToAdd, transferTracker.getTotalUploadTimeInMs()); } public void testAddTotalTransferTimeMs() { - pressureTracker = new RemoteSegmentTransferTracker( + transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); long timeToAdd = randomLongBetween(100, 200); - pressureTracker.getDirectoryFileTransferTracker().addTotalTransferTimeInMs(timeToAdd); - assertEquals(timeToAdd, pressureTracker.getDirectoryFileTransferTracker().getTotalTransferTimeInMs()); + transferTracker.getDirectoryFileTransferTracker().addTotalTransferTimeInMs(timeToAdd); + assertEquals(timeToAdd, transferTracker.getDirectoryFileTransferTracker().getTotalTransferTimeInMs()); long moreTimeToAdd = randomLongBetween(100, 200); - pressureTracker.getDirectoryFileTransferTracker().addTotalTransferTimeInMs(moreTimeToAdd); - assertEquals(timeToAdd + moreTimeToAdd, pressureTracker.getDirectoryFileTransferTracker().getTotalTransferTimeInMs()); + transferTracker.getDirectoryFileTransferTracker().addTotalTransferTimeInMs(moreTimeToAdd); + assertEquals(timeToAdd + moreTimeToAdd, transferTracker.getDirectoryFileTransferTracker().getTotalTransferTimeInMs()); } /** * Tests whether RemoteSegmentTransferTracker.Stats object generated correctly from RemoteSegmentTransferTracker. * */ public void testStatsObjectCreation() { - pressureTracker = constructTracker(); - RemoteSegmentTransferTracker.Stats pressureTrackerStats = pressureTracker.stats(); - assertEquals(pressureTracker.getShardId(), pressureTrackerStats.shardId); - assertEquals(pressureTracker.getTimeMsLag(), (int) pressureTrackerStats.refreshTimeLagMs); - assertEquals(pressureTracker.getLocalRefreshSeqNo(), (int) pressureTrackerStats.localRefreshNumber); - assertEquals(pressureTracker.getRemoteRefreshSeqNo(), (int) pressureTrackerStats.remoteRefreshNumber); - assertEquals(pressureTracker.getBytesLag(), (int) pressureTrackerStats.bytesLag); - assertEquals(pressureTracker.getRejectionCount(), (int) pressureTrackerStats.rejectionCount); - assertEquals(pressureTracker.getConsecutiveFailureCount(), (int) pressureTrackerStats.consecutiveFailuresCount); - assertEquals(pressureTracker.getUploadBytesStarted(), (int) pressureTrackerStats.uploadBytesStarted); - assertEquals(pressureTracker.getUploadBytesSucceeded(), (int) pressureTrackerStats.uploadBytesSucceeded); - assertEquals(pressureTracker.getUploadBytesFailed(), (int) pressureTrackerStats.uploadBytesFailed); - assertEquals(pressureTracker.getUploadBytesAverage(), pressureTrackerStats.uploadBytesMovingAverage, 0); - assertEquals(pressureTracker.getUploadBytesPerSecAverage(), pressureTrackerStats.uploadBytesPerSecMovingAverage, 0); - assertEquals(pressureTracker.getUploadTimeMsAverage(), pressureTrackerStats.uploadTimeMovingAverage, 0); - assertEquals(pressureTracker.getTotalUploadsStarted(), (int) pressureTrackerStats.totalUploadsStarted); - assertEquals(pressureTracker.getTotalUploadsSucceeded(), (int) pressureTrackerStats.totalUploadsSucceeded); - assertEquals(pressureTracker.getTotalUploadsFailed(), (int) pressureTrackerStats.totalUploadsFailed); + transferTracker = constructTracker(); + RemoteSegmentTransferTracker.Stats transferTrackerStats = transferTracker.stats(); + assertEquals(transferTracker.getShardId(), transferTrackerStats.shardId); + assertEquals(transferTracker.getTimeMsLag(), (int) transferTrackerStats.refreshTimeLagMs); + assertEquals(transferTracker.getLocalRefreshSeqNo(), (int) transferTrackerStats.localRefreshNumber); + assertEquals(transferTracker.getRemoteRefreshSeqNo(), (int) transferTrackerStats.remoteRefreshNumber); + assertEquals(transferTracker.getBytesLag(), (int) transferTrackerStats.bytesLag); + assertEquals(transferTracker.getRejectionCount(), (int) transferTrackerStats.rejectionCount); + assertEquals(transferTracker.getConsecutiveFailureCount(), (int) transferTrackerStats.consecutiveFailuresCount); + assertEquals(transferTracker.getUploadBytesStarted(), (int) transferTrackerStats.uploadBytesStarted); + assertEquals(transferTracker.getUploadBytesSucceeded(), (int) transferTrackerStats.uploadBytesSucceeded); + assertEquals(transferTracker.getUploadBytesFailed(), (int) transferTrackerStats.uploadBytesFailed); + assertEquals(transferTracker.getUploadBytesAverage(), transferTrackerStats.uploadBytesMovingAverage, 0); + assertEquals(transferTracker.getUploadBytesPerSecAverage(), transferTrackerStats.uploadBytesPerSecMovingAverage, 0); + assertEquals(transferTracker.getUploadTimeMsAverage(), transferTrackerStats.uploadTimeMovingAverage, 0); + assertEquals(transferTracker.getTotalUploadsStarted(), (int) transferTrackerStats.totalUploadsStarted); + assertEquals(transferTracker.getTotalUploadsSucceeded(), (int) transferTrackerStats.totalUploadsSucceeded); + assertEquals(transferTracker.getTotalUploadsFailed(), (int) transferTrackerStats.totalUploadsFailed); } /** @@ -606,64 +532,62 @@ public void testStatsObjectCreation() { * This comes into play during internode data transfer. */ public void testStatsObjectCreationViaStream() throws IOException { - pressureTracker = constructTracker(); - RemoteSegmentTransferTracker.Stats pressureTrackerStats = pressureTracker.stats(); + transferTracker = constructTracker(); + RemoteSegmentTransferTracker.Stats transferTrackerStats = transferTracker.stats(); try (BytesStreamOutput out = new BytesStreamOutput()) { - pressureTrackerStats.writeTo(out); + transferTrackerStats.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { RemoteSegmentTransferTracker.Stats deserializedStats = new RemoteSegmentTransferTracker.Stats(in); - assertEquals(deserializedStats.shardId, pressureTrackerStats.shardId); - assertEquals((int) deserializedStats.refreshTimeLagMs, (int) pressureTrackerStats.refreshTimeLagMs); - assertEquals((int) deserializedStats.localRefreshNumber, (int) pressureTrackerStats.localRefreshNumber); - assertEquals((int) deserializedStats.remoteRefreshNumber, (int) pressureTrackerStats.remoteRefreshNumber); - assertEquals((int) deserializedStats.bytesLag, (int) pressureTrackerStats.bytesLag); - assertEquals((int) deserializedStats.rejectionCount, (int) pressureTrackerStats.rejectionCount); - assertEquals((int) deserializedStats.consecutiveFailuresCount, (int) pressureTrackerStats.consecutiveFailuresCount); - assertEquals((int) deserializedStats.uploadBytesStarted, (int) pressureTrackerStats.uploadBytesStarted); - assertEquals((int) deserializedStats.uploadBytesSucceeded, (int) pressureTrackerStats.uploadBytesSucceeded); - assertEquals((int) deserializedStats.uploadBytesFailed, (int) pressureTrackerStats.uploadBytesFailed); - assertEquals((int) deserializedStats.uploadBytesMovingAverage, pressureTrackerStats.uploadBytesMovingAverage, 0); + assertEquals(deserializedStats.shardId, transferTrackerStats.shardId); + assertEquals((int) deserializedStats.refreshTimeLagMs, (int) transferTrackerStats.refreshTimeLagMs); + assertEquals((int) deserializedStats.localRefreshNumber, (int) transferTrackerStats.localRefreshNumber); + assertEquals((int) deserializedStats.remoteRefreshNumber, (int) transferTrackerStats.remoteRefreshNumber); + assertEquals((int) deserializedStats.bytesLag, (int) transferTrackerStats.bytesLag); + assertEquals((int) deserializedStats.rejectionCount, (int) transferTrackerStats.rejectionCount); + assertEquals((int) deserializedStats.consecutiveFailuresCount, (int) transferTrackerStats.consecutiveFailuresCount); + assertEquals((int) deserializedStats.uploadBytesStarted, (int) transferTrackerStats.uploadBytesStarted); + assertEquals((int) deserializedStats.uploadBytesSucceeded, (int) transferTrackerStats.uploadBytesSucceeded); + assertEquals((int) deserializedStats.uploadBytesFailed, (int) transferTrackerStats.uploadBytesFailed); + assertEquals((int) deserializedStats.uploadBytesMovingAverage, transferTrackerStats.uploadBytesMovingAverage, 0); assertEquals( (int) deserializedStats.uploadBytesPerSecMovingAverage, - pressureTrackerStats.uploadBytesPerSecMovingAverage, + transferTrackerStats.uploadBytesPerSecMovingAverage, 0 ); - assertEquals((int) deserializedStats.uploadTimeMovingAverage, pressureTrackerStats.uploadTimeMovingAverage, 0); - assertEquals((int) deserializedStats.totalUploadsStarted, (int) pressureTrackerStats.totalUploadsStarted); - assertEquals((int) deserializedStats.totalUploadsSucceeded, (int) pressureTrackerStats.totalUploadsSucceeded); - assertEquals((int) deserializedStats.totalUploadsFailed, (int) pressureTrackerStats.totalUploadsFailed); + assertEquals((int) deserializedStats.uploadTimeMovingAverage, transferTrackerStats.uploadTimeMovingAverage, 0); + assertEquals((int) deserializedStats.totalUploadsStarted, (int) transferTrackerStats.totalUploadsStarted); + assertEquals((int) deserializedStats.totalUploadsSucceeded, (int) transferTrackerStats.totalUploadsSucceeded); + assertEquals((int) deserializedStats.totalUploadsFailed, (int) transferTrackerStats.totalUploadsFailed); assertEquals( (int) deserializedStats.directoryFileTransferTrackerStats.transferredBytesStarted, - (int) pressureTrackerStats.directoryFileTransferTrackerStats.transferredBytesStarted + (int) transferTrackerStats.directoryFileTransferTrackerStats.transferredBytesStarted ); assertEquals( (int) deserializedStats.directoryFileTransferTrackerStats.transferredBytesSucceeded, - (int) pressureTrackerStats.directoryFileTransferTrackerStats.transferredBytesSucceeded + (int) transferTrackerStats.directoryFileTransferTrackerStats.transferredBytesSucceeded ); assertEquals( (int) deserializedStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage, - (int) pressureTrackerStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage + (int) transferTrackerStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage ); } } } private RemoteSegmentTransferTracker constructTracker() { - RemoteSegmentTransferTracker segmentPressureTracker = new RemoteSegmentTransferTracker( + RemoteSegmentTransferTracker transferTracker = new RemoteSegmentTransferTracker( shardId, new DirectoryFileTransferTracker(), - pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - segmentPressureTracker.incrementTotalUploadsFailed(); - segmentPressureTracker.addTimeForCompletedUploadSync(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); - segmentPressureTracker.addUploadBytes(99); - segmentPressureTracker.updateRemoteRefreshTimeMs(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); - segmentPressureTracker.incrementRejectionCount(); - segmentPressureTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(10); - segmentPressureTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(10, System.currentTimeMillis()); - segmentPressureTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(5); - return segmentPressureTracker; + transferTracker.incrementTotalUploadsFailed(); + transferTracker.addTimeForCompletedUploadSync(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); + transferTracker.addUploadBytes(99); + transferTracker.updateRemoteRefreshTimeMs(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); + transferTracker.incrementRejectionCount(); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(10); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(10, System.currentTimeMillis()); + transferTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(5); + return transferTracker; } } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java index e164269d96a3d..355333e74f826 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java @@ -8,17 +8,12 @@ package org.opensearch.index.remote; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.store.Store; -import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -28,8 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexShard; public class RemoteStorePressureServiceTests extends OpenSearchTestCase { @@ -41,6 +35,8 @@ public class RemoteStorePressureServiceTests extends OpenSearchTestCase { private RemoteStorePressureService pressureService; + private RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; + @Override public void setUp() throws Exception { super.setUp(); @@ -60,7 +56,8 @@ public void tearDown() throws Exception { } public void testIsSegmentsUploadBackpressureEnabled() { - pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); + remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY, remoteStoreStatsTrackerFactory); assertFalse(pressureService.isSegmentsUploadBackpressureEnabled()); Settings newSettings = Settings.builder() @@ -71,37 +68,14 @@ public void testIsSegmentsUploadBackpressureEnabled() { assertTrue(pressureService.isSegmentsUploadBackpressureEnabled()); } - public void testAfterIndexShardCreatedForRemoteBackedIndex() { - IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); - pressureService.afterIndexShardCreated(indexShard); - assertNotNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); - } - - public void testAfterIndexShardCreatedForNonRemoteBackedIndex() { - IndexShard indexShard = createIndexShard(shardId, false); - pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); - pressureService.afterIndexShardCreated(indexShard); - assertNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); - } - - public void testAfterIndexShardClosed() { - IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); - pressureService.afterIndexShardCreated(indexShard); - assertNotNull(pressureService.getRemoteRefreshSegmentTracker(shardId)); - - pressureService.afterIndexShardClosed(shardId, indexShard, indexShard.indexSettings().getSettings()); - assertNull(pressureService.getRemoteRefreshSegmentTracker(shardId)); - } - public void testValidateSegmentUploadLag() { // Create the pressure tracker IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); - pressureService.afterIndexShardCreated(indexShard); + remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY, remoteStoreStatsTrackerFactory); + remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard); - RemoteSegmentTransferTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId); + RemoteSegmentTransferTracker pressureTracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId); pressureTracker.updateLocalRefreshSeqNo(6); // 1. time lag more than dynamic threshold @@ -152,17 +126,4 @@ public void testValidateSegmentUploadLag() { pressureService.validateSegmentsUploadLag(shardId); } - private static IndexShard createIndexShard(ShardId shardId, boolean remoteStoreEnabled) { - Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, String.valueOf(remoteStoreEnabled)) - .build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); - Store store = mock(Store.class); - IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(indexSettings); - when(indexShard.shardId()).thenReturn(shardId); - when(indexShard.store()).thenReturn(store); - return indexShard; - } } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java index 9c5ec69cf6be9..f5514b8936a2f 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java @@ -15,10 +15,6 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; public class RemoteStorePressureSettingsTests extends OpenSearchTestCase { @@ -62,15 +58,6 @@ public void testGetDefaultSettings() { // Check minimum consecutive failures limit default value assertEquals(5, pressureSettings.getMinConsecutiveFailuresLimit()); - - // Check upload bytes moving average window size default value - assertEquals(20, pressureSettings.getUploadBytesMovingAverageWindowSize()); - - // Check upload bytes per sec moving average window size default value - assertEquals(20, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); - - // Check upload time moving average window size default value - assertEquals(20, pressureSettings.getUploadTimeMovingAverageWindowSize()); } public void testGetConfiguredSettings() { @@ -79,9 +66,6 @@ public void testGetConfiguredSettings() { .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) - .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) - .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) - .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) .build(); RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, @@ -100,15 +84,6 @@ public void testGetConfiguredSettings() { // Check minimum consecutive failures limit configured value assertEquals(121, pressureSettings.getMinConsecutiveFailuresLimit()); - - // Check upload bytes moving average window size configured value - assertEquals(102, pressureSettings.getUploadBytesMovingAverageWindowSize()); - - // Check upload bytes per sec moving average window size configured value - assertEquals(103, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); - - // Check upload time moving average window size configured value - assertEquals(104, pressureSettings.getUploadTimeMovingAverageWindowSize()); } public void testUpdateAfterGetDefaultSettings() { @@ -123,9 +98,6 @@ public void testUpdateAfterGetDefaultSettings() { .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) - .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) - .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) - .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) .build(); clusterService.getClusterSettings().applySettings(newSettings); @@ -140,15 +112,6 @@ public void testUpdateAfterGetDefaultSettings() { // Check minimum consecutive failures limit updated assertEquals(121, pressureSettings.getMinConsecutiveFailuresLimit()); - - // Check upload bytes moving average window size updated - assertEquals(102, pressureSettings.getUploadBytesMovingAverageWindowSize()); - - // Check upload bytes per sec moving average window size updated - assertEquals(103, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); - - // Check upload time moving average window size updated - assertEquals(104, pressureSettings.getUploadTimeMovingAverageWindowSize()); } public void testUpdateAfterGetConfiguredSettings() { @@ -157,9 +120,6 @@ public void testUpdateAfterGetConfiguredSettings() { .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) - .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) - .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) - .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) .build(); RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, @@ -171,9 +131,6 @@ public void testUpdateAfterGetConfiguredSettings() { .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 40.0) .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 111) - .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 112) - .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 113) - .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 114) .build(); clusterService.getClusterSettings().applySettings(newSettings); @@ -189,59 +146,5 @@ public void testUpdateAfterGetConfiguredSettings() { // Check minimum consecutive failures limit updated assertEquals(111, pressureSettings.getMinConsecutiveFailuresLimit()); - - // Check upload bytes moving average window size updated - assertEquals(112, pressureSettings.getUploadBytesMovingAverageWindowSize()); - - // Check upload bytes per sec moving average window size updated - assertEquals(113, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); - - // Check upload time moving average window size updated - assertEquals(114, pressureSettings.getUploadTimeMovingAverageWindowSize()); - } - - public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() { - - int toUpdateVal1 = 1121, toUpdateVal2 = 1123, toUpdateVal3 = 1125; - - AtomicInteger updatedUploadBytesWindowSize = new AtomicInteger(); - AtomicInteger updatedUploadBytesPerSecWindowSize = new AtomicInteger(); - AtomicInteger updatedUploadTimeWindowSize = new AtomicInteger(); - - RemoteStorePressureService pressureService = mock(RemoteStorePressureService.class); - - // Upload bytes - doAnswer(invocation -> { - updatedUploadBytesWindowSize.set(invocation.getArgument(0)); - return null; - }).when(pressureService).updateUploadBytesMovingAverageWindowSize(anyInt()); - - // Upload bytes per sec - doAnswer(invocation -> { - updatedUploadBytesPerSecWindowSize.set(invocation.getArgument(0)); - return null; - }).when(pressureService).updateUploadBytesPerSecMovingAverageWindowSize(anyInt()); - - // Upload time - doAnswer(invocation -> { - updatedUploadTimeWindowSize.set(invocation.getArgument(0)); - return null; - }).when(pressureService).updateUploadTimeMsMovingAverageWindowSize(anyInt()); - - RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings(clusterService, Settings.EMPTY, pressureService); - Settings newSettings = Settings.builder() - .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal1) - .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal2) - .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal3) - .build(); - clusterService.getClusterSettings().applySettings(newSettings); - - // Assertions - assertEquals(toUpdateVal1, pressureSettings.getUploadBytesMovingAverageWindowSize()); - assertEquals(toUpdateVal1, updatedUploadBytesWindowSize.get()); - assertEquals(toUpdateVal2, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); - assertEquals(toUpdateVal2, updatedUploadBytesPerSecWindowSize.get()); - assertEquals(toUpdateVal3, pressureSettings.getUploadTimeMovingAverageWindowSize()); - assertEquals(toUpdateVal3, updatedUploadTimeWindowSize.get()); } } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactoryTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactoryTests.java new file mode 100644 index 0000000000000..c300f316ac633 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactoryTests.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexShard; + +public class RemoteStoreStatsTrackerFactoryTests extends OpenSearchTestCase { + private ThreadPool threadPool; + private ClusterService clusterService; + private Settings settings; + private ShardId shardId; + private IndexShard indexShard; + private RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; + + @Override + public void setUp() throws Exception { + super.setUp(); + shardId = new ShardId("index", "uuid", 0); + indexShard = createIndexShard(shardId, true); + threadPool = new TestThreadPool(getTestName()); + settings = Settings.builder() + .put( + RemoteStoreStatsTrackerFactory.MOVING_AVERAGE_WINDOW_SIZE.getKey(), + RemoteStoreStatsTrackerFactory.Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE + ) + .build(); + clusterService = new ClusterService(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool); + remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testAfterIndexShardCreatedForRemoteBackedIndex() { + remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard); + assertNotNull(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId())); + } + + public void testAfterIndexShardCreatedForNonRemoteBackedIndex() { + indexShard = createIndexShard(shardId, false); + remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard); + assertNull(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId())); + } + + public void testAfterIndexShardClosed() { + remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard); + assertNotNull(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId)); + remoteStoreStatsTrackerFactory.afterIndexShardClosed(shardId, indexShard, indexShard.indexSettings().getSettings()); + assertNull(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId)); + } + + public void testGetConfiguredSettings() { + assertEquals( + RemoteStoreStatsTrackerFactory.Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE, + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() + ); + } + + public void testInvalidMovingAverageWindowSize() { + Settings settings = Settings.builder() + .put( + RemoteStoreStatsTrackerFactory.MOVING_AVERAGE_WINDOW_SIZE.getKey(), + RemoteStoreStatsTrackerFactory.Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE - 1 + ) + .build(); + assertThrows( + "Failed to parse value", + IllegalArgumentException.class, + () -> new RemoteStoreStatsTrackerFactory( + new ClusterService(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool), + settings + ) + ); + } + + public void testUpdateAfterGetConfiguredSettings() { + assertEquals( + RemoteStoreStatsTrackerFactory.Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE, + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() + ); + + Settings newSettings = Settings.builder().put(RemoteStoreStatsTrackerFactory.MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102).build(); + + clusterService.getClusterSettings().applySettings(newSettings); + + // Check moving average window size updated + assertEquals(102, remoteStoreStatsTrackerFactory.getMovingAverageWindowSize()); + } + + public void testGetDefaultSettings() { + remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory( + new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool), + Settings.EMPTY + ); + // Check moving average window size updated + assertEquals( + RemoteStoreStatsTrackerFactory.Defaults.MOVING_AVERAGE_WINDOW_SIZE, + remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() + ); + } +} diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java new file mode 100644 index 0000000000000..e072d3037caad --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.IndexSettingsModule; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Helper functions for Remote Store tests + */ +public class RemoteStoreTestsHelper { + static IndexShard createIndexShard(ShardId shardId, boolean remoteStoreEnabled) { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, String.valueOf(remoteStoreEnabled)) + .build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); + Store store = mock(Store.class); + IndexShard indexShard = mock(IndexShard.class); + when(indexShard.indexSettings()).thenReturn(indexSettings); + when(indexShard.shardId()).thenReturn(shardId); + when(indexShard.store()).thenReturn(store); + return indexShard; + } +} diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 9a7fefb78a06b..0b5d66cea84fa 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -1759,8 +1759,8 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException { .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) .build() ); - RemoteSegmentTransferTracker remoteRefreshSegmentTracker = shard.getRemoteStorePressureService() - .getRemoteRefreshSegmentTracker(shard.shardId); + RemoteSegmentTransferTracker remoteRefreshSegmentTracker = shard.getRemoteStoreStatsTrackerFactory() + .getRemoteSegmentTransferTracker(shard.shardId); populateSampleRemoteStoreStats(remoteRefreshSegmentTracker); ShardStats shardStats = new ShardStats( shard.routingEntry(), diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index e05f8dc6e4e57..896bbffb10d09 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -27,7 +27,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.remote.RemoteSegmentTransferTracker; -import org.opensearch.index.remote.RemoteStorePressureService; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.store.RemoteDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils; @@ -60,7 +60,7 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { private IndexShard indexShard; private ClusterService clusterService; private RemoteStoreRefreshListener remoteStoreRefreshListener; - private RemoteStorePressureService remoteStorePressureService; + private RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; public void setup(boolean primary, int numberOfDocs) throws IOException { indexShard = newStartedShard( @@ -84,9 +84,9 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - remoteStorePressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); - remoteStorePressureService.afterIndexShardCreated(indexShard); - RemoteSegmentTransferTracker tracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY); + remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard); + RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY, tracker); } @@ -317,15 +317,15 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteStorePressureService pressureService = tuple.v2(); - RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2(); + RemoteSegmentTransferTracker segmentTracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 0); } @@ -338,15 +338,15 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteStorePressureService pressureService = tuple.v2(); - RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2(); + RemoteSegmentTransferTracker segmentTracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 1); } @@ -384,15 +384,15 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteStorePressureService pressureService = tuple.v2(); - RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2(); + RemoteSegmentTransferTracker segmentTracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 2); } @@ -406,10 +406,10 @@ private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segme } public void testTrackerData() throws Exception { - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh(1); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh(1); RemoteStoreRefreshListener listener = tuple.v1(); - RemoteStorePressureService pressureService = tuple.v2(); - RemoteSegmentTransferTracker tracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2(); + RemoteSegmentTransferTracker tracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); assertNoLag(tracker); indexDocs(100, randomIntBetween(100, 200)); indexShard.refresh("test"); @@ -431,12 +431,13 @@ private void assertNoLag(RemoteSegmentTransferTracker tracker) { assertEquals(0, tracker.getTotalUploadsFailed()); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh(int succeedOnAttempt) - throws IOException { + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int succeedOnAttempt + ) throws IOException { return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, null, null); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh( + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch @@ -445,7 +446,7 @@ private Tuple mockIndexS return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch, 1, noOpLatch); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh( + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch, @@ -539,13 +540,13 @@ private Tuple mockIndexS new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - RemoteStorePressureService remoteStorePressureService = indexShard.getRemoteStorePressureService(); + RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = indexShard.getRemoteStoreStatsTrackerFactory(); when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); when(shard.shardId()).thenReturn(indexShard.shardId()); - RemoteSegmentTransferTracker tracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker); refreshListener.afterRefresh(true); - return Tuple.tuple(refreshListener, remoteStorePressureService); + return Tuple.tuple(refreshListener, remoteStoreStatsTrackerFactory); } public static class TestFilterDirectory extends FilterDirectory { diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 405903c005a84..c455101ff4549 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -46,7 +46,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.remote.RemoteStorePressureService; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -264,7 +264,7 @@ public MockIndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteStorePressureService remoteStorePressureService + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 4f7697660096e..a1cedbb6e24c0 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -178,6 +178,7 @@ import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.remote.RemoteStorePressureService; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; @@ -2126,7 +2127,7 @@ public void onFailure(final Exception e) { ), RetentionLeaseSyncer.EMPTY, SegmentReplicationCheckpointPublisher.EMPTY, - mock(RemoteStorePressureService.class) + mock(RemoteStoreStatsTrackerFactory.class) ); final SystemIndices systemIndices = new SystemIndices(emptyMap()); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 8348584379f9c..b14a42e1e78ae 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -97,7 +97,7 @@ import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; -import org.opensearch.index.remote.RemoteStorePressureService; +import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -640,7 +640,7 @@ protected IndexShard newShard( clusterSettings ); Store remoteStore = null; - RemoteStorePressureService remoteStorePressureService = null; + RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = null; RepositoriesService mockRepoSvc = mock(RepositoriesService.class); if (indexSettings.isRemoteStoreEnabled()) { @@ -655,7 +655,7 @@ protected IndexShard newShard( remoteStore = createRemoteStore(remotePath, routing, indexMetadata); - remoteStorePressureService = new RemoteStorePressureService(clusterService, indexSettings.getSettings()); + remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, indexSettings.getSettings()); BlobStoreRepository repo = createRepository(remotePath); when(mockRepoSvc.repository(any())).thenAnswer(invocationOnMock -> repo); } @@ -695,11 +695,11 @@ protected IndexShard newShard( translogFactorySupplier, checkpointPublisher, remoteStore, - remoteStorePressureService + remoteStoreStatsTrackerFactory ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); - if (remoteStorePressureService != null) { - remoteStorePressureService.afterIndexShardCreated(indexShard); + if (remoteStoreStatsTrackerFactory != null) { + remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard); } success = true; } finally {