Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support setting a fixed write rate throttle for ConcurrentMergeScheduler #16

Open
wants to merge 3 commits into
base: bw_branch_6_6_2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
protected double targetMBPerSec = START_MB_PER_SEC;

/** true if we should rate-limit writes for each merge */
private boolean doAutoIOThrottle = false;
private boolean doAutoIOThrottle = true;

private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;

/** Write throttle rate when doAutoIOThrottle is false */
private double fixedMergeMBPerSec = Double.POSITIVE_INFINITY;

/** Sole constructor, with all settings set to default
* values. */
public ConcurrentMergeScheduler() {
Expand Down Expand Up @@ -224,13 +227,23 @@ public synchronized boolean getAutoIOThrottle() {
return doAutoIOThrottle;
}

/** Set the per-merge IO throttle rate when auto IO throttling is disabled (default: {@code Double.POSITIVE_INFINITY}). */
public synchronized void setFixedMergeMBPerSec(double v) {
fixedMergeMBPerSec = v;
}

/** Get the per-merge IO throttle rate when auto IO throttling is disabled. */
public synchronized double getFixedMergeMBPerSec() {
return fixedMergeMBPerSec;
}

/** Returns the currently set per-merge IO writes rate limit, if {@link #enableAutoIOThrottle}
* was called, else {@code Double.POSITIVE_INFINITY}. */
* was called, else {@link #getFixedMergeMBPerSec}. */
public synchronized double getIORateLimitMBPerSec() {
if (doAutoIOThrottle) {
return targetMBPerSec;
} else {
return Double.POSITIVE_INFINITY;
return fixedMergeMBPerSec;
}
}

Expand Down Expand Up @@ -354,10 +367,10 @@ protected synchronized void updateMergeThreads() {
} else if (merge.maxNumSegments != -1) {
newMBPerSec = forceMergeMBPerSec;
} else if (doAutoIOThrottle == false) {
newMBPerSec = Double.POSITIVE_INFINITY;
newMBPerSec = fixedMergeMBPerSec;
} else if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB*1024*1024) {
// Don't rate limit small merges:
newMBPerSec = Double.POSITIVE_INFINITY;
newMBPerSec = fixedMergeMBPerSec;
} else {
newMBPerSec = targetMBPerSec;
}
Expand Down Expand Up @@ -385,7 +398,7 @@ protected synchronized void updateMergeThreads() {
if (newMBPerSec == 0.0) {
message.append(" now stop");
} else if (curMBPerSec == 0.0) {
if (newMBPerSec == Double.POSITIVE_INFINITY) {
if (newMBPerSec == fixedMergeMBPerSec) {
message.append(" now resume");
} else {
message.append(String.format(Locale.ROOT, " now resume to %.1f MB/sec", newMBPerSec));
Expand Down Expand Up @@ -427,10 +440,10 @@ private synchronized void initDynamicDefaults(IndexWriter writer) throws IOExcep
}
}

private static String rateToString(double mbPerSec) {
private String rateToString(double mbPerSec) {
if (mbPerSec == 0.0) {
return "stopped";
} else if (mbPerSec == Double.POSITIVE_INFINITY) {
} else if (mbPerSec == fixedMergeMBPerSec) {
return "unlimited";
} else {
return String.format(Locale.ROOT, "%.1f MB/sec", mbPerSec);
Expand Down
15 changes: 12 additions & 3 deletions solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,25 @@ private MergeScheduler buildMergeScheduler(IndexSchema schema) {
// LUCENE-5080: these two setters are removed, so we have to invoke setMaxMergesAndThreads
// if someone has them configured.
if (scheduler instanceof ConcurrentMergeScheduler) {
ConcurrentMergeScheduler cmScheduler = (ConcurrentMergeScheduler) scheduler;
NamedList args = mergeSchedulerInfo.initArgs.clone();
Integer maxMergeCount = (Integer) args.remove("maxMergeCount");
if (maxMergeCount == null) {
maxMergeCount = ((ConcurrentMergeScheduler) scheduler).getMaxMergeCount();
maxMergeCount = cmScheduler.getMaxMergeCount();
}
Integer maxThreadCount = (Integer) args.remove("maxThreadCount");
if (maxThreadCount == null) {
maxThreadCount = ((ConcurrentMergeScheduler) scheduler).getMaxThreadCount();
maxThreadCount = cmScheduler.getMaxThreadCount();
}
cmScheduler.setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
Boolean autoIOThrottle = (Boolean) args.remove("autoIOThrottle");
if (autoIOThrottle != null) {
if (autoIOThrottle) {
cmScheduler.enableAutoIOThrottle();
} else {
cmScheduler.disableAutoIOThrottle();
}
}
((ConcurrentMergeScheduler)scheduler).setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
SolrPluginUtils.invokeSetters(scheduler, args);
} else {
SolrPluginUtils.invokeSetters(scheduler, mergeSchedulerInfo.initArgs);
Expand Down