diff --git a/src/freenet/io/xfer/BlockReceiver.java b/src/freenet/io/xfer/BlockReceiver.java index 176edc0002..3ebfb66e1f 100644 --- a/src/freenet/io/xfer/BlockReceiver.java +++ b/src/freenet/io/xfer/BlockReceiver.java @@ -41,7 +41,8 @@ import freenet.support.Ticker; import freenet.support.TimeUtil; import freenet.support.io.NativeThread; -import freenet.support.math.MedianMeanRunningAverage; +import freenet.support.math.RunningAverage; +import freenet.support.math.TrivialRunningAverage; /** * IMPORTANT: The receiver can cancel the incoming transfer. This may or may not, @@ -306,10 +307,8 @@ public void onMatched(Message m1) { long endTime = System.currentTimeMillis(); long transferTime = (endTime - startTime); if(logMINOR) { - synchronized(avgTimeTaken) { - avgTimeTaken.report(transferTime); - Logger.minor(this, "Block transfer took "+transferTime+"ms - average is "+avgTimeTaken); - } + avgTimeTaken.report(transferTime); + Logger.minor(this, "Block transfer took "+transferTime+"ms - average is "+avgTimeTaken.currentValue()); } complete(_prb.getBlock()); return; @@ -534,7 +533,7 @@ public void receiveAborted(int reason, String description) { } } - private static MedianMeanRunningAverage avgTimeTaken = new MedianMeanRunningAverage(); + private static final RunningAverage avgTimeTaken = new TrivialRunningAverage(); private void maybeResetDiscardFilter() { long timeleft=discardEndTime-System.currentTimeMillis(); diff --git a/src/freenet/io/xfer/BlockTransmitter.java b/src/freenet/io/xfer/BlockTransmitter.java index e363102312..2ff0d2344b 100644 --- a/src/freenet/io/xfer/BlockTransmitter.java +++ b/src/freenet/io/xfer/BlockTransmitter.java @@ -18,8 +18,8 @@ */ package freenet.io.xfer; -import java.util.HashSet; import java.util.Deque; +import java.util.HashSet; import freenet.io.comm.AsyncMessageCallback; import freenet.io.comm.AsyncMessageFilterCallback; @@ -32,18 +32,19 @@ import freenet.io.comm.NotConnectedException; import freenet.io.comm.PeerContext; import freenet.io.comm.RetrievalException; -import freenet.node.MessageItem; import freenet.io.comm.SlowAsyncMessageFilterCallback; +import freenet.node.MessageItem; import freenet.node.PrioRunnable; import freenet.support.BitArray; import freenet.support.Executor; import freenet.support.LogThresholdCallback; import freenet.support.Logger; +import freenet.support.Logger.LogLevel; import freenet.support.Ticker; import freenet.support.TimeUtil; -import freenet.support.Logger.LogLevel; import freenet.support.io.NativeThread; -import freenet.support.math.MedianMeanRunningAverage; +import freenet.support.math.RunningAverage; +import freenet.support.math.TrivialRunningAverage; /** * @author ian @@ -471,10 +472,8 @@ public void onMatched(Message m) { if(logMINOR) { long endTime = System.currentTimeMillis(); long transferTime = (endTime - startTime); - synchronized(avgTimeTaken) { - avgTimeTaken.report(transferTime); - Logger.minor(this, "Block send took "+transferTime+" : "+avgTimeTaken+" on "+BlockTransmitter.this); - } + avgTimeTaken.report(transferTime); + Logger.minor(this, "Block send took "+transferTime+" : average "+avgTimeTaken.currentValue()+" on "+BlockTransmitter.this); } synchronized(_senderThread) { _receivedSendCompletion = true; @@ -764,7 +763,7 @@ else if(logMINOR) private long lastSentPacket = -1; - private static MedianMeanRunningAverage avgTimeTaken = new MedianMeanRunningAverage(); + private static final RunningAverage avgTimeTaken = new TrivialRunningAverage(); /** LOCKING: Must be called with _senderThread held. */ private int getNumSent() { diff --git a/src/freenet/node/RequestSender.java b/src/freenet/node/RequestSender.java index 2a52a0a80e..48a3fd88bc 100644 --- a/src/freenet/node/RequestSender.java +++ b/src/freenet/node/RequestSender.java @@ -44,7 +44,8 @@ import freenet.support.SimpleFieldSet; import freenet.support.TimeUtil; import freenet.support.io.NativeThread; -import freenet.support.math.MedianMeanRunningAverage; +import freenet.support.math.RunningAverage; +import freenet.support.math.TrivialRunningAverage; /** * @author amphibian @@ -1542,9 +1543,9 @@ public synchronized short waitUntilStatusChange(short mask) { } } - private static MedianMeanRunningAverage avgTimeTaken = new MedianMeanRunningAverage(); + private static final RunningAverage avgTimeTaken = new TrivialRunningAverage(); - private static MedianMeanRunningAverage avgTimeTakenTransfer = new MedianMeanRunningAverage(); + private static final RunningAverage avgTimeTakenTransfer = new TrivialRunningAverage(); private long transferTime; @@ -1583,13 +1584,11 @@ private void finish(int code, PeerNode next, boolean fromOfferedKey) { if(status == SUCCESS) { if((!isSSK) && transferTime > 0 && logMINOR) { long timeTaken = System.currentTimeMillis() - startTime; - synchronized(avgTimeTaken) { - avgTimeTaken.report(timeTaken); - avgTimeTakenTransfer.report(transferTime); - if(logMINOR) Logger.minor(this, "Successful CHK request took "+timeTaken+" average "+avgTimeTaken); - if(logMINOR) Logger.minor(this, "Successful CHK request transfer "+transferTime+" average "+avgTimeTakenTransfer); - if(logMINOR) Logger.minor(this, "Search phase: median "+(avgTimeTaken.currentValue() - avgTimeTakenTransfer.currentValue())+"ms, mean "+(avgTimeTaken.meanValue() - avgTimeTakenTransfer.meanValue())+"ms"); - } + avgTimeTaken.report(timeTaken); + avgTimeTakenTransfer.report(transferTime); + Logger.minor(this, "Successful CHK request took "+timeTaken+" average "+avgTimeTaken.currentValue()); + Logger.minor(this, "Successful CHK request transfer "+transferTime+" average "+avgTimeTakenTransfer.currentValue()); + Logger.minor(this, "Search phase: mean "+(avgTimeTaken.currentValue() - avgTimeTakenTransfer.currentValue())+"ms"); } if(next != null) { next.onSuccess(false, isSSK); diff --git a/src/freenet/support/math/BootstrappingDecayingRunningAverage.java b/src/freenet/support/math/BootstrappingDecayingRunningAverage.java index 8017edf65f..ba2dc60a5d 100644 --- a/src/freenet/support/math/BootstrappingDecayingRunningAverage.java +++ b/src/freenet/support/math/BootstrappingDecayingRunningAverage.java @@ -3,10 +3,10 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.support.math; -import freenet.support.LogThresholdCallback; -import freenet.support.Logger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; + import freenet.support.SimpleFieldSet; -import freenet.support.Logger.LogLevel; /** * Exponential decay "running average". @@ -23,30 +23,11 @@ * */ public final class BootstrappingDecayingRunningAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = -1; - @Override - public final BootstrappingDecayingRunningAverage clone() { - // Override clone() for locking; BDRAs are self-synchronized. - // Implement Cloneable to shut up findbugs. - return new BootstrappingDecayingRunningAverage(this); - } - + + private final AtomicReference data = new AtomicReference<>(); private final double min; private final double max; - private double currentValue; - private long reports; - private int maxReports; - - private static volatile boolean logDEBUG; - static { - Logger.registerLogThresholdCallback(new LogThresholdCallback(){ - @Override - public void shouldUpdate(){ - logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this); - } - }); - } - + /** * Constructor * @@ -62,128 +43,83 @@ public void shouldUpdate(){ * {@link SimpleFieldSet} parameter for this object. Will * override other parameters. */ - public BootstrappingDecayingRunningAverage(double defaultValue, double min, - double max, int maxReports, SimpleFieldSet fs) { + public BootstrappingDecayingRunningAverage( + double defaultValue, + double min, + double max, + int maxReports, + SimpleFieldSet fs + ) { this.min = min; this.max = max; - reports = 0; - currentValue = defaultValue; - this.maxReports = maxReports; - assert(maxReports > 0); - if(fs != null) { + long reports = 0; + double currentValue = defaultValue; + if (fs != null) { double d = fs.getDouble("CurrentValue", currentValue); - if(!(Double.isNaN(d) || d < min || d > max)) { + if (!isInvalid(d)) { + reports = Math.max(0, fs.getLong("Reports", reports)); currentValue = d; - reports = fs.getLong("Reports", reports); } } + data.set(new Data(maxReports, reports, currentValue)); } - - /** - * {@inheritDoc} - * - * @return - */ - @Override - public synchronized double currentValue() { - return currentValue; + + private BootstrappingDecayingRunningAverage(BootstrappingDecayingRunningAverage other) { + this.min = other.min; + this.max = other.max; + this.data.set(new Data(other.data.get())); } - - /** - * Not a public method. Changes the internally stored currentValue and return the old one. - * - * Used by {@link DecayingKeyspaceAverage} to normalize the stored averages. Calling this function - * may (purposefully) destroy the utility of the average being kept. - * - * @param d - * @return - * @see DecayingKeyspaceAverage - */ - protected synchronized double setCurrentValue(double d) { - double old=currentValue; - currentValue=d; - return old; + + @Override + public double currentValue() { + return data.get().currentValue; } - /** - * {@inheritDoc} - * - * @param d - */ @Override - public synchronized void report(double d) { - if(d < min) { - if(logDEBUG) - Logger.debug(this, "Too low: "+d, new Exception("debug")); - d = min; - } - if(d > max) { - if(logDEBUG) - Logger.debug(this, "Too high: "+d, new Exception("debug")); - d = max; - } - reports++; - double decayFactor = 1.0 / (Math.min(reports, maxReports)); - currentValue = (d * decayFactor) + (currentValue * (1-decayFactor)); + public void report(double d) { + data.updateAndGet(data -> data.updated(d)); } - /** - * {@inheritDoc} - * - * @param d - */ @Override public void report(long d) { report((double)d); } - /** - * {@inheritDoc} - * - * @param d - */ @Override - public synchronized double valueIfReported(double d) { - if(d < min) { - Logger.error(this, "Too low: "+d, new Exception("debug")); - d = min; - } - if(d > max) { - Logger.error(this, "Too high: "+d, new Exception("debug")); - d = max; - } - double decayFactor = 1.0 / (Math.min(reports + 1, maxReports)); - return (d * decayFactor) + (currentValue * (1-decayFactor)); + public double valueIfReported(double d) { + return data.get().updated(d).currentValue; + } + + @Override + public long countReports() { + return data.get().reports; } - + + @Override + public BootstrappingDecayingRunningAverage clone() { + return new BootstrappingDecayingRunningAverage(this); + } + /** * Change maxReports. - * + * * @param maxReports */ - public synchronized void changeMaxReports(int maxReports) { - this.maxReports=maxReports; + public void changeMaxReports(int maxReports) { + data.updateAndGet(data -> data.withMaxReports(maxReports)); } /** - * Copy constructor. + * Reports a new value while allowing for normalization of the resulting currentValue. + * Used by {@link DecayingKeyspaceAverage} to normalize the stored averages. Calling this function + * may (purposefully) destroy the utility of the average being kept. */ - private BootstrappingDecayingRunningAverage(BootstrappingDecayingRunningAverage a) { - synchronized (a) { - this.currentValue = a.currentValue; - this.max = a.max; - this.maxReports = a.maxReports; - this.min = a.min; - this.reports = a.reports; - } + void report(UnaryOperator updateFunction) { + data.updateAndGet(updateFunction); } - /** - * {@inheritDoc} - */ - @Override - public synchronized long countReports() { - return reports; + double valueIfReported(UnaryOperator updateFunction) { + return updateFunction.apply(data.get()).currentValue; } /** @@ -193,11 +129,58 @@ public synchronized long countReports() { * See {@link SimpleFieldSet#SimpleFieldSet(boolean)}. * @return */ - public synchronized SimpleFieldSet exportFieldSet(boolean shortLived) { + public SimpleFieldSet exportFieldSet(boolean shortLived) { + Data data = this.data.get(); SimpleFieldSet fs = new SimpleFieldSet(shortLived); fs.putSingle("Type", "BootstrappingDecayingRunningAverage"); - fs.put("CurrentValue", currentValue); - fs.put("Reports", reports); + fs.put("CurrentValue", data.currentValue); + fs.put("Reports", data.reports); return fs; } + + private boolean isInvalid(double d) { + return d < min || d > max || Double.isInfinite(d) || Double.isNaN(d); + } + + class Data { + private final long maxReports; + private final long reports; + private final double currentValue; + + private Data(long maxReports, long reports, double currentValue) { + this.maxReports = maxReports; + this.reports = reports; + this.currentValue = currentValue; + } + + private Data(Data other) { + this.maxReports = other.maxReports; + this.reports = other.reports; + this.currentValue = other.currentValue; + } + + double currentValue() { + return currentValue; + } + + Data updated(double d) { + if (isInvalid(d)) { + return this; + } + double decayFactor = 1d / Math.min(reports + 1, maxReports); + double newValue = (d * decayFactor) + (currentValue * (1 - decayFactor)); + return new Data(maxReports, reports + 1, newValue); + } + + Data withCurrentValue(double currentValue) { + if (isInvalid(currentValue)) { + return this; + } + return new Data(maxReports, reports, currentValue); + } + + Data withMaxReports(long maxReports) { + return new Data(maxReports, reports, currentValue); + } + } } diff --git a/src/freenet/support/math/DecayingKeyspaceAverage.java b/src/freenet/support/math/DecayingKeyspaceAverage.java index 4199b959a8..4a6920ceff 100644 --- a/src/freenet/support/math/DecayingKeyspaceAverage.java +++ b/src/freenet/support/math/DecayingKeyspaceAverage.java @@ -3,183 +3,90 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.support.math; +import java.util.function.UnaryOperator; + import freenet.node.Location; import freenet.support.SimpleFieldSet; /** * @author robert - * + *

* A filter on BootstrappingDecayingRunningAverage which makes it aware of the circular keyspace. */ public final class DecayingKeyspaceAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = 5129429614949179428L; - /** - 'avg' is the normalized average location, note that the the reporting bounds are (-2.0, 2.0) however. - */ - BootstrappingDecayingRunningAverage avg; - - /** - * - * @param defaultValue - * @param maxReports - * @param fs - */ - public DecayingKeyspaceAverage(double defaultValue, int maxReports, SimpleFieldSet fs) { - avg = new BootstrappingDecayingRunningAverage(defaultValue, -2.0, 2.0, maxReports, fs); - } - - /** - * - * @param a - */ - public DecayingKeyspaceAverage(BootstrappingDecayingRunningAverage a) { - //check the max/min values? ignore them? - avg = a.clone(); - } - - @Override - public synchronized DecayingKeyspaceAverage clone() { - // Override clone() for deep copy. - // Implement Cloneable to shut up findbugs. - return new DecayingKeyspaceAverage(avg); - } - - /** - * - * @return - */ - @Override - public synchronized double currentValue() { - return avg.currentValue(); - } - - /** - * - * @param d - */ - @Override - public synchronized void report(double d) { - if((d < 0.0) || (d > 1.0)) - //Just because we use non-normalized locations doesn't mean we can accept them. - throw new IllegalArgumentException("Not a valid normalized key: " + d); - double superValue = avg.currentValue(); - double thisValue = Location.normalize(superValue); - double diff = Location.change(thisValue, d); - double toAverage = (superValue + diff); + /** + * 'avg' is the normalized average location, note that the reporting bounds are (-0.5, 1.5) however. + */ + private final BootstrappingDecayingRunningAverage avg; + + public DecayingKeyspaceAverage(double defaultValue, int maxReports, SimpleFieldSet fs) { + avg = new BootstrappingDecayingRunningAverage(defaultValue, -0.5, 1.5, maxReports, fs); + } + + public DecayingKeyspaceAverage(DecayingKeyspaceAverage other) { + avg = other.avg.clone(); + } + + @Override + public DecayingKeyspaceAverage clone() { + return new DecayingKeyspaceAverage(this); + } + + @Override + public double currentValue() { + return avg.currentValue(); + } + + @Override + public void report(double d) { + avg.report(locationUpdateFunction(d)); + } + + @Override + public double valueIfReported(double d) { + return avg.valueIfReported(locationUpdateFunction(d)); + } + + @Override + public long countReports() { + return avg.countReports(); + } + + @Override + public void report(long d) { + throw new IllegalArgumentException("KeyspaceAverage does not like longs"); + } + + public void changeMaxReports(int maxReports) { + avg.changeMaxReports(maxReports); + } + + public SimpleFieldSet exportFieldSet(boolean shortLived) { + return avg.exportFieldSet(shortLived); + } + + private UnaryOperator locationUpdateFunction(double d) { + if (!Location.isValid(d)) { + throw new IllegalArgumentException("Not a valid normalized key: " + d); + } + /* To gracefully wrap around the 1.0/0.0 threshold we average over (or under) it, and simply normalize the result when reporting a currentValue ---example--- - d=0.2; //being reported - superValue=1.9; //already wrapped once, but at 0.9 - thisValue=0.9; //the normalized value of where we are in the keyspace - diff = +0.3; //the diff from the normalized values; Location.change(0.9, 0.2); - avg.report(2.2);//to successfully move the average towards the closest route to the given value. - */ - avg.report(toAverage); - double newValue = avg.currentValue(); - if(newValue < 0.0 || newValue > 1.0) - avg.setCurrentValue(Location.normalize(newValue)); - } - - @Override - public synchronized double valueIfReported(double d) { - if((d < 0.0) || (d > 1.0)) - throw new IllegalArgumentException("Not a valid normalized key: " + d); - double superValue = avg.currentValue(); - double thisValue = Location.normalize(superValue); - double diff = Location.change(thisValue, d); - return Location.normalize(avg.valueIfReported(superValue + diff)); - } - - @Override - public synchronized long countReports() { - return avg.countReports(); - } - - /** - * - * @param d - */ - @Override - public void report(long d) { - throw new IllegalArgumentException("KeyspaceAverage does not like longs"); - } - - /** - * - * @param maxReports - */ - public synchronized void changeMaxReports(int maxReports) { - avg.changeMaxReports(maxReports); - } - - /** - * - * @param shortLived - * @return - */ - public synchronized SimpleFieldSet exportFieldSet(boolean shortLived) { - return avg.exportFieldSet(shortLived); - } - - ///@todo: make this a junit test - /** - * - * @param args - */ - public static void main(String[] args) { - DecayingKeyspaceAverage a = new DecayingKeyspaceAverage(0.9, 10, null); - a.report(0.9); - for(int i = 10; i != 0; i--) { - a.report(0.2); - System.out.println("<-0.2-- current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(0.8); - System.out.println("--0.8-> current=" + a.currentValue()); - } - System.out.println("--- positive wrap test ---"); - for(int wrap = 3; wrap != 0; wrap--) { - System.out.println("wrap test #" + wrap); - for(int i = 10; i != 0; i--) { - a.report(0.25); - System.out.println("<-0.25- current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(0.5); - System.out.println("--0.5-> current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(0.75); - System.out.println("-0.75-> current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(1.0); - System.out.println("<-1.0-- current=" + a.currentValue()); - } - } - System.out.println("--- negative wrap test ---"); - a = new DecayingKeyspaceAverage(0.2, 10, null); - a.report(0.2); - for(int wrap = 3; wrap != 0; wrap--) { - System.out.println("negwrap test #" + wrap); - for(int i = 10; i != 0; i--) { - a.report(0.75); - System.out.println("-0.75-> current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(0.5); - System.out.println("<-0.5-- current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(0.25); - System.out.println("<-0.25- current=" + a.currentValue()); - } - for(int i = 10; i != 0; i--) { - a.report(1.0); - System.out.println("--1.0-> current=" + a.currentValue()); - } - } - } + d = 0.2; //being reported + currentValue = 0.9; //the normalized value of where we are in the keyspace + change = 0.3; //the diff from the normalized values; Location.change(0.9, 0.2); + report(1.2); //to successfully move the average towards the closest route to the given value. + */ + return data -> { + double currentValue = data.currentValue(); + double change = Location.change(currentValue, d); + return normalize(data.updated(currentValue + change)); + }; + } + + private BootstrappingDecayingRunningAverage.Data normalize(BootstrappingDecayingRunningAverage.Data data) { + return data.withCurrentValue(Location.normalize(data.currentValue())); + } } diff --git a/src/freenet/support/math/MedianMeanRunningAverage.java b/src/freenet/support/math/MedianMeanRunningAverage.java index 7de1ee50a6..0c440a36eb 100644 --- a/src/freenet/support/math/MedianMeanRunningAverage.java +++ b/src/freenet/support/math/MedianMeanRunningAverage.java @@ -1,98 +1,82 @@ package freenet.support.math; import java.util.ArrayList; +import java.util.Collections; /** * A RunningAverage that tracks both the median and mean of a series of values. * WARNING: Uses memory and proportional to the number of reports! Only for debugging! * (Also uses CPU time O(N log N) with the number of reports in currentValue()). + * * @author Matthew Toseland (0xE43DA450) + * @deprecated may use excessive RAM and CPU, see warning */ +@Deprecated public final class MedianMeanRunningAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = 1L; - final ArrayList reports; - final TrivialRunningAverage mean; + private final ArrayList reports = new ArrayList<>(); + private final TrivialRunningAverage mean; - /** - * - */ - public MedianMeanRunningAverage() { - reports = new ArrayList(); - mean = new TrivialRunningAverage(); - } + public MedianMeanRunningAverage() { + mean = new TrivialRunningAverage(); + } - /** - * - * @param average - */ - public MedianMeanRunningAverage(MedianMeanRunningAverage average) { - this.mean = new TrivialRunningAverage(average.mean); - this.reports = new ArrayList(); - reports.addAll(average.reports); - } + public MedianMeanRunningAverage(MedianMeanRunningAverage other) { + synchronized (other.reports) { + this.mean = new TrivialRunningAverage(other.mean); + this.reports.addAll(other.reports); + } + } - @Override - public MedianMeanRunningAverage clone() { - // Override clone() for synchronization. - // Implement Cloneable to shut up findbugs. - synchronized (this) { - return new MedianMeanRunningAverage(this); - } - } + @Override + public MedianMeanRunningAverage clone() { + return new MedianMeanRunningAverage(this); + } - @Override - public synchronized long countReports() { - return reports.size(); - } + @Override + public long countReports() { + synchronized (reports) { + return reports.size(); + } + } - /** - * - * @return - */ - @Override - public synchronized double currentValue() { - int size = reports.size(); - int middle = size / 2; - java.util.Collections.sort(reports); - return reports.get(middle); - } + @Override + public double currentValue() { + synchronized (reports) { + int size = reports.size(); + int middle = size / 2; + Collections.sort(reports); + return reports.get(middle); + } + } - /** - * - * @param d - */ - @Override - public synchronized void report(double d) { - mean.report(d); - reports.add(d); - } + @Override + public void report(double d) { + synchronized (reports) { + mean.report(d); + reports.add(d); + } + } - /** - * - * @param d - */ - @Override - public void report(long d) { - report((double)d); - } + @Override + public void report(long d) { + report((double) d); + } - @Override - public double valueIfReported(double r) { - throw new UnsupportedOperationException(); - } - - @Override - public synchronized String toString() { - return "Median "+currentValue()+" mean "+mean.currentValue(); - } - - /** - * - * @return - */ - public synchronized double meanValue() { - return mean.currentValue(); - } + @Override + public double valueIfReported(double r) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + synchronized (reports) { + return "Median " + currentValue() + " mean " + meanValue(); + } + } + + public double meanValue() { + return mean.currentValue(); + } } diff --git a/src/freenet/support/math/RunningAverage.java b/src/freenet/support/math/RunningAverage.java index 2ee88c6fbb..cf467eda1c 100644 --- a/src/freenet/support/math/RunningAverage.java +++ b/src/freenet/support/math/RunningAverage.java @@ -3,46 +3,42 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.support.math; -import java.io.Serializable; - -/** A running average. That is, something that takes reports as numbers and generates a current value. - * Synchronized class, including clone(). */ -public interface RunningAverage extends Serializable { +/** + * A running average. That is, something that takes reports as numbers and generates a current value. + */ +public interface RunningAverage { /** - * Copy the RunningAverage (create a snapshot). Deep copy, the new RA won't change when the first one - * does. Will synchronize on the original during the copy process. + * Copy the RunningAverage (create a snapshot). Deep copy, the new RA won't change when the first one does. */ - public RunningAverage clone(); + RunningAverage clone(); - /** - * - * @return - */ - public double currentValue(); + /** + * @return + */ + double currentValue(); - /** - * - * @param d - */ - public void report(double d); + /** + * @param d + */ + void report(double d); - /** - * - * @param d - */ - public void report(long d); + /** + * @param d + */ + void report(long d); - /** - * Get what currentValue() would be if we reported some given value - * @param r the value to mimic reporting - * @return the output of currentValue() if we were to report r - */ - public double valueIfReported(double r); + /** + * Get what currentValue() would be if we reported some given value + * + * @param r the value to mimic reporting + * @return the output of currentValue() if we were to report r + */ + double valueIfReported(double r); - /** - * @return the total number of reports on this RunningAverage so far. - * Used for weighted averages, confidence/newbieness estimation etc. - */ - public long countReports(); + /** + * @return the total number of reports on this RunningAverage so far. + * Used for weighted averages, confidence/newbieness estimation etc. + */ + long countReports(); } diff --git a/src/freenet/support/math/SimpleRunningAverage.java b/src/freenet/support/math/SimpleRunningAverage.java index b2cd26b431..a5c396fc73 100644 --- a/src/freenet/support/math/SimpleRunningAverage.java +++ b/src/freenet/support/math/SimpleRunningAverage.java @@ -3,174 +3,96 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.support.math; -import java.io.DataOutputStream; - -import freenet.support.Logger; -import freenet.support.Logger.LogLevel; +import java.util.Arrays; /** * Simple running average: linear mean of the last N reports. + * * @author amphibian */ public final class SimpleRunningAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = -1; - final double[] refs; - int nextSlotPtr=0; - int curLen=0; - double total=0; - int totalReports = 0; - final double initValue; - private boolean logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this); + private final double[] refs; + private final double initValue; + + private double sum; + private long totalReports; - @Override - public final SimpleRunningAverage clone() { - // Deep copy needed. Implement Cloneable to shut up findbugs. - return new SimpleRunningAverage(this); - } - - /** - * Clear the SRA - */ - public synchronized void clear() { - nextSlotPtr = 0; - curLen = 0; - totalReports = 0; - total = 0; - for(int i=0;i= refs.length) nextSlotPtr = 0; + public void report(double d) { + synchronized (refs) { + int index = (int) (totalReports % refs.length); + sum = sum - refs[index] + d; + refs[index] = d; + totalReports++; + } } - /** - * - * @return - */ - protected synchronized double popValue(){ - return refs[nextSlotPtr]; - } - @Override - public synchronized String toString() { - return super.toString() + ": curLen="+curLen+", ptr="+nextSlotPtr+", total="+ - total+", average="+total/curLen; + public String toString() { + synchronized (refs) { + return super.toString() + + ", total=" + sum + + ", average=" + currentValue(); + } } - - /** - * - * @param d - */ + @Override public void report(long d) { - report((double)d); + report((double) d); } - /** - * - * @param out - */ - public void writeDataTo(DataOutputStream out) { - throw new UnsupportedOperationException(); + @Override + public long countReports() { + synchronized (refs) { + return totalReports; + } } @Override - public synchronized long countReports() { - return totalReports; + public SimpleRunningAverage clone() { + return new SimpleRunningAverage(this); } /** - * - * @param targetValue - * @return + * Clear the SRA */ - public synchronized double minReportForValue(double targetValue) { - if(curLen < refs.length) { - /** Don't need to remove any values before reporting, - * so is slightly simpler. - * (total + report) / (curLen + 1) >= targetValue => - * report / (curLen + 1) >= targetValue - total/(curLen+1) - * => report >= (targetValue - total/(curLen + 1)) * (curLen+1) - * => report >= targetValue * (curLen + 1) - total - * EXAMPLE: - * Mean (5, 5, 5, 5, 5, X) = 10 - * X = 10 * 6 - 25 = 35 - * => Mean = (25 + 35) / 6 = 60/6 = 10 - */ - return targetValue * (curLen + 1) - total; - } else { - /** Essentially the same, but: - * 1) Length will be curLen, not curLen+1, because is full. - * 2) Take off the value that will be taken off first. - */ - return targetValue * curLen - (total - refs[nextSlotPtr]); + public void clear() { + synchronized (refs) { + sum = 0; + totalReports = 0; + Arrays.fill(refs, 0.0); } } } diff --git a/src/freenet/support/math/TimeDecayingRunningAverage.java b/src/freenet/support/math/TimeDecayingRunningAverage.java index 2daf14e911..4ea5dbdb09 100644 --- a/src/freenet/support/math/TimeDecayingRunningAverage.java +++ b/src/freenet/support/math/TimeDecayingRunningAverage.java @@ -3,330 +3,272 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.support.math; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; import freenet.node.TimeSkewDetectorCallback; import freenet.support.Logger; import freenet.support.SimpleFieldSet; -import freenet.support.Logger.LogLevel; /** * Time decaying running average. - * + *

* Decay factor = 0.5 ^ (interval / halflife). - * + *

* So if the interval is exactly the half-life then reporting 0 will halve the value. - * - * Note that the older version has a half life on the influence of any given report without taking - * into account the fact that reports persist and accumulate. :) - * */ public final class TimeDecayingRunningAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = -1; - static final int MAGIC = 0x5ff4ac94; - - @Override - public final TimeDecayingRunningAverage clone() { - // Override clone to synchronize, as per comments in RunningAverage. - // Implement Cloneable to shut up findbugs. - synchronized(this) { - return new TimeDecayingRunningAverage(this); - } - } - - double curValue; - final double halfLife; - long lastReportTime; - long createdTime; - long totalReports; - boolean started; - double defaultValue; - double minReport; - double maxReport; - boolean logDEBUG; - private final TimeSkewDetectorCallback timeSkewCallback; - - @Override - public String toString() { - long now = System.currentTimeMillis(); - synchronized(this) { - return super.toString() + ": currentValue="+curValue+", halfLife="+halfLife+ - ", lastReportTime="+(now - lastReportTime)+ - "ms ago, createdTime="+(now - createdTime)+ - "ms ago, totalReports="+totalReports+", started="+started+ - ", defaultValue="+defaultValue+", min="+minReport+", max="+maxReport; - } - } - + private final AtomicReference data = new AtomicReference<>(); + private final TimeTracker timeTracker; + /** - * - * @param defaultValue - * @param halfLife - * @param min - * @param max - * @param callback + * Monotonic time source in nanosecond resolution (but usually with lower accuracy). + * This time source should increase monotonically with elapsed time - usually `System::nanoTime`. */ - public TimeDecayingRunningAverage(double defaultValue, long halfLife, - double min, double max, TimeSkewDetectorCallback callback) { - curValue = defaultValue; - this.defaultValue = defaultValue; - started = false; - this.halfLife = halfLife; - createdTime = lastReportTime = System.currentTimeMillis(); - this.minReport = min; - this.maxReport = max; - totalReports = 0; - logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this); - if(logDEBUG) - Logger.debug(this, "Created "+this, - new Exception("debug")); - this.timeSkewCallback = callback; - } - + private final LongSupplier monotonicTimeSourceNanos; + /** - * - * @param defaultValue - * @param halfLife - * @param min - * @param max - * @param fs - * @param callback + * Special timestamp set to 1 nanosecond prior to creation of this instance. + * Used as a sentinel value for "not initialized" timestamps. */ - public TimeDecayingRunningAverage(double defaultValue, long halfLife, - double min, double max, SimpleFieldSet fs, TimeSkewDetectorCallback callback) { - curValue = defaultValue; - this.defaultValue = defaultValue; - started = false; - this.halfLife = halfLife; - createdTime = System.currentTimeMillis(); - this.lastReportTime = -1; // long warm-up may skew results, so lets wait for the first report - this.minReport = min; - this.maxReport = max; - totalReports = 0; - logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this); - if(logDEBUG) - Logger.debug(this, "Created "+this, - new Exception("debug")); - if(fs != null) { - started = fs.getBoolean("Started", false); - if(started) { - curValue = fs.getDouble("CurrentValue", curValue); - if(curValue > maxReport || curValue < minReport || Double.isNaN(curValue)) { - curValue = defaultValue; - totalReports = 0; - createdTime = System.currentTimeMillis(); - } else { - totalReports = fs.getLong("TotalReports", 0); - long uptime = fs.getLong("Uptime", 0); - createdTime = System.currentTimeMillis() - uptime; - } - } - } - this.timeSkewCallback = callback; - } - + private final long notInitializedSentinel; + /** - * - * @param defaultValue - * @param halfLife - * @param min - * @param max - * @param dis - * @param callback - * @throws IOException + * Creation time of this instance in wall-clock time. Used for uptime calculation. */ - public TimeDecayingRunningAverage(double defaultValue, double halfLife, double min, double max, DataInputStream dis, TimeSkewDetectorCallback callback) throws IOException { - int m = dis.readInt(); - if(m != MAGIC) throw new IOException("Invalid magic "+m); - int v = dis.readInt(); - if(v != 1) throw new IOException("Invalid version "+v); - curValue = dis.readDouble(); - if(Double.isInfinite(curValue) || Double.isNaN(curValue)) - throw new IOException("Invalid weightedTotal: "+curValue); - if((curValue < min) || (curValue > max)) - throw new IOException("Out of range: curValue = "+curValue); - started = dis.readBoolean(); - long priorExperienceTime = dis.readLong(); - this.halfLife = halfLife; - this.minReport = min; - this.maxReport = max; - this.defaultValue = defaultValue; - logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this); - lastReportTime = -1; - createdTime = System.currentTimeMillis() - priorExperienceTime; - totalReports = dis.readLong(); - this.timeSkewCallback = callback; - } + private final long createdTimeMillis; /** - * - * @param a + * Half-life time in nanoseconds. */ - public TimeDecayingRunningAverage(TimeDecayingRunningAverage a) { - this.createdTime = a.createdTime; - this.defaultValue = a.defaultValue; - this.halfLife = a.halfLife; - this.lastReportTime = a.lastReportTime; - this.maxReport = a.maxReport; - this.minReport = a.minReport; - this.started = a.started; - this.totalReports = a.totalReports; - this.curValue = a.curValue; - this.timeSkewCallback = a.timeSkewCallback; - } + private final double halfLifeNanos; /** - * - * @return + * Minimum allowed input value. Smaller reported values are silently ignored. */ - @Override - public synchronized double currentValue() { - return curValue; - } + private final double min; /** - * - * @param d + * Maximum allowed input value. Larger reported values are silently ignored. */ + private final double max; + + public TimeDecayingRunningAverage( + double defaultValue, + long halfLife, + double min, + double max, + TimeSkewDetectorCallback callback + ) { + this(defaultValue, halfLife, min, max, null, callback); + } + + public TimeDecayingRunningAverage( + double defaultValue, + long halfLife, + double min, + double max, + SimpleFieldSet fs, + TimeSkewDetectorCallback callback + ) { + this(defaultValue, halfLife, min, max, fs, callback, System::currentTimeMillis, System::nanoTime); + } + + TimeDecayingRunningAverage( + double defaultValue, + long halfLife, + double min, + double max, + SimpleFieldSet fs, + TimeSkewDetectorCallback callback, + LongSupplier wallClockTimeSourceMillis, + LongSupplier monotonicTimeSourceNanos + ) { + this.halfLifeNanos = Math.max(1, halfLife) * 1e6; + this.min = min; + this.max = max; + long createdTime = wallClockTimeSourceMillis.getAsLong(); + long reports = 0; + boolean started = false; + double currentValue = defaultValue; + if (fs != null) { + started = fs.getBoolean("Started", false); + if (started) { + double d = fs.getDouble("CurrentValue", currentValue); + if (!isInvalid(d)) { + reports = Math.max(0, fs.getLong("TotalReports", 0)); + createdTime = createdTime - Math.max(0, fs.getLong("Uptime", 0)); + currentValue = d; + } + } + } + this.timeTracker = new TimeTracker(callback, wallClockTimeSourceMillis); + this.monotonicTimeSourceNanos = monotonicTimeSourceNanos; + this.notInitializedSentinel = monotonicTimeSourceNanos.getAsLong() - 1; + this.createdTimeMillis = createdTime; + this.data.set(new Data(reports, notInitializedSentinel, started, currentValue)); + } + + public TimeDecayingRunningAverage(TimeDecayingRunningAverage other) { + this.timeTracker = new TimeTracker(other.timeTracker); + this.monotonicTimeSourceNanos = other.monotonicTimeSourceNanos; + this.notInitializedSentinel = other.notInitializedSentinel; + this.createdTimeMillis = other.createdTimeMillis; + this.halfLifeNanos = other.halfLifeNanos; + this.max = other.max; + this.min = other.min; + this.data.set(new Data(other.data.get())); + } + + @Override + public double currentValue() { + return data.get().currentValue; + } + @Override public void report(double d) { - synchronized(this) { - // Must synchronize first to achieve serialization. - long now = System.currentTimeMillis(); - if(d < minReport) { - Logger.error(this, "Impossible: "+d+" on "+this, new Exception("error")); - return; - } - if(d > maxReport) { - Logger.error(this, "Impossible: "+d+" on "+this, new Exception("error")); - return; - } - if(Double.isInfinite(d) || Double.isNaN(d)) { - Logger.error(this, "Reported infinity or NaN to "+this+" : "+d, new Exception("error")); - return; - } - totalReports++; - if(!started) { - curValue = d; - started = true; - if(logDEBUG) - Logger.debug(this, "Reported "+d+" on "+this+" when just started"); - } else if(lastReportTime != -1) { // might be just serialized in - long thisInterval = - now - lastReportTime; - long uptime = now - createdTime; - if(thisInterval < 0) { - Logger.error(this, "Clock (reporting) went back in time, ignoring report: "+now+" was "+lastReportTime+" (back "+(-thisInterval)+"ms)"); - lastReportTime = now; - if(timeSkewCallback != null) - timeSkewCallback.setTimeSkewDetectedUserAlert(); - return; - } - double thisHalfLife = halfLife; - if(uptime < 0) { - Logger.error(this, "Clock (uptime) went back in time, ignoring report: "+now+" was "+createdTime+" (back "+(-uptime)+"ms)"); - if(timeSkewCallback != null) - timeSkewCallback.setTimeSkewDetectedUserAlert(); - return; - // Disable sensitivity hack. - // Excessive sensitivity at start isn't necessarily a good thing. - // In particular it makes the average inconsistent - 20 reports of 0 at 1s intervals have a *different* effect to 10 reports of 0 at 2s intervals! - // Also it increases the impact of startup spikes, which then take a long time to recover from. - //} else { - //double oneFourthOfUptime = uptime / 4D; - //if(oneFourthOfUptime < thisHalfLife) thisHalfLife = oneFourthOfUptime; - } - - if(thisHalfLife == 0) thisHalfLife = 1; - double changeFactor = - Math.pow(0.5, (thisInterval) / thisHalfLife); - double oldCurValue = curValue; - curValue = curValue * changeFactor /* close to 1.0 if short interval, close to 0.0 if long interval */ - + (1.0 - changeFactor) * d; - // FIXME remove when stop getting reports of wierd output values - if(curValue < minReport || curValue > maxReport) { - Logger.error(this, "curValue="+curValue+" was "+oldCurValue+" - out of range"); - curValue = oldCurValue; - } - if(logDEBUG) - Logger.debug(this, "Reported "+d+" on "+this+": thisInterval="+thisInterval+ - ", halfLife="+halfLife+", uptime="+uptime+", thisHalfLife="+thisHalfLife+ - ", changeFactor="+changeFactor+", oldCurValue="+oldCurValue+ - ", currentValue="+currentValue()+ - ", thisInterval="+thisInterval+", thisHalfLife="+thisHalfLife+ - ", uptime="+uptime+", changeFactor="+changeFactor); - } - lastReportTime = now; - } - } + data.updateAndGet(data -> data.updated(d)); + timeTracker.report(); + } - /** - * - * @param d - */ - @Override - public void report(long d) { - report((double)d); + @Override + public void report(long d) { + report((double) d); } @Override public double valueIfReported(double r) { - throw new UnsupportedOperationException(); + return data.get().updated(r).currentValue; } - /** - * - * @param out - * @throws IOException - */ - public void writeDataTo(DataOutputStream out) throws IOException { - long now = System.currentTimeMillis(); - synchronized(this) { - out.writeInt(MAGIC); - out.writeInt(1); - out.writeDouble(curValue); - out.writeBoolean(started); - out.writeLong(totalReports); - out.writeLong(now - createdTime); - } - } + @Override + public long countReports() { + return data.get().reports; + } - /** - * - * @return - */ - public int getDataLength() { - return 4 + 4 + 8 + 8 + 1 + 8 + 8; + public long lastReportTime() { + return timeTracker.lastReportMillis; + } + + public SimpleFieldSet exportFieldSet(boolean shortLived) { + Data data = this.data.get(); + long now = timeTracker.wallClockTimeSourceMillis.getAsLong(); + SimpleFieldSet fs = new SimpleFieldSet(shortLived); + fs.putSingle("Type", "TimeDecayingRunningAverage"); + fs.put("CurrentValue", data.currentValue); + fs.put("Started", data.started); + fs.put("TotalReports", data.reports); + fs.put("Uptime", now - createdTimeMillis); + return fs; } @Override - public synchronized long countReports() { - return totalReports; + public TimeDecayingRunningAverage clone() { + return new TimeDecayingRunningAverage(this); } - /** - * - * @return - */ - public synchronized long lastReportTime() { - return lastReportTime; - } + @Override + public String toString() { + Data data = this.data.get(); + long now = timeTracker.wallClockTimeSourceMillis.getAsLong(); + return super.toString() + + ": currentValue=" + data.currentValue + ", " + + ", halfLife=" + (long) (halfLifeNanos / 1e6) + "ms" + + ", lastReportTime=" + (now - lastReportTime()) + "ms ago" + + ", createdTime=" + (now - createdTimeMillis) + "ms ago" + + ", reports=" + data.reports + + ", started=" + data.started + + ", min=" + min + + ", max=" + max; + } - /** - * - * @param shortLived - * @return - */ - public synchronized SimpleFieldSet exportFieldSet(boolean shortLived) { - SimpleFieldSet fs = new SimpleFieldSet(shortLived); - fs.putSingle("Type", "TimeDecayingRunningAverage"); - fs.put("CurrentValue", curValue); - fs.put("Started", started); - fs.put("TotalReports", totalReports); - fs.put("Uptime", System.currentTimeMillis() - createdTime); - return fs; - } + private boolean isInvalid(double d) { + return d < min || d > max || Double.isInfinite(d) || Double.isNaN(d); + } + + private class Data { + private final long reports; + private final long lastUpdatedNanos; + private final boolean started; + private final double currentValue; + + private Data(long reports, long lastUpdatedNanos, boolean started, double currentValue) { + this.reports = reports; + this.lastUpdatedNanos = lastUpdatedNanos; + this.started = started; + this.currentValue = Math.max(min, Math.min(max, currentValue)); + } + + private Data(Data other) { + this.reports = other.reports; + this.lastUpdatedNanos = other.lastUpdatedNanos; + this.started = other.started; + this.currentValue = other.currentValue; + } + + private Data updated(double d) { + if (isInvalid(d)) { + return this; + } + long now = monotonicTimeSourceNanos.getAsLong(); + if (!started) { + // A fresh average instantly jumps to the first reported value + return new Data(reports + 1, now, true, d); + } + if (lastUpdatedNanos == notInitializedSentinel) { + // For a restored average, the first data point is ignored + return new Data(reports + 1, now, true, currentValue); + } + double timeSinceLastUpdated = now - lastUpdatedNanos; + /* close to 1.0 if short interval, close to 0.0 if long interval */ + double changeFactor = Math.pow(0.5, timeSinceLastUpdated / halfLifeNanos); + double newValue = currentValue * changeFactor + (1.0 - changeFactor) * d; + return new Data(reports + 1, now, true, newValue); + } + } + + private static class TimeTracker { + /** + * Callback to invoke when a time skew is detected. + */ + private final TimeSkewDetectorCallback timeSkewDetectorCallback; + + /** + * Time source reporting the wall-clock time in milliseconds since the UNIX epoch. + * This clock should represent the system time which may drift due to (network) time synchronization. + * Usually set to `System::currentTimeMillis`. + */ + private final LongSupplier wallClockTimeSourceMillis; + + /** + * Timestamp when `report` was last invoked, in milliseconds since UNIX epoch. + */ + private volatile long lastReportMillis = -1; + + private TimeTracker(TimeSkewDetectorCallback timeSkewDetectorCallback, LongSupplier wallClockTimeSourceMillis) { + this.timeSkewDetectorCallback = timeSkewDetectorCallback; + this.wallClockTimeSourceMillis = wallClockTimeSourceMillis; + } + + private TimeTracker(TimeTracker other) { + this.timeSkewDetectorCallback = other.timeSkewDetectorCallback; + this.wallClockTimeSourceMillis = other.wallClockTimeSourceMillis; + this.lastReportMillis = other.lastReportMillis; + } + + private void report() { + if (timeSkewDetectorCallback == null) { + this.lastReportMillis = wallClockTimeSourceMillis.getAsLong(); + return; + } + long lastReportTime = this.lastReportMillis; + long now = wallClockTimeSourceMillis.getAsLong(); + this.lastReportMillis = now; + if (now < lastReportTime) { + Logger.error(this, "Clock went back in time: " + now + " was " + lastReportTime + " (back " + (lastReportTime - now) + "ms)"); + timeSkewDetectorCallback.setTimeSkewDetectedUserAlert(); + } + } + } } diff --git a/src/freenet/support/math/TrivialRunningAverage.java b/src/freenet/support/math/TrivialRunningAverage.java index ffaad5ab83..d9b4167167 100644 --- a/src/freenet/support/math/TrivialRunningAverage.java +++ b/src/freenet/support/math/TrivialRunningAverage.java @@ -1,77 +1,69 @@ package freenet.support.math; +import java.util.concurrent.atomic.AtomicReference; + public final class TrivialRunningAverage implements RunningAverage, Cloneable { - private static final long serialVersionUID = 1L; - private long reports; - private double total; - - /** - * - * @param average - */ - public TrivialRunningAverage(TrivialRunningAverage average) { - this.reports = average.reports; - this.total = average.total; - } - - /** - * - */ - public TrivialRunningAverage() { - reports = 0; - total = 0.0; - } - - @Override - public synchronized long countReports() { - return reports; - } - - public synchronized double totalValue() { - return total; - } - - /** - * - * @return - */ - @Override - public synchronized double currentValue() { - return total / reports; - } - - /** - * - * @param d - */ - @Override - public synchronized void report(double d) { - total += d; - reports++; - // TODO Auto-generated method stub - } - - /** - * - * @param d - */ - @Override - public void report(long d) { - report((double)d); - } - - @Override - public synchronized double valueIfReported(double r) { - return (total + r) / (reports + 1); - } - - @Override - public TrivialRunningAverage clone() { - // Override clone() for synchronization. - // Implement Cloneable to shut up findbugs. - synchronized (this) { - return new TrivialRunningAverage(this); - } - } + private final AtomicReference data = new AtomicReference<>(); + + public TrivialRunningAverage(TrivialRunningAverage other) { + data.set(other.data.get()); + } + + public TrivialRunningAverage() { + data.set(new Data(0, 0)); + } + + @Override + public long countReports() { + return data.get().reports; + } + + public double totalValue() { + return data.get().total; + } + + @Override + public double currentValue() { + return data.get().getRunningAverage(); + } + + @Override + public void report(double d) { + data.updateAndGet(data -> data.updated(d)); + } + + @Override + public void report(long d) { + report((double) d); + } + + @Override + public double valueIfReported(double r) { + return data.get().updated(r).getRunningAverage(); + } + + @Override + public TrivialRunningAverage clone() { + return new TrivialRunningAverage(this); + } + + private static class Data { + private final long reports; + private final double total; + + Data(long reports, double total) { + this.reports = reports; + this.total = total; + } + + Data updated(double d) { + return new Data(reports + 1, d + total); + } + + double getRunningAverage() { + return total / reports; + } + } + } diff --git a/test/freenet/support/math/BootstrappingDecayingRunningAverageTest.java b/test/freenet/support/math/BootstrappingDecayingRunningAverageTest.java new file mode 100644 index 0000000000..000b18a378 --- /dev/null +++ b/test/freenet/support/math/BootstrappingDecayingRunningAverageTest.java @@ -0,0 +1,93 @@ +package freenet.support.math; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; + +import freenet.support.SimpleFieldSet; +import org.junit.Test; + +public class BootstrappingDecayingRunningAverageTest { + @Test + public void decaysOverSubsequentReports() { + BootstrappingDecayingRunningAverage average = new BootstrappingDecayingRunningAverage(0, 0, 1, 2, null); + average.report(1); + assertThat(average.currentValue(), equalTo(1.0)); + average.report(0); + assertThat(average.currentValue(), equalTo(0.5)); + average.report(0); + assertThat(average.currentValue(), equalTo(0.25)); + average.report(0); + assertThat(average.currentValue(), equalTo(0.125)); + } + + @Test + public void newInstanceHasDefaultValue() { + BootstrappingDecayingRunningAverage average = new BootstrappingDecayingRunningAverage(1, 0, 1, 2, null); + assertThat(average.currentValue(), equalTo(1.0)); + } + + @Test + public void countsNumberOfValidReports() { + BootstrappingDecayingRunningAverage average = new BootstrappingDecayingRunningAverage(0, 0, 1, 2, null); + assertThat(average.countReports(), equalTo(0L)); + + // Valid report + average.report(0); + assertThat(average.countReports(), equalTo(1L)); + + // Invalid reports + average.report(-1000); + average.report(1000); + average.report(Double.NEGATIVE_INFINITY); + average.report(Double.POSITIVE_INFINITY); + average.report(Double.NaN); + assertThat(average.countReports(), equalTo(1L)); + } + + @Test + public void writesStateToSimpleFieldSet() { + BootstrappingDecayingRunningAverage average = new BootstrappingDecayingRunningAverage(0, 0, 1, 2, null); + average.report(0.5); + + SimpleFieldSet sfs = average.exportFieldSet(true); + assertThat(sfs.directKeyValues(), allOf( + hasEntry("Type", "BootstrappingDecayingRunningAverage"), + hasEntry("Reports", "1"), + hasEntry("CurrentValue", "0.5") + )); + } + + @Test + public void canBeRestoredFromSimpleFieldSet() { + SimpleFieldSet sfs = new SimpleFieldSet(true); + sfs.putSingle("Type", "BootstrappingDecayingRunningAverage"); + sfs.putSingle("Reports", "1"); + sfs.putSingle("CurrentValue", "0.5"); + + BootstrappingDecayingRunningAverage average = new BootstrappingDecayingRunningAverage(0, 0, 1, 2, sfs); + assertThat(average.currentValue(), equalTo(0.5)); + assertThat(average.countReports(), equalTo(1L)); + + average.report(0); + assertThat(average.currentValue(), equalTo(0.25)); + assertThat(average.countReports(), equalTo(2L)); + } + + @Test + public void cloneCreatesIndependentInstance() { + BootstrappingDecayingRunningAverage first = new BootstrappingDecayingRunningAverage(0, 0, 1, 2, null); + BootstrappingDecayingRunningAverage second = first.clone(); + second.report(0); + second.report(1); + + // Cloned instance should remain untouched + assertThat(first.currentValue(), equalTo(0.0)); + assertThat(first.countReports(), equalTo(0L)); + + // New instance should be updated + assertThat(second.currentValue(), equalTo(0.5)); + assertThat(second.countReports(), equalTo(2L)); + } +} diff --git a/test/freenet/support/math/DecayingKeyspaceAverageTest.java b/test/freenet/support/math/DecayingKeyspaceAverageTest.java new file mode 100644 index 0000000000..ee5edc64bd --- /dev/null +++ b/test/freenet/support/math/DecayingKeyspaceAverageTest.java @@ -0,0 +1,36 @@ +package freenet.support.math; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThrows; + +import org.junit.Test; + +public class DecayingKeyspaceAverageTest { + private final DecayingKeyspaceAverage average = new DecayingKeyspaceAverage(0.5, 2, null); + + @Test + public void wrapsAround() { + average.report(0.5); + assertThat(average.currentValue(), equalTo(0.5)); + average.report(1.0); + assertThat(average.currentValue(), equalTo(0.75)); + average.report(0.25); + assertThat(average.currentValue(), equalTo(0.0)); + average.report(0.25); + assertThat(average.currentValue(), equalTo(0.125)); + average.report(0.875); + assertThat(average.currentValue(), equalTo(0.0)); + average.report(0.75); + assertThat(average.currentValue(), equalTo(0.875)); + } + + @Test + public void rejectsInvalidInput() { + assertThrows(IllegalArgumentException.class, () -> average.report(1.1)); + assertThrows(IllegalArgumentException.class, () -> average.report(-0.1)); + assertThrows(IllegalArgumentException.class, () -> average.report(Double.NaN)); + assertThrows(IllegalArgumentException.class, () -> average.report(Double.POSITIVE_INFINITY)); + assertThrows(IllegalArgumentException.class, () -> average.report(Double.NEGATIVE_INFINITY)); + } +} diff --git a/test/freenet/support/math/SimpleRunningAverageTest.java b/test/freenet/support/math/SimpleRunningAverageTest.java new file mode 100644 index 0000000000..cf63b1cfcf --- /dev/null +++ b/test/freenet/support/math/SimpleRunningAverageTest.java @@ -0,0 +1,47 @@ +package freenet.support.math; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.junit.Test; + +public class SimpleRunningAverageTest { + private final SimpleRunningAverage average = new SimpleRunningAverage(4, 100.0); + + @Test + public void returnsInitialValue() { + assertThat(average.currentValue(), equalTo(100.0)); + } + + @Test + public void returnsLinearMeanOfLastReports() { + average.report(10); + assertThat(average.currentValue(), equalTo(10.0)); + average.report(40); + assertThat(average.currentValue(), equalTo(25.0)); + average.report(40); + assertThat(average.currentValue(), equalTo(30.0)); + average.report(110); + assertThat(average.currentValue(), equalTo(50.0)); + + // Values should start dropping out now + average.report(40); + assertThat(average.currentValue(), equalTo(57.5)); + average.report(10); + assertThat(average.currentValue(), equalTo(50.0)); + } + + @Test + public void clear() { + for (int i = 0; i < 4; i++) { + average.report(12345); + } + assertThat(average.currentValue(), equalTo(12345.0)); + assertThat(average.countReports(), equalTo(4L)); + + // Clear should reset to initial value = 100 + average.clear(); + assertThat(average.currentValue(), equalTo(100.0)); + assertThat(average.countReports(), equalTo(0L)); + } +} diff --git a/test/freenet/support/math/TimeDecayingRunningAverageTest.java b/test/freenet/support/math/TimeDecayingRunningAverageTest.java new file mode 100644 index 0000000000..88db6184b9 --- /dev/null +++ b/test/freenet/support/math/TimeDecayingRunningAverageTest.java @@ -0,0 +1,179 @@ +package freenet.support.math; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + +import java.util.Random; + +import freenet.node.TimeSkewDetectorCallback; +import freenet.support.SimpleFieldSet; +import org.junit.Test; + +public class TimeDecayingRunningAverageTest { + private final Clock clock = new Clock(); + + @Test + public void decaysOverTime() { + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + average.report(0); + assertThat(average.currentValue(), equalTo(0.0)); + + // 1 half-live passes, should take 50% old value (0.0) and 50% new value (1.0) + clock.tick(1000); + average.report(1); + assertThat(average.currentValue(), equalTo(0.5)); + + // 2 half-lives pass, should take 25% old value (0.5) and 75% new value (1.0) + clock.tick(2000); + average.report(1); + assertThat(average.currentValue(), equalTo(0.875)); + } + + @Test + public void newInstanceJumpsToFirstReported() { + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + assertThat(average.currentValue(), equalTo(0.0)); + + average.report(1); + assertThat(average.currentValue(), equalTo(1.0)); + } + + @Test + public void isNotAffectedByClockDrift() { + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + average.report(0); + assertThat(average.currentValue(), equalTo(0.0)); + + // Wall-clock time drifts backwards + clock.drift(-12345); + + // 1 half-live passes, should take 50% old value (0.0) and 50% new value (1.0) + clock.tick(1000); + average.report(1); + assertThat(average.currentValue(), equalTo(0.5)); + } + + @Test + public void reportsNegativeClockDrift() { + TimeSkewDetectorCallback callback = mock(TimeSkewDetectorCallback.class); + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, callback, clock::getWallClockMillis, clock::getMonotonicNanos); + + // Wall-clock time drifts forward should not be reported + clock.drift(12345); + average.report(0); + verifyZeroInteractions(callback); + + // Wall-clock time drifts backwards should be reported + clock.drift(-12345); + average.report(0); + verify(callback).setTimeSkewDetectedUserAlert(); + } + + @Test + public void countsNumberOfValidReports() { + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + assertThat(average.countReports(), equalTo(0L)); + + // Valid report + average.report(0); + assertThat(average.countReports(), equalTo(1L)); + + // Invalid reports + average.report(-1000); + average.report(1000); + average.report(Double.NEGATIVE_INFINITY); + average.report(Double.POSITIVE_INFINITY); + average.report(Double.NaN); + assertThat(average.countReports(), equalTo(1L)); + } + + @Test + public void writesStateToSimpleFieldSet() { + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + clock.tick(1000); + average.report(0.5); + + SimpleFieldSet sfs = average.exportFieldSet(true); + assertThat(sfs.directKeyValues(), allOf( + hasEntry("Type", "TimeDecayingRunningAverage"), + hasEntry("Started", "true"), + hasEntry("Uptime", "1000"), + hasEntry("TotalReports", "1"), + hasEntry("CurrentValue", "0.5") + )); + } + + @Test + public void ignoresFirstValueWhenRestoredInStartedState() { + SimpleFieldSet sfs = new SimpleFieldSet(true); + sfs.putSingle("Type", "TimeDecayingRunningAverage"); + sfs.putSingle("Started", "true"); + sfs.putSingle("Uptime", "1000"); + sfs.putSingle("TotalReports", "1"); + sfs.putSingle("CurrentValue", "0.5"); + + TimeDecayingRunningAverage average = new TimeDecayingRunningAverage(0, 1000, 0, 1, sfs, null, clock::getWallClockMillis, clock::getMonotonicNanos); + assertThat(average.currentValue(), equalTo(0.5)); + assertThat(average.countReports(), equalTo(1L)); + assertThat(average.toString(), containsString("createdTime=1000ms ago")); + + // First reported value should be ignored (but the remaining fields keep counting) + clock.tick(1000); + average.report(0); + assertThat(average.currentValue(), equalTo(0.5)); + assertThat(average.countReports(), equalTo(2L)); + assertThat(average.toString(), containsString("createdTime=2000ms ago")); + + // Subsequent values should be handled normally + clock.tick(1000); + average.report(0); + assertThat(average.currentValue(), equalTo(0.25)); + assertThat(average.countReports(), equalTo(3L)); + assertThat(average.toString(), containsString("createdTime=3000ms ago")); + } + + @Test + public void cloneCreatesIndependentInstance() { + TimeDecayingRunningAverage first = new TimeDecayingRunningAverage(0, 1000, 0, 1, null, null, clock::getWallClockMillis, clock::getMonotonicNanos); + TimeDecayingRunningAverage second = first.clone(); + second.report(0); + clock.tick(1000); + second.report(1); + + // Cloned instance should remain untouched + assertThat(first.currentValue(), equalTo(0.0)); + assertThat(first.countReports(), equalTo(0L)); + + // New instance should be updated + assertThat(second.currentValue(), equalTo(0.5)); + assertThat(second.countReports(), equalTo(2L)); + } + + static class Clock { + private long wallClockMillis = System.currentTimeMillis(); + private long monotonicMillis = new Random().nextLong(); + + void tick(long millis) { + wallClockMillis += millis; + monotonicMillis += millis; + } + + void drift(long millis) { + wallClockMillis += millis; + } + + long getWallClockMillis() { + return wallClockMillis; + } + + long getMonotonicNanos() { + return monotonicMillis * 1_000_000; + } + } +}