From 3433c26b8dddff5847a5923a82a19197b287a761 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 12 Jul 2023 21:04:29 +0000 Subject: [PATCH 1/2] Create a new threadpool for segment replication. Signed-off-by: Rishikesh1159 --- .../index/SegmentReplicationPressureService.java | 2 +- .../replication/SegmentReplicationSourceService.java | 6 +++--- .../replication/SegmentReplicationTargetService.java | 4 ++-- .../main/java/org/opensearch/threadpool/ThreadPool.java | 7 +++++++ .../org/opensearch/threadpool/ScalingThreadPoolTests.java | 1 + 5 files changed, 14 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 2740baa8ad166..cb31a9276765f 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -272,7 +272,7 @@ public void onFailure(Exception e) { @Override protected String getThreadPool() { - return ThreadPool.Names.GENERIC; + return ThreadPool.Names.SEGMENT_REPLICATION; } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 79186deeeaf0f..17d9964f6a320 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -81,19 +81,19 @@ protected SegmentReplicationSourceService( this.ongoingSegmentReplications = ongoingSegmentReplications; transportService.registerRequestHandler( Actions.GET_CHECKPOINT_INFO, - ThreadPool.Names.GENERIC, + ThreadPool.Names.SEGMENT_REPLICATION, CheckpointInfoRequest::new, new CheckpointInfoRequestHandler() ); transportService.registerRequestHandler( Actions.GET_SEGMENT_FILES, - ThreadPool.Names.GENERIC, + ThreadPool.Names.SEGMENT_REPLICATION, GetSegmentFilesRequest::new, new GetSegmentFilesRequestHandler() ); transportService.registerRequestHandler( Actions.UPDATE_VISIBLE_CHECKPOINT, - ThreadPool.Names.GENERIC, + ThreadPool.Names.SEGMENT_REPLICATION, UpdateVisibleCheckpointRequest::new, new UpdateVisibleCheckpointRequestHandler() ); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 467f499056345..af05025c285db 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -126,13 +126,13 @@ public SegmentReplicationTargetService( transportService.registerRequestHandler( Actions.FILE_CHUNK, - ThreadPool.Names.GENERIC, + ThreadPool.Names.SEGMENT_REPLICATION, FileChunkRequest::new, new FileChunkTransportRequestHandler() ); transportService.registerRequestHandler( Actions.FORCE_SYNC, - ThreadPool.Names.GENERIC, + ThreadPool.Names.SEGMENT_REPLICATION, ForceSyncRequest::new, new ForceSyncTransportRequestHandler() ); diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index d9f73a9b41658..dccab764aac1e 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -113,6 +113,8 @@ public static class Names { public static final String TRANSLOG_SYNC = "translog_sync"; public static final String REMOTE_PURGE = "remote_purge"; public static final String REMOTE_REFRESH = "remote_refresh"; + + public static final String SEGMENT_REPLICATION = "segment_replication"; public static final String INDEX_SEARCHER = "index_searcher"; } @@ -182,6 +184,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.TRANSLOG_SYNC, ThreadPoolType.FIXED); map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING); map.put(Names.REMOTE_REFRESH, ThreadPoolType.SCALING); + map.put(Names.SEGMENT_REPLICATION, ThreadPoolType.SCALING); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE); } @@ -267,6 +270,10 @@ public ThreadPool( Names.REMOTE_REFRESH, new ScalingExecutorBuilder(Names.REMOTE_REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) ); + builders.put( + Names.SEGMENT_REPLICATION, + new ScalingExecutorBuilder(Names.SEGMENT_REPLICATION, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) + ); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { builders.put( Names.INDEX_SEARCHER, diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index 33ad845ea647e..6169513d412a7 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -136,6 +136,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n); sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive); sizes.put(ThreadPool.Names.REMOTE_REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen); + sizes.put(ThreadPool.Names.SEGMENT_REPLICATION, ThreadPool::halfAllocatedProcessorsMaxTen); return sizes.get(threadPoolName).apply(numberOfProcessors); } From f0a5b000caa0d21a4bea2be7d2d44c00f01d587f Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 12 Jul 2023 22:05:00 +0000 Subject: [PATCH 2/2] Address comments on PR. Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/SegmentReplicationPressureService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index cb31a9276765f..2740baa8ad166 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -272,7 +272,7 @@ public void onFailure(Exception e) { @Override protected String getThreadPool() { - return ThreadPool.Names.SEGMENT_REPLICATION; + return ThreadPool.Names.GENERIC; } @Override