From 4626c643149320a35324e789ff628b1018cdeda6 Mon Sep 17 00:00:00 2001 From: Xiliang Chen Date: Thu, 21 Nov 2024 15:27:46 +1300 Subject: [PATCH 1/3] Rpc refactoring (#227) * rpc wip * wip * server name could be null * rpc refactoring * show rpc methods * fix --- .../Blockchain/State/State+Genesis.swift | 2 - .../Sources/Blockchain/State/State.swift | 8 ++ .../Blockchain/State/StateBackend.swift | 8 +- .../Sources/Blockchain/State/StateLayer.swift | 111 ++++++++++-------- Boka/Sources/Boka.swift | 3 +- Codec/Sources/Codec/JamEncoder.swift | 4 +- .../Sources/MsQuicSwift/QuicListener.swift | 2 +- Node/Sources/Node/Genesis.swift | 2 +- Node/Sources/Node/Node.swift | 17 ++- Node/Sources/Node/NodeDataSource.swift | 29 +++-- RPC/Sources/RPC/DataSource/DataSource.swift | 14 ++- RPC/Sources/RPC/Handlers/ChainHandler.swift | 59 ---------- RPC/Sources/RPC/Handlers/ChainHandlers.swift | 31 +++++ RPC/Sources/RPC/Handlers/RPCHandlers.swift | 25 ++++ RPC/Sources/RPC/Handlers/SystemHandler.swift | 18 --- RPC/Sources/RPC/Handlers/SystemHandlers.swift | 30 +++++ .../RPC/Handlers/TelemetryHandler.swift | 26 ---- .../RPC/Handlers/TelemetryHandlers.swift | 51 ++++++++ RPC/Sources/RPC/JSONRPC/FromJSON.swift | 88 ++++++++++++++ RPC/Sources/RPC/JSONRPC/JSONRPC.swift | 18 ++- .../RPC/JSONRPC/JSONRPCController.swift | 35 ++++-- RPC/Sources/RPC/JSONRPC/RPCHandler.swift | 24 ++++ RPC/Sources/RPC/Server.swift | 7 +- .../RPCTests/JSONRPCControllerTests.swift | 4 +- Utils/Sources/Utils/JSON.swift | 8 ++ 25 files changed, 434 insertions(+), 190 deletions(-) delete mode 100644 RPC/Sources/RPC/Handlers/ChainHandler.swift create mode 100644 RPC/Sources/RPC/Handlers/ChainHandlers.swift create mode 100644 RPC/Sources/RPC/Handlers/RPCHandlers.swift delete mode 100644 RPC/Sources/RPC/Handlers/SystemHandler.swift create mode 100644 RPC/Sources/RPC/Handlers/SystemHandlers.swift delete mode 100644 RPC/Sources/RPC/Handlers/TelemetryHandler.swift create mode 100644 RPC/Sources/RPC/Handlers/TelemetryHandlers.swift create mode 100644 RPC/Sources/RPC/JSONRPC/FromJSON.swift create mode 100644 RPC/Sources/RPC/JSONRPC/RPCHandler.swift diff --git a/Blockchain/Sources/Blockchain/State/State+Genesis.swift b/Blockchain/Sources/Blockchain/State/State+Genesis.swift index 7e142721..35452299 100644 --- a/Blockchain/Sources/Blockchain/State/State+Genesis.swift +++ b/Blockchain/Sources/Blockchain/State/State+Genesis.swift @@ -42,6 +42,4 @@ extension State { return (StateRef(state), block) } - // TODO: add file genesis - // public static func fileGenesis(config: ProtocolConfigRef) throws -> State } diff --git a/Blockchain/Sources/Blockchain/State/State.swift b/Blockchain/Sources/Blockchain/State/State.swift index 6295ae5e..1f5eec79 100644 --- a/Blockchain/Sources/Blockchain/State/State.swift +++ b/Blockchain/Sources/Blockchain/State/State.swift @@ -230,6 +230,14 @@ public struct State: Sendable { await backend.rootHash } } + + public func read(key: Data32) async throws -> Data? { + let res = try layer[key].map { try JamEncoder.encode($0) } + if let res { + return res + } + return try await backend.readRaw(key) + } } extension State { diff --git a/Blockchain/Sources/Blockchain/State/StateBackend.swift b/Blockchain/Sources/Blockchain/State/StateBackend.swift index 36ecf36f..d9d05da0 100644 --- a/Blockchain/Sources/Blockchain/State/StateBackend.swift +++ b/Blockchain/Sources/Blockchain/State/StateBackend.swift @@ -47,11 +47,15 @@ public final class StateBackend: Sendable { return ret } - public func write(_ values: any Sequence<(key: any StateKey, value: (Codable & Sendable)?)>) async throws { - try await trie.update(values.map { try (key: $0.key.encode(), value: $0.value.map { try JamEncoder.encode($0) }) }) + public func write(_ values: any Sequence<(key: Data32, value: (Codable & Sendable)?)>) async throws { + try await trie.update(values.map { try (key: $0.key, value: $0.value.map { try JamEncoder.encode($0) }) }) try await trie.save() } + public func readRaw(_ key: Data32) async throws -> Data? { + try await trie.read(key: key) + } + public func writeRaw(_ values: [(key: Data32, value: Data?)]) async throws { try await trie.update(values) try await trie.save() diff --git a/Blockchain/Sources/Blockchain/State/StateLayer.swift b/Blockchain/Sources/Blockchain/State/StateLayer.swift index 8bfb413e..3ee3d1ed 100644 --- a/Blockchain/Sources/Blockchain/State/StateLayer.swift +++ b/Blockchain/Sources/Blockchain/State/StateLayer.swift @@ -22,190 +22,190 @@ private enum StateLayerValue: Sendable { } // @unchecked because AnyHashable is not Sendable -public struct StateLayer: @unchecked Sendable { - private var changes: [AnyHashable: StateLayerValue] = [:] +public struct StateLayer: Sendable { + private var changes: [Data32: StateLayerValue] = [:] public init(backend: StateBackend) async throws { let results = try await backend.batchRead(StateKeys.prefetchKeys) for (key, value) in results { - changes[AnyHashable(key)] = try .init(value.unwrap()) + changes[key.encode()] = try .init(value.unwrap()) } } public init(changes: [(key: any StateKey, value: Codable & Sendable)]) { for (key, value) in changes { - self.changes[AnyHashable(key)] = .value(value) + self.changes[key.encode()] = .value(value) } } // α: The core αuthorizations pool. public var coreAuthorizationPool: StateKeys.CoreAuthorizationPoolKey.Value { get { - changes[StateKeys.CoreAuthorizationPoolKey()]!.value()! + changes[StateKeys.CoreAuthorizationPoolKey().encode()]!.value()! } set { - changes[StateKeys.CoreAuthorizationPoolKey()] = .init(newValue) + changes[StateKeys.CoreAuthorizationPoolKey().encode()] = .init(newValue) } } // φ: The authorization queue. public var authorizationQueue: StateKeys.AuthorizationQueueKey.Value { get { - changes[StateKeys.AuthorizationQueueKey()]!.value()! + changes[StateKeys.AuthorizationQueueKey().encode()]!.value()! } set { - changes[StateKeys.AuthorizationQueueKey()] = .init(newValue) + changes[StateKeys.AuthorizationQueueKey().encode()] = .init(newValue) } } // β: Information on the most recent βlocks. public var recentHistory: StateKeys.RecentHistoryKey.Value { get { - changes[StateKeys.RecentHistoryKey()]!.value()! + changes[StateKeys.RecentHistoryKey().encode()]!.value()! } set { - changes[StateKeys.RecentHistoryKey()] = .init(newValue) + changes[StateKeys.RecentHistoryKey().encode()] = .init(newValue) } } // γ: State concerning Safrole. public var safroleState: StateKeys.SafroleStateKey.Value { get { - changes[StateKeys.SafroleStateKey()]!.value()! + changes[StateKeys.SafroleStateKey().encode()]!.value()! } set { - changes[StateKeys.SafroleStateKey()] = .init(newValue) + changes[StateKeys.SafroleStateKey().encode()] = .init(newValue) } } // ψ: past judgements public var judgements: StateKeys.JudgementsKey.Value { get { - changes[StateKeys.JudgementsKey()]!.value()! + changes[StateKeys.JudgementsKey().encode()]!.value()! } set { - changes[StateKeys.JudgementsKey()] = .init(newValue) + changes[StateKeys.JudgementsKey().encode()] = .init(newValue) } } // η: The eηtropy accumulator and epochal raηdomness. public var entropyPool: StateKeys.EntropyPoolKey.Value { get { - changes[StateKeys.EntropyPoolKey()]!.value()! + changes[StateKeys.EntropyPoolKey().encode()]!.value()! } set { - changes[StateKeys.EntropyPoolKey()] = .init(newValue) + changes[StateKeys.EntropyPoolKey().encode()] = .init(newValue) } } // ι: The validator keys and metadata to be drawn from next. public var validatorQueue: StateKeys.ValidatorQueueKey.Value { get { - changes[StateKeys.ValidatorQueueKey()]!.value()! + changes[StateKeys.ValidatorQueueKey().encode()]!.value()! } set { - changes[StateKeys.ValidatorQueueKey()] = .init(newValue) + changes[StateKeys.ValidatorQueueKey().encode()] = .init(newValue) } } // κ: The validator κeys and metadata currently active. public var currentValidators: StateKeys.CurrentValidatorsKey.Value { get { - changes[StateKeys.CurrentValidatorsKey()]!.value()! + changes[StateKeys.CurrentValidatorsKey().encode()]!.value()! } set { - changes[StateKeys.CurrentValidatorsKey()] = .init(newValue) + changes[StateKeys.CurrentValidatorsKey().encode()] = .init(newValue) } } // λ: The validator keys and metadata which were active in the prior epoch. public var previousValidators: StateKeys.PreviousValidatorsKey.Value { get { - changes[StateKeys.PreviousValidatorsKey()]!.value()! + changes[StateKeys.PreviousValidatorsKey().encode()]!.value()! } set { - changes[StateKeys.PreviousValidatorsKey()] = .init(newValue) + changes[StateKeys.PreviousValidatorsKey().encode()] = .init(newValue) } } // ρ: The ρending reports, per core, which are being made available prior to accumulation. public var reports: StateKeys.ReportsKey.Value { get { - changes[StateKeys.ReportsKey()]!.value()! + changes[StateKeys.ReportsKey().encode()]!.value()! } set { - changes[StateKeys.ReportsKey()] = .init(newValue) + changes[StateKeys.ReportsKey().encode()] = .init(newValue) } } // τ: The most recent block’s τimeslot. public var timeslot: StateKeys.TimeslotKey.Value { get { - changes[StateKeys.TimeslotKey()]!.value()! + changes[StateKeys.TimeslotKey().encode()]!.value()! } set { - changes[StateKeys.TimeslotKey()] = .init(newValue) + changes[StateKeys.TimeslotKey().encode()] = .init(newValue) } } // χ: The privileged service indices. public var privilegedServices: StateKeys.PrivilegedServicesKey.Value { get { - changes[StateKeys.PrivilegedServicesKey()]!.value()! + changes[StateKeys.PrivilegedServicesKey().encode()]!.value()! } set { - changes[StateKeys.PrivilegedServicesKey()] = .init(newValue) + changes[StateKeys.PrivilegedServicesKey().encode()] = .init(newValue) } } // π: The activity statistics for the validators. public var activityStatistics: StateKeys.ActivityStatisticsKey.Value { get { - changes[StateKeys.ActivityStatisticsKey()]!.value()! + changes[StateKeys.ActivityStatisticsKey().encode()]!.value()! } set { - changes[StateKeys.ActivityStatisticsKey()] = .init(newValue) + changes[StateKeys.ActivityStatisticsKey().encode()] = .init(newValue) } } // ϑ: The accumulation queue. public var accumulationQueue: StateKeys.AccumulationQueueKey.Value { get { - changes[StateKeys.AccumulationQueueKey()]!.value()! + changes[StateKeys.AccumulationQueueKey().encode()]!.value()! } set { - changes[StateKeys.AccumulationQueueKey()] = .init(newValue) + changes[StateKeys.AccumulationQueueKey().encode()] = .init(newValue) } } // ξ: The accumulation history. public var accumulationHistory: StateKeys.AccumulationHistoryKey.Value { get { - changes[StateKeys.AccumulationHistoryKey()]!.value()! + changes[StateKeys.AccumulationHistoryKey().encode()]!.value()! } set { - changes[StateKeys.AccumulationHistoryKey()] = .init(newValue) + changes[StateKeys.AccumulationHistoryKey().encode()] = .init(newValue) } } // δ: The (prior) state of the service accounts. public subscript(serviceAccount index: ServiceIndex) -> StateKeys.ServiceAccountKey.Value? { get { - changes[StateKeys.ServiceAccountKey(index: index)]?.value() + changes[StateKeys.ServiceAccountKey(index: index).encode()]?.value() } set { - changes[StateKeys.ServiceAccountKey(index: index)] = .init(newValue) + changes[StateKeys.ServiceAccountKey(index: index).encode()] = .init(newValue) } } // s public subscript(serviceAccount index: ServiceIndex, storageKey key: Data32) -> StateKeys.ServiceAccountStorageKey.Value? { get { - changes[StateKeys.ServiceAccountStorageKey(index: index, key: key)]?.value() + changes[StateKeys.ServiceAccountStorageKey(index: index, key: key).encode()]?.value() } set { - changes[StateKeys.ServiceAccountStorageKey(index: index, key: key)] = .init(newValue) + changes[StateKeys.ServiceAccountStorageKey(index: index, key: key).encode()] = .init(newValue) } } @@ -214,10 +214,10 @@ public struct StateLayer: @unchecked Sendable { serviceAccount index: ServiceIndex, preimageHash hash: Data32 ) -> StateKeys.ServiceAccountPreimagesKey.Value? { get { - changes[StateKeys.ServiceAccountPreimagesKey(index: index, hash: hash)]?.value() + changes[StateKeys.ServiceAccountPreimagesKey(index: index, hash: hash).encode()]?.value() } set { - changes[StateKeys.ServiceAccountPreimagesKey(index: index, hash: hash)] = .init(newValue) + changes[StateKeys.ServiceAccountPreimagesKey(index: index, hash: hash).encode()] = .init(newValue) } } @@ -227,36 +227,51 @@ public struct StateLayer: @unchecked Sendable { ) -> StateKeys.ServiceAccountPreimageInfoKey.Value? { get { changes[ - StateKeys.ServiceAccountPreimageInfoKey(index: index, hash: hash, length: length) + StateKeys.ServiceAccountPreimageInfoKey( + index: index, hash: hash, length: length + ).encode() ]?.value() } set { - changes[StateKeys.ServiceAccountPreimageInfoKey(index: index, hash: hash, length: length)] = .init(newValue) + changes[ + StateKeys.ServiceAccountPreimageInfoKey( + index: index, hash: hash, length: length + ).encode() + ] = .init(newValue) } } } extension StateLayer { - public func toKV() -> some Sequence<(key: any StateKey, value: (Codable & Sendable)?)> { - changes.map { (key: $0.key.base as! any StateKey, value: $0.value.value()) } + public func toKV() -> some Sequence<(key: Data32, value: (Codable & Sendable)?)> { + changes.map { (key: $0.key, value: $0.value.value()) } } } extension StateLayer { public func read(_ key: Key) -> Key.Value? { - changes[key] as? Key.Value + changes[key.encode()] as? Key.Value } public mutating func write(_ key: Key, value: Key.Value?) { - changes[key] = .init(value) + changes[key.encode()] = .init(value) } public subscript(key: any StateKey) -> (Codable & Sendable)? { get { - changes[AnyHashable(key)]?.value() + changes[key.encode()]?.value() + } + set { + changes[key.encode()] = .init(newValue) + } + } + + public subscript(key: Data32) -> (Codable & Sendable)? { + get { + changes[key]?.value() } set { - changes[AnyHashable(key)] = .init(newValue) + changes[key] = .init(newValue) } } } diff --git a/Boka/Sources/Boka.swift b/Boka/Sources/Boka.swift index 1af35e1e..1ff183eb 100644 --- a/Boka/Sources/Boka.swift +++ b/Boka/Sources/Boka.swift @@ -167,7 +167,8 @@ struct Boka: AsyncParsableCommand { rpc: rpcConfig, network: networkConfig, peers: peers, - local: local + local: local, + name: name ) let node: Node = if validator { diff --git a/Codec/Sources/Codec/JamEncoder.swift b/Codec/Sources/Codec/JamEncoder.swift index 7103e97c..2cc3e467 100644 --- a/Codec/Sources/Codec/JamEncoder.swift +++ b/Codec/Sources/Codec/JamEncoder.swift @@ -11,11 +11,11 @@ public class JamEncoder { encoder = EncodeContext(Data(capacity: capacity)) } - public func encode(_ value: some Encodable) throws { + public func encode(_ value: any Encodable) throws { try encoder.encode(value) } - public static func encode(_ value: some Encodable) throws -> Data { + public static func encode(_ value: any Encodable) throws -> Data { let encoder = if let value = value as? EncodedSize { JamEncoder(capacity: value.encodedSize) } else { diff --git a/Networking/Sources/MsQuicSwift/QuicListener.swift b/Networking/Sources/MsQuicSwift/QuicListener.swift index a8d6e371..777f020c 100644 --- a/Networking/Sources/MsQuicSwift/QuicListener.swift +++ b/Networking/Sources/MsQuicSwift/QuicListener.swift @@ -132,7 +132,7 @@ private final class ListenerHandle: Sendable { localAddress: NetAddr(quicAddr: evtInfo.pointee.LocalAddress.pointee), remoteAddress: NetAddr(quicAddr: evtInfo.pointee.RemoteAddress.pointee), negotiatedAlpn: Data(bytes: evtInfo.pointee.NegotiatedAlpn, count: Int(evtInfo.pointee.NegotiatedAlpnLength)), - serverName: String( + serverName: evtInfo.pointee.ServerNameLength == 0 ? "" : String( bytes: Data(bytes: evtInfo.pointee.ServerName, count: Int(evtInfo.pointee.ServerNameLength)), encoding: .utf8 ) ?? "" diff --git a/Node/Sources/Node/Genesis.swift b/Node/Sources/Node/Genesis.swift index 1e419ac3..2de32f90 100644 --- a/Node/Sources/Node/Genesis.swift +++ b/Node/Sources/Node/Genesis.swift @@ -47,7 +47,7 @@ extension Genesis { var kv = [String: Data]() for (key, value) in state.value.layer.toKV() { if let value { - kv[key.encode().toHexString()] = try JamEncoder.encode(value) + kv[key.toHexString()] = try JamEncoder.encode(value) } } return try ChainSpec( diff --git a/Node/Sources/Node/Node.swift b/Node/Sources/Node/Node.swift index 438759d4..a3d4db09 100644 --- a/Node/Sources/Node/Node.swift +++ b/Node/Sources/Node/Node.swift @@ -15,12 +15,20 @@ public class Node { public var network: NetworkConfig public var peers: [NetAddr] public var local: Bool + public var name: String? - public init(rpc: RPCConfig?, network: NetworkConfig, peers: [NetAddr] = [], local: Bool = false) { + public init( + rpc: RPCConfig?, + network: NetworkConfig, + peers: [NetAddr] = [], + local: Bool = false, + name: String? = nil + ) { self.rpc = rpc self.network = network self.peers = peers self.local = local + self.name = name } } @@ -71,7 +79,12 @@ public class Node { devPeers: Set(config.peers) ) - let nodeDataSource = NodeDataSource(blockchain: blockchain, chainDataProvider: dataProvider, networkManager: network) + let nodeDataSource = NodeDataSource( + blockchain: blockchain, + chainDataProvider: dataProvider, + networkManager: network, + name: config.name + ) rpcServer = try config.rpc.map { try Server(config: $0, source: nodeDataSource) diff --git a/Node/Sources/Node/NodeDataSource.swift b/Node/Sources/Node/NodeDataSource.swift index 8ca8a65b..ebcecf53 100644 --- a/Node/Sources/Node/NodeDataSource.swift +++ b/Node/Sources/Node/NodeDataSource.swift @@ -1,22 +1,30 @@ import Blockchain +import Foundation import RPC import Utils -public final class NodeDataSource: DataSource { +public final class NodeDataSource: Sendable { public let blockchain: Blockchain public let chainDataProvider: BlockchainDataProvider public let networkManager: NetworkManager + public let name: String - public init(blockchain: Blockchain, chainDataProvider: BlockchainDataProvider, networkManager: NetworkManager) { + public init( + blockchain: Blockchain, + chainDataProvider: BlockchainDataProvider, + networkManager: NetworkManager, + name: String? + ) { self.blockchain = blockchain self.chainDataProvider = chainDataProvider self.networkManager = networkManager + self.name = name ?? "(no name)" // TODO: generate a random name } +} - public func importBlock(_ block: BlockRef) async throws { - try await blockchain.importBlock(block) - } +extension NodeDataSource: SystemDataSource {} +extension NodeDataSource: ChainDataSource { public func getBestBlock() async throws -> BlockRef { try await chainDataProvider.getBlock(hash: chainDataProvider.bestHead.hash) } @@ -25,8 +33,15 @@ public final class NodeDataSource: DataSource { try await chainDataProvider.getBlock(hash: hash) } - public func getState(hash: Data32) async throws -> StateRef? { - try await chainDataProvider.getState(hash: hash) + public func getState(blockHash: Data32, key: Data32) async throws -> Data? { + let state = try await chainDataProvider.getState(hash: blockHash) + return try await state.value.read(key: key) + } +} + +extension NodeDataSource: TelemetryDataSource { + public func name() async throws -> String { + name } public func getPeersCount() async throws -> Int { diff --git a/RPC/Sources/RPC/DataSource/DataSource.swift b/RPC/Sources/RPC/DataSource/DataSource.swift index d3de016b..adffa63b 100644 --- a/RPC/Sources/RPC/DataSource/DataSource.swift +++ b/RPC/Sources/RPC/DataSource/DataSource.swift @@ -1,10 +1,18 @@ import Blockchain +import Foundation import Utils -public protocol DataSource: Sendable { +public protocol SystemDataSource: Sendable {} + +public protocol ChainDataSource: Sendable { func getBestBlock() async throws -> BlockRef func getBlock(hash: Data32) async throws -> BlockRef? - func importBlock(_: BlockRef) async throws - func getState(hash: Data32) async throws -> StateRef? + func getState(blockHash: Data32, key: Data32) async throws -> Data? +} + +public protocol TelemetryDataSource: Sendable { + func name() async throws -> String func getPeersCount() async throws -> Int } + +public typealias DataSource = ChainDataSource & SystemDataSource & TelemetryDataSource diff --git a/RPC/Sources/RPC/Handlers/ChainHandler.swift b/RPC/Sources/RPC/Handlers/ChainHandler.swift deleted file mode 100644 index 47686eb5..00000000 --- a/RPC/Sources/RPC/Handlers/ChainHandler.swift +++ /dev/null @@ -1,59 +0,0 @@ -import Blockchain -import Foundation -import Utils - -struct ChainHandler { - let source: DataSource - - static func getHandlers(source: DataSource) -> [String: JSONRPCHandler] { - let handler = ChainHandler(source: source) - - return [ - "chain_getBlock": handler.getBlock, - "chain_getState": handler.getState, - ] - } - - func getBlock(request: JSONRequest) async throws -> any Encodable { - let hash = request.params?["hash"] as? String - if let hash { - guard let data = Data(fromHexString: hash), let data32 = Data32(data) else { - throw JSONError(code: -32602, message: "Invalid block hash") - } - let block = try await source.getBlock(hash: data32) - return block - } else { - let block = try await source.getBestBlock() - return block - } - } - - func getState(request: JSONRequest) async throws -> any Encodable { - let hash = request.params?["hash"] as? String - if let hash { - guard - let data = Data(fromHexString: hash), - let data32 = Data32(data) - else { - throw JSONError(code: -32602, message: "Invalid block hash") - } - let state = try await source.getState(hash: data32) - guard let state else { - return JSON.null - } - // return state root for now - return await [ - "stateRoot": state.value.stateRoot.toHexString(), - "blockHash": hash, - ] - } else { - // return best block state by default - let block = try await source.getBestBlock() - let state = try await source.getState(hash: block.hash) - return await [ - "stateRoot": state?.value.stateRoot.toHexString(), - "blockHash": block.hash.toHexString(), - ] - } - } -} diff --git a/RPC/Sources/RPC/Handlers/ChainHandlers.swift b/RPC/Sources/RPC/Handlers/ChainHandlers.swift new file mode 100644 index 00000000..c086376e --- /dev/null +++ b/RPC/Sources/RPC/Handlers/ChainHandlers.swift @@ -0,0 +1,31 @@ +import Blockchain +import Foundation +import Utils + +enum ChainHandlers { + static func getHandlers(source: ChainDataSource) -> [any RPCHandler] { + [ + GetBlock(source: source), + ] + } + + struct GetBlock: RPCHandler { + var method: String { "chain_getBlock" } + typealias Request = Data32? + typealias Response = BlockRef? + + private let source: ChainDataSource + + init(source: ChainDataSource) { + self.source = source + } + + func handle(request: Request) async throws -> Response? { + if let hash = request { + try await source.getBlock(hash: hash) + } else { + try await source.getBestBlock() + } + } + } +} diff --git a/RPC/Sources/RPC/Handlers/RPCHandlers.swift b/RPC/Sources/RPC/Handlers/RPCHandlers.swift new file mode 100644 index 00000000..7991cb38 --- /dev/null +++ b/RPC/Sources/RPC/Handlers/RPCHandlers.swift @@ -0,0 +1,25 @@ +import Utils + +enum RPCHandlers { + static func getHandlers(source: [any RPCHandler]) -> [any RPCHandler] { + [ + Methods(source: source), + ] + } + + struct Methods: RPCHandler { + var method: String { "rpc_methods" } + typealias Request = VoidRequest + typealias Response = [String] + + private let methods: [String] + + init(source: [any RPCHandler]) { + methods = source.map(\.method) + } + + func handle(request _: Request) async throws -> Response? { + methods + } + } +} diff --git a/RPC/Sources/RPC/Handlers/SystemHandler.swift b/RPC/Sources/RPC/Handlers/SystemHandler.swift deleted file mode 100644 index e81f658b..00000000 --- a/RPC/Sources/RPC/Handlers/SystemHandler.swift +++ /dev/null @@ -1,18 +0,0 @@ -struct SystemHandler { - static func getHandlers() -> [String: JSONRPCHandler] { - let handler = SystemHandler() - - return [ - "system_health": handler.health, - "system_name": handler.name, - ] - } - - func health(request _: JSONRequest) async throws -> any Encodable { - true - } - - func name(request _: JSONRequest) async throws -> any Encodable { - "Boka" - } -} diff --git a/RPC/Sources/RPC/Handlers/SystemHandlers.swift b/RPC/Sources/RPC/Handlers/SystemHandlers.swift new file mode 100644 index 00000000..bee53b6b --- /dev/null +++ b/RPC/Sources/RPC/Handlers/SystemHandlers.swift @@ -0,0 +1,30 @@ +import Utils + +enum SystemHandlers { + static func getHandlers(source _: SystemDataSource) -> [any RPCHandler] { + [ + Health(), + Version(), + ] + } + + struct Health: RPCHandler { + var method: String { "system_health" } + typealias Request = VoidRequest + typealias Response = Bool + + func handle(request _: Request) async throws -> Response? { + true + } + } + + struct Version: RPCHandler { + var method: String { "system_version" } + typealias Request = VoidRequest + typealias Response = String + + func handle(request _: Request) async throws -> Response? { + "0.0.1" + } + } +} diff --git a/RPC/Sources/RPC/Handlers/TelemetryHandler.swift b/RPC/Sources/RPC/Handlers/TelemetryHandler.swift deleted file mode 100644 index d0c21f82..00000000 --- a/RPC/Sources/RPC/Handlers/TelemetryHandler.swift +++ /dev/null @@ -1,26 +0,0 @@ -import Blockchain -import Foundation -import Utils - -struct TelemetryHandler { - let source: DataSource - - static func getHandlers(source: DataSource) -> [String: JSONRPCHandler] { - let handler = TelemetryHandler(source: source) - - return [ - "telemetry_getUpdate": handler.getUpdate, - ] - } - - func getUpdate(request _: JSONRequest) async throws -> any Encodable { - let block = try await source.getBestBlock() - let peerCount = try await source.getPeersCount() - return [ - "name": "Boka", - "chainHead": block.header.timeslot.description, - "blockHash": block.hash.description, - "peerCount": peerCount.description, - ] - } -} diff --git a/RPC/Sources/RPC/Handlers/TelemetryHandlers.swift b/RPC/Sources/RPC/Handlers/TelemetryHandlers.swift new file mode 100644 index 00000000..dcb39d82 --- /dev/null +++ b/RPC/Sources/RPC/Handlers/TelemetryHandlers.swift @@ -0,0 +1,51 @@ +import Blockchain +import Foundation +import Utils + +enum TelemetryHandlers { + static func getHandlers(source: TelemetryDataSource & ChainDataSource) -> [any RPCHandler] { + [ + GetUpdate(source: source), + Name(source: source), + ] + } + + struct GetUpdate: RPCHandler { + var method: String { "telemetry_getUpdate" } + typealias Request = VoidRequest + typealias Response = [String: String] + + private let source: TelemetryDataSource & ChainDataSource + + init(source: TelemetryDataSource & ChainDataSource) { + self.source = source + } + + func handle(request _: Request) async throws -> Response? { + let block = try await source.getBestBlock() + let peerCount = try await source.getPeersCount() + return try await [ + "name": source.name(), + "chainHead": block.header.timeslot.description, + "blockHash": block.hash.description, + "peerCount": peerCount.description, + ] + } + } + + struct Name: RPCHandler { + var method: String { "telemetry_name" } + typealias Request = VoidRequest + typealias Response = String + + private let source: TelemetryDataSource + + init(source: TelemetryDataSource) { + self.source = source + } + + func handle(request _: Request) async throws -> Response? { + try await source.name() + } + } +} diff --git a/RPC/Sources/RPC/JSONRPC/FromJSON.swift b/RPC/Sources/RPC/JSONRPC/FromJSON.swift new file mode 100644 index 00000000..95f32507 --- /dev/null +++ b/RPC/Sources/RPC/JSONRPC/FromJSON.swift @@ -0,0 +1,88 @@ +import Foundation +import Utils + +enum FromJSONError: Error { + case null + case unexpectedJSON +} + +protocol FromJSON { + init(from: JSON?) throws +} + +enum VoidRequest: FromJSON { + case void + + init(from _: JSON?) throws { + // ignore + self = .void + } +} + +extension Optional: FromJSON where Wrapped: FromJSON { + init(from json: JSON?) throws { + guard let json else { + self = .none + return + } + switch json { + case .null: + self = .none + default: + self = try .some(Wrapped(from: json)) + } + } +} + +extension BinaryInteger where Self: FromJSON { + init(from json: JSON?) throws { + guard let json else { + throw FromJSONError.null + } + switch json { + case let .number(n): + self.init(n) + default: + throw FromJSONError.unexpectedJSON + } + } +} + +extension Int8: FromJSON {} +extension Int16: FromJSON {} +extension Int32: FromJSON {} +extension Int64: FromJSON {} +extension Int: FromJSON {} +extension UInt8: FromJSON {} +extension UInt16: FromJSON {} +extension UInt32: FromJSON {} +extension UInt64: FromJSON {} +extension UInt: FromJSON {} + +extension Data: FromJSON { + init(from json: JSON?) throws { + guard let json else { + throw FromJSONError.null + } + switch json { + case let .string(str): + self = try Data(fromHexString: str).unwrap() + default: + throw FromJSONError.unexpectedJSON + } + } +} + +extension Data32: FromJSON { + init(from json: JSON?) throws { + guard let json else { + throw FromJSONError.null + } + switch json { + case let .string(str): + self = try Data32(fromHexString: str).unwrap() + default: + throw FromJSONError.unexpectedJSON + } + } +} diff --git a/RPC/Sources/RPC/JSONRPC/JSONRPC.swift b/RPC/Sources/RPC/JSONRPC/JSONRPC.swift index c95a3e5e..67f19de0 100644 --- a/RPC/Sources/RPC/JSONRPC/JSONRPC.swift +++ b/RPC/Sources/RPC/JSONRPC/JSONRPC.swift @@ -5,14 +5,28 @@ struct JSONRequest: Content { let jsonrpc: String let method: String let params: JSON? - let id: Int + let id: JSON } struct JSONResponse: Content { let jsonrpc: String let result: AnyCodable? let error: JSONError? - let id: Int? + let id: JSON? + + init(id: JSON?, result: (any Encodable)?) { + jsonrpc = "2.0" + self.result = result.map(AnyCodable.init) + error = nil + self.id = id + } + + init(id: JSON?, error: JSONError) { + jsonrpc = "2.0" + result = nil + self.error = error + self.id = id + } } struct JSONError: Content, Error { diff --git a/RPC/Sources/RPC/JSONRPC/JSONRPCController.swift b/RPC/Sources/RPC/JSONRPC/JSONRPCController.swift index ef26a6c6..fa9e94fe 100644 --- a/RPC/Sources/RPC/JSONRPC/JSONRPCController.swift +++ b/RPC/Sources/RPC/JSONRPC/JSONRPCController.swift @@ -5,15 +5,27 @@ import Vapor let logger = Logger(label: "RPC.RPCController") -typealias JSONRPCHandler = @Sendable (JSONRequest) async throws -> any Encodable - final class JSONRPCController: RouteCollection, Sendable { - let handlers: [String: JSONRPCHandler] - let encoder = JSONEncoder() - let decoder = JSONDecoder() + let handlers: [String: any RPCHandler] + let encoder: JSONEncoder + let decoder: JSONDecoder + + init(handlers: [any RPCHandler]) { + var dict = [String: any RPCHandler]() + for handler in handlers { + if dict.keys.contains(handler.method) { + logger.warning("Duplicated handler: \(handler.method)") + } + dict[handler.method] = handler + } + self.handlers = dict + + encoder = JSONEncoder() + encoder.dataEncodingStrategy = .hex + encoder.outputFormatting = [.withoutEscapingSlashes, .sortedKeys] - init(handlers: [String: JSONRPCHandler]) { - self.handlers = handlers + decoder = JSONDecoder() + decoder.dataDecodingStrategy = .hex } func boot(routes: RoutesBuilder) throws { @@ -53,7 +65,7 @@ final class JSONRPCController: RouteCollection, Sendable { logger.debug("Failed to decode JSON request: \(error)") let rpcError = JSONError(code: -32600, message: "Invalid Request") - let rpcResponse = JSONResponse(jsonrpc: "2.0", result: nil, error: rpcError, id: nil) + let rpcResponse = JSONResponse(id: nil, error: rpcError) do { let responseData = try encoder.encode(rpcResponse) @@ -69,16 +81,15 @@ final class JSONRPCController: RouteCollection, Sendable { do { let method = request.method guard let handler = handlers[method] else { - return JSONResponse(jsonrpc: "2.0", result: nil, error: JSONError.methodNotFound(method), id: request.id) + return JSONResponse(id: request.id, error: JSONError.methodNotFound(method)) } - let res = try await handler(request) - return JSONResponse(jsonrpc: "2.0", result: AnyCodable(res), error: nil, id: request.id) + return try await handler.handle(jsonRequest: request) } catch { logger.error("Failed to handle JSON request: \(error)") let rpcError = JSONError(code: -32600, message: "Invalid Request") - return JSONResponse(jsonrpc: "2.0", result: nil, error: rpcError, id: request.id) + return JSONResponse(id: request.id, error: rpcError) } } } diff --git a/RPC/Sources/RPC/JSONRPC/RPCHandler.swift b/RPC/Sources/RPC/JSONRPC/RPCHandler.swift new file mode 100644 index 00000000..67d0bf04 --- /dev/null +++ b/RPC/Sources/RPC/JSONRPC/RPCHandler.swift @@ -0,0 +1,24 @@ +import Foundation +import Utils +import Vapor + +protocol RPCHandler: Sendable { + associatedtype Request: FromJSON + associatedtype Response: Encodable + + var method: String { get } + + func handle(request: Request) async throws -> Response? + func handle(jsonRequest: JSONRequest) async throws -> JSONResponse +} + +extension RPCHandler { + public func handle(jsonRequest: JSONRequest) async throws -> JSONResponse { + let req = try Request(from: jsonRequest.params) + let res = try await handle(request: req) + return JSONResponse( + id: jsonRequest.id, + result: res + ) + } +} diff --git a/RPC/Sources/RPC/Server.swift b/RPC/Sources/RPC/Server.swift index f5692a7b..d46cf1f3 100644 --- a/RPC/Sources/RPC/Server.swift +++ b/RPC/Sources/RPC/Server.swift @@ -27,9 +27,10 @@ public class Server { let env = try Environment.detect(arguments: ["--env"]) app = Application(env) - var handlers: [String: JSONRPCHandler] = SystemHandler.getHandlers() - handlers.merge(ChainHandler.getHandlers(source: source)) { _, new in new } - handlers.merge(TelemetryHandler.getHandlers(source: source)) { _, new in new } + var handlers: [any RPCHandler] = SystemHandlers.getHandlers(source: source) + handlers.append(contentsOf: ChainHandlers.getHandlers(source: source)) + handlers.append(contentsOf: TelemetryHandlers.getHandlers(source: source)) + handlers.append(contentsOf: RPCHandlers.getHandlers(source: handlers)) // Register routes let rpcController = JSONRPCController(handlers: handlers) diff --git a/RPC/Tests/RPCTests/JSONRPCControllerTests.swift b/RPC/Tests/RPCTests/JSONRPCControllerTests.swift index 4d836f2a..ee41ca15 100644 --- a/RPC/Tests/RPCTests/JSONRPCControllerTests.swift +++ b/RPC/Tests/RPCTests/JSONRPCControllerTests.swift @@ -5,13 +5,15 @@ import TracingUtils import Vapor import XCTVapor +struct DummySource: SystemDataSource {} + final class JSONRPCControllerTests { var app: Application init() throws { app = Application(.testing) - let rpcController = JSONRPCController(handlers: SystemHandler.getHandlers()) + let rpcController = JSONRPCController(handlers: SystemHandlers.getHandlers(source: DummySource())) try app.register(collection: rpcController) } diff --git a/Utils/Sources/Utils/JSON.swift b/Utils/Sources/Utils/JSON.swift index 552c9450..f8d3be8f 100644 --- a/Utils/Sources/Utils/JSON.swift +++ b/Utils/Sources/Utils/JSON.swift @@ -74,6 +74,14 @@ extension JSON: CustomDebugStringConvertible { } } +extension JSON: ExpressibleByIntegerLiteral { + public typealias IntegerLiteralType = Int32 + + public init(integerLiteral value: Int32) { + self = .number(Double(value)) + } +} + extension JSON { public subscript(key: Any) -> JSON? { if let array, let index = key as? Int, index < array.count { From 161eb1069216212311bb70c54fc9c7664af68ec6 Mon Sep 17 00:00:00 2001 From: Qiwei Yang Date: Fri, 22 Nov 2024 08:36:59 +0800 Subject: [PATCH 2/3] 0.5.0 updates (#228) * small changes * main updates * fix types * update tests * fix --- .../Blockchain/Config/ProtocolConfig.swift | 4 +- .../RuntimeProtocols/Accumulation.swift | 2 +- .../Blockchain/RuntimeProtocols/Runtime.swift | 25 +++++++--- .../Sources/Blockchain/State/State.swift | 2 +- .../Sources/Blockchain/State/StateKeys.swift | 26 +++++----- .../Types/AvailabilitySpecifications.swift | 12 +++-- .../Sources/Blockchain/Types/Extrinsic.swift | 26 ++++++---- .../Blockchain/Types/PrivilegedServices.swift | 6 +-- .../VMInvocations/HostCall/HostCalls.swift | 2 +- .../Invocations/AccumulateInvocation.swift | 2 +- .../Invocations/OnTransferInvocation.swift | 2 +- .../Blockchain/Validator/BlockAuthor.swift | 2 +- .../ExtrinsicPoolServiceTests.swift | 2 +- JAMTests/Tests/JAMTests/CodecTests.swift | 49 +++++++------------ JAMTests/jamtestvectors | 2 +- 15 files changed, 88 insertions(+), 76 deletions(-) diff --git a/Blockchain/Sources/Blockchain/Config/ProtocolConfig.swift b/Blockchain/Sources/Blockchain/Config/ProtocolConfig.swift index 193f0cdf..988cbe63 100644 --- a/Blockchain/Sources/Blockchain/Config/ProtocolConfig.swift +++ b/Blockchain/Sources/Blockchain/Config/ProtocolConfig.swift @@ -64,7 +64,7 @@ public struct ProtocolConfig: Sendable, Codable, Equatable { // P = 6: The slot period, in seconds. public var slotPeriodSeconds: Int - // Q = 80: The maximum number of items in the authorizations queue. + // Q = 80: The number of items in the authorizations queue. public var maxAuthorizationsQueueItems: Int // R = 10: The rotation period of validator-core assignments, in timeslots. @@ -107,7 +107,7 @@ public struct ProtocolConfig: Sendable, Codable, Equatable { // ZI = 2^24: The standard pvm program initialization input data size. public var pvmProgramInitInputDataSize: Int - // ZP = 2^14: The standard pvm program initialization page size. + // ZG = 2^14: The standard pvm program initialization page size. public var pvmProgramInitPageSize: Int // ZQ = 2^16: The standard pvm program initialization segment size. diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift index d5190e81..6cf55f9b 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift @@ -186,7 +186,7 @@ extension Accumulation { } switch service { - case privilegedServices.empower: + case privilegedServices.blessed: newPrivilegedServices = singleOutput.state.privilegedServices case privilegedServices.assign: newAuthorizationQueue = singleOutput.state.authorizationQueue diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift index eb50be6f..f08814ed 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift @@ -60,10 +60,7 @@ public final class Runtime { throw Error.invalidHeaderStateRoot } - let expectedExtrinsicHash = try Result { try JamEncoder.encode(block.extrinsic).blake2b256hash() } - .mapError(Error.encodeError).get() - - guard block.header.extrinsicsHash == expectedExtrinsicHash else { + guard block.header.extrinsicsHash == block.extrinsic.hash() else { throw Error.invalidExtrinsicHash } @@ -297,7 +294,7 @@ public final class Runtime { } public func updateDisputes(block: BlockRef, state newState: inout State) throws { - let (posState, offenders) = try newState.update(config: config, disputes: block.extrinsic.judgements) + let (posState, offenders) = try newState.update(config: config, disputes: block.extrinsic.disputes) newState.mergeWith(postState: posState) guard offenders == block.header.offendersMarkers else { @@ -379,7 +376,7 @@ public final class Runtime { return availableReports } - public func updatePreimages(block: BlockRef, state newState: inout State) async throws { + public func updatePreimages(block: BlockRef, state newState: inout State, prevState: StateRef) async throws { let preimages = block.extrinsic.preimages.preimages guard preimages.isSortedAndUnique() else { @@ -388,14 +385,26 @@ public final class Runtime { for preimage in preimages { let hash = preimage.data.blake2b256hash() + + // check prior state + let prevPreimageData: Data? = try await prevState.value.get(serviceAccount: preimage.serviceIndex, preimageHash: hash) + let prevInfo = try await prevState.value.get( + serviceAccount: preimage.serviceIndex, preimageHash: hash, length: UInt32(preimage.data.count) + ) + guard prevPreimageData == nil, prevInfo == nil else { + throw Error.duplicatedPreimage + } + + // disregard no longer useful ones in new state let preimageData: Data? = try await newState.get(serviceAccount: preimage.serviceIndex, preimageHash: hash) let info = try await newState.get( serviceAccount: preimage.serviceIndex, preimageHash: hash, length: UInt32(preimage.data.count) ) - guard preimageData == nil, info == nil else { - throw Error.duplicatedPreimage + if preimageData != nil || info != nil { + continue } + // update state newState[serviceAccount: preimage.serviceIndex, preimageHash: hash] = preimage.data newState[ serviceAccount: preimage.serviceIndex, preimageHash: hash, length: UInt32(preimage.data.count) diff --git a/Blockchain/Sources/Blockchain/State/State.swift b/Blockchain/Sources/Blockchain/State/State.swift index 1f5eec79..35851112 100644 --- a/Blockchain/Sources/Blockchain/State/State.swift +++ b/Blockchain/Sources/Blockchain/State/State.swift @@ -282,7 +282,7 @@ extension State: Dummy { let authorizationQueue: StateKeys.AuthorizationQueueKey.Value = try! ConfigFixedSizeArray(config: config, defaultValue: ConfigFixedSizeArray(config: config, defaultValue: Data32())) let privilegedServices: StateKeys.PrivilegedServicesKey.Value = PrivilegedServices( - empower: ServiceIndex(), + blessed: ServiceIndex(), assign: ServiceIndex(), designate: ServiceIndex(), basicGas: [:] diff --git a/Blockchain/Sources/Blockchain/State/StateKeys.swift b/Blockchain/Sources/Blockchain/State/StateKeys.swift index df80ebbe..743f3759 100644 --- a/Blockchain/Sources/Blockchain/State/StateKeys.swift +++ b/Blockchain/Sources/Blockchain/State/StateKeys.swift @@ -33,23 +33,23 @@ private func constructKey(_ idx: UInt8, _ service: ServiceIndex) -> Data32 { return Data32(data)! } -private func constructKey(_ service: ServiceIndex, _ val: UInt32, _: Data) -> Data32 { - var data = Data(capacity: 32) +private func constructKey(_ service: ServiceIndex, _ val: UInt32, _ data: Data) -> Data32 { + var stateKey = Data(capacity: 32) withUnsafeBytes(of: service) { servicePtr in withUnsafeBytes(of: val) { valPtr in - data.append(servicePtr.load(as: UInt8.self)) - data.append(valPtr.load(as: UInt8.self)) - data.append(servicePtr.load(fromByteOffset: 1, as: UInt8.self)) - data.append(valPtr.load(fromByteOffset: 1, as: UInt8.self)) - data.append(servicePtr.load(fromByteOffset: 2, as: UInt8.self)) - data.append(valPtr.load(fromByteOffset: 2, as: UInt8.self)) - data.append(servicePtr.load(fromByteOffset: 3, as: UInt8.self)) - data.append(valPtr.load(fromByteOffset: 3, as: UInt8.self)) + stateKey.append(servicePtr.load(as: UInt8.self)) + stateKey.append(valPtr.load(as: UInt8.self)) + stateKey.append(servicePtr.load(fromByteOffset: 1, as: UInt8.self)) + stateKey.append(valPtr.load(fromByteOffset: 1, as: UInt8.self)) + stateKey.append(servicePtr.load(fromByteOffset: 2, as: UInt8.self)) + stateKey.append(valPtr.load(fromByteOffset: 2, as: UInt8.self)) + stateKey.append(servicePtr.load(fromByteOffset: 3, as: UInt8.self)) + stateKey.append(valPtr.load(fromByteOffset: 3, as: UInt8.self)) } } - data.append(contentsOf: data[relative: 0 ..< 24]) - return Data32(data)! + stateKey.append(contentsOf: data[relative: 0 ..< 24]) + return Data32(stateKey)! } public enum StateKeys { @@ -316,7 +316,7 @@ public enum StateKeys { } public func encode() -> Data32 { - constructKey(index, length, hash.blake2b256hash().data) + constructKey(index, length, hash.blake2b256hash().data[2...]) } } } diff --git a/Blockchain/Sources/Blockchain/Types/AvailabilitySpecifications.swift b/Blockchain/Sources/Blockchain/Types/AvailabilitySpecifications.swift index 0013f9c8..14e6af0d 100644 --- a/Blockchain/Sources/Blockchain/Types/AvailabilitySpecifications.swift +++ b/Blockchain/Sources/Blockchain/Types/AvailabilitySpecifications.swift @@ -14,16 +14,21 @@ public struct AvailabilitySpecifications: Sendable, Equatable, Codable { // e public var segmentRoot: Data32 + // n + public var segmentCount: UInt16 + public init( workPackageHash: Data32, length: DataLength, erasureRoot: Data32, - segmentRoot: Data32 + segmentRoot: Data32, + segmentCount: UInt16 ) { self.workPackageHash = workPackageHash self.length = length self.erasureRoot = erasureRoot self.segmentRoot = segmentRoot + self.segmentCount = segmentCount } } @@ -34,14 +39,15 @@ extension AvailabilitySpecifications: Dummy { workPackageHash: Data32(), length: 0, erasureRoot: Data32(), - segmentRoot: Data32() + segmentRoot: Data32(), + segmentCount: 0 ) } } extension AvailabilitySpecifications: EncodedSize { public var encodedSize: Int { - workPackageHash.encodedSize + length.encodedSize + erasureRoot.encodedSize + segmentRoot.encodedSize + workPackageHash.encodedSize + length.encodedSize + erasureRoot.encodedSize + segmentRoot.encodedSize + segmentCount.encodedSize } public static var encodeedSizeHint: Int? { diff --git a/Blockchain/Sources/Blockchain/Types/Extrinsic.swift b/Blockchain/Sources/Blockchain/Types/Extrinsic.swift index 62dbabef..21ed6be4 100644 --- a/Blockchain/Sources/Blockchain/Types/Extrinsic.swift +++ b/Blockchain/Sources/Blockchain/Types/Extrinsic.swift @@ -9,28 +9,28 @@ public struct Extrinsic: Sendable, Equatable, Codable { // permissioning of block authoring public var tickets: ExtrinsicTickets - // ED: Votes, by validators, on dispute(s) arising between them presently taking place - public var judgements: ExtrinsicDisputes - // EP: Static data which is presently being requested to be available for workloads to be able to fetch on demand public var preimages: ExtrinsicPreimages + // EG: Reports of newly completed workloads whose accuracy is guaranteed by specific validators + public var reports: ExtrinsicGuarantees + // EA: Assurances by each validator concerning which of the input data of workloads they have // correctly received and are storing locally public var availability: ExtrinsicAvailability - // EG: Reports of newly completed workloads whose accuracy is guaranteed by specific validators - public var reports: ExtrinsicGuarantees + // ED: Votes, by validators, on dispute(s) arising between them presently taking place + public var disputes: ExtrinsicDisputes public init( tickets: ExtrinsicTickets, - judgements: ExtrinsicDisputes, + disputes: ExtrinsicDisputes, preimages: ExtrinsicPreimages, availability: ExtrinsicAvailability, reports: ExtrinsicGuarantees ) { self.tickets = tickets - self.judgements = judgements + self.disputes = disputes self.preimages = preimages self.availability = availability self.reports = reports @@ -42,7 +42,7 @@ extension Extrinsic: Dummy { public static func dummy(config: Config) -> Extrinsic { Extrinsic( tickets: ExtrinsicTickets.dummy(config: config), - judgements: ExtrinsicDisputes.dummy(config: config), + disputes: ExtrinsicDisputes.dummy(config: config), preimages: ExtrinsicPreimages.dummy(config: config), availability: ExtrinsicAvailability.dummy(config: config), reports: ExtrinsicGuarantees.dummy(config: config) @@ -55,7 +55,15 @@ extension Extrinsic: Validate {} extension Extrinsic { public func hash() -> Data32 { do { - return try JamEncoder.encode(self).blake2b256hash() + return try JamEncoder.encode([ + JamEncoder.encode(tickets).blake2b256hash(), + JamEncoder.encode(preimages).blake2b256hash(), + JamEncoder.encode(reports.guarantees.array.map { item in + try JamEncoder.encode(item.workReport.hash()) + JamEncoder.encode(item.timeslot) + JamEncoder.encode(item.credential) + }).blake2b256hash(), + JamEncoder.encode(availability).blake2b256hash(), + JamEncoder.encode(disputes).blake2b256hash(), + ]).blake2b256hash() } catch { logger.error("Failed to encode extrinsic, returning empty hash", metadata: ["error": "\(error)"]) return Data32() diff --git a/Blockchain/Sources/Blockchain/Types/PrivilegedServices.swift b/Blockchain/Sources/Blockchain/Types/PrivilegedServices.swift index 30061f48..a0850284 100644 --- a/Blockchain/Sources/Blockchain/Types/PrivilegedServices.swift +++ b/Blockchain/Sources/Blockchain/Types/PrivilegedServices.swift @@ -3,7 +3,7 @@ import Utils public struct PrivilegedServices: Sendable, Equatable, Codable { // m - public var empower: ServiceIndex + public var blessed: ServiceIndex // a public var assign: ServiceIndex // v @@ -11,8 +11,8 @@ public struct PrivilegedServices: Sendable, Equatable, Codable { // g @CodingAs> public var basicGas: [ServiceIndex: Gas] - public init(empower: ServiceIndex, assign: ServiceIndex, designate: ServiceIndex, basicGas: [ServiceIndex: Gas]) { - self.empower = empower + public init(blessed: ServiceIndex, assign: ServiceIndex, designate: ServiceIndex, basicGas: [ServiceIndex: Gas]) { + self.blessed = blessed self.assign = assign self.designate = designate self.basicGas = basicGas diff --git a/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift b/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift index ea414bdd..5ba7e920 100644 --- a/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift +++ b/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift @@ -242,7 +242,7 @@ public class Empower: HostCall { if basicGas.count != 0 { state.writeRegister(Registers.Index(raw: 7), HostCallResultCode.OK.rawValue) - x.accumulateState.privilegedServices.empower = regs[0] + x.accumulateState.privilegedServices.blessed = regs[0] x.accumulateState.privilegedServices.assign = regs[1] x.accumulateState.privilegedServices.designate = regs[2] x.accumulateState.privilegedServices.basicGas = basicGas diff --git a/Blockchain/Sources/Blockchain/VMInvocations/Invocations/AccumulateInvocation.swift b/Blockchain/Sources/Blockchain/VMInvocations/Invocations/AccumulateInvocation.swift index d65f7596..f129f0aa 100644 --- a/Blockchain/Sources/Blockchain/VMInvocations/Invocations/AccumulateInvocation.swift +++ b/Blockchain/Sources/Blockchain/VMInvocations/Invocations/AccumulateInvocation.swift @@ -40,7 +40,7 @@ extension AccumulateFunction { let (exitReason, gas, output) = await invokePVM( config: config, blob: accumulatingAccountDetails.codeHash.data, - pc: 10, + pc: 5, gas: gas, argumentData: argument, ctx: ctx diff --git a/Blockchain/Sources/Blockchain/VMInvocations/Invocations/OnTransferInvocation.swift b/Blockchain/Sources/Blockchain/VMInvocations/Invocations/OnTransferInvocation.swift index 6271b5e2..fedc57e4 100644 --- a/Blockchain/Sources/Blockchain/VMInvocations/Invocations/OnTransferInvocation.swift +++ b/Blockchain/Sources/Blockchain/VMInvocations/Invocations/OnTransferInvocation.swift @@ -28,7 +28,7 @@ extension OnTransferFunction { _ = await invokePVM( config: config, blob: account.codeHash.data, - pc: 15, + pc: 10, gas: gasLimitSum, argumentData: argument, ctx: ctx diff --git a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift index 785a027f..a55f72e8 100644 --- a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift +++ b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift @@ -75,7 +75,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { let extrinsic = try Extrinsic( tickets: ExtrinsicTickets(tickets: ConfigLimitedSizeArray(config: config, array: Array(tickets))), - judgements: ExtrinsicDisputes.dummy(config: config), // TODO: + disputes: ExtrinsicDisputes.dummy(config: config), // TODO: preimages: ExtrinsicPreimages.dummy(config: config), // TODO: availability: ExtrinsicAvailability.dummy(config: config), // TODO: reports: ExtrinsicGuarantees.dummy(config: config) // TODO: diff --git a/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift b/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift index 9c993ea0..e9a17aa4 100644 --- a/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift @@ -144,7 +144,7 @@ struct ExtrinsicPoolServiceTests { let blockTickets = Array(tickets[0 ..< 2]) let extrinsic = try Extrinsic( tickets: ExtrinsicTickets(tickets: ConfigLimitedSizeArray(config: config, array: blockTickets.map(\.ticket))), - judgements: ExtrinsicDisputes.dummy(config: config), + disputes: ExtrinsicDisputes.dummy(config: config), preimages: ExtrinsicPreimages.dummy(config: config), availability: ExtrinsicAvailability.dummy(config: config), reports: ExtrinsicGuarantees.dummy(config: config) diff --git a/JAMTests/Tests/JAMTests/CodecTests.swift b/JAMTests/Tests/JAMTests/CodecTests.swift index adf9b875..0938afa4 100644 --- a/JAMTests/Tests/JAMTests/CodecTests.swift +++ b/JAMTests/Tests/JAMTests/CodecTests.swift @@ -102,11 +102,11 @@ struct CodecTests { if value is RefinementContext { return [ "anchor": json["anchor"]!["headerHash"]!, + "state_root": json["anchor"]!["stateRoot"]!, "beefy_root": json["anchor"]!["beefyRoot"]!, "lookup_anchor": json["lookupAnchor"]!["headerHash"]!, "lookup_anchor_slot": json["lookupAnchor"]!["timeslot"]!, - "prerequisite": json["prerequisiteWorkPackages"] ?? .null, - "state_root": json["anchor"]!["stateRoot"]!, + "prerequisites": json["prerequisiteWorkPackages"] ?? .null, ].json } if value is ExtrinsicTickets { @@ -122,7 +122,7 @@ struct CodecTests { "code_hash": json["codeHash"]!, "gas": json["gasRatio"]!, "payload_hash": json["payloadHash"]!, - "service": json["serviceIndex"]!, + "service_id": json["serviceIndex"]!, "result": json["output"]!["success"] == nil ? json["output"]! : [ "ok": json["output"]!["success"]!, ].json, @@ -175,9 +175,10 @@ struct CodecTests { if value is AvailabilitySpecifications { return [ "hash": json["workPackageHash"]!, - "len": json["length"]!, + "length": json["length"]!, "erasure_root": json["erasureRoot"]!, "exports_root": json["segmentRoot"]!, + "exports_count": json["segmentCount"]!, ].json } if let value = value as? ExtrinsicGuarantees { @@ -197,10 +198,10 @@ struct CodecTests { if let value = value as? Extrinsic { return [ "tickets": transform(json["tickets"]!, value: value.tickets), - "disputes": transform(json["judgements"]!, value: value.judgements), "preimages": transform(json["preimages"]!, value: value.preimages), - "assurances": transform(json["availability"]!, value: value.availability), "guarantees": transform(json["reports"]!, value: value.reports), + "assurances": transform(json["availability"]!, value: value.availability), + "disputes": transform(json["disputes"]!, value: value.disputes), ].json } if let value = value as? Header { @@ -242,10 +243,8 @@ struct CodecTests { @Test func block() throws { - withKnownIssue("waiting for refine_context.prerequisite updates", isIntermittent: true) { - let (actual, expected) = try Self.test(Block.self, path: "block") - #expect(actual == expected) - } + let (actual, expected) = try Self.test(Block.self, path: "block") + #expect(actual == expected) } @Test @@ -256,18 +255,14 @@ struct CodecTests { @Test func extrinsic() throws { - withKnownIssue("waiting for refine_context.prerequisite updates", isIntermittent: true) { - let (actual, expected) = try Self.test(Extrinsic.self, path: "extrinsic") - #expect(actual == expected) - } + let (actual, expected) = try Self.test(Extrinsic.self, path: "extrinsic") + #expect(actual == expected) } @Test func guarantees_extrinsic() throws { - withKnownIssue("waiting for refine_context.prerequisite updates", isIntermittent: true) { - let (actual, expected) = try Self.test(ExtrinsicGuarantees.self, path: "guarantees_extrinsic") - #expect(actual == expected) - } + let (actual, expected) = try Self.test(ExtrinsicGuarantees.self, path: "guarantees_extrinsic") + #expect(actual == expected) } @Test @@ -290,10 +285,8 @@ struct CodecTests { @Test func refine_context() throws { - withKnownIssue("waiting for refine_context.prerequisite updates", isIntermittent: true) { - let (actual, expected) = try Self.test(RefinementContext.self, path: "refine_context") - #expect(actual == expected) - } + let (actual, expected) = try Self.test(RefinementContext.self, path: "refine_context") + #expect(actual == expected) } @Test @@ -310,18 +303,14 @@ struct CodecTests { @Test func work_package() throws { - withKnownIssue("waiting for refine_context.prerequisite updates", isIntermittent: true) { - let (actual, expected) = try Self.test(WorkPackage.self, path: "work_package") - #expect(actual == expected) - } + let (actual, expected) = try Self.test(WorkPackage.self, path: "work_package") + #expect(actual == expected) } @Test func work_report() throws { - withKnownIssue("waiting for refine_context.prerequisite updates", isIntermittent: true) { - let (actual, expected) = try Self.test(WorkReport.self, path: "work_report") - #expect(actual == expected) - } + let (actual, expected) = try Self.test(WorkReport.self, path: "work_report") + #expect(actual == expected) } @Test diff --git a/JAMTests/jamtestvectors b/JAMTests/jamtestvectors index a46c3539..41809243 160000 --- a/JAMTests/jamtestvectors +++ b/JAMTests/jamtestvectors @@ -1 +1 @@ -Subproject commit a46c3539d79188a499fbdde933c50a82ac98a0f1 +Subproject commit 418092431e14b1da52dd8040d12ba246cc351f99 From eb7a1228f9e2a220ec99c530a7846b344b021750 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Sun, 24 Nov 2024 12:59:28 +0800 Subject: [PATCH 3/3] fix issue 226 (#230) * fix issue 226 * update test * update p2p --- .../Sources/MsQuicSwift/QuicConnection.swift | 9 +- Networking/Sources/Networking/Peer.swift | 24 ++-- .../NetworkingTests/MockPeerEventTests.swift | 15 ++- .../Tests/NetworkingTests/PeerTests.swift | 104 +++++++++++++++++- 4 files changed, 128 insertions(+), 24 deletions(-) diff --git a/Networking/Sources/MsQuicSwift/QuicConnection.swift b/Networking/Sources/MsQuicSwift/QuicConnection.swift index dddea21f..5218d34d 100644 --- a/Networking/Sources/MsQuicSwift/QuicConnection.swift +++ b/Networking/Sources/MsQuicSwift/QuicConnection.swift @@ -124,6 +124,12 @@ public final class QuicConnection: Sendable { } } + fileprivate func close() { + storage.write { storage in + storage = nil + } + } + public func shutdown(errorCode: QuicErrorCode = .success) throws { logger.debug("closing connection") try storage.write { storage in @@ -250,12 +256,13 @@ private class ConnectionHandle { } case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: - logger.trace("Shutdown complete") + logger.debug("Shutdown complete") if let connection { connection.handler.shutdownComplete(connection) } if event.pointee.SHUTDOWN_COMPLETE.AppCloseInProgress == 0 { // avoid closing twice + connection?.close() api.call { api in api.pointee.ConnectionClose(ptr) } diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 32829072..342cd881 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -294,7 +294,7 @@ final class PeerImpl: Sendable { } func reconnect(to address: NetAddr, role: PeerRole) throws { - let state = reconnectStates.read { reconnectStates in + var state = reconnectStates.read { reconnectStates in reconnectStates[address] ?? .init() } @@ -302,12 +302,9 @@ final class PeerImpl: Sendable { logger.warning("reconnecting to \(address) exceeded max attempts") return } - + state.applyBackoff() reconnectStates.write { reconnectStates in - if var state = reconnectStates[address] { - state.applyBackoff() - reconnectStates[address] = state - } + reconnectStates[address] = state } Task { try await Task.sleep(for: .seconds(state.delay)) @@ -336,7 +333,7 @@ final class PeerImpl: Sendable { } func reopenUpStream(connection: Connection, kind: Handler.PresistentHandler.StreamKind) { - let state = reopenStates.read { states in + var state = reopenStates.read { states in states[connection.id] ?? .init() } @@ -344,12 +341,9 @@ final class PeerImpl: Sendable { logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts") return } - + state.applyBackoff() reopenStates.write { states in - if var state = states[connection.id] { - state.applyBackoff() - states[connection.id] = state - } + states[connection.id] = state } Task { @@ -557,10 +551,10 @@ private struct PeerEventHandler: QuicEventHandler { false case let .transport(status, _): switch QuicStatusCode(rawValue: status.rawValue) { - case .badCert: - false + case .aborted, .outOfMemory, .connectionTimeout, .unreachable, .bufferTooSmall, .connectionRefused: + true default: - !status.isSucceeded + status.isSucceeded } case let .byPeer(code): // Do not reconnect if the closure was initiated by the peer. diff --git a/Networking/Tests/NetworkingTests/MockPeerEventTests.swift b/Networking/Tests/NetworkingTests/MockPeerEventTests.swift index 71b4793a..f837c576 100644 --- a/Networking/Tests/NetworkingTests/MockPeerEventTests.swift +++ b/Networking/Tests/NetworkingTests/MockPeerEventTests.swift @@ -7,6 +7,11 @@ import Utils final class MockPeerEventTests { final class MockPeerEventHandler: QuicEventHandler { + enum MockPeerAction { + case none + case mockHandshakeFailure + } + enum EventType { case newConnection(listener: QuicListener, connection: QuicConnection, info: ConnectionInfo) case shouldOpen(connection: QuicConnection, certificate: Data?) @@ -18,8 +23,11 @@ final class MockPeerEventTests { } let events: ThreadSafeContainer<[EventType]> = .init([]) + let mockAction: MockPeerAction - init() {} + init(_ action: MockPeerAction = .none) { + mockAction = action + } func newConnection( _ listener: QuicListener, connection: QuicConnection, info: ConnectionInfo @@ -32,6 +40,9 @@ final class MockPeerEventTests { } func shouldOpen(_: QuicConnection, certificate: Data?) -> QuicStatus { + if mockAction == .mockHandshakeFailure { + return .code(.handshakeFailure) + } guard let certificate else { return .code(.requiredCert) } @@ -169,7 +180,7 @@ final class MockPeerEventTests { func connected() async throws { let serverHandler = MockPeerEventHandler() let clientHandler = MockPeerEventHandler() - let privateKey1 = try Ed25519.SecretKey(from: Data32()) + let privateKey1 = try Ed25519.SecretKey(from: Data32.random()) let cert = try generateSelfSignedCertificate(privateKey: privateKey1) let serverConfiguration = try QuicConfiguration( registration: registration, diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index ca189d51..29b2f3dd 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -143,6 +143,98 @@ struct PeerTests { typealias EphemeralHandler = MockEphemeralStreamHandler } + @Test + func mockHandshakeFailure() async throws { + let mockPeerTest = try MockPeerEventTests() + let serverHandler = MockPeerEventTests.MockPeerEventHandler( + MockPeerEventTests.MockPeerEventHandler.MockPeerAction.mockHandshakeFailure + ) + let alpns = [ + PeerRole.validator: Alpn(genesisHeader: Data32(), builder: false).data, + PeerRole.builder: Alpn(genesisHeader: Data32(), builder: true).data, + ] + let allAlpns = Array(alpns.values) + // Server setup with bad certificate + let serverConfiguration = try QuicConfiguration( + registration: mockPeerTest.registration, + pkcs12: mockPeerTest.certData, + alpns: allAlpns, + client: false, + settings: QuicSettings.defaultSettings + ) + + let listener = try QuicListener( + handler: serverHandler, + registration: mockPeerTest.registration, + configuration: serverConfiguration, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + alpns: allAlpns + ) + + let listenAddress = try listener.listenAddress() + let peer1 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + + let connection1 = try peer1.connect(to: listenAddress, role: .validator) + try? await Task.sleep(for: .milliseconds(3000)) + #expect(connection1.isClosed == true) + } + + @Test + func mockShutdownBadCert() async throws { + let mockPeerTest = try MockPeerEventTests() + let serverHandler = MockPeerEventTests.MockPeerEventHandler() + let alpns = [ + PeerRole.validator: Alpn(genesisHeader: Data32(), builder: false).data, + PeerRole.builder: Alpn(genesisHeader: Data32(), builder: true).data, + ] + let allAlpns = Array(alpns.values) + // Server setup with bad certificate + let serverConfiguration = try QuicConfiguration( + registration: mockPeerTest.registration, + pkcs12: mockPeerTest.badCertData, + alpns: allAlpns, + client: false, + settings: QuicSettings.defaultSettings + ) + + let listener = try QuicListener( + handler: serverHandler, + registration: mockPeerTest.registration, + configuration: serverConfiguration, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + alpns: allAlpns + ) + + let listenAddress = try listener.listenAddress() + let peer1 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + + let connection1 = try peer1.connect(to: listenAddress, role: .validator) + try? await Task.sleep(for: .milliseconds(1000)) + #expect(connection1.isClosed == true) + } + @Test func reopenUpStream() async throws { let handler2 = MockPresentStreamHandler() @@ -197,7 +289,7 @@ struct PeerTests { peer1.broadcast( kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData) ) - try await Task.sleep(for: .milliseconds(1000)) + try await Task.sleep(for: .milliseconds(2000)) let lastReceivedData2 = await handler2.lastReceivedData #expect(lastReceivedData2 == messageData) } @@ -290,15 +382,15 @@ struct PeerTests { let connection1 = try peer1.connect(to: peer2.listenAddress(), role: .validator) let connection2 = try peer2.connect(to: peer1.listenAddress(), role: .validator) - try? await Task.sleep(for: .milliseconds(50)) + try? await Task.sleep(for: .milliseconds(1000)) if !connection1.isClosed { let data = try await connection1.request(MockRequest(kind: .typeA, data: Data("hello world".utf8))) - try? await Task.sleep(for: .milliseconds(50)) + try? await Task.sleep(for: .milliseconds(500)) #expect(data == Data("hello world response".utf8)) } if !connection2.isClosed { let data = try await connection2.request(MockRequest(kind: .typeA, data: Data("hello world".utf8))) - try? await Task.sleep(for: .milliseconds(50)) + try? await Task.sleep(for: .milliseconds(500)) #expect(data == Data("hello world response".utf8)) } } @@ -573,7 +665,7 @@ struct PeerTests { to: peer2.listenAddress(), role: .validator ) - try? await Task.sleep(for: .milliseconds(50)) + try? await Task.sleep(for: .milliseconds(500)) peer1.broadcast( kind: .uniqueA, message: .init(kind: .uniqueA, data: Data("hello world".utf8)) @@ -583,7 +675,7 @@ struct PeerTests { kind: .uniqueB, message: .init(kind: .uniqueB, data: Data("I am jam".utf8)) ) // Verify last received data - try? await Task.sleep(for: .milliseconds(200)) + try? await Task.sleep(for: .milliseconds(500)) await #expect(handler2.lastReceivedData == Data("hello world".utf8)) await #expect(handler1.lastReceivedData == Data("I am jam".utf8)) }