Skip to content

Commit

Permalink
feat: add machine-advancer service
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 committed May 21, 2024
1 parent 03626e7 commit 45e8bff
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 0 deletions.
35 changes: 35 additions & 0 deletions internal/node/machineadvancer/advancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package machineadvancer

import "github.com/cartesi/rollups-node/internal/node/nodemachine"

type Input = []byte
type Output = []byte
type Report = []byte
type Hash = [32]byte

func GetInputs() []Input {
return []Input{}
}

func Store(outputs []Output, reports []Report, outputsHash Hash, machineHash Hash) error {
return nil
}

func StartAdvanceServer(machine *nodemachine.NodeMachine) {
for {
for _, input := range GetInputs() {
outputs, reports, outputsHash, machineHash, err := machine.Advance(input)
if err != nil {
panic("TODO")
}

err = Store(outputs, reports, outputsHash, machineHash)
if err != nil {
panic("TODO")
}
}
}
}
108 changes: 108 additions & 0 deletions internal/node/nodemachine/machine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package nodemachine

import (
"context"

"github.com/cartesi/rollups-node/internal/node/nodemachine/pmutex"
"github.com/cartesi/rollups-node/pkg/model"
"github.com/cartesi/rollups-node/pkg/rollupsmachine"

"golang.org/x/sync/semaphore"
)

type NodeMachine struct {
*rollupsmachine.RollupsMachine

// Ensures advance/inspect mutual exclusion when accessing the inner RollupsMachine.
// Advances have a higher priority than Inspects to acquire the lock.
mutex *pmutex.PMutex

// Controls how many inspects can be concurrently active.
inspects *semaphore.Weighted
}

func New(machine *rollupsmachine.RollupsMachine, maxConcurrentInspects int8) *NodeMachine {
return &NodeMachine{
RollupsMachine: machine,
mutex: pmutex.New(),
inspects: semaphore.NewWeighted(int64(maxConcurrentInspects)),
}
}

func (machine *NodeMachine) Advance(input []byte) (
outputs []rollupsmachine.Output,
reports []rollupsmachine.Report,
outputsHash model.Hash,
machineHash model.Hash,
err error) {

var fork *rollupsmachine.RollupsMachine

{ // Forks the machine.
machine.mutex.HLock()
defer machine.mutex.Unlock()
fork, err = machine.Fork()

Check failure on line 47 in internal/node/nodemachine/machine.go

View workflow job for this annotation

GitHub Actions / assess-code-quality

machine.Fork undefined (type *NodeMachine has no field or method Fork) (typecheck)
if err != nil {
return outputs, reports, outputsHash, machineHash, err
}
}

// Sends the advance-state request.
outputs, reports, outputsHash, err = fork.Advance(input)
if err != nil {
return outputs, reports, outputsHash, machineHash, err
}

// Gets the post-advance machine hash.
machineHash, err = fork.Hash()
if err != nil {
return outputs, reports, outputsHash, machineHash, err
}

{ // Destroys the old machine and updates the current one.
machine.mutex.HLock()
defer machine.mutex.Unlock()
err = machine.Destroy()

Check failure on line 68 in internal/node/nodemachine/machine.go

View workflow job for this annotation

GitHub Actions / assess-code-quality

machine.Destroy undefined (type *NodeMachine has no field or method Destroy) (typecheck)
if err != nil {
return outputs, reports, outputsHash, machineHash, err
}
machine.RollupsMachine = fork
}

return outputs, reports, outputsHash, machineHash, err
}

func (machine *NodeMachine) Inspect(ctx context.Context, query []byte) (
[]rollupsmachine.Report,
error) {

// Controls how many inspects can be concurrently active.
err := machine.inspects.Acquire(ctx, 1)
if err != nil {
return nil, err
}
defer machine.inspects.Release(1)

// Forks the machine.
var forkedMachine *rollupsmachine.RollupsMachine
{
machine.mutex.LLock()
defer machine.mutex.Unlock()
forkedMachine, err = machine.Fork()

Check failure on line 94 in internal/node/nodemachine/machine.go

View workflow job for this annotation

GitHub Actions / assess-code-quality

machine.Fork undefined (type *NodeMachine has no field or method Fork) (typecheck)
if err != nil {
return nil, err
}
}

// Sends the inspect-state request.
reports, err := forkedMachine.Inspect(query)
if err != nil {
return nil, err
}

// Destroys the forked machine and returns the reports.
return reports, forkedMachine.Destroy()
}
56 changes: 56 additions & 0 deletions internal/node/nodemachine/pmutex/pmutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package pmutex

import (
"sync"
"sync/atomic"
)

// A PMutex is a mutual exclusion lock with priority capabilities.
// A call to HLock always acquires the mutex before LLock.
type PMutex struct {
// Main mutex.
mutex *sync.Mutex

// Condition variable for the waiting low-priority threads.
waitingLow *sync.Cond

// Quantity of high-priority threads waiting to acquire the lock.
waitingHigh *atomic.Int32
}

// New creates a new PMutex.
func New() *PMutex {
mutex := &sync.Mutex{}
return &PMutex{
mutex: mutex,
waitingLow: sync.NewCond(mutex),
waitingHigh: &atomic.Int32{},
}
}

// HLock acquires the mutex for the high-priority threads.
func (lock *PMutex) HLock() {
lock.waitingHigh.Add(1)
lock.mutex.Lock()
lock.waitingHigh.Add(-1)
}

// LLock acquires the mutex for the low-priority threads.
// It waits until there are no high-priority threads trying to acquire the lock.
func (lock *PMutex) LLock() {
lock.mutex.Lock()
for lock.waitingHigh.Load() != 0 {
// NOTE: a cond.Wait() releases the lock uppon being called
// and tries to acquire it after being awakened.
lock.waitingLow.Wait()
}
}

// Unlock releases the mutex for both types of threads.
func (lock *PMutex) Unlock() {
lock.waitingLow.Broadcast()
lock.mutex.Unlock()
}

0 comments on commit 45e8bff

Please sign in to comment.