diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 79186deeeaf0f..17d9964f6a320 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -81,19 +81,19 @@ protected SegmentReplicationSourceService( this.ongoingSegmentReplications = ongoingSegmentReplications; transportService.registerRequestHandler( Actions.GET_CHECKPOINT_INFO, - ThreadPool.Names.GENERIC, + ThreadPool.Names.SEGMENT_REPLICATION, CheckpointInfoRequest::new, new CheckpointInfoRequestHandler() ); transportService.registerRequestHandler( Actions.GET_SEGMENT_FILES, - ThreadPool.Names.GENERIC, + ThreadPool.Names.SEGMENT_REPLICATION, GetSegmentFilesRequest::new, new GetSegmentFilesRequestHandler() ); transportService.registerRequestHandler( Actions.UPDATE_VISIBLE_CHECKPOINT, - ThreadPool.Names.GENERIC, + ThreadPool.Names.SEGMENT_REPLICATION, UpdateVisibleCheckpointRequest::new, new UpdateVisibleCheckpointRequestHandler() ); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 467f499056345..af05025c285db 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -126,13 +126,13 @@ public SegmentReplicationTargetService( transportService.registerRequestHandler( Actions.FILE_CHUNK, - ThreadPool.Names.GENERIC, + ThreadPool.Names.SEGMENT_REPLICATION, FileChunkRequest::new, new FileChunkTransportRequestHandler() ); transportService.registerRequestHandler( Actions.FORCE_SYNC, - ThreadPool.Names.GENERIC, + ThreadPool.Names.SEGMENT_REPLICATION, ForceSyncRequest::new, new ForceSyncTransportRequestHandler() ); diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index d9f73a9b41658..dccab764aac1e 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -113,6 +113,8 @@ public static class Names { public static final String TRANSLOG_SYNC = "translog_sync"; public static final String REMOTE_PURGE = "remote_purge"; public static final String REMOTE_REFRESH = "remote_refresh"; + + public static final String SEGMENT_REPLICATION = "segment_replication"; public static final String INDEX_SEARCHER = "index_searcher"; } @@ -182,6 +184,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.TRANSLOG_SYNC, ThreadPoolType.FIXED); map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING); map.put(Names.REMOTE_REFRESH, ThreadPoolType.SCALING); + map.put(Names.SEGMENT_REPLICATION, ThreadPoolType.SCALING); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE); } @@ -267,6 +270,10 @@ public ThreadPool( Names.REMOTE_REFRESH, new ScalingExecutorBuilder(Names.REMOTE_REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) ); + builders.put( + Names.SEGMENT_REPLICATION, + new ScalingExecutorBuilder(Names.SEGMENT_REPLICATION, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) + ); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { builders.put( Names.INDEX_SEARCHER, diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index 33ad845ea647e..6169513d412a7 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -136,6 +136,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n); sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive); sizes.put(ThreadPool.Names.REMOTE_REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen); + sizes.put(ThreadPool.Names.SEGMENT_REPLICATION, ThreadPool::halfAllocatedProcessorsMaxTen); return sizes.get(threadPoolName).apply(numberOfProcessors); }