Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support IPV6_ADDR_PREFERENCES until Java 24 and beyond #1007

Open
wants to merge 1 commit into
base: next
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 98 additions & 105 deletions src/freenet/io/comm/UdpSocketHandler.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package freenet.io.comm;

import java.io.FileDescriptor;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.DatagramSocketImpl;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.UnsupportedAddressTypeException;
import java.util.Random;

Expand All @@ -26,11 +24,13 @@
import freenet.support.Logger;
import freenet.support.io.NativeThread;
import freenet.support.transport.ip.IPUtil;
import sun.misc.Unsafe;

public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, PortForwardSensitiveSocketHandler {

private final DatagramSocket _sock;
private final InetAddress _bindTo;
private final ByteBuffer receiveBuffer = ByteBuffer.allocate(MAX_RECEIVE_SIZE);
private final DatagramChannel datagramChannel;
private final InetSocketAddress localAddress;
private final AddressTracker tracker;
private IncomingPacketFilter lowLevelFilter;
/** RNG for debugging, used with _dropProbability.
Expand All @@ -45,7 +45,6 @@ public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, Port
private static volatile boolean logDEBUG;
private boolean _isDone;
private volatile boolean _active = true;
private final int listenPort;
private final String title;
private boolean _started;
private long startTime;
Expand Down Expand Up @@ -94,68 +93,58 @@ public enum SOCKET_ADDR_PREFERENCE {
}
}

private static int getFd(DatagramSocket s) {
int ret = -1;
private static int getFd(DatagramChannel channel) {
try {
Method m = s.getClass().getDeclaredMethod("getImpl");
m.setAccessible(true);
DatagramSocketImpl impl = (DatagramSocketImpl)m.invoke(s);
Field f = DatagramSocketImpl.class.getDeclaredField("fd");
f.setAccessible(true);
FileDescriptor fdi = (FileDescriptor)f.get(impl);
f = FileDescriptor.class.getDeclaredField("fd");
f.setAccessible(true);
ret = f.getInt(fdi);
Field unsafe = Unsafe.class.getDeclaredField("theUnsafe");
unsafe.setAccessible(true);
Unsafe theUnsafe = (Unsafe) unsafe.get(null);
Field fdVal = channel.getClass().getDeclaredField("fdVal");
return theUnsafe.getInt(channel, theUnsafe.objectFieldOffset(fdVal));
} catch (Exception e) {
Logger.error(UdpSocketHandler.class, e.getMessage(), e);
return -1;
}
return ret;
}

public static boolean setAddressPreference(DatagramSocket s, SOCKET_ADDR_PREFERENCE p) {
if(!Platform.isLinux())
return false;
int fd = getFd(s);
if(fd <= 2)
return false;
int ret = -1;
public static boolean setAddressPreference(DatagramChannel channel, SOCKET_ADDR_PREFERENCE p) {
if (!Platform.isLinux()) {
return false;
}
int fd = getFd(channel);
if (fd <= 2) {
return false;
}
try {
ret = socketOptionsHolder.setsockopt(fd, SOCKET_level.IPPROTO_IPV6.linux, p.option_name.linux, new IntByReference(p.linux).getPointer(), Native.POINTER_SIZE);
} catch(Exception e) { Logger.normal(UdpSocketHandler.class, e.getMessage(),e); } //if it fails that's fine
return (ret == 0 ? true : false);
int ret = socketOptionsHolder.setsockopt(fd, SOCKET_level.IPPROTO_IPV6.linux, p.option_name.linux, new IntByReference(p.linux).getPointer(), Native.POINTER_SIZE);
return ret == 0;
} catch (Exception e) {
Logger.normal(UdpSocketHandler.class, e.getMessage(), e);
return false;
}
}
}

public UdpSocketHandler(int listenPort, InetAddress bindto, Node node, long startupTime, String title, IOStatisticCollector collector) throws SocketException {
public UdpSocketHandler(int listenPort, InetAddress bindToAddress, Node node, long startupTime, String title, IOStatisticCollector collector) throws IOException {
this.node = node;
this.collector = collector;
this.title = title;
_bindTo = bindto;
// Keep the Updater code in, just commented out, for now
// We may want to be able to do on-line updates.
// if (Updater.hasResource()) {
// _sock = (DatagramSocket) Updater.getResource();
// } else {
this.listenPort = listenPort;
_sock = new DatagramSocket(listenPort, bindto);
int sz = _sock.getReceiveBufferSize();
if(sz < 65536) {
_sock.setReceiveBufferSize(65536);
}
localAddress = new InetSocketAddress(bindToAddress, listenPort);
datagramChannel = DatagramChannel.open()
.bind(localAddress)
.setOption(StandardSocketOptions.SO_RCVBUF, 65536)
.setOption(StandardSocketOptions.SO_REUSEADDR, true);

try {
// Exit reasonably quickly
_sock.setReuseAddress(true);
} catch (SocketException e) {
throw new RuntimeException(e);
datagramChannel.setOption(StandardSocketOptions.IP_TOS, node.getTrafficClass().value);
} catch (UnsupportedOperationException e) {
Logger.error(this, "Failed to set IP_TOS socket option", e);
}
try {
_sock.setTrafficClass(node.getTrafficClass().value);
} catch (SocketException e) {
Logger.error(this, "Failed to setTrafficClass with "+node.getTrafficClass().value,e);

boolean r = socketOptions.setAddressPreference(datagramChannel, socketOptions.SOCKET_ADDR_PREFERENCE.IPV6_PREFER_SRC_PUBLIC);
if(logMINOR) {
Logger.minor(this, "Setting IPV6_PREFER_SRC_PUBLIC for port " + listenPort + " is a " + (r ? "success" : "failure"));
}
boolean r = socketOptions.setAddressPreference(_sock, socketOptions.SOCKET_ADDR_PREFERENCE.IPV6_PREFER_SRC_PUBLIC);
if(logMINOR) Logger.minor(this, "Setting IPV6_PREFER_SRC_PUBLIC for port "+ listenPort + " is a "+(r ? "success" : "failure"));
// }

// Only used for debugging, no need to seed from Yarrow
dropRandom = node.getFastWeakRandom();
tracker = AddressTracker.create(node.getLastBootId(), node.runDir(), listenPort);
Expand All @@ -169,7 +158,7 @@ public void setLowLevelFilter(IncomingPacketFilter f) {
}

public InetAddress getBindTo() {
return _bindTo;
return localAddress.getAddress();
}

public String getTitle() {
Expand Down Expand Up @@ -209,8 +198,8 @@ public void run() { // Listen for packets
t.printStackTrace();
} catch (Throwable tt) {}
} finally {
System.err.println("run() exiting for UdpSocketHandler on port "+_sock.getLocalPort());
Logger.error(this, "run() exiting for UdpSocketHandler on port "+_sock.getLocalPort());
System.err.println("run() exiting for UdpSocketHandler on port " + localAddress.getPort());
Logger.error(this, "run() exiting for UdpSocketHandler on port " + localAddress.getPort());
synchronized (this) {
_isDone = true;
notifyAll();
Expand All @@ -219,11 +208,9 @@ public void run() { // Listen for packets
}

private void runLoop() {
byte[] buf = new byte[MAX_RECEIVE_SIZE];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
while (_active) {
try {
realRun(packet);
realRun();
} catch (Throwable t) {
System.err.println("Caught "+t);
t.printStackTrace(System.err);
Expand All @@ -232,13 +219,12 @@ private void runLoop() {
}
}

private void realRun(DatagramPacket packet) {
// Single receiving thread
boolean gotPacket = getPacket(packet);
private void realRun() {
InetSocketAddress remote = receive();
long now = System.currentTimeMillis();
if (gotPacket) {
if (remote != null) {
long startTime = System.currentTimeMillis();
Peer peer = new Peer(packet.getAddress(), packet.getPort());
Peer peer = new Peer(remote.getAddress(), remote.getPort());
tracker.receivedPacketFrom(peer);
long endTime = System.currentTimeMillis();
if(endTime - startTime > 50) {
Expand All @@ -248,13 +234,13 @@ private void realRun(DatagramPacket packet) {
if(logMINOR) Logger.minor(this, "packet creation took "+(endTime-startTime)+"ms");
}
}
byte[] data = packet.getData();
int offset = packet.getOffset();
int length = packet.getLength();

try {
if(logMINOR) Logger.minor(this, "Processing packet of length "+length+" from "+peer);
if(logMINOR) {
Logger.minor(this, "Processing packet of length " + receiveBuffer.limit() + " from " + peer);
}
startTime = System.currentTimeMillis();
lowLevelFilter.process(data, offset, length, peer, now);
lowLevelFilter.process(receiveBuffer.array(), 0, receiveBuffer.limit(), peer, now);
endTime = System.currentTimeMillis();
if(endTime - startTime > 50) {
if(endTime-startTime > 3000) {
Expand All @@ -263,8 +249,9 @@ private void realRun(DatagramPacket packet) {
if(logMINOR) Logger.minor(this, "processing packet took "+(endTime-startTime)+"ms");
}
}
if(logMINOR) Logger.minor(this,
"Successfully handled packet length " + length);
if(logMINOR) {
Logger.minor(this, "Successfully handled packet length " + receiveBuffer.limit());
}
} catch (Throwable t) {
Logger.error(this, "Caught " + t + " from "
+ lowLevelFilter, t);
Expand All @@ -276,24 +263,24 @@ private void realRun(DatagramPacket packet) {

private static final int MAX_RECEIVE_SIZE = 1500;

private boolean getPacket(DatagramPacket packet) {
private InetSocketAddress receive() {
try {
_sock.receive(packet);
InetAddress address = packet.getAddress();
boolean isLocal = !IPUtil.isValidAddress(address, false);
collector.addInfo(address, packet.getPort(),
getHeadersLength(address) + packet.getLength(), 0, isLocal);
receiveBuffer.clear();
InetSocketAddress remote = (InetSocketAddress) datagramChannel.receive(receiveBuffer);
receiveBuffer.flip();
int port = remote.getPort();
InetAddress address = remote.getAddress();
collector.addInfo(address, port, getHeadersLength(address) + receiveBuffer.limit(), 0, isLocal(address));
return remote;
} catch (SocketTimeoutException e1) {
return false;
return null;
} catch (IOException e2) {
if (!_active) { // closed, just return silently
return false;
return null;
} else {
throw new RuntimeException(e2);
}
}
if(logMINOR) Logger.minor(this, "Received packet");
return true;
}

/**
Expand All @@ -304,45 +291,43 @@ private boolean getPacket(DatagramPacket packet) {
*/
@Override
public void sendPacket(byte[] blockToSend, Peer destination, boolean allowLocalAddresses) throws LocalAddressException {
assert(blockToSend != null);
if(!_active) {
Logger.error(this, "Trying to send packet but no longer active");
// It is essential that for recording accurate AddressTracker data that we don't send any more
// packets after shutdown.
return;
}

ByteBuffer packet = ByteBuffer.wrap(blockToSend);
int port = destination.getPort();
InetAddress address;
// there should be no DNS needed here, but go ahead if we can, but complain doing it
if( destination.getAddress(false, allowLocalAddresses) == null ) {
if ((address = destination.getAddress(false, allowLocalAddresses)) == null) {
Logger.error(this, "Tried sending to destination without pre-looked up IP address(needs a real Peer.getHostname()): null:" + destination.getPort(), new Exception("error"));
if( destination.getAddress(true, allowLocalAddresses) == null ) {
if ((address = destination.getAddress(true, allowLocalAddresses)) == null) {
Logger.error(this, "Tried sending to bad destination address: null:" + destination.getPort(), new Exception("error"));
return;
}
}
if (_dropProbability > 0) {
if (dropRandom.nextInt() % _dropProbability == 0) {
Logger.normal(this, "DROPPED: " + _sock.getLocalPort() + " -> " + destination.getPort());
Logger.normal(this, "DROPPED: " + localAddress.getPort() + " -> " + destination.getPort());
return;
}
}
InetAddress address = destination.getAddress(false, allowLocalAddresses);
assert(address != null);
int port = destination.getPort();
DatagramPacket packet = new DatagramPacket(blockToSend, blockToSend.length);
packet.setAddress(address);
packet.setPort(port);

try {
_sock.send(packet);
datagramChannel.send(packet, new InetSocketAddress(address, port));
tracker.sentPacketTo(destination);
boolean isLocal = (!IPUtil.isValidAddress(address, false)) && (IPUtil.isValidAddress(address, true));
collector.addInfo(address, port, 0, getHeadersLength(address) + blockToSend.length, isLocal);
if(logMINOR) Logger.minor(this, "Sent packet length "+blockToSend.length+" to "+address+':'+port);
collector.addInfo(address, port, 0, getHeadersLength(address) + blockToSend.length, isLocal(address));
if (logMINOR) {
Logger.minor(this, "Sent packet length " + blockToSend.length + " to " + address + ':' + port);
}
} catch (IOException | UnsupportedAddressTypeException e) {
if(packet.getAddress() instanceof Inet6Address) {
Logger.normal(this, "Error while sending packet to IPv6 address: "+destination+": "+e);
if (address instanceof Inet6Address) {
Logger.normal(this, "Error while sending packet to IPv6 address: " + destination + ": " + e);
} else {
Logger.error(this, "Error while sending packet to " + destination+": "+e, e);
Logger.error(this, "Error while sending packet to " + destination + ": " + e, e);
}
}
}
Expand Down Expand Up @@ -400,14 +385,18 @@ public void start() {
_started = true;
startTime = System.currentTimeMillis();
}
node.getExecutor().execute(this, "UdpSocketHandler for port "+listenPort);
node.getExecutor().execute(this, "UdpSocketHandler for port " + localAddress.getPort());
}

public void close() {
Logger.normal(this, "Closing.", new Exception("error"));
synchronized (this) {
_active = false;
_sock.close();
try {
datagramChannel.close();
} catch (IOException e) {
Logger.error(this, "Error closing DatagramChannel", e);
}

if(!_started) return;
while (!_isDone) {
Expand All @@ -418,7 +407,7 @@ public void close() {
}
}
}
tracker.storeData(node.getBootId(), node.runDir(), listenPort);
tracker.storeData(node.getBootId(), node.runDir(), localAddress.getPort());
}

public int getDropProbability() {
Expand All @@ -430,12 +419,12 @@ public void setDropProbability(int dropProbability) {
}

public int getPortNumber() {
return _sock.getLocalPort();
return localAddress.getPort();
}

@Override
public String toString() {
return _sock.getLocalAddress() + ":" + _sock.getLocalPort();
return localAddress.toString();
}

@Override
Expand Down Expand Up @@ -475,4 +464,8 @@ public long getStartTime() {
return startTime;
}

private static boolean isLocal(InetAddress address) {
return address.isLinkLocalAddress() || address.isLoopbackAddress() || IPUtil.isSiteLocalAddress(address);
}

}