Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 committed Aug 30, 2024
1 parent 490d252 commit 6159eb0
Show file tree
Hide file tree
Showing 11 changed files with 952 additions and 86 deletions.
105 changes: 105 additions & 0 deletions cmd/cartesi-rollups-advancer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package main

import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"

"github.com/cartesi/rollups-node/internal/node/advancer"
"github.com/cartesi/rollups-node/internal/node/advancer/machines"
"github.com/cartesi/rollups-node/internal/node/config"
"github.com/cartesi/rollups-node/internal/node/startup"
"github.com/cartesi/rollups-node/internal/repository"
"github.com/spf13/cobra"
)

const CMD_NAME = "advancer"

var (
buildVersion = "devel"
Cmd = &cobra.Command{
Use: CMD_NAME,
Short: "Runs the Advancer",
Long: "Runs the Advancer in standalone mode",
Run: run,
}
)

func init() {
flags := Cmd.Flags()
flags.BytesHex("application-address", nil, "")
flags.String("server-address", "", "")
flags.String("snapshot", "", "")
flags.Int64("snapshot-input-index", -1, "")
flags.Uint64("machine-inc-cycles", 50_000_000, "")
flags.Uint64("machine-max-cycles", 5_000_000_000, "")
flags.Uint64("machine-advance-timeout", 60, "")
flags.Uint64("machine-inspect-timeout", 10, "")
}

func main() {
err := Cmd.Execute()
if err != nil {
os.Exit(1)
}
}

func getDatabase(ctx context.Context, c config.NodeConfig) (*repository.Database, error) {
err := startup.ValidateSchema(c)
if err != nil {
return nil, fmt.Errorf("invalid database schema: %w", err)
}

database, err := repository.Connect(ctx, c.PostgresEndpoint.Value)
if err != nil {
return nil, fmt.Errorf("failed to connect to the database: %w", err)
}

return database, nil
}

func run(cmd *cobra.Command, args []string) {
startTime := time.Now()

ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

c := config.FromEnv()
startup.ConfigLogs(c)

slog.Info("Starting the Cartesi Rollups Node Advancer", "version", buildVersion, "config", c)

database, err := getDatabase(ctx, c)
if err != nil {
slog.Error(err.Error())
os.Exit(1)
}
defer database.Close()

repo := &repository.AdvancerRepository{Database: database}

machines, err := machines.Load(ctx, c, repo)
if err != nil {
slog.Error(err.Error())
os.Exit(1)
}
defer machines.Close()

advancer, err := advancer.New(machines, repo)

poller, err := advancer.Poller(5 * time.Second)

ready := make(chan struct{}, 1)

if err := poller.Start(ctx, ready); err != nil {
slog.Error("advancer exited with an error", "error", err)
os.Exit(1)
}
}
2 changes: 1 addition & 1 deletion cmd/cartesi-rollups-cli/root/app/add/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func run(cmd *cobra.Command, args []string) {
IConsensusAddress: common.HexToAddress(iConsensusAddress),
}

err := cmdcommom.Database.InsertApplication(ctx, &application)
_, err := cmdcommom.Database.InsertApplication(ctx, &application)
cobra.CheckErr(err)
fmt.Printf("Application %v successfully added\n", application.ContractAddress)
}
58 changes: 24 additions & 34 deletions internal/node/advancer/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log/slog"
"time"

"github.com/cartesi/rollups-node/internal/node/advancer/machines"
"github.com/cartesi/rollups-node/internal/node/advancer/poller"
. "github.com/cartesi/rollups-node/internal/node/model"
"github.com/cartesi/rollups-node/internal/nodemachine"
Expand All @@ -24,19 +25,19 @@ var (
)

type Advancer struct {
machines Machines
repository Repository
machines Machines
repo Repository
}

// New instantiates a new Advancer.
func New(machines Machines, repository Repository) (*Advancer, error) {
func New(machines Machines, repo Repository) (*Advancer, error) {
if machines == nil {
return nil, ErrInvalidMachines
}
if repository == nil {
if repo == nil {
return nil, ErrInvalidRepository
}
return &Advancer{machines: machines, repository: repository}, nil
return &Advancer{machines: machines, repo: repo}, nil
}

// Poller instantiates a new poller.Poller using the Advancer.
Expand All @@ -49,11 +50,11 @@ func (advancer *Advancer) Poller(pollingInterval time.Duration) (*poller.Poller,
// runs them through the cartesi machine,
// and updates the repository with the ouputs.
func (advancer *Advancer) Step(ctx context.Context) error {
apps := keysFrom(advancer.machines)
apps := advancer.machines.Keys()

// Gets the unprocessed inputs (of all apps) from the repository.
slog.Info("advancer: getting unprocessed inputs")
inputs, err := advancer.repository.GetUnprocessedInputs(ctx, apps)
inputs, err := advancer.repo.GetUnprocessedInputs(ctx, apps)
if err != nil {
return err
}
Expand All @@ -67,14 +68,22 @@ func (advancer *Advancer) Step(ctx context.Context) error {
}
}

// Updates the status of the epochs.
for _, app := range apps {
err := advancer.repo.UpdateEpochs(ctx, app)
if err != nil {
return err
}
}

return nil
}

// process sequentially processes inputs from the the application.
func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error {
// Asserts that the app has an associated machine.
machine, ok := advancer.machines[app]
if !ok {
machine := advancer.machines.GetAdvanceMachine(app)
if machine == nil {
panic(fmt.Errorf("%w %s", ErrNoApp, app.String()))
}

Expand All @@ -93,17 +102,13 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In
}

// Stores the result in the database.
err = advancer.repository.StoreAdvanceResult(ctx, input, res)
err = advancer.repo.StoreAdvanceResult(ctx, input, res)
if err != nil {
return err
}
}

// Updates the status of the epochs based on the last processed input.
lastInput := inputs[len(inputs)-1]
err := advancer.repository.UpdateEpochs(ctx, app, lastInput)

return err
return nil
}

// ------------------------------------------------------------------------------------------------
Expand All @@ -114,25 +119,10 @@ type Repository interface {

StoreAdvanceResult(context.Context, *Input, *nodemachine.AdvanceResult) error

UpdateEpochs(_ context.Context, app Address, lastInput *Input) error
UpdateEpochs(_ context.Context, app Address) error
}

// A map of application addresses to machines.
type Machines = map[Address]Machine

type Machine interface {
Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error)
}

// ------------------------------------------------------------------------------------------------

// keysFrom returns a slice with the keysFrom of a map.
func keysFrom[K comparable, V any](m map[K]V) []K {
keys := make([]K, len(m))
i := 0
for k := range m {
keys[i] = k
i++
}
return keys
type Machines interface {
GetAdvanceMachine(Address) machines.AdvanceMachine
Keys() []Address
}
43 changes: 10 additions & 33 deletions internal/node/advancer/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *AdvancerSuite) TestRun() {
res3 := randomAdvanceResult()

repository := &MockRepository{
GetInputsReturn: map[Address][]*Input{
GetUnprocessedInputsReturn: map[Address][]*Input{
app1: {
{Id: 1, RawData: marshal(res1)},
{Id: 2, RawData: marshal(res2)},
Expand All @@ -94,7 +94,9 @@ func (s *AdvancerSuite) TestRun() {
require.Len(repository.StoredResults, 3)
})

// NOTE: missing more test cases
s.Run("Error/UpdateEpochs", func() {
s.T().Skip("TODO")
})
}

func (s *AdvancerSuite) TestProcess() {
Expand Down Expand Up @@ -124,7 +126,6 @@ func (s *AdvancerSuite) TestProcess() {
err := advancer.process(context.Background(), app, inputs)
require.Nil(err)
require.Len(repository.StoredResults, 7)
require.Equal(*inputs[6], repository.LastInput)
})

s.Run("Panic", func() {
Expand Down Expand Up @@ -183,25 +184,7 @@ func (s *AdvancerSuite) TestProcess() {
require.Errorf(err, "store-advance error")
require.Len(repository.StoredResults, 1)
})

s.Run("UpdateEpochs", func() {
require := s.Require()

_, repository, advancer, app := setup()
inputs := []*Input{
{Id: 1, RawData: marshal(randomAdvanceResult())},
{Id: 2, RawData: marshal(randomAdvanceResult())},
{Id: 3, RawData: marshal(randomAdvanceResult())},
{Id: 4, RawData: marshal(randomAdvanceResult())},
}
repository.UpdateEpochsError = errors.New("update-epochs error")

err := advancer.process(context.Background(), app, inputs)
require.Errorf(err, "update-epochs error")
require.Len(repository.StoredResults, 4)
})
})

}

func (s *AdvancerSuite) TestKeysFrom() {
Expand All @@ -228,20 +211,19 @@ func (mock *MockMachine) Advance(
// ------------------------------------------------------------------------------------------------

type MockRepository struct {
GetInputsReturn map[Address][]*Input
GetInputsError error
StoreAdvanceError error
UpdateEpochsError error
GetUnprocessedInputsReturn map[Address][]*Input
GetUnprocessedInputsError error
StoreAdvanceError error
UpdateEpochsError error

StoredResults []*nodemachine.AdvanceResult
LastInput Input
}

func (mock *MockRepository) GetUnprocessedInputs(
_ context.Context,
appAddresses []Address,
) (map[Address][]*Input, error) {
return mock.GetInputsReturn, mock.GetInputsError
return mock.GetUnprocessedInputsReturn, mock.GetUnprocessedInputsError
}

func (mock *MockRepository) StoreAdvanceResult(
Expand All @@ -253,12 +235,7 @@ func (mock *MockRepository) StoreAdvanceResult(
return mock.StoreAdvanceError
}

func (mock *MockRepository) UpdateEpochs(
_ context.Context,
_ Address,
lastInput *Input,
) error {
mock.LastInput = *lastInput
func (mock *MockRepository) UpdateEpochs(_ context.Context, _ Address) error {
return mock.UpdateEpochsError
}

Expand Down
Loading

0 comments on commit 6159eb0

Please sign in to comment.