Skip to content

Commit

Permalink
test with rocksdb and many issue fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
xlc committed Dec 11, 2024
1 parent 7c82511 commit 7fb6e5e
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 16 deletions.
6 changes: 4 additions & 2 deletions Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public final class Runtime {
case invalidReportAuthorizer
case encodeError(any Swift.Error)
case invalidExtrinsicHash
case invalidParentHash
case invalidParentHash(state: Data32, header: Data32)
case invalidHeaderStateRoot
case invalidHeaderEpochMarker
case invalidHeaderWinningTickets
Expand Down Expand Up @@ -53,7 +53,7 @@ public final class Runtime {
let block = block.value

guard block.header.parentHash == state.value.lastBlockHash else {
throw Error.invalidParentHash
throw Error.invalidParentHash(state: state.value.lastBlockHash, header: block.header.parentHash)
}

guard block.header.priorStateRoot == context.stateRoot else {
Expand Down Expand Up @@ -189,6 +189,8 @@ public final class Runtime {

// after reports as it need old recent history
try updateRecentHistory(block: block, state: &newState)

try await newState.save()
} catch let error as Error {
throw error
} catch let error as SafroleError {
Expand Down
1 change: 1 addition & 0 deletions Blockchain/Sources/Blockchain/State/State.swift
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public struct State: Sendable {
// TODO: we don't really want to write to the underlying backend here
// instead, it should be writting to a in memory layer
// and when actually saving the state, save the in memory layer to the presistent store
@discardableResult
public func save() async throws -> Data32 {
try await backend.write(layer.toKV())
return await backend.rootHash
Expand Down
8 changes: 6 additions & 2 deletions Blockchain/Sources/Blockchain/State/StateBackend.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Foundation
import Utils

public enum StateBackendError: Error {
case missingState
case missingState(key: Sendable)
case invalidData
}

Expand Down Expand Up @@ -35,7 +35,7 @@ public final class StateBackend: Sendable {
if Key.optional {
return nil
}
throw StateBackendError.missingState
throw StateBackendError.missingState(key: key)
}

public func batchRead(_ keys: [any StateKey]) async throws -> [(key: any StateKey, value: (Codable & Sendable)?)] {
Expand Down Expand Up @@ -74,4 +74,8 @@ public final class StateBackend: Sendable {
return nil
}
}

public func debugPrint() async throws {
try await trie.debugPrint()
}
}
7 changes: 4 additions & 3 deletions Blockchain/Sources/Blockchain/State/StateTrie.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private struct TrieNode {
right = Data32(data.data.suffix(32))!
self.isNew = isNew
rawValue = nil
switch data.data[0] & 0b1100_0000 {
switch data.data.first! & 0b1100_0000 {
case 0b1000_0000:
type = .embeddedLeaf
case 0b1100_0000:
Expand Down Expand Up @@ -66,7 +66,7 @@ private struct TrieNode {
guard type == .embeddedLeaf else {
return nil
}
let len = left.data[0] & 0b0011_1111
let len = left.data.first! & 0b0011_1111
return right.data[relative: 0 ..< Int(len)]
}

Expand All @@ -85,7 +85,7 @@ private struct TrieNode {

static func branch(left: Data32, right: Data32) -> TrieNode {
var left = left.data
left[0] = left[0] & 0b0111_1111 // clear the highest bit
left[left.startIndex] = left[left.startIndex] & 0b0111_1111 // clear the highest bit
return .init(left: Data32(left)!, right: right, type: .branch, isNew: true, rawValue: nil)
}
}
Expand Down Expand Up @@ -352,6 +352,7 @@ public actor StateTrie: Sendable {
}
}

logger.info("Root hash: \(rootHash.toHexString())")
try await printNode(rootHash, depth: 0)
}
}
Expand Down
40 changes: 36 additions & 4 deletions Database/Sources/Database/RocksDBBackend.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ import Blockchain
import Codec
import Foundation
import RocksDBSwift
import TracingUtils
import Utils

private let logger = Logger(label: "RocksDBBackend")

public enum RocksDBBackendError: Error {
case genesisHashMismatch(expected: Data32, actual: Data)
}
Expand All @@ -16,6 +19,7 @@ public final class RocksDBBackend: Sendable {
private let blockHashByTimeslot: Store<StoreId, JamCoder<TimeslotIndex, Set<Data32>>>
private let blockHashByNumber: Store<StoreId, JamCoder<UInt32, Set<Data32>>>
private let blockNumberByHash: Store<StoreId, JamCoder<Data32, UInt32>>
private let stateRootByHash: Store<StoreId, JamCoder<Data32, Data32>>
private let stateTrie: Store<StoreId, JamCoder<Data, Data>>
private let stateValue: Store<StoreId, JamCoder<Data32, Data>>
private let stateRefs: Store<StoreId, JamCoder<Data, UInt32>>
Expand All @@ -31,6 +35,7 @@ public final class RocksDBBackend: Sendable {
blockHashByTimeslot = Store(db: db, column: .blockIndexes, coder: JamCoder(config: config, prefix: Data([0])))
blockHashByNumber = Store(db: db, column: .blockIndexes, coder: JamCoder(config: config, prefix: Data([1])))
blockNumberByHash = Store(db: db, column: .blockIndexes, coder: JamCoder(config: config, prefix: Data([2])))
stateRootByHash = Store(db: db, column: .blockIndexes, coder: JamCoder(config: config, prefix: Data([3])))
stateTrie = Store(db: db, column: .state, coder: JamCoder(config: config, prefix: Data([0])))
stateValue = Store(db: db, column: .state, coder: JamCoder(config: config, prefix: Data([1])))
stateRefs = Store(db: db, column: .stateRefs, coder: JamCoder(config: config, prefix: Data([0])))
Expand All @@ -43,18 +48,25 @@ public final class RocksDBBackend: Sendable {
guard genesis == genesisBlockHash.data else {
throw RocksDBBackendError.genesisHashMismatch(expected: genesisBlockHash, actual: genesis)
}

logger.trace("DB loaded")
} else {
// must be a new db
try meta.put(key: MetaKey.genesisHash.key, value: genesisBlockHash.data)
try await add(block: genesisBlock)
let backend = StateBackend(self, config: config, rootHash: Data32())
try await backend.writeRaw(Array(genesisStateData))
let rootHash = await backend.rootHash
try stateRootByHash.put(key: genesisBlockHash, value: rootHash)
try setHeads([genesisBlockHash])
try await setFinalizedHead(hash: genesisBlockHash)

logger.trace("New DB initialized")
}
}

private func setHeads(_ heads: Set<Data32>) throws {
logger.trace("setHeads() \(heads)")
try meta.put(key: MetaKey.heads.key, value: JamEncoder.encode(heads))
}
}
Expand All @@ -65,7 +77,7 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
}

public func hasState(hash: Data32) async throws -> Bool {
try stateTrie.exists(key: hash.data)
try stateRootByHash.exists(key: hash)
}

public func isHead(hash: Data32) async throws -> Bool {
Expand All @@ -85,7 +97,12 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
}

public func getState(hash: Data32) async throws -> StateRef? {
try await State(backend: StateBackend(self, config: config, rootHash: hash)).asRef()
logger.trace("getState() \(hash)")

guard let rootHash = try stateRootByHash.get(key: hash) else {
return nil
}
return try await State(backend: StateBackend(self, config: config, rootHash: rootHash)).asRef()
}

public func getFinalizedHead() async throws -> Data32? {
Expand All @@ -109,6 +126,8 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
}

public func add(block: BlockRef) async throws {
logger.trace("add(block:) \(block.hash)")

// TODO: batch put

try blocks.put(key: block.hash, value: block)
Expand All @@ -129,15 +148,22 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
try blockNumberByHash.put(key: block.hash, value: blockNumber)
}

public func add(state _: StateRef) async throws {
// nothing to do
public func add(state: StateRef) async throws {
logger.trace("add(state:) \(state.value.lastBlockHash)")

let rootHash = await state.value.stateRoot
try stateRootByHash.put(key: state.value.lastBlockHash, value: rootHash)
}

public func setFinalizedHead(hash: Data32) async throws {
logger.trace("setFinalizedHead() \(hash)")

try meta.put(key: MetaKey.finalizedHead.key, value: hash.data)
}

public func updateHead(hash: Data32, parent: Data32) async throws {
logger.trace("updateHead() \(hash) \(parent)")

var heads = try await getHeads()

// parent needs to be either
Expand All @@ -155,6 +181,8 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
}

public func remove(hash: Data32) async throws {
logger.trace("remove() \(hash)")

// TODO: batch delete

try blocks.delete(key: hash)
Expand Down Expand Up @@ -203,6 +231,8 @@ extension RocksDBBackend: StateBackendProtocol {
}

public func batchUpdate(_ updates: [StateBackendOperation]) async throws {
logger.trace("batchUpdate() \(updates.count) operations")

// TODO: implement this using merge operator to perform atomic increment
// so we can do the whole thing in a single batch
for update in updates {
Expand All @@ -228,6 +258,8 @@ extension RocksDBBackend: StateBackendProtocol {
}

public func gc(callback _: @Sendable (Data) -> Data32?) async throws {
logger.trace("gc()")

// TODO: implement
}
}
1 change: 1 addition & 0 deletions Database/Sources/Database/Stores.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ enum StoreId: UInt8, ColumnFamilyKey {
// 0x00 + timeslot => Set<BlockHash>
// 0x01 + blockNumber => Set<BlockHash>
// 0x02 + blockHash => blockNumber
// 0x03 + blockHash => stateRootHash
case blockIndexes = 2
// state trie
// 0x00 + hash => trie node
Expand Down
15 changes: 15 additions & 0 deletions Database/Sources/RocksDBSwift/RocksDB.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import Foundation
import rocksdb
import TracingUtils
import Utils

private let logger = Logger(label: "RocksDB")

public protocol ColumnFamilyKey: Sendable, CaseIterable, Hashable, RawRepresentable<UInt8> {}

public enum BatchOperation {
Expand Down Expand Up @@ -159,6 +162,8 @@ extension RocksDB {

extension RocksDB {
public func put(column: CFKey, key: Data, value: Data) throws {
logger.trace("put() \(column) \(key.toHexString()) \(value.toHexString())")

let handle = getHandle(column: column)
try Self.call(key, value) { err, ptrs in
let key = ptrs[0]
Expand All @@ -179,6 +184,8 @@ extension RocksDB {
}

public func get(column: CFKey, key: Data) throws -> Data? {
logger.trace("get() \(column) \(key.toHexString())")

var len = 0
let handle = getHandle(column: column)

Expand All @@ -193,6 +200,8 @@ extension RocksDB {
}

public func delete(column: CFKey, key: Data) throws {
logger.trace("delete() \(column) \(key.toHexString())")

let handle = getHandle(column: column)

try Self.call(key) { err, ptrs in
Expand All @@ -204,19 +213,25 @@ extension RocksDB {
}

public func batch(operations: [BatchOperation]) throws {
logger.trace("batch() \(operations.count) operations")

let writeBatch = rocksdb_writebatch_create()
defer { rocksdb_writebatch_destroy(writeBatch) }

for operation in operations {
switch operation {
case let .delete(column, key):
logger.trace("batch() delete \(column) \(key.toHexString())")

let handle = try getHandle(column: column).unwrap(orError: Error.invalidColumn(column))
try Self.call(key) { ptrs in
let key = ptrs[0]
rocksdb_writebatch_delete_cf(writeBatch, handle, key.ptr, key.count)
}

case let .put(column, key, value):
logger.trace("batch() put \(column) \(key.toHexString()) \(value.toHexString())")

let handle = try getHandle(column: column).unwrap(orError: Error.invalidColumn(column))
try Self.call(key, value) { ptrs in
let key = ptrs[0]
Expand Down
7 changes: 6 additions & 1 deletion Node/Sources/Node/ValidatorNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import Foundation
import TracingUtils
import Utils

private let logger = Logger(label: "ValidatorNode")

public class ValidatorNode: Node {
private var validator: ValidatorService!

Expand Down Expand Up @@ -34,12 +36,15 @@ public class ValidatorNode: Node {
let dataProvider: BlockchainDataProvider = blockchain.dataProvider
let local = config.local
Task {
let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash)
if !local {
logger.trace("Waiting for sync")
await syncManager.waitForSyncCompletion()
}
logger.trace("Sync completed")
await validator.onSyncCompleted()
let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash)
if await dataProvider.bestHead.hash == dataProvider.genesisBlockHash {
logger.trace("Calling on(genesis:)")
await validator.on(genesis: genesisState)
}
}
Expand Down
Loading

0 comments on commit 7fb6e5e

Please sign in to comment.