Skip to content

Commit

Permalink
Create a socket pool of multiple sockets bound to the same port (#293)
Browse files Browse the repository at this point in the history
So we can send from the same local IP and port from multiple threads simultaneously on Java 18+.

Use it in AbstractUdpListener (and thus SinglePortUdpHarvester).
  • Loading branch information
JonathanLennox authored Oct 22, 2024
1 parent 816c80b commit 1373788
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 63 deletions.
43 changes: 25 additions & 18 deletions src/main/java/org/ice4j/ice/harvest/AbstractUdpListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.ice4j.*;
import org.ice4j.attribute.*;
import org.ice4j.ice.*;
import org.ice4j.message.*;
import org.ice4j.socket.*;
import org.ice4j.util.*;
Expand All @@ -37,7 +36,7 @@
import static org.ice4j.ice.harvest.HarvestConfig.config;

/**
* A class which holds a {@link DatagramSocket} and runs a thread
* A class which holds a {@link SocketPool} and runs a thread
* ({@link #thread}) which perpetually reads from it.
*
* When a datagram from an unknown source is received, it is parsed as a STUN
Expand Down Expand Up @@ -196,13 +195,18 @@ static String getUfrag(byte[] buf, int off, int len)
*/
protected final TransportAddress localAddress;

/**
* The pool of sockets available for writing.
*/
private final SocketPool socketPool;

/**
* The "main" socket that this harvester reads from.
*/
private final DatagramSocket socket;
private final DatagramSocket receiveSocket;

/**
* The thread reading from {@link #socket}.
* The thread reading from {@link #receiveSocket}.
*/
private final Thread thread;

Expand Down Expand Up @@ -236,32 +240,35 @@ protected AbstractUdpListener(TransportAddress localAddress)
);
}

socket = new DatagramSocket( tempAddress );
socketPool = new SocketPool( tempAddress, config.udpSocketPoolSize() );

receiveSocket = socketPool.getReceiveSocket();

Integer receiveBufferSize = config.udpReceiveBufferSize();
if (receiveBufferSize != null)
{
socket.setReceiveBufferSize(receiveBufferSize);
receiveSocket.setReceiveBufferSize(receiveBufferSize);
}

/* Update the port number if needed. */
if (localAddress.getPort() == 0)
{
tempAddress = new TransportAddress(
tempAddress.getAddress(),
socket.getLocalPort(),
receiveSocket.getLocalPort(),
tempAddress.getTransport()
);
}
this.localAddress = tempAddress;

String logMessage
= "Initialized AbstractUdpListener with address " + this.localAddress;
logMessage += ". Receive buffer size " + socket.getReceiveBufferSize();
logMessage += ". Receive buffer size " + receiveSocket.getReceiveBufferSize();
if (receiveBufferSize != null)
{
logMessage += " (asked for " + receiveBufferSize + ")";
}
logMessage += "; socket pool size " + socketPool.getNumSockets();
logger.info(logMessage);

thread = new Thread(() ->
Expand Down Expand Up @@ -292,11 +299,11 @@ public TransportAddress getLocalAddress()
public void close()
{
close = true;
socket.close(); // causes socket#receive to stop blocking.
socketPool.close(); // causes socket#receive to stop blocking.
}

/**
* Perpetually reads datagrams from {@link #socket} and handles them
* Perpetually reads datagrams from {@link #receiveSocket} and handles them
* accordingly.
*
* It is important that this blocks are little as possible (except on
Expand Down Expand Up @@ -326,7 +333,7 @@ private void runInHarvesterThread()

try
{
socket.receive(pkt);
receiveSocket.receive(pkt);
}
catch (IOException ioe)
{
Expand Down Expand Up @@ -376,13 +383,13 @@ private void runInHarvesterThread()
{
candidateSocket.close();
}
socket.close();
socketPool.close();
}

/**
* Read packets from the socket and forward them via the push API. Note that the memory model here is different
* than the other case. Specifically, we:
* 1. Receive from {@link #socket} into a fixed buffer
* 1. Receive from {@link #receiveSocket} into a fixed buffer
* 2. Obtain a buffer of the required size using {@link BufferPool#getBuffer}
* 3. Copy the data into the buffer and either
* 3.1 Call the associated {@link BufferHandler} if the packet is payload
Expand Down Expand Up @@ -410,7 +417,7 @@ private void runInHarvesterThreadPush()

try
{
socket.receive(pkt);
receiveSocket.receive(pkt);
receivedTime = clock.instant();
}
catch (IOException ioe)
Expand Down Expand Up @@ -467,7 +474,7 @@ private void runInHarvesterThreadPush()
{
candidateSocket.close();
}
socket.close();
socketPool.close();
}

private Buffer bufferFromPacket(DatagramPacket p, Instant receivedTime)
Expand All @@ -478,7 +485,7 @@ private Buffer bufferFromPacket(DatagramPacket p, Instant receivedTime)
System.arraycopy(p.getData(), p.getOffset(), buffer.getBuffer(), off, p.getLength());
buffer.setOffset(off);
buffer.setLength(p.getLength());
buffer.setLocalAddress(socket.getLocalSocketAddress());
buffer.setLocalAddress(receiveSocket.getLocalSocketAddress());
buffer.setRemoteAddress(p.getSocketAddress());
buffer.setReceivedTime(receivedTime);

Expand Down Expand Up @@ -808,14 +815,14 @@ public void receive(DatagramPacket p)
/**
* {@inheritDoc}
*
* Delegates to the actual socket of the harvester.
* Delegates to the socket pool.
*/
@Override
public void send(DatagramPacket p)
throws IOException
{
p.setSocketAddress(remoteAddress);
socket.send(p);
socketPool.send(p);
}
}
}
6 changes: 6 additions & 0 deletions src/main/kotlin/org/ice4j/ice/harvest/HarvestConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ class HarvestConfig {
}
fun udpReceiveBufferSize() = udpReceiveBufferSize

val udpSocketPoolSize: Int by config {
"ice4j.harvest.udp.socket-pool-size".from(configSource)
}

fun udpSocketPoolSize() = udpSocketPoolSize

val useIpv6: Boolean by config {
"org.ice4j.ipv6.DISABLED".from(configSource)
.transformedBy { !it }
Expand Down
114 changes: 114 additions & 0 deletions src/main/kotlin/org/ice4j/socket/SocketPool.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright @ 2020 - Present, 8x8 Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.ice4j.socket

import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.DatagramSocketImpl
import java.net.SocketAddress
import java.nio.channels.DatagramChannel

/** A pool of datagram sockets all bound on the same port.
*
* This is necessary to allow multiple threads to send packets simultaneously from the same source address,
* in JDK 15 and later, because the [DatagramChannel]-based implementation of [DatagramSocketImpl] introduced
* in that version locks the socket during a call to [DatagramSocket.send].
*
* (The old [DatagramSocketImpl] implementation can be used by setting the system property
* `jdk.net.usePlainDatagramSocketImpl` in JDK versions 15 through 17, but was removed in versions 18 and later.)
*
* This feature may also be useful on older JDK versions on non-Linux operating systems, such as macOS,
* which block simultaneous writes through the same UDP socket at the operating system level.
*
* The sockets are opened such that packets will be _received_ on exactly one socket.
*/
class SocketPool(
/** The address to which to bind the pool of sockets. */
address: SocketAddress,
/** The number of sockets to create for the pool. If this is set to zero (the default), the number
* will be set automatically to an appropriate value.
*/
requestedNumSockets: Int = 0
) {
init {
require(requestedNumSockets >= 0) { "RequestedNumSockets must be >= 0" }
}

internal class SocketAndIndex(
val socket: DatagramSocket,
var count: Int = 0
)

val numSockets: Int =
if (requestedNumSockets != 0) {
requestedNumSockets
} else {
// TODO: set this to 1 in situations where pools aren't needed?
Runtime.getRuntime().availableProcessors()
}

private val sockets = buildList {
val multipleSockets = numSockets > 1
var bindAddr = address
for (i in 0 until numSockets) {
val sock = DatagramSocket(null)
if (multipleSockets) {
sock.reuseAddress = true
}
sock.bind(bindAddr)
if (i == 0 && multipleSockets) {
bindAddr = sock.localSocketAddress
}
add(SocketAndIndex(sock, 0))
}
}

/** The socket on which packets will be received. */
val receiveSocket: DatagramSocket
// On all platforms I've tested, the last-bound socket is the one which receives packets.
// TODO: should we support Linux's flavor of SO_REUSEPORT, in which packets can be received on *all* the
// sockets, spreading load?
get() = sockets.last().socket

fun send(packet: DatagramPacket) {
val sendSocket = getSendSocket()
sendSocket.socket.send(packet)
returnSocket(sendSocket)
}

/** Gets a socket on which packets can be sent, chosen from among all the available send sockets. */
internal fun getSendSocket(): SocketAndIndex {
if (numSockets == 1) {
return sockets.first()
}
synchronized(sockets) {
val min = sockets.minBy { it.count }
min.count++

return min
}
}

internal fun returnSocket(socket: SocketAndIndex) {
synchronized(sockets) {
socket.count--
}
}

fun close() {
sockets.forEach { it.socket.close() }
}
}
4 changes: 4 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ ice4j {
// Whether to allocate ephemeral ports for local candidates. This is the default value, and can be overridden
// for Agent instances.
use-dynamic-ports = true

// The size of the socket pool to use to send packets on the "single port" harvester. 0 means the
// default (Java's reported number of available processors). 1 is equivalent to not using a socket pool.
socket-pool-size = 0
}

// The list of IP addresses that are allowed to be used for host candidate allocations. When empty, any address is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,51 +31,6 @@
*/
public class SinglePortUdpHarvesterTest
{
/**
* Verifies that, without closing, the address used by a harvester cannot be re-used.
*
* @see <a href="https://github.com/jitsi/ice4j/issues/139">https://github.com/jitsi/ice4j/issues/139</a>
*/
@Test
public void testRebindWithoutCloseThrows() throws Exception
{
// Setup test fixture.
final TransportAddress address = new TransportAddress( "127.0.0.1", 10000, Transport.UDP );
SinglePortUdpHarvester firstHarvester;
try
{
firstHarvester = new SinglePortUdpHarvester( address );
}
catch (BindException ex)
{
// This is not expected at this stage (the port is likely already in use by another process, voiding this
// test). Rethrow as a different exception than the BindException, that is expected to be thrown later in
// this test.
throw new Exception( "Test fixture is invalid.", ex );
}

// Execute system under test.
SinglePortUdpHarvester secondHarvester = null;
try
{
secondHarvester = new SinglePortUdpHarvester( address );
fail("expected BindException to be thrown at this point");
}
catch (BindException ex)
{
//expected, do nothing
}
finally
{
// Tear down
firstHarvester.close();
if (secondHarvester != null)
{
secondHarvester.close();
}
}
}

/**
* Verifies that, after closing, the address used by a harvester can be re-used.
*
Expand Down
5 changes: 5 additions & 0 deletions src/test/kotlin/org/ice4j/ice/harvest/HarvestConfigTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class HarvestConfigTest : ConfigTest() {
config.useIpv6 shouldBe true
config.useLinkLocalAddresses shouldBe true
config.udpReceiveBufferSize shouldBe null
config.udpSocketPoolSize shouldBe 0
config.stunMappingCandidateHarvesterAddresses shouldBe emptyList()
}
context("Setting via legacy config (system properties)") {
Expand All @@ -39,6 +40,7 @@ class HarvestConfigTest : ConfigTest() {
config.useIpv6 shouldBe false
config.useLinkLocalAddresses shouldBe false
config.udpReceiveBufferSize shouldBe 555
config.udpSocketPoolSize shouldBe 0
config.stunMappingCandidateHarvesterAddresses shouldBe listOf("stun1.legacy:555", "stun2.legacy")
}
}
Expand All @@ -49,6 +51,7 @@ class HarvestConfigTest : ConfigTest() {
config.useIpv6 shouldBe false
config.useLinkLocalAddresses shouldBe false
config.udpReceiveBufferSize shouldBe 666
config.udpSocketPoolSize shouldBe 3
config.stunMappingCandidateHarvesterAddresses shouldBe listOf("stun1.new:666", "stun2.new")
}
}
Expand All @@ -60,6 +63,7 @@ class HarvestConfigTest : ConfigTest() {
config.useIpv6 shouldBe false
config.useLinkLocalAddresses shouldBe false
config.udpReceiveBufferSize shouldBe 555
config.udpSocketPoolSize shouldBe 0
config.stunMappingCandidateHarvesterAddresses shouldBe listOf("stun1.legacy:555", "stun2.legacy")
}
}
Expand Down Expand Up @@ -153,6 +157,7 @@ private val newConfigNonDefault = """
udp {
receive-buffer-size = 666
use-dynamic-ports = false
socket-pool-size = 3
}
mapping {
stun {
Expand Down
Loading

0 comments on commit 1373788

Please sign in to comment.