From cbee884abbb86c856be6e3ca915cc1d2de17e1d5 Mon Sep 17 00:00:00 2001
From: Renan Santos <renan.061@gmail.com>
Date: Thu, 29 Aug 2024 14:34:10 -0300
Subject: [PATCH] whisk away

---
 internal/node/advancer/advancer.go          |  44 ++----
 internal/node/advancer/machines/machines.go | 167 ++++++++++++++++++++
 internal/node/model/models.go               |   1 +
 internal/nodemachine/machine.go             |  11 +-
 4 files changed, 189 insertions(+), 34 deletions(-)
 create mode 100644 internal/node/advancer/machines/machines.go

diff --git a/internal/node/advancer/advancer.go b/internal/node/advancer/advancer.go
index 5e13cf756..9b9c20d80 100644
--- a/internal/node/advancer/advancer.go
+++ b/internal/node/advancer/advancer.go
@@ -10,6 +10,7 @@ import (
 	"log/slog"
 	"time"
 
+	"github.com/cartesi/rollups-node/internal/node/advancer/machines"
 	"github.com/cartesi/rollups-node/internal/node/advancer/poller"
 	. "github.com/cartesi/rollups-node/internal/node/model"
 	"github.com/cartesi/rollups-node/internal/nodemachine"
@@ -24,19 +25,19 @@ var (
 )
 
 type Advancer struct {
-	machines   Machines
-	repository Repository
+	machines Machines
+	repo     Repository
 }
 
 // New instantiates a new Advancer.
-func New(machines Machines, repository Repository) (*Advancer, error) {
+func New(machines Machines, repo Repository) (*Advancer, error) {
 	if machines == nil {
 		return nil, ErrInvalidMachines
 	}
-	if repository == nil {
+	if repo == nil {
 		return nil, ErrInvalidRepository
 	}
-	return &Advancer{machines: machines, repository: repository}, nil
+	return &Advancer{machines: machines, repo: repo}, nil
 }
 
 // Poller instantiates a new poller.Poller using the Advancer.
@@ -49,11 +50,11 @@ func (advancer *Advancer) Poller(pollingInterval time.Duration) (*poller.Poller,
 // runs them through the cartesi machine,
 // and updates the repository with the ouputs.
 func (advancer *Advancer) Step(ctx context.Context) error {
-	apps := keysFrom(advancer.machines)
+	apps := advancer.machines.Keys()
 
 	// Gets the unprocessed inputs (of all apps) from the repository.
 	slog.Info("advancer: getting unprocessed inputs")
-	inputs, err := advancer.repository.GetUnprocessedInputs(ctx, apps)
+	inputs, err := advancer.repo.GetUnprocessedInputs(ctx, apps)
 	if err != nil {
 		return err
 	}
@@ -69,7 +70,7 @@ func (advancer *Advancer) Step(ctx context.Context) error {
 
 	// Updates the status of the epochs.
 	for _, app := range apps {
-		err := advancer.repository.UpdateEpochs(ctx, app)
+		err := advancer.repo.UpdateEpochs(ctx, app)
 		if err != nil {
 			return err
 		}
@@ -81,8 +82,8 @@ func (advancer *Advancer) Step(ctx context.Context) error {
 // process sequentially processes inputs from the the application.
 func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error {
 	// Asserts that the app has an associated machine.
-	machine, ok := advancer.machines[app]
-	if !ok {
+	machine := advancer.machines.GetAdvanceMachine(app)
+	if machine == nil {
 		panic(fmt.Errorf("%w %s", ErrNoApp, app.String()))
 	}
 
@@ -101,7 +102,7 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In
 		}
 
 		// Stores the result in the database.
-		err = advancer.repository.StoreAdvanceResult(ctx, input, res)
+		err = advancer.repo.StoreAdvanceResult(ctx, input, res)
 		if err != nil {
 			return err
 		}
@@ -121,22 +122,7 @@ type Repository interface {
 	UpdateEpochs(_ context.Context, app Address) error
 }
 
-// A map of application addresses to machines.
-type Machines = map[Address]Machine
-
-type Machine interface {
-	Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error)
-}
-
-// ------------------------------------------------------------------------------------------------
-
-// keysFrom returns a slice with the keysFrom of a map.
-func keysFrom[K comparable, V any](m map[K]V) []K {
-	keys := make([]K, len(m))
-	i := 0
-	for k := range m {
-		keys[i] = k
-		i++
-	}
-	return keys
+type Machines interface {
+	GetAdvanceMachine(Address) machines.AdvanceMachine
+	Keys() []Address
 }
diff --git a/internal/node/advancer/machines/machines.go b/internal/node/advancer/machines/machines.go
new file mode 100644
index 000000000..a60bd740c
--- /dev/null
+++ b/internal/node/advancer/machines/machines.go
@@ -0,0 +1,167 @@
+// (c) Cartesi and individual authors (see AUTHORS)
+// SPDX-License-Identifier: Apache-2.0 (see LICENSE)
+
+package machines
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"log/slog"
+	"os"
+	"sync"
+
+	"github.com/cartesi/rollups-node/internal/node/config"
+	"github.com/cartesi/rollups-node/internal/node/model"
+	"github.com/cartesi/rollups-node/internal/nodemachine"
+	"github.com/cartesi/rollups-node/internal/repository"
+	"github.com/cartesi/rollups-node/pkg/emulator"
+	"github.com/cartesi/rollups-node/pkg/rollupsmachine"
+	"github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine"
+)
+
+type Repository interface {
+	GetAppData(context.Context) ([]*repository.AppData, error)
+}
+
+type AdvanceMachine interface {
+	Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error)
+}
+
+type InspectMachine interface {
+	Inspect(_ context.Context, query []byte) (*nodemachine.InspectResult, error)
+}
+
+type Machines struct {
+	mutex    sync.RWMutex
+	machines map[model.Address]*nodemachine.NodeMachine
+}
+
+func Load(ctx context.Context, config config.NodeConfig, repo Repository) (*Machines, error) {
+	appData, err := repo.GetAppData(ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	machines := map[model.Address]*nodemachine.NodeMachine{}
+
+	maxConcurrentInspects := config.MaxConcurrentInspects
+
+	serverVerbosity := config.MachineServerVerbosity
+	machineInc := config.MachineIncCycles
+	machineMax := config.MachineMaxCycles
+	machineAdvanceTimeout := config.MachineAdvanceTimeout
+	machineInspectTimeout := config.MachineInspectTimeout
+
+	for _, appData := range appData {
+		appAddress := appData.AppAddress
+		snapshotPath := appData.SnapshotPath
+		snapshotInputIndex := appData.InputIndex
+
+		address, err := cartesimachine.StartServer(serverVerbosity, 0, os.Stdout, os.Stderr)
+		if err != nil {
+			return nil, close(machines)
+		}
+
+		config := &emulator.MachineRuntimeConfig{}
+		cartesiMachine, err := cartesimachine.Load(ctx, snapshotPath, address, config)
+		if err != nil {
+			err = errors.Join(err, cartesimachine.StopServer(address), close(machines))
+			return nil, err
+		}
+
+		rollupsMachine, err := rollupsmachine.New(ctx, cartesiMachine, machineInc, machineMax)
+		if err != nil {
+			err = errors.Join(err, cartesiMachine.Close(ctx), close(machines))
+			return nil, err
+		}
+
+		nodeMachine, err := nodemachine.New(rollupsMachine,
+			snapshotInputIndex,
+			machineAdvanceTimeout,
+			machineInspectTimeout,
+			maxConcurrentInspects)
+		if err != nil {
+			err = errors.Join(err, rollupsMachine.Close(ctx), close(machines))
+			return nil, err
+		}
+
+		machines[appAddress] = nodeMachine
+	}
+
+	return &Machines{machines: machines}, nil
+}
+
+func (m *Machines) GetAdvanceMachine(app model.Address) AdvanceMachine {
+	return m.get(app)
+}
+
+func (m *Machines) GetInspectMachine(app model.Address) InspectMachine {
+	return m.get(app)
+}
+
+func (m *Machines) Set(app model.Address, machine *nodemachine.NodeMachine) bool {
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+
+	if _, ok := m.machines[app]; ok {
+		return false
+	} else {
+		m.machines[app] = machine
+		return true
+	}
+}
+
+func (m *Machines) Remove(app model.Address) *nodemachine.NodeMachine {
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+
+	if machine, ok := m.machines[app]; ok {
+		return nil
+	} else {
+		delete(m.machines, app)
+		return machine
+	}
+}
+
+func (m *Machines) Keys() []model.Address {
+	m.mutex.RLock()
+	defer m.mutex.Unlock()
+
+	keys := make([]model.Address, len(m.machines))
+	i := 0
+	for k := range m.machines {
+		keys[i] = k
+		i++
+	}
+	return keys
+}
+
+func (m *Machines) Close() error {
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+
+	err := close(m.machines)
+	if err != nil {
+		slog.Error(fmt.Sprintf("failed to close some machines: %v", err))
+	}
+	return err
+}
+
+// ------------------------------------------------------------------------------------------------
+
+func (m *Machines) get(app model.Address) *nodemachine.NodeMachine {
+	m.mutex.RLock()
+	defer m.mutex.Unlock()
+	return m.machines[app]
+}
+
+func close(machines map[model.Address]*nodemachine.NodeMachine) (err error) {
+	for _, machine := range machines {
+		err = errors.Join(err, machine.Close())
+	}
+	for app := range machines {
+		delete(machines, app)
+	}
+	return
+}
diff --git a/internal/node/model/models.go b/internal/node/model/models.go
index ab11d1d99..b0bd46c3d 100644
--- a/internal/node/model/models.go
+++ b/internal/node/model/models.go
@@ -65,6 +65,7 @@ type Application struct {
 	Id                 uint64
 	ContractAddress    Address
 	TemplateHash       Hash
+	TemplateUri        string
 	LastProcessedBlock uint64
 	Status             ApplicationStatus
 	IConsensusAddress  Address
diff --git a/internal/nodemachine/machine.go b/internal/nodemachine/machine.go
index 4abf71da6..396d1a0b0 100644
--- a/internal/nodemachine/machine.go
+++ b/internal/nodemachine/machine.go
@@ -34,7 +34,7 @@ type AdvanceResult struct {
 }
 
 type InspectResult struct {
-	InputIndex uint64
+	InputIndex *uint64
 	Accepted   bool
 	Reports    [][]byte
 	Error      error
@@ -44,7 +44,8 @@ type NodeMachine struct {
 	inner rollupsmachine.RollupsMachine
 
 	// Index of the last Input that was processed.
-	lastInputIndex uint64
+	// Can be nil if no inputs were processed.
+	lastInputIndex *uint64
 
 	// How long a call to inner.Advance or inner.Inspect can take.
 	advanceTimeout, inspectTimeout time.Duration
@@ -68,7 +69,7 @@ type NodeMachine struct {
 
 func New(
 	inner rollupsmachine.RollupsMachine,
-	inputIndex uint64,
+	inputIndex *uint64,
 	advanceTimeout time.Duration,
 	inspectTimeout time.Duration,
 	maxConcurrentInspects uint8,
@@ -148,14 +149,14 @@ func (machine *NodeMachine) Advance(ctx context.Context,
 		// Replaces the current machine with the fork and updates lastInputIndex.
 		machine.mutex.HLock()
 		machine.inner = fork
-		machine.lastInputIndex = index
+		machine.lastInputIndex = &index
 		machine.mutex.Unlock()
 	} else {
 		// Closes the forked machine.
 		err = fork.Close(ctx)
 		// Updates lastInputIndex.
 		machine.mutex.HLock()
-		machine.lastInputIndex = index
+		machine.lastInputIndex = &index
 		machine.mutex.Unlock()
 	}