Skip to content

Commit

Permalink
connection rotation strategy (#224)
Browse files Browse the repository at this point in the history
* update reopen up stream

* update reopen up stream

* update reopen up stream

* update test

* update connection rotation strategy

* update connection rotation strategy

* update peer test

* update test

* adjust memory copy

* Update Networking/Sources/MsQuicSwift/QuicStream.swift

* try different runner image

* try ubuntu-24.04

* update test

---------

Co-authored-by: Xiliang Chen <[email protected]>
  • Loading branch information
MacOMNI and xlc authored Nov 25, 2024
1 parent eb7a122 commit e50528b
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ concurrency:
jobs:
lint:
name: Swift Lint
runs-on: ubuntu-latest
runs-on: ubuntu-24.04

steps:
- uses: actions/checkout@v4
Expand Down
11 changes: 9 additions & 2 deletions Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ private struct Storage {
let connection: QuicConnection
}

public enum SendError: Error {
case emptyData
}

public final class QuicStream: Sendable {
public let id: UniqueId
private let logger: Logger
Expand Down Expand Up @@ -97,9 +101,13 @@ public final class QuicStream: Sendable {
throw QuicError.alreadyClosed
}

// TODO: improve the case when data is empty
let messageLength = data.count

if messageLength == 0 {
logger.trace("No data to send.")
throw SendError.emptyData // Throw a specific error or return
}

let sendBufferRaw = UnsafeMutableRawPointer.allocate( // !! allocate
byteCount: MemoryLayout<QUIC_BUFFER>.size + messageLength,
alignment: MemoryLayout<QUIC_BUFFER>.alignment
Expand All @@ -108,7 +116,6 @@ public final class QuicStream: Sendable {
let sendBuffer = sendBufferRaw.assumingMemoryBound(to: QUIC_BUFFER.self)
let bufferPointer = sendBufferRaw.advanced(by: MemoryLayout<QUIC_BUFFER>.size).assumingMemoryBound(to: UInt8.self)
data.copyBytes(to: bufferPointer, count: messageLength) // TODO: figure out a better way to avoid memory copy here

sendBuffer.pointee.Buffer = bufferPointer
sendBuffer.pointee.Length = UInt32(messageLength)

Expand Down
12 changes: 11 additions & 1 deletion Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import AsyncChannels
import Foundation
import MsQuicSwift
import Synchronization
import TracingUtils
import Utils

Expand Down Expand Up @@ -36,7 +37,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP

public let role: PeerRole
public let remoteAddress: NetAddr

private let lastActive: Atomic<TimeInterval> = Atomic(0)
let presistentStreams: ThreadSafeContainer<
[Handler.PresistentHandler.StreamKind: Stream<Handler>]
> = .init([:])
Expand All @@ -58,6 +59,10 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}
}

func getLastActive() -> TimeInterval {
lastActive.load(ordering: .sequentiallyConsistent)
}

public var id: UniqueId {
connection.id
}
Expand All @@ -68,6 +73,11 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
self.role = role
self.remoteAddress = remoteAddress
self.initiatedByLocal = initiatedByLocal
updateLastActive()
}

func updateLastActive() {
lastActive.store(Date().timeIntervalSince1970, ordering: .releasing)
}

func opened(publicKey: Data) throws {
Expand Down
18 changes: 14 additions & 4 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public struct PeerOptions<Handler: StreamHandler>: Sendable {
}
}

// TODO: reopen UP stream, peer reputation system to ban peers not following the protocol
// TODO: peer reputation system to ban peers not following the protocol
public final class Peer<Handler: StreamHandler>: Sendable {
private let impl: PeerImpl<Handler>

Expand Down Expand Up @@ -271,9 +271,15 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
if role == .builder {
let currentCount = connections.byAddr.values.filter { $0.role == role }.count
if currentCount >= self.settings.maxBuilderConnections {
self.logger.warning("max builder connections reached")
// TODO: consider connection rotation strategy
return false
if let conn = connections.byAddr.values.filter({ $0.role == .builder })
.sorted(by: { $0.getLastActive() < $1.getLastActive() }).first
{
self.logger.warning("Replacing least active builder connection at \(conn.remoteAddress)")
conn.close(abort: false)
} else {
self.logger.warning("Max builder connections reached, no eligible replacement found")
return false
}
}
}
if connections.byAddr[addr] != nil {
Expand Down Expand Up @@ -584,6 +590,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
if let stream {
stream.received(data: data)
let connection = impl.connections.read { connections in
connections.byId[stream.connectionId]
}
connection?.updateLastActive()
}
}

Expand Down
52 changes: 52 additions & 0 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,58 @@ struct PeerTests {
typealias EphemeralHandler = MockEphemeralStreamHandler
}

@Test
func connectionRotationStrategy() async throws {
var peers: [Peer<MockStreamHandler>] = []
var handlers: [MockPresentStreamHandler] = []
let centerPeer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
// Create 30 peer nodes
for _ in 0 ..< 30 {
let handler = MockPresentStreamHandler()
handlers.append(handler)
let peer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .builder,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
peers.append(peer)
}

// Make some connections
for i in 0 ..< 30 {
let peer = peers[i]
let con = try peer.connect(to: centerPeer.listenAddress(), role: .builder)
try await con.ready()
}
// Simulate close connections 3~5s
try? await Task.sleep(for: .milliseconds(5000))
centerPeer.broadcast(kind: .uniqueA, message: .init(kind: .uniqueA, data: Data("connection rotation strategy".utf8)))
try? await Task.sleep(for: .milliseconds(1000))
var receivedCount = 0
for handler in handlers {
receivedCount += await handler.receivedData.count
}
#expect(receivedCount == PeerSettings.defaultSettings.maxBuilderConnections)
}

@Test
func mockHandshakeFailure() async throws {
let mockPeerTest = try MockPeerEventTests()
Expand Down

0 comments on commit e50528b

Please sign in to comment.