From fd88815539c982b3031b5011bc319e4195408fb2 Mon Sep 17 00:00:00 2001 From: Bert Massop Date: Sun, 13 Oct 2024 13:50:09 +0200 Subject: [PATCH 1/4] RunningAverage: replace synchronization with immutable state Simplify the implementations of the RunningAverage interface by separating the data being kept into an immutable object. This way we no longer have to synchronize on everything, but rather update the data object using compare-and-set. MedianMeanRunningAverage is not included in this effort because that performance would be hurt due to the large amount of data kept. --- .../BootstrappingDecayingRunningAverage.java | 223 ++++---- .../support/math/DecayingKeyspaceAverage.java | 241 +++------ src/freenet/support/math/RunningAverage.java | 64 ++- .../math/TimeDecayingRunningAverage.java | 508 ++++++++---------- .../support/math/TrivialRunningAverage.java | 136 +++-- ...otstrappingDecayingRunningAverageTest.java | 93 ++++ .../math/DecayingKeyspaceAverageTest.java | 36 ++ .../math/TimeDecayingRunningAverageTest.java | 179 ++++++ 8 files changed, 804 insertions(+), 676 deletions(-) create mode 100644 test/freenet/support/math/BootstrappingDecayingRunningAverageTest.java create mode 100644 test/freenet/support/math/DecayingKeyspaceAverageTest.java create mode 100644 test/freenet/support/math/TimeDecayingRunningAverageTest.java diff --git a/src/freenet/support/math/BootstrappingDecayingRunningAverage.java b/src/freenet/support/math/BootstrappingDecayingRunningAverage.java index 8017edf65f0..ba2dc60a5dc 100644 --- a/src/freenet/support/math/BootstrappingDecayingRunningAverage.java +++ b/src/freenet/support/math/BootstrappingDecayingRunningAverage.java @@ -3,10 +3,10 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.support.math; -import freenet.support.LogThresholdCallback; -import freenet.support.Logger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; + import freenet.support.SimpleFieldSet; -import freenet.support.Logger.LogLevel; /** * Exponential decay "running average". @@ -23,30 +23,11 @@ * */ public final class BootstrappingDecayingRunningAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = -1; - @Override - public final BootstrappingDecayingRunningAverage clone() { - // Override clone() for locking; BDRAs are self-synchronized. - // Implement Cloneable to shut up findbugs. - return new BootstrappingDecayingRunningAverage(this); - } - + + private final AtomicReference data = new AtomicReference<>(); private final double min; private final double max; - private double currentValue; - private long reports; - private int maxReports; - - private static volatile boolean logDEBUG; - static { - Logger.registerLogThresholdCallback(new LogThresholdCallback(){ - @Override - public void shouldUpdate(){ - logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this); - } - }); - } - + /** * Constructor * @@ -62,128 +43,83 @@ public void shouldUpdate(){ * {@link SimpleFieldSet} parameter for this object. Will * override other parameters. */ - public BootstrappingDecayingRunningAverage(double defaultValue, double min, - double max, int maxReports, SimpleFieldSet fs) { + public BootstrappingDecayingRunningAverage( + double defaultValue, + double min, + double max, + int maxReports, + SimpleFieldSet fs + ) { this.min = min; this.max = max; - reports = 0; - currentValue = defaultValue; - this.maxReports = maxReports; - assert(maxReports > 0); - if(fs != null) { + long reports = 0; + double currentValue = defaultValue; + if (fs != null) { double d = fs.getDouble("CurrentValue", currentValue); - if(!(Double.isNaN(d) || d < min || d > max)) { + if (!isInvalid(d)) { + reports = Math.max(0, fs.getLong("Reports", reports)); currentValue = d; - reports = fs.getLong("Reports", reports); } } + data.set(new Data(maxReports, reports, currentValue)); } - - /** - * {@inheritDoc} - * - * @return - */ - @Override - public synchronized double currentValue() { - return currentValue; + + private BootstrappingDecayingRunningAverage(BootstrappingDecayingRunningAverage other) { + this.min = other.min; + this.max = other.max; + this.data.set(new Data(other.data.get())); } - - /** - * Not a public method. Changes the internally stored currentValue and return the old one. - * - * Used by {@link DecayingKeyspaceAverage} to normalize the stored averages. Calling this function - * may (purposefully) destroy the utility of the average being kept. - * - * @param d - * @return - * @see DecayingKeyspaceAverage - */ - protected synchronized double setCurrentValue(double d) { - double old=currentValue; - currentValue=d; - return old; + + @Override + public double currentValue() { + return data.get().currentValue; } - /** - * {@inheritDoc} - * - * @param d - */ @Override - public synchronized void report(double d) { - if(d < min) { - if(logDEBUG) - Logger.debug(this, "Too low: "+d, new Exception("debug")); - d = min; - } - if(d > max) { - if(logDEBUG) - Logger.debug(this, "Too high: "+d, new Exception("debug")); - d = max; - } - reports++; - double decayFactor = 1.0 / (Math.min(reports, maxReports)); - currentValue = (d * decayFactor) + (currentValue * (1-decayFactor)); + public void report(double d) { + data.updateAndGet(data -> data.updated(d)); } - /** - * {@inheritDoc} - * - * @param d - */ @Override public void report(long d) { report((double)d); } - /** - * {@inheritDoc} - * - * @param d - */ @Override - public synchronized double valueIfReported(double d) { - if(d < min) { - Logger.error(this, "Too low: "+d, new Exception("debug")); - d = min; - } - if(d > max) { - Logger.error(this, "Too high: "+d, new Exception("debug")); - d = max; - } - double decayFactor = 1.0 / (Math.min(reports + 1, maxReports)); - return (d * decayFactor) + (currentValue * (1-decayFactor)); + public double valueIfReported(double d) { + return data.get().updated(d).currentValue; + } + + @Override + public long countReports() { + return data.get().reports; } - + + @Override + public BootstrappingDecayingRunningAverage clone() { + return new BootstrappingDecayingRunningAverage(this); + } + /** * Change maxReports. - * + * * @param maxReports */ - public synchronized void changeMaxReports(int maxReports) { - this.maxReports=maxReports; + public void changeMaxReports(int maxReports) { + data.updateAndGet(data -> data.withMaxReports(maxReports)); } /** - * Copy constructor. + * Reports a new value while allowing for normalization of the resulting currentValue. + * Used by {@link DecayingKeyspaceAverage} to normalize the stored averages. Calling this function + * may (purposefully) destroy the utility of the average being kept. */ - private BootstrappingDecayingRunningAverage(BootstrappingDecayingRunningAverage a) { - synchronized (a) { - this.currentValue = a.currentValue; - this.max = a.max; - this.maxReports = a.maxReports; - this.min = a.min; - this.reports = a.reports; - } + void report(UnaryOperator updateFunction) { + data.updateAndGet(updateFunction); } - /** - * {@inheritDoc} - */ - @Override - public synchronized long countReports() { - return reports; + double valueIfReported(UnaryOperator updateFunction) { + return updateFunction.apply(data.get()).currentValue; } /** @@ -193,11 +129,58 @@ public synchronized long countReports() { * See {@link SimpleFieldSet#SimpleFieldSet(boolean)}. * @return */ - public synchronized SimpleFieldSet exportFieldSet(boolean shortLived) { + public SimpleFieldSet exportFieldSet(boolean shortLived) { + Data data = this.data.get(); SimpleFieldSet fs = new SimpleFieldSet(shortLived); fs.putSingle("Type", "BootstrappingDecayingRunningAverage"); - fs.put("CurrentValue", currentValue); - fs.put("Reports", reports); + fs.put("CurrentValue", data.currentValue); + fs.put("Reports", data.reports); return fs; } + + private boolean isInvalid(double d) { + return d < min || d > max || Double.isInfinite(d) || Double.isNaN(d); + } + + class Data { + private final long maxReports; + private final long reports; + private final double currentValue; + + private Data(long maxReports, long reports, double currentValue) { + this.maxReports = maxReports; + this.reports = reports; + this.currentValue = currentValue; + } + + private Data(Data other) { + this.maxReports = other.maxReports; + this.reports = other.reports; + this.currentValue = other.currentValue; + } + + double currentValue() { + return currentValue; + } + + Data updated(double d) { + if (isInvalid(d)) { + return this; + } + double decayFactor = 1d / Math.min(reports + 1, maxReports); + double newValue = (d * decayFactor) + (currentValue * (1 - decayFactor)); + return new Data(maxReports, reports + 1, newValue); + } + + Data withCurrentValue(double currentValue) { + if (isInvalid(currentValue)) { + return this; + } + return new Data(maxReports, reports, currentValue); + } + + Data withMaxReports(long maxReports) { + return new Data(maxReports, reports, currentValue); + } + } } diff --git a/src/freenet/support/math/DecayingKeyspaceAverage.java b/src/freenet/support/math/DecayingKeyspaceAverage.java index 4199b959a8c..4a6920ceffc 100644 --- a/src/freenet/support/math/DecayingKeyspaceAverage.java +++ b/src/freenet/support/math/DecayingKeyspaceAverage.java @@ -3,183 +3,90 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.support.math; +import java.util.function.UnaryOperator; + import freenet.node.Location; import freenet.support.SimpleFieldSet; /** * @author robert - * + *

* A filter on BootstrappingDecayingRunningAverage which makes it aware of the circular keyspace. */ public final class DecayingKeyspaceAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = 5129429614949179428L; - /** - 'avg' is the normalized average location, note that the the reporting bounds are (-2.0, 2.0) however. - */ - BootstrappingDecayingRunningAverage avg; - - /** - * - * @param defaultValue - * @param maxReports - * @param fs - */ - public DecayingKeyspaceAverage(double defaultValue, int maxReports, SimpleFieldSet fs) { - avg = new BootstrappingDecayingRunningAverage(defaultValue, -2.0, 2.0, maxReports, fs); - } - - /** - * - * @param a - */ - public DecayingKeyspaceAverage(BootstrappingDecayingRunningAverage a) { - //check the max/min values? ignore them? - avg = a.clone(); - } - - @Override - public synchronized DecayingKeyspaceAverage clone() { - // Override clone() for deep copy. - // Implement Cloneable to shut up findbugs. - return new DecayingKeyspaceAverage(avg); - } - - /** - * - * @return - */ - @Override - public synchronized double currentValue() { - return avg.currentValue(); - } - - /** - * - * @param d - */ - @Override - public synchronized void report(double d) { - if((d < 0.0) || (d > 1.0)) - //Just because we use non-normalized locations doesn't mean we can accept them. - throw new IllegalArgumentException("Not a valid normalized key: " + d); - double superValue = avg.currentValue(); - double thisValue = Location.normalize(superValue); - double diff = Location.change(thisValue, d); - double toAverage = (superValue + diff); + /** + * 'avg' is the normalized average location, note that the reporting bounds are (-0.5, 1.5) however. + */ + private final BootstrappingDecayingRunningAverage avg; + + public DecayingKeyspaceAverage(double defaultValue, int maxReports, SimpleFieldSet fs) { + avg = new BootstrappingDecayingRunningAverage(defaultValue, -0.5, 1.5, maxReports, fs); + } + + public DecayingKeyspaceAverage(DecayingKeyspaceAverage other) { + avg = other.avg.clone(); + } + + @Override + public DecayingKeyspaceAverage clone() { + return new DecayingKeyspaceAverage(this); + } + + @Override + public double currentValue() { + return avg.currentValue(); + } + + @Override + public void report(double d) { + avg.report(locationUpdateFunction(d)); + } + + @Override + public double valueIfReported(double d) { + return avg.valueIfReported(locationUpdateFunction(d)); + } + + @Override + public long countReports() { + return avg.countReports(); + } + + @Override + public void report(long d) { + throw new IllegalArgumentException("KeyspaceAverage does not like longs"); + } + + public void changeMaxReports(int maxReports) { + avg.changeMaxReports(maxReports); + } + + public SimpleFieldSet exportFieldSet(boolean shortLived) { + return avg.exportFieldSet(shortLived); + } + + private UnaryOperator locationUpdateFunction(double d) { + if (!Location.isValid(d)) { + throw new IllegalArgumentException("Not a valid normalized key: " + d); + } + /* To gracefully wrap around the 1.0/0.0 threshold we average over (or under) it, and simply normalize the result when reporting a currentValue ---example--- - d=0.2; //being reported - superValue=1.9; //already wrapped once, but at 0.9 - thisValue=0.9; //the normalized value of where we are in the keyspace - diff = +0.3; //the diff from the normalized values; Location.change(0.9, 0.2); - avg.report(2.2);//to successfully move the average towards the closest route to the given value. - */ - avg.report(toAverage); - double newValue = avg.currentValue(); - if(newValue < 0.0 || newValue > 1.0) - avg.setCurrentValue(Location.normalize(newValue)); - } - - @Override - public synchronized double valueIfReported(double d) { - if((d < 0.0) || (d > 1.0)) - throw new IllegalArgumentException("Not a valid normalized key: " + d); - double superValue = avg.currentValue(); - double thisValue = Location.normalize(superValue); - double diff = Location.change(thisValue, d); - return Location.normalize(avg.valueIfReported(superValue + diff)); - } - - @Override - public synchronized long countReports() { - return avg.countReports(); - } - - /** - * - * @param d - */ - @Override - public void report(long d) { - throw new IllegalArgumentException("KeyspaceAverage does not like longs"); - } - - /** - * - * @param maxReports - */ - public synchronized void changeMaxReports(int maxReports) { - avg.changeMaxReports(maxReports); - } - - /** - * - * @param shortLived - * @return - */ - public synchronized SimpleFieldSet exportFieldSet(boolean shortLived) { - return avg.exportFieldSet(shortLived); - } - - ///@todo: make this a junit test - /** - * - * @param args - */ - public static void main(String[] args) { - DecayingKeyspaceAverage a = new DecayingKeyspaceAverage(0.9, 10, null); - a.report(0.9); - for(int i = 10; i != 0; i--) { - a.report(0.2); - System.out.println("<-0.2-- current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(0.8); - System.out.println("--0.8-> current=" + a.currentValue()); - } - System.out.println("--- positive wrap test ---"); - for(int wrap = 3; wrap != 0; wrap--) { - System.out.println("wrap test #" + wrap); - for(int i = 10; i != 0; i--) { - a.report(0.25); - System.out.println("<-0.25- current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(0.5); - System.out.println("--0.5-> current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(0.75); - System.out.println("-0.75-> current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(1.0); - System.out.println("<-1.0-- current=" + a.currentValue()); - } - } - System.out.println("--- negative wrap test ---"); - a = new DecayingKeyspaceAverage(0.2, 10, null); - a.report(0.2); - for(int wrap = 3; wrap != 0; wrap--) { - System.out.println("negwrap test #" + wrap); - for(int i = 10; i != 0; i--) { - a.report(0.75); - System.out.println("-0.75-> current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(0.5); - System.out.println("<-0.5-- current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(0.25); - System.out.println("<-0.25- current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(1.0); - System.out.println("--1.0-> current=" + a.currentValue()); - } - } - } + d = 0.2; //being reported + currentValue = 0.9; //the normalized value of where we are in the keyspace + change = 0.3; //the diff from the normalized values; Location.change(0.9, 0.2); + report(1.2); //to successfully move the average towards the closest route to the given value. + */ + return data -> { + double currentValue = data.currentValue(); + double change = Location.change(currentValue, d); + return normalize(data.updated(currentValue + change)); + }; + } + + private BootstrappingDecayingRunningAverage.Data normalize(BootstrappingDecayingRunningAverage.Data data) { + return data.withCurrentValue(Location.normalize(data.currentValue())); + } } diff --git a/src/freenet/support/math/RunningAverage.java b/src/freenet/support/math/RunningAverage.java index 2ee88c6fbb0..cf467eda1cd 100644 --- a/src/freenet/support/math/RunningAverage.java +++ b/src/freenet/support/math/RunningAverage.java @@ -3,46 +3,42 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.support.math; -import java.io.Serializable; - -/** A running average. That is, something that takes reports as numbers and generates a current value. - * Synchronized class, including clone(). */ -public interface RunningAverage extends Serializable { +/** + * A running average. That is, something that takes reports as numbers and generates a current value. + */ +public interface RunningAverage { /** - * Copy the RunningAverage (create a snapshot). Deep copy, the new RA won't change when the first one - * does. Will synchronize on the original during the copy process. + * Copy the RunningAverage (create a snapshot). Deep copy, the new RA won't change when the first one does. */ - public RunningAverage clone(); + RunningAverage clone(); - /** - * - * @return - */ - public double currentValue(); + /** + * @return + */ + double currentValue(); - /** - * - * @param d - */ - public void report(double d); + /** + * @param d + */ + void report(double d); - /** - * - * @param d - */ - public void report(long d); + /** + * @param d + */ + void report(long d); - /** - * Get what currentValue() would be if we reported some given value - * @param r the value to mimic reporting - * @return the output of currentValue() if we were to report r - */ - public double valueIfReported(double r); + /** + * Get what currentValue() would be if we reported some given value + * + * @param r the value to mimic reporting + * @return the output of currentValue() if we were to report r + */ + double valueIfReported(double r); - /** - * @return the total number of reports on this RunningAverage so far. - * Used for weighted averages, confidence/newbieness estimation etc. - */ - public long countReports(); + /** + * @return the total number of reports on this RunningAverage so far. + * Used for weighted averages, confidence/newbieness estimation etc. + */ + long countReports(); } diff --git a/src/freenet/support/math/TimeDecayingRunningAverage.java b/src/freenet/support/math/TimeDecayingRunningAverage.java index 2daf14e9117..4ea5dbdb094 100644 --- a/src/freenet/support/math/TimeDecayingRunningAverage.java +++ b/src/freenet/support/math/TimeDecayingRunningAverage.java @@ -3,330 +3,272 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.support.math; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; import freenet.node.TimeSkewDetectorCallback; import freenet.support.Logger; import freenet.support.SimpleFieldSet; -import freenet.support.Logger.LogLevel; /** * Time decaying running average. - * + *

* Decay factor = 0.5 ^ (interval / halflife). - * + *

* So if the interval is exactly the half-life then reporting 0 will halve the value. - * - * Note that the older version has a half life on the influence of any given report without taking - * into account the fact that reports persist and accumulate. :) - * */ public final class TimeDecayingRunningAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = -1; - static final int MAGIC = 0x5ff4ac94; - - @Override - public final TimeDecayingRunningAverage clone() { - // Override clone to synchronize, as per comments in RunningAverage. - // Implement Cloneable to shut up findbugs. - synchronized(this) { - return new TimeDecayingRunningAverage(this); - } - } - - double curValue; - final double halfLife; - long lastReportTime; - long createdTime; - long totalReports; - boolean started; - double defaultValue; - double minReport; - double maxReport; - boolean logDEBUG; - private final TimeSkewDetectorCallback timeSkewCallback; - - @Override - public String toString() { - long now = System.currentTimeMillis(); - synchronized(this) { - return super.toString() + ": currentValue="+curValue+", halfLife="+halfLife+ - ", lastReportTime="+(now - lastReportTime)+ - "ms ago, createdTime="+(now - createdTime)+ - "ms ago, totalReports="+totalReports+", started="+started+ - ", defaultValue="+defaultValue+", min="+minReport+", max="+maxReport; - } - } - + private final AtomicReference data = new AtomicReference<>(); + private final TimeTracker timeTracker; + /** - * - * @param defaultValue - * @param halfLife - * @param min - * @param max - * @param callback + * Monotonic time source in nanosecond resolution (but usually with lower accuracy). + * This time source should increase monotonically with elapsed time - usually `System::nanoTime`. */ - public TimeDecayingRunningAverage(double defaultValue, long halfLife, - double min, double max, TimeSkewDetectorCallback callback) { - curValue = defaultValue; - this.defaultValue = defaultValue; - started = false; - this.halfLife = halfLife; - createdTime = lastReportTime = System.currentTimeMillis(); - this.minReport = min; - this.maxReport = max; - totalReports = 0; - logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this); - if(logDEBUG) - Logger.debug(this, "Created "+this, - new Exception("debug")); - this.timeSkewCallback = callback; - } - + private final LongSupplier monotonicTimeSourceNanos; + /** - * - * @param defaultValue - * @param halfLife - * @param min - * @param max - * @param fs - * @param callback + * Special timestamp set to 1 nanosecond prior to creation of this instance. + * Used as a sentinel value for "not initialized" timestamps. */ - public TimeDecayingRunningAverage(double defaultValue, long halfLife, - double min, double max, SimpleFieldSet fs, TimeSkewDetectorCallback callback) { - curValue = defaultValue; - this.defaultValue = defaultValue; - started = false; - this.halfLife = halfLife; - createdTime = System.currentTimeMillis(); - this.lastReportTime = -1; // long warm-up may skew results, so lets wait for the first report - this.minReport = min; - this.maxReport = max; - totalReports = 0; - logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this); - if(logDEBUG) - Logger.debug(this, "Created "+this, - new Exception("debug")); - if(fs != null) { - started = fs.getBoolean("Started", false); - if(started) { - curValue = fs.getDouble("CurrentValue", curValue); - if(curValue > maxReport || curValue < minReport || Double.isNaN(curValue)) { - curValue = defaultValue; - totalReports = 0; - createdTime = System.currentTimeMillis(); - } else { - totalReports = fs.getLong("TotalReports", 0); - long uptime = fs.getLong("Uptime", 0); - createdTime = System.currentTimeMillis() - uptime; - } - } - } - this.timeSkewCallback = callback; - } - + private final long notInitializedSentinel; + /** - * - * @param defaultValue - * @param halfLife - * @param min - * @param max - * @param dis - * @param callback - * @throws IOException + * Creation time of this instance in wall-clock time. Used for uptime calculation. */ - public TimeDecayingRunningAverage(double defaultValue, double halfLife, double min, double max, DataInputStream dis, TimeSkewDetectorCallback callback) throws IOException { - int m = dis.readInt(); - if(m != MAGIC) throw new IOException("Invalid magic "+m); - int v = dis.readInt(); - if(v != 1) throw new IOException("Invalid version "+v); - curValue = dis.readDouble(); - if(Double.isInfinite(curValue) || Double.isNaN(curValue)) - throw new IOException("Invalid weightedTotal: "+curValue); - if((curValue < min) || (curValue > max)) - throw new IOException("Out of range: curValue = "+curValue); - started = dis.readBoolean(); - long priorExperienceTime = dis.readLong(); - this.halfLife = halfLife; - this.minReport = min; - this.maxReport = max; - this.defaultValue = defaultValue; - logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this); - lastReportTime = -1; - createdTime = System.currentTimeMillis() - priorExperienceTime; - totalReports = dis.readLong(); - this.timeSkewCallback = callback; - } + private final long createdTimeMillis; /** - * - * @param a + * Half-life time in nanoseconds. */ - public TimeDecayingRunningAverage(TimeDecayingRunningAverage a) { - this.createdTime = a.createdTime; - this.defaultValue = a.defaultValue; - this.halfLife = a.halfLife; - this.lastReportTime = a.lastReportTime; - this.maxReport = a.maxReport; - this.minReport = a.minReport; - this.started = a.started; - this.totalReports = a.totalReports; - this.curValue = a.curValue; - this.timeSkewCallback = a.timeSkewCallback; - } + private final double halfLifeNanos; /** - * - * @return + * Minimum allowed input value. Smaller reported values are silently ignored. */ - @Override - public synchronized double currentValue() { - return curValue; - } + private final double min; /** - * - * @param d + * Maximum allowed input value. Larger reported values are silently ignored. */ + private final double max; + + public TimeDecayingRunningAverage( + double defaultValue, + long halfLife, + double min, + double max, + TimeSkewDetectorCallback callback + ) { + this(defaultValue, halfLife, min, max, null, callback); + } + + public TimeDecayingRunningAverage( + double defaultValue, + long halfLife, + double min, + double max, + SimpleFieldSet fs, + TimeSkewDetectorCallback callback + ) { + this(defaultValue, halfLife, min, max, fs, callback, System::currentTimeMillis, System::nanoTime); + } + + TimeDecayingRunningAverage( + double defaultValue, + long halfLife, + double min, + double max, + SimpleFieldSet fs, + TimeSkewDetectorCallback callback, + LongSupplier wallClockTimeSourceMillis, + LongSupplier monotonicTimeSourceNanos + ) { + this.halfLifeNanos = Math.max(1, halfLife) * 1e6; + this.min = min; + this.max = max; + long createdTime = wallClockTimeSourceMillis.getAsLong(); + long reports = 0; + boolean started = false; + double currentValue = defaultValue; + if (fs != null) { + started = fs.getBoolean("Started", false); + if (started) { + double d = fs.getDouble("CurrentValue", currentValue); + if (!isInvalid(d)) { + reports = Math.max(0, fs.getLong("TotalReports", 0)); + createdTime = createdTime - Math.max(0, fs.getLong("Uptime", 0)); + currentValue = d; + } + } + } + this.timeTracker = new TimeTracker(callback, wallClockTimeSourceMillis); + this.monotonicTimeSourceNanos = monotonicTimeSourceNanos; + this.notInitializedSentinel = monotonicTimeSourceNanos.getAsLong() - 1; + this.createdTimeMillis = createdTime; + this.data.set(new Data(reports, notInitializedSentinel, started, currentValue)); + } + + public TimeDecayingRunningAverage(TimeDecayingRunningAverage other) { + this.timeTracker = new TimeTracker(other.timeTracker); + this.monotonicTimeSourceNanos = other.monotonicTimeSourceNanos; + this.notInitializedSentinel = other.notInitializedSentinel; + this.createdTimeMillis = other.createdTimeMillis; + this.halfLifeNanos = other.halfLifeNanos; + this.max = other.max; + this.min = other.min; + this.data.set(new Data(other.data.get())); + } + + @Override + public double currentValue() { + return data.get().currentValue; + } + @Override public void report(double d) { - synchronized(this) { - // Must synchronize first to achieve serialization. - long now = System.currentTimeMillis(); - if(d < minReport) { - Logger.error(this, "Impossible: "+d+" on "+this, new Exception("error")); - return; - } - if(d > maxReport) { - Logger.error(this, "Impossible: "+d+" on "+this, new Exception("error")); - return; - } - if(Double.isInfinite(d) || Double.isNaN(d)) { - Logger.error(this, "Reported infinity or NaN to "+this+" : "+d, new Exception("error")); - return; - } - totalReports++; - if(!started) { - curValue = d; - started = true; - if(logDEBUG) - Logger.debug(this, "Reported "+d+" on "+this+" when just started"); - } else if(lastReportTime != -1) { // might be just serialized in - long thisInterval = - now - lastReportTime; - long uptime = now - createdTime; - if(thisInterval < 0) { - Logger.error(this, "Clock (reporting) went back in time, ignoring report: "+now+" was "+lastReportTime+" (back "+(-thisInterval)+"ms)"); - lastReportTime = now; - if(timeSkewCallback != null) - timeSkewCallback.setTimeSkewDetectedUserAlert(); - return; - } - double thisHalfLife = halfLife; - if(uptime < 0) { - Logger.error(this, "Clock (uptime) went back in time, ignoring report: "+now+" was "+createdTime+" (back "+(-uptime)+"ms)"); - if(timeSkewCallback != null) - timeSkewCallback.setTimeSkewDetectedUserAlert(); - return; - // Disable sensitivity hack. - // Excessive sensitivity at start isn't necessarily a good thing. - // In particular it makes the average inconsistent - 20 reports of 0 at 1s intervals have a *different* effect to 10 reports of 0 at 2s intervals! - // Also it increases the impact of startup spikes, which then take a long time to recover from. - //} else { - //double oneFourthOfUptime = uptime / 4D; - //if(oneFourthOfUptime < thisHalfLife) thisHalfLife = oneFourthOfUptime; - } - - if(thisHalfLife == 0) thisHalfLife = 1; - double changeFactor = - Math.pow(0.5, (thisInterval) / thisHalfLife); - double oldCurValue = curValue; - curValue = curValue * changeFactor /* close to 1.0 if short interval, close to 0.0 if long interval */ - + (1.0 - changeFactor) * d; - // FIXME remove when stop getting reports of wierd output values - if(curValue < minReport || curValue > maxReport) { - Logger.error(this, "curValue="+curValue+" was "+oldCurValue+" - out of range"); - curValue = oldCurValue; - } - if(logDEBUG) - Logger.debug(this, "Reported "+d+" on "+this+": thisInterval="+thisInterval+ - ", halfLife="+halfLife+", uptime="+uptime+", thisHalfLife="+thisHalfLife+ - ", changeFactor="+changeFactor+", oldCurValue="+oldCurValue+ - ", currentValue="+currentValue()+ - ", thisInterval="+thisInterval+", thisHalfLife="+thisHalfLife+ - ", uptime="+uptime+", changeFactor="+changeFactor); - } - lastReportTime = now; - } - } + data.updateAndGet(data -> data.updated(d)); + timeTracker.report(); + } - /** - * - * @param d - */ - @Override - public void report(long d) { - report((double)d); + @Override + public void report(long d) { + report((double) d); } @Override public double valueIfReported(double r) { - throw new UnsupportedOperationException(); + return data.get().updated(r).currentValue; } - /** - * - * @param out - * @throws IOException - */ - public void writeDataTo(DataOutputStream out) throws IOException { - long now = System.currentTimeMillis(); - synchronized(this) { - out.writeInt(MAGIC); - out.writeInt(1); - out.writeDouble(curValue); - out.writeBoolean(started); - out.writeLong(totalReports); - out.writeLong(now - createdTime); - } - } + @Override + public long countReports() { + return data.get().reports; + } - /** - * - * @return - */ - public int getDataLength() { - return 4 + 4 + 8 + 8 + 1 + 8 + 8; + public long lastReportTime() { + return timeTracker.lastReportMillis; + } + + public SimpleFieldSet exportFieldSet(boolean shortLived) { + Data data = this.data.get(); + long now = timeTracker.wallClockTimeSourceMillis.getAsLong(); + SimpleFieldSet fs = new SimpleFieldSet(shortLived); + fs.putSingle("Type", "TimeDecayingRunningAverage"); + fs.put("CurrentValue", data.currentValue); + fs.put("Started", data.started); + fs.put("TotalReports", data.reports); + fs.put("Uptime", now - createdTimeMillis); + return fs; } @Override - public synchronized long countReports() { - return totalReports; + public TimeDecayingRunningAverage clone() { + return new TimeDecayingRunningAverage(this); } - /** - * - * @return - */ - public synchronized long lastReportTime() { - return lastReportTime; - } + @Override + public String toString() { + Data data = this.data.get(); + long now = timeTracker.wallClockTimeSourceMillis.getAsLong(); + return super.toString() + + ": currentValue=" + data.currentValue + ", " + + ", halfLife=" + (long) (halfLifeNanos / 1e6) + "ms" + + ", lastReportTime=" + (now - lastReportTime()) + "ms ago" + + ", createdTime=" + (now - createdTimeMillis) + "ms ago" + + ", reports=" + data.reports + + ", started=" + data.started + + ", min=" + min + + ", max=" + max; + } - /** - * - * @param shortLived - * @return - */ - public synchronized SimpleFieldSet exportFieldSet(boolean shortLived) { - SimpleFieldSet fs = new SimpleFieldSet(shortLived); - fs.putSingle("Type", "TimeDecayingRunningAverage"); - fs.put("CurrentValue", curValue); - fs.put("Started", started); - fs.put("TotalReports", totalReports); - fs.put("Uptime", System.currentTimeMillis() - createdTime); - return fs; - } + private boolean isInvalid(double d) { + return d < min || d > max || Double.isInfinite(d) || Double.isNaN(d); + } + + private class Data { + private final long reports; + private final long lastUpdatedNanos; + private final boolean started; + private final double currentValue; + + private Data(long reports, long lastUpdatedNanos, boolean started, double currentValue) { + this.reports = reports; + this.lastUpdatedNanos = lastUpdatedNanos; + this.started = started; + this.currentValue = Math.max(min, Math.min(max, currentValue)); + } + + private Data(Data other) { + this.reports = other.reports; + this.lastUpdatedNanos = other.lastUpdatedNanos; + this.started = other.started; + this.currentValue = other.currentValue; + } + + private Data updated(double d) { + if (isInvalid(d)) { + return this; + } + long now = monotonicTimeSourceNanos.getAsLong(); + if (!started) { + // A fresh average instantly jumps to the first reported value + return new Data(reports + 1, now, true, d); + } + if (lastUpdatedNanos == notInitializedSentinel) { + // For a restored average, the first data point is ignored + return new Data(reports + 1, now, true, currentValue); + } + double timeSinceLastUpdated = now - lastUpdatedNanos; + /* close to 1.0 if short interval, close to 0.0 if long interval */ + double changeFactor = Math.pow(0.5, timeSinceLastUpdated / halfLifeNanos); + double newValue = currentValue * changeFactor + (1.0 - changeFactor) * d; + return new Data(reports + 1, now, true, newValue); + } + } + + private static class TimeTracker { + /** + * Callback to invoke when a time skew is detected. + */ + private final TimeSkewDetectorCallback timeSkewDetectorCallback; + + /** + * Time source reporting the wall-clock time in milliseconds since the UNIX epoch. + * This clock should represent the system time which may drift due to (network) time synchronization. + * Usually set to `System::currentTimeMillis`. + */ + private final LongSupplier wallClockTimeSourceMillis; + + /** + * Timestamp when `report` was last invoked, in milliseconds since UNIX epoch. + */ + private volatile long lastReportMillis = -1; + + private TimeTracker(TimeSkewDetectorCallback timeSkewDetectorCallback, LongSupplier wallClockTimeSourceMillis) { + this.timeSkewDetectorCallback = timeSkewDetectorCallback; + this.wallClockTimeSourceMillis = wallClockTimeSourceMillis; + } + + private TimeTracker(TimeTracker other) { + this.timeSkewDetectorCallback = other.timeSkewDetectorCallback; + this.wallClockTimeSourceMillis = other.wallClockTimeSourceMillis; + this.lastReportMillis = other.lastReportMillis; + } + + private void report() { + if (timeSkewDetectorCallback == null) { + this.lastReportMillis = wallClockTimeSourceMillis.getAsLong(); + return; + } + long lastReportTime = this.lastReportMillis; + long now = wallClockTimeSourceMillis.getAsLong(); + this.lastReportMillis = now; + if (now < lastReportTime) { + Logger.error(this, "Clock went back in time: " + now + " was " + lastReportTime + " (back " + (lastReportTime - now) + "ms)"); + timeSkewDetectorCallback.setTimeSkewDetectedUserAlert(); + } + } + } } diff --git a/src/freenet/support/math/TrivialRunningAverage.java b/src/freenet/support/math/TrivialRunningAverage.java index ffaad5ab83c..d9b41671674 100644 --- a/src/freenet/support/math/TrivialRunningAverage.java +++ b/src/freenet/support/math/TrivialRunningAverage.java @@ -1,77 +1,69 @@ package freenet.support.math; +import java.util.concurrent.atomic.AtomicReference; + public final class TrivialRunningAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = 1L; - private long reports; - private double total; - - /** - * - * @param average - */ - public TrivialRunningAverage(TrivialRunningAverage average) { - this.reports = average.reports; - this.total = average.total; - } - - /** - * - */ - public TrivialRunningAverage() { - reports = 0; - total = 0.0; - } - - @Override - public synchronized long countReports() { - return reports; - } - - public synchronized double totalValue() { - return total; - } - - /** - * - * @return - */ - @Override - public synchronized double currentValue() { - return total / reports; - } - - /** - * - * @param d - */ - @Override - public synchronized void report(double d) { - total += d; - reports++; - // TODO Auto-generated method stub - } - - /** - * - * @param d - */ - @Override - public void report(long d) { - report((double)d); - } - - @Override - public synchronized double valueIfReported(double r) { - return (total + r) / (reports + 1); - } - - @Override - public TrivialRunningAverage clone() { - // Override clone() for synchronization. - // Implement Cloneable to shut up findbugs. - synchronized (this) { - return new TrivialRunningAverage(this); - } - } + private final AtomicReference data = new AtomicReference<>(); + + public TrivialRunningAverage(TrivialRunningAverage other) { + data.set(other.data.get()); + } + + public TrivialRunningAverage() { + data.set(new Data(0, 0)); + } + + @Override + public long countReports() { + return data.get().reports; + } + + public double totalValue() { + return data.get().total; + } + + @Override + public double currentValue() { + return data.get().getRunningAverage(); + } + + @Override + public void report(double d) { + data.updateAndGet(data -> data.updated(d)); + } + + @Override + public void report(long d) { + report((double) d); + } + + @Override + public double valueIfReported(double r) { + return data.get().updated(r).getRunningAverage(); + } + + @Override + public TrivialRunningAverage clone() { + return new TrivialRunningAverage(this); + } + + private static class Data { + private final long reports; + private final double total; + + Data(long reports, double total) { + this.reports = reports; + this.total = total; + } + + Data updated(double d) { + return new Data(reports + 1, d + total); + } + + double getRunningAverage() { + return total / reports; + } + } + } diff --git a/test/freenet/support/math/BootstrappingDecayingRunningAverageTest.java b/test/freenet/support/math/BootstrappingDecayingRunningAverageTest.java new file mode 100644 index 00000000000..000b18a3783 --- /dev/null +++ b/test/freenet/support/math/BootstrappingDecayingRunningAverageTest.java @@ -0,0 +1,93 @@ +package freenet.support.math; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; + +import freenet.support.SimpleFieldSet; +import org.junit.Test; + +public class BootstrappingDecayingRunningAverageTest { + @Test + public void decaysOverSubsequentReports() { + BootstrappingDecayingRunningAverage average = new BootstrappingDecayingRunningAverage(0, 0, 1, 2, null); + average.report(1); + assertThat(average.currentValue(), equalTo(1.0)); + average.report(0); + assertThat(average.currentValue(), equalTo(0.5)); + average.report(0); + assertThat(average.currentValue(), equalTo(0.25)); + average.report(0); + assertThat(average.currentValue(), equalTo(0.125)); + } + + @Test + public void newInstanceHasDefaultValue() { + BootstrappingDecayingRunningAverage average = new BootstrappingDecayingRunningAverage(1, 0, 1, 2, null); + assertThat(average.currentValue(), equalTo(1.0)); + } + + @Test + public void countsNumberOfValidReports() { + BootstrappingDecayingRunningAverage average = new BootstrappingDecayingRunningAverage(0, 0, 1, 2, null); + assertThat(average.countReports(), equalTo(0L)); + + // Valid report + average.report(0); + assertThat(average.countReports(), equalTo(1L)); + + // Invalid reports + average.report(-1000); + average.report(1000); + average.report(Double.NEGATIVE_INFINITY); + average.report(Double.POSITIVE_INFINITY); + average.report(Double.NaN); + assertThat(average.countReports(), equalTo(1L)); + } + + @Test + public void writesStateToSimpleFieldSet() { + BootstrappingDecayingRunningAverage average = new BootstrappingDecayingRunningAverage(0, 0, 1, 2, null); + average.report(0.5); + + SimpleFieldSet sfs = average.exportFieldSet(true); + assertThat(sfs.directKeyValues(), allOf( + hasEntry("Type", "BootstrappingDecayingRunningAverage"), + hasEntry("Reports", "1"), + hasEntry("CurrentValue", "0.5") + )); + } + + @Test + public void canBeRestoredFromSimpleFieldSet() { + SimpleFieldSet sfs = new SimpleFieldSet(true); + sfs.putSingle("Type", "BootstrappingDecayingRunningAverage"); + sfs.putSingle("Reports", "1"); + sfs.putSingle("CurrentValue", "0.5"); + + BootstrappingDecayingRunningAverage average = new BootstrappingDecayingRunningAverage(0, 0, 1, 2, sfs); + assertThat(average.currentValue(), equalTo(0.5)); + assertThat(average.countReports(), equalTo(1L)); + + average.report(0); + assertThat(average.currentValue(), equalTo(0.25)); + assertThat(average.countReports(), equalTo(2L)); + } + + @Test + public void cloneCreatesIndependentInstance() { + BootstrappingDecayingRunningAverage first = new BootstrappingDecayingRunningAverage(0, 0, 1, 2, null); + BootstrappingDecayingRunningAverage second = first.clone(); + second.report(0); + second.report(1); + + // Cloned instance should remain untouched + assertThat(first.currentValue(), equalTo(0.0)); + assertThat(first.countReports(), equalTo(0L)); + + // New instance should be updated + assertThat(second.currentValue(), equalTo(0.5)); + assertThat(second.countReports(), equalTo(2L)); + } +} diff --git a/test/freenet/support/math/DecayingKeyspaceAverageTest.java b/test/freenet/support/math/DecayingKeyspaceAverageTest.java new file mode 100644 index 00000000000..ee5edc64bdb --- /dev/null +++ b/test/freenet/support/math/DecayingKeyspaceAverageTest.java @@ -0,0 +1,36 @@ +package freenet.support.math; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThrows; + +import org.junit.Test; + +public class DecayingKeyspaceAverageTest { + private final DecayingKeyspaceAverage average = new DecayingKeyspaceAverage(0.5, 2, null); + + @Test + public void wrapsAround() { + average.report(0.5); + assertThat(average.currentValue(), equalTo(0.5)); + average.report(1.0); + assertThat(average.currentValue(), equalTo(0.75)); + average.report(0.25); + assertThat(average.currentValue(), equalTo(0.0)); + average.report(0.25); + assertThat(average.currentValue(), equalTo(0.125)); + average.report(0.875); + assertThat(average.currentValue(), equalTo(0.0)); + average.report(0.75); + assertThat(average.currentValue(), equalTo(0.875)); + } + + @Test + public void rejectsInvalidInput() { + assertThrows(IllegalArgumentException.class, () -> average.report(1.1)); + assertThrows(IllegalArgumentException.class, () -> average.report(-0.1)); + assertThrows(IllegalArgumentException.class, () -> average.report(Double.NaN)); + assertThrows(IllegalArgumentException.class, () -> average.report(Double.POSITIVE_INFINITY)); + assertThrows(IllegalArgumentException.class, () -> average.report(Double.NEGATIVE_INFINITY)); + } +} diff --git a/test/freenet/support/math/TimeDecayingRunningAverageTest.java b/test/freenet/support/math/TimeDecayingRunningAverageTest.java new file mode 100644 index 00000000000..88db6184b9f --- /dev/null +++ b/test/freenet/support/math/TimeDecayingRunningAverageTest.java @@ -0,0 +1,179 @@ +package freenet.support.math; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + +import java.util.Random; + +import freenet.node.TimeSkewDetectorCallback; +import freenet.support.SimpleFieldSet; +import org.junit.Test; + +public class TimeDecayingRunningAverageTest { + private final Clock clock = new Clock(); + + @Test + public void decaysOverTime() { + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + average.report(0); + assertThat(average.currentValue(), equalTo(0.0)); + + // 1 half-live passes, should take 50% old value (0.0) and 50% new value (1.0) + clock.tick(1000); + average.report(1); + assertThat(average.currentValue(), equalTo(0.5)); + + // 2 half-lives pass, should take 25% old value (0.5) and 75% new value (1.0) + clock.tick(2000); + average.report(1); + assertThat(average.currentValue(), equalTo(0.875)); + } + + @Test + public void newInstanceJumpsToFirstReported() { + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + assertThat(average.currentValue(), equalTo(0.0)); + + average.report(1); + assertThat(average.currentValue(), equalTo(1.0)); + } + + @Test + public void isNotAffectedByClockDrift() { + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + average.report(0); + assertThat(average.currentValue(), equalTo(0.0)); + + // Wall-clock time drifts backwards + clock.drift(-12345); + + // 1 half-live passes, should take 50% old value (0.0) and 50% new value (1.0) + clock.tick(1000); + average.report(1); + assertThat(average.currentValue(), equalTo(0.5)); + } + + @Test + public void reportsNegativeClockDrift() { + TimeSkewDetectorCallback callback = mock(TimeSkewDetectorCallback.class); + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, callback, clock::getWallClockMillis, clock::getMonotonicNanos); + + // Wall-clock time drifts forward should not be reported + clock.drift(12345); + average.report(0); + verifyZeroInteractions(callback); + + // Wall-clock time drifts backwards should be reported + clock.drift(-12345); + average.report(0); + verify(callback).setTimeSkewDetectedUserAlert(); + } + + @Test + public void countsNumberOfValidReports() { + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + assertThat(average.countReports(), equalTo(0L)); + + // Valid report + average.report(0); + assertThat(average.countReports(), equalTo(1L)); + + // Invalid reports + average.report(-1000); + average.report(1000); + average.report(Double.NEGATIVE_INFINITY); + average.report(Double.POSITIVE_INFINITY); + average.report(Double.NaN); + assertThat(average.countReports(), equalTo(1L)); + } + + @Test + public void writesStateToSimpleFieldSet() { + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + clock.tick(1000); + average.report(0.5); + + SimpleFieldSet sfs = average.exportFieldSet(true); + assertThat(sfs.directKeyValues(), allOf( + hasEntry("Type", "TimeDecayingRunningAverage"), + hasEntry("Started", "true"), + hasEntry("Uptime", "1000"), + hasEntry("TotalReports", "1"), + hasEntry("CurrentValue", "0.5") + )); + } + + @Test + public void ignoresFirstValueWhenRestoredInStartedState() { + SimpleFieldSet sfs = new SimpleFieldSet(true); + sfs.putSingle("Type", "TimeDecayingRunningAverage"); + sfs.putSingle("Started", "true"); + sfs.putSingle("Uptime", "1000"); + sfs.putSingle("TotalReports", "1"); + sfs.putSingle("CurrentValue", "0.5"); + + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, sfs, null, clock::getWallClockMillis, clock::getMonotonicNanos); + assertThat(average.currentValue(), equalTo(0.5)); + assertThat(average.countReports(), equalTo(1L)); + assertThat(average.toString(), containsString("createdTime=1000ms ago")); + + // First reported value should be ignored (but the remaining fields keep counting) + clock.tick(1000); + average.report(0); + assertThat(average.currentValue(), equalTo(0.5)); + assertThat(average.countReports(), equalTo(2L)); + assertThat(average.toString(), containsString("createdTime=2000ms ago")); + + // Subsequent values should be handled normally + clock.tick(1000); + average.report(0); + assertThat(average.currentValue(), equalTo(0.25)); + assertThat(average.countReports(), equalTo(3L)); + assertThat(average.toString(), containsString("createdTime=3000ms ago")); + } + + @Test + public void cloneCreatesIndependentInstance() { + TimeDecayingRunningAverage first = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + TimeDecayingRunningAverage second = first.clone(); + second.report(0); + clock.tick(1000); + second.report(1); + + // Cloned instance should remain untouched + assertThat(first.currentValue(), equalTo(0.0)); + assertThat(first.countReports(), equalTo(0L)); + + // New instance should be updated + assertThat(second.currentValue(), equalTo(0.5)); + assertThat(second.countReports(), equalTo(2L)); + } + + static class Clock { + private long wallClockMillis = System.currentTimeMillis(); + private long monotonicMillis = new Random().nextLong(); + + void tick(long millis) { + wallClockMillis += millis; + monotonicMillis += millis; + } + + void drift(long millis) { + wallClockMillis += millis; + } + + long getWallClockMillis() { + return wallClockMillis; + } + + long getMonotonicNanos() { + return monotonicMillis * 1_000_000; + } + } +} From 9a09bb35c2ea1f1961ef45db4c57f8d44dd59296 Mon Sep 17 00:00:00 2001 From: Bert Massop Date: Sun, 13 Oct 2024 20:23:18 +0200 Subject: [PATCH 2/4] RunningAverage: reduce bookkeeping in SimpleRunningAverage The variables nextSlotPtr and curLen can trivially be derived from totalReports, remove the former variables. Additionally, synchronize on the internal array rather than on the instance itself to prevent potential locking issues. --- .../support/math/SimpleRunningAverage.java | 186 +++++------------- .../math/SimpleRunningAverageTest.java | 47 +++++ 2 files changed, 101 insertions(+), 132 deletions(-) create mode 100644 test/freenet/support/math/SimpleRunningAverageTest.java diff --git a/src/freenet/support/math/SimpleRunningAverage.java b/src/freenet/support/math/SimpleRunningAverage.java index b2cd26b431f..a5c396fc736 100644 --- a/src/freenet/support/math/SimpleRunningAverage.java +++ b/src/freenet/support/math/SimpleRunningAverage.java @@ -3,174 +3,96 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.support.math; -import java.io.DataOutputStream; - -import freenet.support.Logger; -import freenet.support.Logger.LogLevel; +import java.util.Arrays; /** * Simple running average: linear mean of the last N reports. + * * @author amphibian */ public final class SimpleRunningAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = -1; - final double[] refs; - int nextSlotPtr=0; - int curLen=0; - double total=0; - int totalReports = 0; - final double initValue; - private boolean logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this); + private final double[] refs; + private final double initValue; + + private double sum; + private long totalReports; - @Override - public final SimpleRunningAverage clone() { - // Deep copy needed. Implement Cloneable to shut up findbugs. - return new SimpleRunningAverage(this); - } - - /** - * Clear the SRA - */ - public synchronized void clear() { - nextSlotPtr = 0; - curLen = 0; - totalReports = 0; - total = 0; - for(int i=0;i= refs.length) nextSlotPtr = 0; + public void report(double d) { + synchronized (refs) { + int index = (int) (totalReports % refs.length); + sum = sum - refs[index] + d; + refs[index] = d; + totalReports++; + } } - /** - * - * @return - */ - protected synchronized double popValue(){ - return refs[nextSlotPtr]; - } - @Override - public synchronized String toString() { - return super.toString() + ": curLen="+curLen+", ptr="+nextSlotPtr+", total="+ - total+", average="+total/curLen; + public String toString() { + synchronized (refs) { + return super.toString() + + ", total=" + sum + + ", average=" + currentValue(); + } } - - /** - * - * @param d - */ + @Override public void report(long d) { - report((double)d); + report((double) d); } - /** - * - * @param out - */ - public void writeDataTo(DataOutputStream out) { - throw new UnsupportedOperationException(); + @Override + public long countReports() { + synchronized (refs) { + return totalReports; + } } @Override - public synchronized long countReports() { - return totalReports; + public SimpleRunningAverage clone() { + return new SimpleRunningAverage(this); } /** - * - * @param targetValue - * @return + * Clear the SRA */ - public synchronized double minReportForValue(double targetValue) { - if(curLen < refs.length) { - /** Don't need to remove any values before reporting, - * so is slightly simpler. - * (total + report) / (curLen + 1) >= targetValue => - * report / (curLen + 1) >= targetValue - total/(curLen+1) - * => report >= (targetValue - total/(curLen + 1)) * (curLen+1) - * => report >= targetValue * (curLen + 1) - total - * EXAMPLE: - * Mean (5, 5, 5, 5, 5, X) = 10 - * X = 10 * 6 - 25 = 35 - * => Mean = (25 + 35) / 6 = 60/6 = 10 - */ - return targetValue * (curLen + 1) - total; - } else { - /** Essentially the same, but: - * 1) Length will be curLen, not curLen+1, because is full. - * 2) Take off the value that will be taken off first. - */ - return targetValue * curLen - (total - refs[nextSlotPtr]); + public void clear() { + synchronized (refs) { + sum = 0; + totalReports = 0; + Arrays.fill(refs, 0.0); } } } diff --git a/test/freenet/support/math/SimpleRunningAverageTest.java b/test/freenet/support/math/SimpleRunningAverageTest.java new file mode 100644 index 00000000000..cf63b1cfcfc --- /dev/null +++ b/test/freenet/support/math/SimpleRunningAverageTest.java @@ -0,0 +1,47 @@ +package freenet.support.math; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.junit.Test; + +public class SimpleRunningAverageTest { + private final SimpleRunningAverage average = new SimpleRunningAverage(4, 100.0); + + @Test + public void returnsInitialValue() { + assertThat(average.currentValue(), equalTo(100.0)); + } + + @Test + public void returnsLinearMeanOfLastReports() { + average.report(10); + assertThat(average.currentValue(), equalTo(10.0)); + average.report(40); + assertThat(average.currentValue(), equalTo(25.0)); + average.report(40); + assertThat(average.currentValue(), equalTo(30.0)); + average.report(110); + assertThat(average.currentValue(), equalTo(50.0)); + + // Values should start dropping out now + average.report(40); + assertThat(average.currentValue(), equalTo(57.5)); + average.report(10); + assertThat(average.currentValue(), equalTo(50.0)); + } + + @Test + public void clear() { + for (int i = 0; i < 4; i++) { + average.report(12345); + } + assertThat(average.currentValue(), equalTo(12345.0)); + assertThat(average.countReports(), equalTo(4L)); + + // Clear should reset to initial value = 100 + average.clear(); + assertThat(average.currentValue(), equalTo(100.0)); + assertThat(average.countReports(), equalTo(0L)); + } +} From 8b78902c1ea766b90304497efa1bed86aac68cf3 Mon Sep 17 00:00:00 2001 From: Bert Massop Date: Sun, 13 Oct 2024 20:37:58 +0200 Subject: [PATCH 3/4] RunningAverage: basic cleanup of MedianMeanRunningAverage This fixes lack of synchronization in the copy constructor, removes all the empty javadoc and generally improves code style. --- .../math/MedianMeanRunningAverage.java | 134 ++++++++---------- 1 file changed, 58 insertions(+), 76 deletions(-) diff --git a/src/freenet/support/math/MedianMeanRunningAverage.java b/src/freenet/support/math/MedianMeanRunningAverage.java index 7de1ee50a60..7b38553762f 100644 --- a/src/freenet/support/math/MedianMeanRunningAverage.java +++ b/src/freenet/support/math/MedianMeanRunningAverage.java @@ -1,98 +1,80 @@ package freenet.support.math; import java.util.ArrayList; +import java.util.Collections; /** * A RunningAverage that tracks both the median and mean of a series of values. * WARNING: Uses memory and proportional to the number of reports! Only for debugging! * (Also uses CPU time O(N log N) with the number of reports in currentValue()). + * * @author Matthew Toseland (0xE43DA450) */ public final class MedianMeanRunningAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = 1L; - final ArrayList reports; - final TrivialRunningAverage mean; + private final ArrayList reports = new ArrayList<>(); + private final TrivialRunningAverage mean; - /** - * - */ - public MedianMeanRunningAverage() { - reports = new ArrayList(); - mean = new TrivialRunningAverage(); - } + public MedianMeanRunningAverage() { + mean = new TrivialRunningAverage(); + } - /** - * - * @param average - */ - public MedianMeanRunningAverage(MedianMeanRunningAverage average) { - this.mean = new TrivialRunningAverage(average.mean); - this.reports = new ArrayList(); - reports.addAll(average.reports); - } + public MedianMeanRunningAverage(MedianMeanRunningAverage other) { + synchronized (other.reports) { + this.mean = new TrivialRunningAverage(other.mean); + this.reports.addAll(other.reports); + } + } - @Override - public MedianMeanRunningAverage clone() { - // Override clone() for synchronization. - // Implement Cloneable to shut up findbugs. - synchronized (this) { - return new MedianMeanRunningAverage(this); - } - } + @Override + public MedianMeanRunningAverage clone() { + return new MedianMeanRunningAverage(this); + } - @Override - public synchronized long countReports() { - return reports.size(); - } + @Override + public long countReports() { + synchronized (reports) { + return reports.size(); + } + } - /** - * - * @return - */ - @Override - public synchronized double currentValue() { - int size = reports.size(); - int middle = size / 2; - java.util.Collections.sort(reports); - return reports.get(middle); - } + @Override + public double currentValue() { + synchronized (reports) { + int size = reports.size(); + int middle = size / 2; + Collections.sort(reports); + return reports.get(middle); + } + } - /** - * - * @param d - */ - @Override - public synchronized void report(double d) { - mean.report(d); - reports.add(d); - } + @Override + public void report(double d) { + synchronized (reports) { + mean.report(d); + reports.add(d); + } + } - /** - * - * @param d - */ - @Override - public void report(long d) { - report((double)d); - } + @Override + public void report(long d) { + report((double) d); + } - @Override - public double valueIfReported(double r) { - throw new UnsupportedOperationException(); - } - - @Override - public synchronized String toString() { - return "Median "+currentValue()+" mean "+mean.currentValue(); - } - - /** - * - * @return - */ - public synchronized double meanValue() { - return mean.currentValue(); - } + @Override + public double valueIfReported(double r) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + synchronized (reports) { + return "Median " + currentValue() + " mean " + meanValue(); + } + } + + public double meanValue() { + return mean.currentValue(); + } } From ea3b8e49c33407e382f75418605f1ffdb84fbc98 Mon Sep 17 00:00:00 2001 From: Bert Massop Date: Sun, 13 Oct 2024 20:48:59 +0200 Subject: [PATCH 4/4] RunningAverage: deprecate MedianMeanRunningAverage and replace its uses Replace all uses of MedianMeanRunningAverage with TrivialRunningAverage which does not eat an unbounded amount of RAM. This class was only used for debug purposes under the logMINOR flag. Having the median value in addition to mean value does likely not warrant the additional RAM and CPU consumption. Deprecate the class so that it can eventually be removed. --- src/freenet/io/xfer/BlockReceiver.java | 11 +++++------ src/freenet/io/xfer/BlockTransmitter.java | 17 ++++++++--------- src/freenet/node/RequestSender.java | 19 +++++++++---------- .../math/MedianMeanRunningAverage.java | 2 ++ 4 files changed, 24 insertions(+), 25 deletions(-) diff --git a/src/freenet/io/xfer/BlockReceiver.java b/src/freenet/io/xfer/BlockReceiver.java index 176edc00028..3ebfb66e1fc 100644 --- a/src/freenet/io/xfer/BlockReceiver.java +++ b/src/freenet/io/xfer/BlockReceiver.java @@ -41,7 +41,8 @@ import freenet.support.Ticker; import freenet.support.TimeUtil; import freenet.support.io.NativeThread; -import freenet.support.math.MedianMeanRunningAverage; +import freenet.support.math.RunningAverage; +import freenet.support.math.TrivialRunningAverage; /** * IMPORTANT: The receiver can cancel the incoming transfer. This may or may not, @@ -306,10 +307,8 @@ public void onMatched(Message m1) { long endTime = System.currentTimeMillis(); long transferTime = (endTime - startTime); if(logMINOR) { - synchronized(avgTimeTaken) { - avgTimeTaken.report(transferTime); - Logger.minor(this, "Block transfer took "+transferTime+"ms - average is "+avgTimeTaken); - } + avgTimeTaken.report(transferTime); + Logger.minor(this, "Block transfer took "+transferTime+"ms - average is "+avgTimeTaken.currentValue()); } complete(_prb.getBlock()); return; @@ -534,7 +533,7 @@ public void receiveAborted(int reason, String description) { } } - private static MedianMeanRunningAverage avgTimeTaken = new MedianMeanRunningAverage(); + private static final RunningAverage avgTimeTaken = new TrivialRunningAverage(); private void maybeResetDiscardFilter() { long timeleft=discardEndTime-System.currentTimeMillis(); diff --git a/src/freenet/io/xfer/BlockTransmitter.java b/src/freenet/io/xfer/BlockTransmitter.java index e3631023127..2ff0d2344bf 100644 --- a/src/freenet/io/xfer/BlockTransmitter.java +++ b/src/freenet/io/xfer/BlockTransmitter.java @@ -18,8 +18,8 @@ */ package freenet.io.xfer; -import java.util.HashSet; import java.util.Deque; +import java.util.HashSet; import freenet.io.comm.AsyncMessageCallback; import freenet.io.comm.AsyncMessageFilterCallback; @@ -32,18 +32,19 @@ import freenet.io.comm.NotConnectedException; import freenet.io.comm.PeerContext; import freenet.io.comm.RetrievalException; -import freenet.node.MessageItem; import freenet.io.comm.SlowAsyncMessageFilterCallback; +import freenet.node.MessageItem; import freenet.node.PrioRunnable; import freenet.support.BitArray; import freenet.support.Executor; import freenet.support.LogThresholdCallback; import freenet.support.Logger; +import freenet.support.Logger.LogLevel; import freenet.support.Ticker; import freenet.support.TimeUtil; -import freenet.support.Logger.LogLevel; import freenet.support.io.NativeThread; -import freenet.support.math.MedianMeanRunningAverage; +import freenet.support.math.RunningAverage; +import freenet.support.math.TrivialRunningAverage; /** * @author ian @@ -471,10 +472,8 @@ public void onMatched(Message m) { if(logMINOR) { long endTime = System.currentTimeMillis(); long transferTime = (endTime - startTime); - synchronized(avgTimeTaken) { - avgTimeTaken.report(transferTime); - Logger.minor(this, "Block send took "+transferTime+" : "+avgTimeTaken+" on "+BlockTransmitter.this); - } + avgTimeTaken.report(transferTime); + Logger.minor(this, "Block send took "+transferTime+" : average "+avgTimeTaken.currentValue()+" on "+BlockTransmitter.this); } synchronized(_senderThread) { _receivedSendCompletion = true; @@ -764,7 +763,7 @@ else if(logMINOR) private long lastSentPacket = -1; - private static MedianMeanRunningAverage avgTimeTaken = new MedianMeanRunningAverage(); + private static final RunningAverage avgTimeTaken = new TrivialRunningAverage(); /** LOCKING: Must be called with _senderThread held. */ private int getNumSent() { diff --git a/src/freenet/node/RequestSender.java b/src/freenet/node/RequestSender.java index 2a52a0a80ef..48a3fd88bc1 100644 --- a/src/freenet/node/RequestSender.java +++ b/src/freenet/node/RequestSender.java @@ -44,7 +44,8 @@ import freenet.support.SimpleFieldSet; import freenet.support.TimeUtil; import freenet.support.io.NativeThread; -import freenet.support.math.MedianMeanRunningAverage; +import freenet.support.math.RunningAverage; +import freenet.support.math.TrivialRunningAverage; /** * @author amphibian @@ -1542,9 +1543,9 @@ public synchronized short waitUntilStatusChange(short mask) { } } - private static MedianMeanRunningAverage avgTimeTaken = new MedianMeanRunningAverage(); + private static final RunningAverage avgTimeTaken = new TrivialRunningAverage(); - private static MedianMeanRunningAverage avgTimeTakenTransfer = new MedianMeanRunningAverage(); + private static final RunningAverage avgTimeTakenTransfer = new TrivialRunningAverage(); private long transferTime; @@ -1583,13 +1584,11 @@ private void finish(int code, PeerNode next, boolean fromOfferedKey) { if(status == SUCCESS) { if((!isSSK) && transferTime > 0 && logMINOR) { long timeTaken = System.currentTimeMillis() - startTime; - synchronized(avgTimeTaken) { - avgTimeTaken.report(timeTaken); - avgTimeTakenTransfer.report(transferTime); - if(logMINOR) Logger.minor(this, "Successful CHK request took "+timeTaken+" average "+avgTimeTaken); - if(logMINOR) Logger.minor(this, "Successful CHK request transfer "+transferTime+" average "+avgTimeTakenTransfer); - if(logMINOR) Logger.minor(this, "Search phase: median "+(avgTimeTaken.currentValue() - avgTimeTakenTransfer.currentValue())+"ms, mean "+(avgTimeTaken.meanValue() - avgTimeTakenTransfer.meanValue())+"ms"); - } + avgTimeTaken.report(timeTaken); + avgTimeTakenTransfer.report(transferTime); + Logger.minor(this, "Successful CHK request took "+timeTaken+" average "+avgTimeTaken.currentValue()); + Logger.minor(this, "Successful CHK request transfer "+transferTime+" average "+avgTimeTakenTransfer.currentValue()); + Logger.minor(this, "Search phase: mean "+(avgTimeTaken.currentValue() - avgTimeTakenTransfer.currentValue())+"ms"); } if(next != null) { next.onSuccess(false, isSSK); diff --git a/src/freenet/support/math/MedianMeanRunningAverage.java b/src/freenet/support/math/MedianMeanRunningAverage.java index 7b38553762f..0c440a36eb9 100644 --- a/src/freenet/support/math/MedianMeanRunningAverage.java +++ b/src/freenet/support/math/MedianMeanRunningAverage.java @@ -9,7 +9,9 @@ * (Also uses CPU time O(N log N) with the number of reports in currentValue()). * * @author Matthew Toseland (0xE43DA450) + * @deprecated may use excessive RAM and CPU, see warning */ +@Deprecated public final class MedianMeanRunningAverage implements RunningAverage, Cloneable { private final ArrayList reports = new ArrayList<>();