Skip to content

Commit

Permalink
Merge branch 'probabilistic-queue-priorization' into next
Browse files Browse the repository at this point in the history
  • Loading branch information
ArneBab committed Nov 29, 2024
2 parents 2b13590 + 02129a6 commit c873c45
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 29 deletions.
4 changes: 3 additions & 1 deletion src/freenet/io/comm/UdpSocketHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ private static int getFd(DatagramSocket s) {
f.setAccessible(true);
ret = f.getInt(fdi);
} catch (Exception e) {
Logger.error(UdpSocketHandler.class, e.getMessage(), e);
if (logMINOR) { // TODO: Known Java 21 problem.
Logger.warning(UdpSocketHandler.class, e.getMessage(), e);
}
}
return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion src/freenet/node/OpennetManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class OpennetManager {
* (around 20%), and (b) it ensures that nodes with 10 connections still have 3 long links, so
* long links cannot form chains and the routing still scales if the short routing is broken.
*
* See USK@ZLwcSLwqpM1527Tw1YmnSiXgzITU0neHQ11Cyl0iLmk,f6FLo3TvsEijIcJq-X3BTjjtm0ErVZwAPO7AUd9V7lY,AQACAAE/fix-link-length/7/
* See USK@ZLwcSLwqpM1527Tw1YmnSiXgzITU0neHQ11Cyl0iLmk,f6FLo3TvsEijIcJq-X3BTjjtm0ErVZwAPO7AUd9V7lY,AQACAAE/fix-link-length/22/
* (FIXME move to wiki or other permanent storage)
*/
/** Peers with more than this distance are considered "long links". */
Expand Down
23 changes: 12 additions & 11 deletions src/freenet/node/PeerMessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
import java.util.Random;

import freenet.io.comm.DMT;
import freenet.support.DoublyLinkedList;
Expand Down Expand Up @@ -710,7 +711,9 @@ public boolean isEmpty() {

}

PeerMessageQueue() {
final private Random fastWeakRandom;
PeerMessageQueue(Random fastWeakRandom) {
this.fastWeakRandom = fastWeakRandom;
queuesByPriority = new PrioQueue[DMT.NUM_PRIORITIES];
for(int i=0;i<queuesByPriority.length;i++) {
if(i == DMT.PRIORITY_BULK_DATA)
Expand Down Expand Up @@ -873,25 +876,23 @@ public synchronized MessageItem grabQueuedMessageItem(int minPriority) {
if(ret != null) return ret;
}

// Include bulk or realtime, whichever is more urgent.

boolean tryRealtimeFirst = true;

// If one is empty, try the other.
// Otherwise try whichever is more urgent, favouring realtime if there is a draw.
// Realtime is supposed to be bursty.


// Include bulk or realtime, whichever is more urgent.
boolean tryRealtimeFirst;
if(queuesByPriority[DMT.PRIORITY_REALTIME_DATA].isEmpty()) {
tryRealtimeFirst = false;
} else if(queuesByPriority[DMT.PRIORITY_BULK_DATA].isEmpty()) {
tryRealtimeFirst = true;
} else if(queuesByPriority[DMT.PRIORITY_BULK_DATA].getNextUrgentTime(Long.MAX_VALUE, 0) >= queuesByPriority[DMT.PRIORITY_REALTIME_DATA].getNextUrgentTime(Long.MAX_VALUE, 0)) {
} else if(queuesByPriority[DMT.PRIORITY_BULK_DATA].getNextUrgentTime(Long.MAX_VALUE, 0)
>= queuesByPriority[DMT.PRIORITY_REALTIME_DATA].getNextUrgentTime(Long.MAX_VALUE, 0)) {
tryRealtimeFirst = true;
} else {
tryRealtimeFirst = false;
// 10% chance to use bulk in case of a draw to avoid starving the bulk queue.
tryRealtimeFirst = this.fastWeakRandom.nextInt(10) > 0;
}

// FIXME token bucket?

if(tryRealtimeFirst) {
// Try realtime first
if(logMINOR) Logger.minor(this, "Trying realtime first");
Expand Down
2 changes: 1 addition & 1 deletion src/freenet/node/PeerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ public PeerNode(SimpleFieldSet fs, Node node2, NodeCrypto crypto, boolean fromLo
swapRequestsInterval = new SimpleRunningAverage(50, Node.MIN_INTERVAL_BETWEEN_INCOMING_SWAP_REQUESTS);
probeRequestsInterval = new SimpleRunningAverage(50, Node.MIN_INTERVAL_BETWEEN_INCOMING_PROBE_REQUESTS);

messageQueue = new PeerMessageQueue();
messageQueue = new PeerMessageQueue(node.getFastWeakRandom());

decrementHTLAtMaximum = node.getRandom().nextFloat() < Node.DECREMENT_AT_MAX_PROB;
decrementHTLAtMinimum = node.getRandom().nextFloat() < Node.DECREMENT_AT_MIN_PROB;
Expand Down
23 changes: 12 additions & 11 deletions test/freenet/node/NewPacketFormatTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.junit.Test;

import freenet.crypt.BlockCipher;
import freenet.crypt.DummyRandomSource;
import freenet.crypt.ciphers.Rijndael;
import freenet.io.comm.DMT;
import freenet.io.comm.FreenetInetAddress;
Expand All @@ -33,7 +34,7 @@ public void setUp() {
@Test
public void testEmptyCreation() throws BlockedTooLongException {
NewPacketFormat npf = new NewPacketFormat(null, 0, 0);
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));
SessionKey s = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);

NPFPacket p = npf.createPacket(1400, pmq, s, false);
Expand All @@ -44,7 +45,7 @@ public void testEmptyCreation() throws BlockedTooLongException {
public void testAckOnlyCreation() throws BlockedTooLongException, InterruptedException {
BasePeerNode pn = new NullBasePeerNode();
NewPacketFormat npf = new NewPacketFormat(pn, 0, 0);
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));
SessionKey s = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);

NPFPacket p = null;
Expand All @@ -65,10 +66,10 @@ public void testAckOnlyCreation() throws BlockedTooLongException, InterruptedExc
public void testLostLastAck() throws BlockedTooLongException, InterruptedException {
NullBasePeerNode senderNode = new NullBasePeerNode();
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode();
NewPacketFormat receiver = new NewPacketFormat(receiverNode, 0, 0);
PeerMessageQueue receiverQueue = new PeerMessageQueue();
PeerMessageQueue receiverQueue = new PeerMessageQueue(new DummyRandomSource(1234));
SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);
senderNode.currentKey = senderKey;
SessionKey receiverKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testLostLastAck() throws BlockedTooLongException, InterruptedExcepti
public void testOutOfOrderDelivery() throws BlockedTooLongException {
NullBasePeerNode senderNode = new NullBasePeerNode();
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode();
NewPacketFormat receiver = new NewPacketFormat(receiverNode, 0, 0);
SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);
Expand All @@ -142,7 +143,7 @@ public void testOutOfOrderDelivery() throws BlockedTooLongException {
public void testReceiveUnknownMessageLength() throws BlockedTooLongException {
NullBasePeerNode senderNode = new NullBasePeerNode();
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode();
NewPacketFormat receiver = new NewPacketFormat(receiverNode, 0, 0);
SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);
Expand All @@ -166,7 +167,7 @@ public void testReceiveUnknownMessageLength() throws BlockedTooLongException {
public void testResendAlreadyCompleted() throws BlockedTooLongException, InterruptedException {
NullBasePeerNode senderNode = new NullBasePeerNode();
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode();
NewPacketFormat receiver = new NewPacketFormat(receiverNode, 0, 0);
SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1);
Expand Down Expand Up @@ -219,7 +220,7 @@ public SessionKey getCurrentKeyTracker() {

};
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode() {

@Override
Expand Down Expand Up @@ -284,7 +285,7 @@ public SessionKey getCurrentKeyTracker() {

};
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));

senderQueue.queueAndEstimateSize(new MessageItem(new byte[128], null, false, null, (short) 0, false, true), 1024);

Expand Down Expand Up @@ -319,7 +320,7 @@ public void handleMessage(Message msg) {

};
NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0);
PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));
NullBasePeerNode receiverNode = new NullBasePeerNode() {

@Override
Expand Down Expand Up @@ -434,7 +435,7 @@ public void testEncryption()
receiverNPF =
new NewPacketFormat(receiverNode, receiverStartSeq, senderStartSeq);

PeerMessageQueue senderQueue = new PeerMessageQueue();
PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234));

byte[] message = new byte[1024];
random.nextBytes(message);
Expand Down
10 changes: 6 additions & 4 deletions test/freenet/node/PeerMessageQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@

import org.junit.Test;

import freenet.crypt.DummyRandomSource;

public class PeerMessageQueueTest {
@Test
public void testUrgentTimeEmpty() {
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));
assertEquals(Long.MAX_VALUE, pmq.getNextUrgentTime(Long.MAX_VALUE, System.currentTimeMillis()));
}

@Test
public void testUrgentTime() {
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));

//Constructor might take some time, so grab a range
long start = System.currentTimeMillis();
Expand All @@ -37,7 +39,7 @@ public void testUrgentTime() {
* it. */
@Test
public void testUrgentTimeQueuedWrong() {
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));

//Constructor might take some time, so grab a range
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -67,7 +69,7 @@ public void testUrgentTimeQueuedWrong() {

@Test
public void testGrabQueuedMessageItem() {
PeerMessageQueue pmq = new PeerMessageQueue();
PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234));

MessageItem itemUrgent = new MessageItem(new byte[1024], null, false, null, (short) 0, false, false);

Expand Down

0 comments on commit c873c45

Please sign in to comment.