Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a socket pool of multiple sockets bound to the same port #293

Merged
merged 24 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
933885a
Add socket pool, to allow sending to a single socket from multiple th…
JonathanLennox Oct 14, 2024
8ef4273
Fix race: Return socket before releasing semaphore.
JonathanLennox Oct 14, 2024
dff4051
Add main() method to SocketPoolTest.
JonathanLennox Oct 14, 2024
753eb8a
Change test sender to be more realistic.
JonathanLennox Oct 14, 2024
d2e792a
Remove blocking inside SocketPool.
JonathanLennox Oct 15, 2024
0800279
More timing tests.
JonathanLennox Oct 15, 2024
4e4afae
Improve timing test more.
JonathanLennox Oct 15, 2024
5470f0a
Ktlint fix.
JonathanLennox Oct 15, 2024
af1c1e6
Simplify socket pool index calculation.
JonathanLennox Oct 15, 2024
4403e8a
Command-line args for SocketPoolTest main().
JonathanLennox Oct 15, 2024
5da22ee
Fix output
JonathanLennox Oct 15, 2024
8f49368
Add warmup for testSendingOnce.
JonathanLennox Oct 15, 2024
6182860
Make order of command-line arguments match output
JonathanLennox Oct 15, 2024
46acf6a
Make send the API on socketpool, so socket choice can be internal.
JonathanLennox Oct 21, 2024
7b3b5a9
Add experimental API annotation.
JonathanLennox Oct 21, 2024
f752308
Keep a lightweight count of which sockets are most heavily used.
JonathanLennox Oct 21, 2024
0950c10
Minor cleanups.
JonathanLennox Oct 21, 2024
902aca9
Change default pool size.
JonathanLennox Oct 21, 2024
c85bdc0
Use SocketPool in AbstractUdpListener.
JonathanLennox Oct 21, 2024
3c39601
Revert "Change default pool size."
JonathanLennox Oct 22, 2024
5017a4f
Make the single-port harvester's socket pool size configurable.
JonathanLennox Oct 22, 2024
eb4511c
Disable SocketPool performance tests by default.
JonathanLennox Oct 22, 2024
07c3ee1
Merge remote-tracking branch 'origin/master' into socketpool
JonathanLennox Oct 22, 2024
af99d83
Log AbstractUdpListener's socket pool size.
JonathanLennox Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions src/main/java/org/ice4j/ice/harvest/AbstractUdpListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.ice4j.*;
import org.ice4j.attribute.*;
import org.ice4j.ice.*;
import org.ice4j.message.*;
import org.ice4j.socket.*;
import org.ice4j.util.*;
Expand All @@ -37,7 +36,7 @@
import static org.ice4j.ice.harvest.HarvestConfig.config;

/**
* A class which holds a {@link DatagramSocket} and runs a thread
* A class which holds a {@link SocketPool} and runs a thread
* ({@link #thread}) which perpetually reads from it.
*
* When a datagram from an unknown source is received, it is parsed as a STUN
Expand Down Expand Up @@ -196,13 +195,18 @@ static String getUfrag(byte[] buf, int off, int len)
*/
protected final TransportAddress localAddress;

/**
* The pool of sockets available for writing.
*/
private final SocketPool socketPool;

/**
* The "main" socket that this harvester reads from.
*/
private final DatagramSocket socket;
private final DatagramSocket receiveSocket;

/**
* The thread reading from {@link #socket}.
* The thread reading from {@link #receiveSocket}.
*/
private final Thread thread;

Expand Down Expand Up @@ -236,32 +240,35 @@ protected AbstractUdpListener(TransportAddress localAddress)
);
}

socket = new DatagramSocket( tempAddress );
socketPool = new SocketPool( tempAddress, config.udpSocketPoolSize() );

receiveSocket = socketPool.getReceiveSocket();

Integer receiveBufferSize = config.udpReceiveBufferSize();
if (receiveBufferSize != null)
{
socket.setReceiveBufferSize(receiveBufferSize);
receiveSocket.setReceiveBufferSize(receiveBufferSize);
}

/* Update the port number if needed. */
if (localAddress.getPort() == 0)
{
tempAddress = new TransportAddress(
tempAddress.getAddress(),
socket.getLocalPort(),
receiveSocket.getLocalPort(),
tempAddress.getTransport()
);
}
this.localAddress = tempAddress;

String logMessage
= "Initialized AbstractUdpListener with address " + this.localAddress;
logMessage += ". Receive buffer size " + socket.getReceiveBufferSize();
logMessage += ". Receive buffer size " + receiveSocket.getReceiveBufferSize();
if (receiveBufferSize != null)
{
logMessage += " (asked for " + receiveBufferSize + ")";
}
logMessage += "; socket pool size " + socketPool.getNumSockets();
logger.info(logMessage);

thread = new Thread(() ->
Expand Down Expand Up @@ -292,11 +299,11 @@ public TransportAddress getLocalAddress()
public void close()
{
close = true;
socket.close(); // causes socket#receive to stop blocking.
socketPool.close(); // causes socket#receive to stop blocking.
}

/**
* Perpetually reads datagrams from {@link #socket} and handles them
* Perpetually reads datagrams from {@link #receiveSocket} and handles them
* accordingly.
*
* It is important that this blocks are little as possible (except on
Expand Down Expand Up @@ -326,7 +333,7 @@ private void runInHarvesterThread()

try
{
socket.receive(pkt);
receiveSocket.receive(pkt);
}
catch (IOException ioe)
{
Expand Down Expand Up @@ -376,13 +383,13 @@ private void runInHarvesterThread()
{
candidateSocket.close();
}
socket.close();
socketPool.close();
}

/**
* Read packets from the socket and forward them via the push API. Note that the memory model here is different
* than the other case. Specifically, we:
* 1. Receive from {@link #socket} into a fixed buffer
* 1. Receive from {@link #receiveSocket} into a fixed buffer
* 2. Obtain a buffer of the required size using {@link BufferPool#getBuffer}
* 3. Copy the data into the buffer and either
* 3.1 Call the associated {@link BufferHandler} if the packet is payload
Expand Down Expand Up @@ -410,7 +417,7 @@ private void runInHarvesterThreadPush()

try
{
socket.receive(pkt);
receiveSocket.receive(pkt);
receivedTime = clock.instant();
}
catch (IOException ioe)
Expand Down Expand Up @@ -467,7 +474,7 @@ private void runInHarvesterThreadPush()
{
candidateSocket.close();
}
socket.close();
socketPool.close();
}

private Buffer bufferFromPacket(DatagramPacket p, Instant receivedTime)
Expand All @@ -478,7 +485,7 @@ private Buffer bufferFromPacket(DatagramPacket p, Instant receivedTime)
System.arraycopy(p.getData(), p.getOffset(), buffer.getBuffer(), off, p.getLength());
buffer.setOffset(off);
buffer.setLength(p.getLength());
buffer.setLocalAddress(socket.getLocalSocketAddress());
buffer.setLocalAddress(receiveSocket.getLocalSocketAddress());
buffer.setRemoteAddress(p.getSocketAddress());
buffer.setReceivedTime(receivedTime);

Expand Down Expand Up @@ -808,14 +815,14 @@ public void receive(DatagramPacket p)
/**
* {@inheritDoc}
*
* Delegates to the actual socket of the harvester.
* Delegates to the socket pool.
*/
@Override
public void send(DatagramPacket p)
throws IOException
{
p.setSocketAddress(remoteAddress);
socket.send(p);
socketPool.send(p);
}
}
}
6 changes: 6 additions & 0 deletions src/main/kotlin/org/ice4j/ice/harvest/HarvestConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ class HarvestConfig {
}
fun udpReceiveBufferSize() = udpReceiveBufferSize

val udpSocketPoolSize: Int by config {
"ice4j.harvest.udp.socket-pool-size".from(configSource)
}

fun udpSocketPoolSize() = udpSocketPoolSize

val useIpv6: Boolean by config {
"org.ice4j.ipv6.DISABLED".from(configSource)
.transformedBy { !it }
Expand Down
114 changes: 114 additions & 0 deletions src/main/kotlin/org/ice4j/socket/SocketPool.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright @ 2020 - Present, 8x8 Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.ice4j.socket

import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.DatagramSocketImpl
import java.net.SocketAddress
import java.nio.channels.DatagramChannel

/** A pool of datagram sockets all bound on the same port.
*
* This is necessary to allow multiple threads to send packets simultaneously from the same source address,
* in JDK 15 and later, because the [DatagramChannel]-based implementation of [DatagramSocketImpl] introduced
* in that version locks the socket during a call to [DatagramSocket.send].
*
* (The old [DatagramSocketImpl] implementation can be used by setting the system property
* `jdk.net.usePlainDatagramSocketImpl` in JDK versions 15 through 17, but was removed in versions 18 and later.)
*
* This feature may also be useful on older JDK versions on non-Linux operating systems, such as macOS,
* which block simultaneous writes through the same UDP socket at the operating system level.
*
* The sockets are opened such that packets will be _received_ on exactly one socket.
*/
class SocketPool(
/** The address to which to bind the pool of sockets. */
address: SocketAddress,
/** The number of sockets to create for the pool. If this is set to zero (the default), the number
* will be set automatically to an appropriate value.
*/
requestedNumSockets: Int = 0
) {
init {
require(requestedNumSockets >= 0) { "RequestedNumSockets must be >= 0" }
}

internal class SocketAndIndex(
val socket: DatagramSocket,
var count: Int = 0
)

val numSockets: Int =
if (requestedNumSockets != 0) {
requestedNumSockets
} else {
// TODO: set this to 1 in situations where pools aren't needed?
Runtime.getRuntime().availableProcessors()
}

private val sockets = buildList {
val multipleSockets = numSockets > 1
var bindAddr = address
for (i in 0 until numSockets) {
val sock = DatagramSocket(null)
if (multipleSockets) {
sock.reuseAddress = true
}
sock.bind(bindAddr)
if (i == 0 && multipleSockets) {
bindAddr = sock.localSocketAddress
}
add(SocketAndIndex(sock, 0))
}
}

/** The socket on which packets will be received. */
val receiveSocket: DatagramSocket
// On all platforms I've tested, the last-bound socket is the one which receives packets.
// TODO: should we support Linux's flavor of SO_REUSEPORT, in which packets can be received on *all* the
// sockets, spreading load?
get() = sockets.last().socket

fun send(packet: DatagramPacket) {
val sendSocket = getSendSocket()
sendSocket.socket.send(packet)
returnSocket(sendSocket)
}

/** Gets a socket on which packets can be sent, chosen from among all the available send sockets. */
internal fun getSendSocket(): SocketAndIndex {
if (numSockets == 1) {
return sockets.first()
}
synchronized(sockets) {
val min = sockets.minBy { it.count }
min.count++

return min
}
}

internal fun returnSocket(socket: SocketAndIndex) {
synchronized(sockets) {
socket.count--
}
}

fun close() {
sockets.forEach { it.socket.close() }
}
}
4 changes: 4 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ ice4j {
// Whether to allocate ephemeral ports for local candidates. This is the default value, and can be overridden
// for Agent instances.
use-dynamic-ports = true

// The size of the socket pool to use to send packets on the "single port" harvester. 0 means the
// default (Java's reported number of available processors). 1 is equivalent to not using a socket pool.
socket-pool-size = 0
}

// The list of IP addresses that are allowed to be used for host candidate allocations. When empty, any address is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,51 +31,6 @@
*/
public class SinglePortUdpHarvesterTest
{
/**
* Verifies that, without closing, the address used by a harvester cannot be re-used.
*
* @see <a href="https://github.com/jitsi/ice4j/issues/139">https://github.com/jitsi/ice4j/issues/139</a>
*/
@Test
public void testRebindWithoutCloseThrows() throws Exception
{
// Setup test fixture.
final TransportAddress address = new TransportAddress( "127.0.0.1", 10000, Transport.UDP );
SinglePortUdpHarvester firstHarvester;
try
{
firstHarvester = new SinglePortUdpHarvester( address );
}
catch (BindException ex)
{
// This is not expected at this stage (the port is likely already in use by another process, voiding this
// test). Rethrow as a different exception than the BindException, that is expected to be thrown later in
// this test.
throw new Exception( "Test fixture is invalid.", ex );
}

// Execute system under test.
SinglePortUdpHarvester secondHarvester = null;
try
{
secondHarvester = new SinglePortUdpHarvester( address );
fail("expected BindException to be thrown at this point");
}
catch (BindException ex)
{
//expected, do nothing
}
finally
{
// Tear down
firstHarvester.close();
if (secondHarvester != null)
{
secondHarvester.close();
}
}
}

/**
* Verifies that, after closing, the address used by a harvester can be re-used.
*
Expand Down
5 changes: 5 additions & 0 deletions src/test/kotlin/org/ice4j/ice/harvest/HarvestConfigTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class HarvestConfigTest : ConfigTest() {
config.useIpv6 shouldBe true
config.useLinkLocalAddresses shouldBe true
config.udpReceiveBufferSize shouldBe null
config.udpSocketPoolSize shouldBe 0
config.stunMappingCandidateHarvesterAddresses shouldBe emptyList()
}
context("Setting via legacy config (system properties)") {
Expand All @@ -39,6 +40,7 @@ class HarvestConfigTest : ConfigTest() {
config.useIpv6 shouldBe false
config.useLinkLocalAddresses shouldBe false
config.udpReceiveBufferSize shouldBe 555
config.udpSocketPoolSize shouldBe 0
config.stunMappingCandidateHarvesterAddresses shouldBe listOf("stun1.legacy:555", "stun2.legacy")
}
}
Expand All @@ -49,6 +51,7 @@ class HarvestConfigTest : ConfigTest() {
config.useIpv6 shouldBe false
config.useLinkLocalAddresses shouldBe false
config.udpReceiveBufferSize shouldBe 666
config.udpSocketPoolSize shouldBe 3
config.stunMappingCandidateHarvesterAddresses shouldBe listOf("stun1.new:666", "stun2.new")
}
}
Expand All @@ -60,6 +63,7 @@ class HarvestConfigTest : ConfigTest() {
config.useIpv6 shouldBe false
config.useLinkLocalAddresses shouldBe false
config.udpReceiveBufferSize shouldBe 555
config.udpSocketPoolSize shouldBe 0
config.stunMappingCandidateHarvesterAddresses shouldBe listOf("stun1.legacy:555", "stun2.legacy")
}
}
Expand Down Expand Up @@ -153,6 +157,7 @@ private val newConfigNonDefault = """
udp {
receive-buffer-size = 666
use-dynamic-ports = false
socket-pool-size = 3
}
mapping {
stun {
Expand Down
Loading
Loading