Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection rotation strategy #224

Merged
merged 16 commits into from
Nov 25, 2024
11 changes: 9 additions & 2 deletions Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ private struct Storage {
let connection: QuicConnection
}

public enum SendError: Error {
case emptyData
}

public final class QuicStream: Sendable {
public let id: UniqueId
private let logger: Logger
Expand Down Expand Up @@ -97,9 +101,13 @@ public final class QuicStream: Sendable {
throw QuicError.alreadyClosed
}

// TODO: improve the case when data is empty
let messageLength = data.count

if messageLength == 0 {
logger.trace("No data to send.")
throw SendError.emptyData // Throw a specific error or return
}

let sendBufferRaw = UnsafeMutableRawPointer.allocate( // !! allocate
byteCount: MemoryLayout<QUIC_BUFFER>.size + messageLength,
alignment: MemoryLayout<QUIC_BUFFER>.alignment
Expand All @@ -108,7 +116,6 @@ public final class QuicStream: Sendable {
let sendBuffer = sendBufferRaw.assumingMemoryBound(to: QUIC_BUFFER.self)
let bufferPointer = sendBufferRaw.advanced(by: MemoryLayout<QUIC_BUFFER>.size).assumingMemoryBound(to: UInt8.self)
data.copyBytes(to: bufferPointer, count: messageLength) // TODO: figure out a better way to avoid memory copy here

sendBuffer.pointee.Buffer = bufferPointer
sendBuffer.pointee.Length = UInt32(messageLength)

Expand Down
12 changes: 11 additions & 1 deletion Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import AsyncChannels
import Foundation
import MsQuicSwift
import Synchronization
import TracingUtils
import Utils

Expand Down Expand Up @@ -36,7 +37,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP

public let role: PeerRole
public let remoteAddress: NetAddr

private let lastActive: Atomic<TimeInterval> = Atomic(0)
let presistentStreams: ThreadSafeContainer<
[Handler.PresistentHandler.StreamKind: Stream<Handler>]
> = .init([:])
Expand All @@ -58,6 +59,10 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}
}

func getLastActive() -> TimeInterval {
lastActive.load(ordering: .sequentiallyConsistent)
}

public var id: UniqueId {
connection.id
}
Expand All @@ -68,6 +73,11 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
self.role = role
self.remoteAddress = remoteAddress
self.initiatedByLocal = initiatedByLocal
updateLastActive()
}

func updateLastActive() {
lastActive.store(Date().timeIntervalSince1970, ordering: .releasing)
}

func opened(publicKey: Data) throws {
Expand Down
18 changes: 14 additions & 4 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public struct PeerOptions<Handler: StreamHandler>: Sendable {
}
}

// TODO: reopen UP stream, peer reputation system to ban peers not following the protocol
// TODO: peer reputation system to ban peers not following the protocol
public final class Peer<Handler: StreamHandler>: Sendable {
private let impl: PeerImpl<Handler>

Expand Down Expand Up @@ -271,9 +271,15 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
if role == .builder {
let currentCount = connections.byAddr.values.filter { $0.role == role }.count
if currentCount >= self.settings.maxBuilderConnections {
self.logger.warning("max builder connections reached")
// TODO: consider connection rotation strategy
return false
if let conn = connections.byAddr.values.filter({ $0.role == .builder })
.sorted(by: { $0.getLastActive() < $1.getLastActive() }).first
{
self.logger.warning("Replacing least active builder connection at \(conn.remoteAddress)")
conn.close(abort: false)
} else {
self.logger.warning("Max builder connections reached, no eligible replacement found")
return false
}
}
}
if connections.byAddr[addr] != nil {
Expand Down Expand Up @@ -590,6 +596,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
if let stream {
stream.received(data: data)
let connection = impl.connections.read { connections in
connections.byId[stream.connectionId]
}
connection?.updateLastActive()
}
}

Expand Down
52 changes: 52 additions & 0 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,58 @@ struct PeerTests {
typealias EphemeralHandler = MockEphemeralStreamHandler
}

@Test
func connectionRotationStrategy() async throws {
var peers: [Peer<MockStreamHandler>] = []
var handlers: [MockPresentStreamHandler] = []
let centerPeer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
// Create 30 peer nodes
for _ in 0 ..< 30 {
let handler = MockPresentStreamHandler()
handlers.append(handler)
let peer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .builder,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
peers.append(peer)
}

// Make some connections
for i in 0 ..< 30 {
let peer = peers[i]
let con = try peer.connect(to: centerPeer.listenAddress(), role: .builder)
try await con.ready()
}
// Simulate close connections 3~5s
try? await Task.sleep(for: .milliseconds(5000))
centerPeer.broadcast(kind: .uniqueA, message: .init(kind: .uniqueA, data: Data("connection rotation strategy".utf8)))
try? await Task.sleep(for: .milliseconds(1000))
var receivedCount = 0
for handler in handlers {
receivedCount += await handler.receivedData.count
}
#expect(receivedCount == PeerSettings.defaultSettings.maxBuilderConnections)
}

@Test
func reopenUpStream() async throws {
let handler2 = MockPresentStreamHandler()
Expand Down