diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 396ad04c..8eadb92d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ concurrency: jobs: lint: name: Swift Lint - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index 9690d75f..22f16c4b 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -8,6 +8,10 @@ private struct Storage { let connection: QuicConnection } +public enum SendError: Error { + case emptyData +} + public final class QuicStream: Sendable { public let id: UniqueId private let logger: Logger @@ -97,9 +101,13 @@ public final class QuicStream: Sendable { throw QuicError.alreadyClosed } - // TODO: improve the case when data is empty let messageLength = data.count + if messageLength == 0 { + logger.trace("No data to send.") + throw SendError.emptyData // Throw a specific error or return + } + let sendBufferRaw = UnsafeMutableRawPointer.allocate( // !! allocate byteCount: MemoryLayout.size + messageLength, alignment: MemoryLayout.alignment @@ -108,7 +116,6 @@ public final class QuicStream: Sendable { let sendBuffer = sendBufferRaw.assumingMemoryBound(to: QUIC_BUFFER.self) let bufferPointer = sendBufferRaw.advanced(by: MemoryLayout.size).assumingMemoryBound(to: UInt8.self) data.copyBytes(to: bufferPointer, count: messageLength) // TODO: figure out a better way to avoid memory copy here - sendBuffer.pointee.Buffer = bufferPointer sendBuffer.pointee.Length = UInt32(messageLength) diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index 2fe24b91..998b2116 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -1,6 +1,7 @@ import AsyncChannels import Foundation import MsQuicSwift +import Synchronization import TracingUtils import Utils @@ -36,7 +37,7 @@ public final class Connection: Sendable, ConnectionInfoP public let role: PeerRole public let remoteAddress: NetAddr - + private let lastActive: Atomic = Atomic(0) let presistentStreams: ThreadSafeContainer< [Handler.PresistentHandler.StreamKind: Stream] > = .init([:]) @@ -58,6 +59,10 @@ public final class Connection: Sendable, ConnectionInfoP } } + func getLastActive() -> TimeInterval { + lastActive.load(ordering: .sequentiallyConsistent) + } + public var id: UniqueId { connection.id } @@ -68,6 +73,11 @@ public final class Connection: Sendable, ConnectionInfoP self.role = role self.remoteAddress = remoteAddress self.initiatedByLocal = initiatedByLocal + updateLastActive() + } + + func updateLastActive() { + lastActive.store(Date().timeIntervalSince1970, ordering: .releasing) } func opened(publicKey: Data) throws { diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 342cd881..eef08566 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -70,7 +70,7 @@ public struct PeerOptions: Sendable { } } -// TODO: reopen UP stream, peer reputation system to ban peers not following the protocol +// TODO: peer reputation system to ban peers not following the protocol public final class Peer: Sendable { private let impl: PeerImpl @@ -271,9 +271,15 @@ final class PeerImpl: Sendable { if role == .builder { let currentCount = connections.byAddr.values.filter { $0.role == role }.count if currentCount >= self.settings.maxBuilderConnections { - self.logger.warning("max builder connections reached") - // TODO: consider connection rotation strategy - return false + if let conn = connections.byAddr.values.filter({ $0.role == .builder }) + .sorted(by: { $0.getLastActive() < $1.getLastActive() }).first + { + self.logger.warning("Replacing least active builder connection at \(conn.remoteAddress)") + conn.close(abort: false) + } else { + self.logger.warning("Max builder connections reached, no eligible replacement found") + return false + } } } if connections.byAddr[addr] != nil { @@ -584,6 +590,10 @@ private struct PeerEventHandler: QuicEventHandler { } if let stream { stream.received(data: data) + let connection = impl.connections.read { connections in + connections.byId[stream.connectionId] + } + connection?.updateLastActive() } } diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index 29b2f3dd..07079bcd 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -143,6 +143,58 @@ struct PeerTests { typealias EphemeralHandler = MockEphemeralStreamHandler } + @Test + func connectionRotationStrategy() async throws { + var peers: [Peer] = [] + var handlers: [MockPresentStreamHandler] = [] + let centerPeer = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + // Create 30 peer nodes + for _ in 0 ..< 30 { + let handler = MockPresentStreamHandler() + handlers.append(handler) + let peer = try Peer( + options: PeerOptions( + role: .builder, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: handler, + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + peers.append(peer) + } + + // Make some connections + for i in 0 ..< 30 { + let peer = peers[i] + let con = try peer.connect(to: centerPeer.listenAddress(), role: .builder) + try await con.ready() + } + // Simulate close connections 3~5s + try? await Task.sleep(for: .milliseconds(5000)) + centerPeer.broadcast(kind: .uniqueA, message: .init(kind: .uniqueA, data: Data("connection rotation strategy".utf8))) + try? await Task.sleep(for: .milliseconds(1000)) + var receivedCount = 0 + for handler in handlers { + receivedCount += await handler.receivedData.count + } + #expect(receivedCount == PeerSettings.defaultSettings.maxBuilderConnections) + } + @Test func mockHandshakeFailure() async throws { let mockPeerTest = try MockPeerEventTests()