diff --git a/src/freenet/io/comm/UdpSocketHandler.java b/src/freenet/io/comm/UdpSocketHandler.java index 99712220bd..42c5d065b5 100644 --- a/src/freenet/io/comm/UdpSocketHandler.java +++ b/src/freenet/io/comm/UdpSocketHandler.java @@ -1,16 +1,14 @@ package freenet.io.comm; -import java.io.FileDescriptor; import java.io.IOException; import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.DatagramSocketImpl; import java.net.Inet6Address; import java.net.InetAddress; -import java.net.SocketException; +import java.net.InetSocketAddress; import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; import java.nio.channels.UnsupportedAddressTypeException; import java.util.Random; @@ -26,11 +24,13 @@ import freenet.support.Logger; import freenet.support.io.NativeThread; import freenet.support.transport.ip.IPUtil; +import sun.misc.Unsafe; public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, PortForwardSensitiveSocketHandler { - private final DatagramSocket _sock; - private final InetAddress _bindTo; + private final ByteBuffer receiveBuffer = ByteBuffer.allocate(MAX_RECEIVE_SIZE); + private final DatagramChannel datagramChannel; + private final InetSocketAddress localAddress; private final AddressTracker tracker; private IncomingPacketFilter lowLevelFilter; /** RNG for debugging, used with _dropProbability. @@ -45,7 +45,6 @@ public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, Port private static volatile boolean logDEBUG; private boolean _isDone; private volatile boolean _active = true; - private final int listenPort; private final String title; private boolean _started; private long startTime; @@ -94,68 +93,58 @@ public enum SOCKET_ADDR_PREFERENCE { } } - private static int getFd(DatagramSocket s) { - int ret = -1; + private static int getFd(DatagramChannel channel) { try { - Method m = s.getClass().getDeclaredMethod("getImpl"); - m.setAccessible(true); - DatagramSocketImpl impl = (DatagramSocketImpl)m.invoke(s); - Field f = DatagramSocketImpl.class.getDeclaredField("fd"); - f.setAccessible(true); - FileDescriptor fdi = (FileDescriptor)f.get(impl); - f = FileDescriptor.class.getDeclaredField("fd"); - f.setAccessible(true); - ret = f.getInt(fdi); + Field unsafe = Unsafe.class.getDeclaredField("theUnsafe"); + unsafe.setAccessible(true); + Unsafe theUnsafe = (Unsafe) unsafe.get(null); + Field fdVal = channel.getClass().getDeclaredField("fdVal"); + return theUnsafe.getInt(channel, theUnsafe.objectFieldOffset(fdVal)); } catch (Exception e) { Logger.error(UdpSocketHandler.class, e.getMessage(), e); + return -1; } - return ret; } - public static boolean setAddressPreference(DatagramSocket s, SOCKET_ADDR_PREFERENCE p) { - if(!Platform.isLinux()) - return false; - int fd = getFd(s); - if(fd <= 2) - return false; - int ret = -1; + public static boolean setAddressPreference(DatagramChannel channel, SOCKET_ADDR_PREFERENCE p) { + if (!Platform.isLinux()) { + return false; + } + int fd = getFd(channel); + if (fd <= 2) { + return false; + } try { - ret = socketOptionsHolder.setsockopt(fd, SOCKET_level.IPPROTO_IPV6.linux, p.option_name.linux, new IntByReference(p.linux).getPointer(), Native.POINTER_SIZE); - } catch(Exception e) { Logger.normal(UdpSocketHandler.class, e.getMessage(),e); } //if it fails that's fine - return (ret == 0 ? true : false); + int ret = socketOptionsHolder.setsockopt(fd, SOCKET_level.IPPROTO_IPV6.linux, p.option_name.linux, new IntByReference(p.linux).getPointer(), Native.POINTER_SIZE); + return ret == 0; + } catch (Exception e) { + Logger.normal(UdpSocketHandler.class, e.getMessage(), e); + return false; + } } } - public UdpSocketHandler(int listenPort, InetAddress bindto, Node node, long startupTime, String title, IOStatisticCollector collector) throws SocketException { + public UdpSocketHandler(int listenPort, InetAddress bindToAddress, Node node, long startupTime, String title, IOStatisticCollector collector) throws IOException { this.node = node; this.collector = collector; this.title = title; - _bindTo = bindto; - // Keep the Updater code in, just commented out, for now - // We may want to be able to do on-line updates. -// if (Updater.hasResource()) { -// _sock = (DatagramSocket) Updater.getResource(); -// } else { - this.listenPort = listenPort; - _sock = new DatagramSocket(listenPort, bindto); - int sz = _sock.getReceiveBufferSize(); - if(sz < 65536) { - _sock.setReceiveBufferSize(65536); - } + localAddress = new InetSocketAddress(bindToAddress, listenPort); + datagramChannel = DatagramChannel.open() + .bind(localAddress) + .setOption(StandardSocketOptions.SO_RCVBUF, 65536) + .setOption(StandardSocketOptions.SO_REUSEADDR, true); + try { - // Exit reasonably quickly - _sock.setReuseAddress(true); - } catch (SocketException e) { - throw new RuntimeException(e); + datagramChannel.setOption(StandardSocketOptions.IP_TOS, node.getTrafficClass().value); + } catch (UnsupportedOperationException e) { + Logger.error(this, "Failed to set IP_TOS socket option", e); } - try { - _sock.setTrafficClass(node.getTrafficClass().value); - } catch (SocketException e) { - Logger.error(this, "Failed to setTrafficClass with "+node.getTrafficClass().value,e); + + boolean r = socketOptions.setAddressPreference(datagramChannel, socketOptions.SOCKET_ADDR_PREFERENCE.IPV6_PREFER_SRC_PUBLIC); + if(logMINOR) { + Logger.minor(this, "Setting IPV6_PREFER_SRC_PUBLIC for port " + listenPort + " is a " + (r ? "success" : "failure")); } - boolean r = socketOptions.setAddressPreference(_sock, socketOptions.SOCKET_ADDR_PREFERENCE.IPV6_PREFER_SRC_PUBLIC); - if(logMINOR) Logger.minor(this, "Setting IPV6_PREFER_SRC_PUBLIC for port "+ listenPort + " is a "+(r ? "success" : "failure")); -// } + // Only used for debugging, no need to seed from Yarrow dropRandom = node.getFastWeakRandom(); tracker = AddressTracker.create(node.getLastBootId(), node.runDir(), listenPort); @@ -169,7 +158,7 @@ public void setLowLevelFilter(IncomingPacketFilter f) { } public InetAddress getBindTo() { - return _bindTo; + return localAddress.getAddress(); } public String getTitle() { @@ -209,8 +198,8 @@ public void run() { // Listen for packets t.printStackTrace(); } catch (Throwable tt) {} } finally { - System.err.println("run() exiting for UdpSocketHandler on port "+_sock.getLocalPort()); - Logger.error(this, "run() exiting for UdpSocketHandler on port "+_sock.getLocalPort()); + System.err.println("run() exiting for UdpSocketHandler on port " + localAddress.getPort()); + Logger.error(this, "run() exiting for UdpSocketHandler on port " + localAddress.getPort()); synchronized (this) { _isDone = true; notifyAll(); @@ -219,11 +208,9 @@ public void run() { // Listen for packets } private void runLoop() { - byte[] buf = new byte[MAX_RECEIVE_SIZE]; - DatagramPacket packet = new DatagramPacket(buf, buf.length); while (_active) { try { - realRun(packet); + realRun(); } catch (Throwable t) { System.err.println("Caught "+t); t.printStackTrace(System.err); @@ -232,13 +219,12 @@ private void runLoop() { } } - private void realRun(DatagramPacket packet) { - // Single receiving thread - boolean gotPacket = getPacket(packet); + private void realRun() { + InetSocketAddress remote = receive(); long now = System.currentTimeMillis(); - if (gotPacket) { + if (remote != null) { long startTime = System.currentTimeMillis(); - Peer peer = new Peer(packet.getAddress(), packet.getPort()); + Peer peer = new Peer(remote.getAddress(), remote.getPort()); tracker.receivedPacketFrom(peer); long endTime = System.currentTimeMillis(); if(endTime - startTime > 50) { @@ -248,13 +234,13 @@ private void realRun(DatagramPacket packet) { if(logMINOR) Logger.minor(this, "packet creation took "+(endTime-startTime)+"ms"); } } - byte[] data = packet.getData(); - int offset = packet.getOffset(); - int length = packet.getLength(); + try { - if(logMINOR) Logger.minor(this, "Processing packet of length "+length+" from "+peer); + if(logMINOR) { + Logger.minor(this, "Processing packet of length " + receiveBuffer.limit() + " from " + peer); + } startTime = System.currentTimeMillis(); - lowLevelFilter.process(data, offset, length, peer, now); + lowLevelFilter.process(receiveBuffer.array(), 0, receiveBuffer.limit(), peer, now); endTime = System.currentTimeMillis(); if(endTime - startTime > 50) { if(endTime-startTime > 3000) { @@ -263,8 +249,9 @@ private void realRun(DatagramPacket packet) { if(logMINOR) Logger.minor(this, "processing packet took "+(endTime-startTime)+"ms"); } } - if(logMINOR) Logger.minor(this, - "Successfully handled packet length " + length); + if(logMINOR) { + Logger.minor(this, "Successfully handled packet length " + receiveBuffer.limit()); + } } catch (Throwable t) { Logger.error(this, "Caught " + t + " from " + lowLevelFilter, t); @@ -276,24 +263,24 @@ private void realRun(DatagramPacket packet) { private static final int MAX_RECEIVE_SIZE = 1500; - private boolean getPacket(DatagramPacket packet) { + private InetSocketAddress receive() { try { - _sock.receive(packet); - InetAddress address = packet.getAddress(); - boolean isLocal = !IPUtil.isValidAddress(address, false); - collector.addInfo(address, packet.getPort(), - getHeadersLength(address) + packet.getLength(), 0, isLocal); + receiveBuffer.clear(); + InetSocketAddress remote = (InetSocketAddress) datagramChannel.receive(receiveBuffer); + receiveBuffer.flip(); + int port = remote.getPort(); + InetAddress address = remote.getAddress(); + collector.addInfo(address, port, getHeadersLength(address) + receiveBuffer.limit(), 0, isLocal(address)); + return remote; } catch (SocketTimeoutException e1) { - return false; + return null; } catch (IOException e2) { if (!_active) { // closed, just return silently - return false; + return null; } else { throw new RuntimeException(e2); } } - if(logMINOR) Logger.minor(this, "Received packet"); - return true; } /** @@ -304,45 +291,43 @@ private boolean getPacket(DatagramPacket packet) { */ @Override public void sendPacket(byte[] blockToSend, Peer destination, boolean allowLocalAddresses) throws LocalAddressException { - assert(blockToSend != null); if(!_active) { Logger.error(this, "Trying to send packet but no longer active"); // It is essential that for recording accurate AddressTracker data that we don't send any more // packets after shutdown. return; } + + ByteBuffer packet = ByteBuffer.wrap(blockToSend); + int port = destination.getPort(); + InetAddress address; // there should be no DNS needed here, but go ahead if we can, but complain doing it - if( destination.getAddress(false, allowLocalAddresses) == null ) { + if ((address = destination.getAddress(false, allowLocalAddresses)) == null) { Logger.error(this, "Tried sending to destination without pre-looked up IP address(needs a real Peer.getHostname()): null:" + destination.getPort(), new Exception("error")); - if( destination.getAddress(true, allowLocalAddresses) == null ) { + if ((address = destination.getAddress(true, allowLocalAddresses)) == null) { Logger.error(this, "Tried sending to bad destination address: null:" + destination.getPort(), new Exception("error")); return; } } if (_dropProbability > 0) { if (dropRandom.nextInt() % _dropProbability == 0) { - Logger.normal(this, "DROPPED: " + _sock.getLocalPort() + " -> " + destination.getPort()); + Logger.normal(this, "DROPPED: " + localAddress.getPort() + " -> " + destination.getPort()); return; } } - InetAddress address = destination.getAddress(false, allowLocalAddresses); - assert(address != null); - int port = destination.getPort(); - DatagramPacket packet = new DatagramPacket(blockToSend, blockToSend.length); - packet.setAddress(address); - packet.setPort(port); try { - _sock.send(packet); + datagramChannel.send(packet, new InetSocketAddress(address, port)); tracker.sentPacketTo(destination); - boolean isLocal = (!IPUtil.isValidAddress(address, false)) && (IPUtil.isValidAddress(address, true)); - collector.addInfo(address, port, 0, getHeadersLength(address) + blockToSend.length, isLocal); - if(logMINOR) Logger.minor(this, "Sent packet length "+blockToSend.length+" to "+address+':'+port); + collector.addInfo(address, port, 0, getHeadersLength(address) + blockToSend.length, isLocal(address)); + if (logMINOR) { + Logger.minor(this, "Sent packet length " + blockToSend.length + " to " + address + ':' + port); + } } catch (IOException | UnsupportedAddressTypeException e) { - if(packet.getAddress() instanceof Inet6Address) { - Logger.normal(this, "Error while sending packet to IPv6 address: "+destination+": "+e); + if (address instanceof Inet6Address) { + Logger.normal(this, "Error while sending packet to IPv6 address: " + destination + ": " + e); } else { - Logger.error(this, "Error while sending packet to " + destination+": "+e, e); + Logger.error(this, "Error while sending packet to " + destination + ": " + e, e); } } } @@ -400,14 +385,18 @@ public void start() { _started = true; startTime = System.currentTimeMillis(); } - node.getExecutor().execute(this, "UdpSocketHandler for port "+listenPort); + node.getExecutor().execute(this, "UdpSocketHandler for port " + localAddress.getPort()); } public void close() { Logger.normal(this, "Closing.", new Exception("error")); synchronized (this) { _active = false; - _sock.close(); + try { + datagramChannel.close(); + } catch (IOException e) { + Logger.error(this, "Error closing DatagramChannel", e); + } if(!_started) return; while (!_isDone) { @@ -418,7 +407,7 @@ public void close() { } } } - tracker.storeData(node.getBootId(), node.runDir(), listenPort); + tracker.storeData(node.getBootId(), node.runDir(), localAddress.getPort()); } public int getDropProbability() { @@ -430,12 +419,12 @@ public void setDropProbability(int dropProbability) { } public int getPortNumber() { - return _sock.getLocalPort(); + return localAddress.getPort(); } @Override public String toString() { - return _sock.getLocalAddress() + ":" + _sock.getLocalPort(); + return localAddress.toString(); } @Override @@ -475,4 +464,8 @@ public long getStartTime() { return startTime; } + private static boolean isLocal(InetAddress address) { + return address.isLinkLocalAddress() || address.isLoopbackAddress() || IPUtil.isSiteLocalAddress(address); + } + }