Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Create a separate thread pool for Segment Replication events #8669

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public void onFailure(Exception e) {

@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
return ThreadPool.Names.SEGMENT_REPLICATION;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This component is used during ingest to determine if pressure should be applied, not to perform segrep activities. I don't think we should use the pool here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes makes sense, will update it

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,19 @@ protected SegmentReplicationSourceService(
this.ongoingSegmentReplications = ongoingSegmentReplications;
transportService.registerRequestHandler(
Actions.GET_CHECKPOINT_INFO,
ThreadPool.Names.GENERIC,
ThreadPool.Names.SEGMENT_REPLICATION,
CheckpointInfoRequest::new,
new CheckpointInfoRequestHandler()
);
transportService.registerRequestHandler(
Actions.GET_SEGMENT_FILES,
ThreadPool.Names.GENERIC,
ThreadPool.Names.SEGMENT_REPLICATION,
GetSegmentFilesRequest::new,
new GetSegmentFilesRequestHandler()
);
transportService.registerRequestHandler(
Actions.UPDATE_VISIBLE_CHECKPOINT,
ThreadPool.Names.GENERIC,
ThreadPool.Names.SEGMENT_REPLICATION,
UpdateVisibleCheckpointRequest::new,
new UpdateVisibleCheckpointRequestHandler()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ public SegmentReplicationTargetService(

transportService.registerRequestHandler(
Actions.FILE_CHUNK,
ThreadPool.Names.GENERIC,
ThreadPool.Names.SEGMENT_REPLICATION,
FileChunkRequest::new,
new FileChunkTransportRequestHandler()
);
transportService.registerRequestHandler(
Actions.FORCE_SYNC,
ThreadPool.Names.GENERIC,
ThreadPool.Names.SEGMENT_REPLICATION,
ForceSyncRequest::new,
new ForceSyncTransportRequestHandler()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public static class Names {
public static final String TRANSLOG_SYNC = "translog_sync";
public static final String REMOTE_PURGE = "remote_purge";
public static final String REMOTE_REFRESH = "remote_refresh";

public static final String SEGMENT_REPLICATION = "segment_replication";
public static final String INDEX_SEARCHER = "index_searcher";
}

Expand Down Expand Up @@ -182,6 +184,7 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.TRANSLOG_SYNC, ThreadPoolType.FIXED);
map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING);
map.put(Names.REMOTE_REFRESH, ThreadPoolType.SCALING);
map.put(Names.SEGMENT_REPLICATION, ThreadPoolType.SCALING);
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) {
map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE);
}
Expand Down Expand Up @@ -267,6 +270,10 @@ public ThreadPool(
Names.REMOTE_REFRESH,
new ScalingExecutorBuilder(Names.REMOTE_REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))
);
builders.put(
Names.SEGMENT_REPLICATION,
new ScalingExecutorBuilder(Names.SEGMENT_REPLICATION, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we deciding on these values? Should we be trying out a few different values to see what works best with most common segment replication setups?

Copy link
Member

@dreamer-89 dreamer-89 Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
I think bounding SEGMENT_REPLICATION threadpool to max 10 threads is not correct and is problematic for cluster having high number of indices. I think with benchmark with multiple indices, problem will surface in segment replication stats (higher replication lag) and thread pool stats.

I think for segment replication, the thread queue should not be bounded or at least should be a fairly large value

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Rishikesh1159 : Identifying ideal maximum thread count is tricky and will depend on cluster load & usage. I think we need to set this to a number sufficient to handle the busiest of traffic/ingestion pattern without bottleneck on threads in pool. One way to get more insights on this number is to run an experiment

  1. Max thread count to a very high value (512?)
  2. Create X indices (start with 1 ?)
  3. Simulate traffic on all X indices
  4. Repeat 1-3 for higher values of X

The count of max/avg thread usage will give us idea on this number.
Please note, we need to have ingestion happening on all indices simulateneously to parallel rounds of segment replication and an increased threads usage.

);
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) {
builders.put(
Names.INDEX_SEARCHER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso
sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n);
sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.REMOTE_REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen);
sizes.put(ThreadPool.Names.SEGMENT_REPLICATION, ThreadPool::halfAllocatedProcessorsMaxTen);
return sizes.get(threadPoolName).apply(numberOfProcessors);
}

Expand Down
Loading