Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support setting a fixed write rate throttle for ConcurrentMergeScheduler #16

Open
wants to merge 3 commits into
base: bw_branch_6_6_2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
protected double targetMBPerSec = START_MB_PER_SEC;

/** true if we should rate-limit writes for each merge */
private boolean doAutoIOThrottle = false;
private boolean doAutoIOThrottle = true;

private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;

/** Write throttle rate when doAutoIOThrottle is false */
private double fixedMergeMBPerSec = Double.POSITIVE_INFINITY;

/** Sole constructor, with all settings set to default
* values. */
public ConcurrentMergeScheduler() {
Expand Down Expand Up @@ -224,13 +227,23 @@ public synchronized boolean getAutoIOThrottle() {
return doAutoIOThrottle;
}

/** Set the per-merge IO throttle rate when auto IO throttling is disabled (default: {@code Double.POSITIVE_INFINITY}). */
public synchronized void setFixedMergeMBPerSec(double v) {
fixedMergeMBPerSec = v;
}

/** Get the per-merge IO throttle rate when auto IO throttling is disabled. */
public synchronized double getFixedMergeMBPerSec() {
return fixedMergeMBPerSec;
}

/** Returns the currently set per-merge IO writes rate limit, if {@link #enableAutoIOThrottle}
* was called, else {@code Double.POSITIVE_INFINITY}. */
* was called, else {@link #getFixedMergeMBPerSec}. */
public synchronized double getIORateLimitMBPerSec() {
if (doAutoIOThrottle) {
return targetMBPerSec;
} else {
return Double.POSITIVE_INFINITY;
return fixedMergeMBPerSec;
}
}

Expand Down Expand Up @@ -354,10 +367,10 @@ protected synchronized void updateMergeThreads() {
} else if (merge.maxNumSegments != -1) {
newMBPerSec = forceMergeMBPerSec;
} else if (doAutoIOThrottle == false) {
newMBPerSec = Double.POSITIVE_INFINITY;
newMBPerSec = fixedMergeMBPerSec;
} else if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB*1024*1024) {
// Don't rate limit small merges:
newMBPerSec = Double.POSITIVE_INFINITY;
newMBPerSec = fixedMergeMBPerSec;
} else {
newMBPerSec = targetMBPerSec;
}
Expand Down Expand Up @@ -385,7 +398,7 @@ protected synchronized void updateMergeThreads() {
if (newMBPerSec == 0.0) {
message.append(" now stop");
} else if (curMBPerSec == 0.0) {
if (newMBPerSec == Double.POSITIVE_INFINITY) {
if (newMBPerSec == fixedMergeMBPerSec) {
message.append(" now resume");
} else {
message.append(String.format(Locale.ROOT, " now resume to %.1f MB/sec", newMBPerSec));
Expand Down Expand Up @@ -427,10 +440,10 @@ private synchronized void initDynamicDefaults(IndexWriter writer) throws IOExcep
}
}

private static String rateToString(double mbPerSec) {
private String rateToString(double mbPerSec) {
if (mbPerSec == 0.0) {
return "stopped";
} else if (mbPerSec == Double.POSITIVE_INFINITY) {
} else if (mbPerSec == fixedMergeMBPerSec) {
return "unlimited";
} else {
return String.format(Locale.ROOT, "%.1f MB/sec", mbPerSec);
Expand Down
11 changes: 10 additions & 1 deletion solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,16 @@ private MergeScheduler buildMergeScheduler(IndexSchema schema) {
if (maxThreadCount == null) {
maxThreadCount = ((ConcurrentMergeScheduler) scheduler).getMaxThreadCount();
}
((ConcurrentMergeScheduler)scheduler).setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
ConcurrentMergeScheduler cmScheduler = (ConcurrentMergeScheduler)scheduler;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space before scheduler

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For their code style, yeah good point (personally I dislike that style, but not my choice!)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it might be code style, but 2 lines above it adds a space 🤷‍♂️

Actually now I've read that line again, I think you can move the variable up a bit and re-use it there too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh .. good idea, there's 2 other casts

cmScheduler.setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
Boolean autoIOThrottle = (Boolean) args.remove("autoIOThrottle");
if (autoIOThrottle != null) {
if (autoIOThrottle) {
cmScheduler.enableAutoIOThrottle();
} else {
cmScheduler.disableAutoIOThrottle();
}
}
SolrPluginUtils.invokeSetters(scheduler, args);
} else {
SolrPluginUtils.invokeSetters(scheduler, mergeSchedulerInfo.initArgs);
Expand Down