From 06bcd9cbe41ca6381055e9ca72d87e3fee118694 Mon Sep 17 00:00:00 2001 From: Renan Santos Date: Fri, 19 Jul 2024 13:36:20 -0300 Subject: [PATCH] temp --- internal/node/machineadvancer/advancer.go | 16 ++- .../node/machineadvancer/advancer_test.go | 67 +++++------ internal/node/machineadvancer/repository.go | 38 +------ internal/node/model/machine.go | 14 --- internal/node/nodemachine/machine.go | 106 ++++++++++++------ 5 files changed, 114 insertions(+), 127 deletions(-) delete mode 100644 internal/node/model/machine.go diff --git a/internal/node/machineadvancer/advancer.go b/internal/node/machineadvancer/advancer.go index 04b6b5db5..4c301b6d3 100644 --- a/internal/node/machineadvancer/advancer.go +++ b/internal/node/machineadvancer/advancer.go @@ -4,20 +4,21 @@ package machineadvancer import ( + "context" "errors" "fmt" "time" - . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/node/nodemachine" ) type Machine interface { - Advance(input []byte) (*nodemachine.AdvanceResponse, error) + Advance(_ context.Context, input []byte) (*nodemachine.AdvanceResponse, error) } type MachineAdvancer struct { - machines map[Address]Machine + machines map[model.Address]Machine repository Repository ticker *time.Ticker } @@ -32,7 +33,7 @@ var ( // Duration must be greater than 0. func New( - machines map[Address]Machine, + machines map[model.Address]Machine, repository Repository, pollingInterval time.Duration, ) (*MachineAdvancer, error) { @@ -69,15 +70,12 @@ func (advancer *MachineAdvancer) Start() error { // Processes all inputs sequentially. for _, input := range inputs { - res, err := machine.Advance(input.Payload) + res, err := machine.Advance(context.Background(), input.RawData) if err != nil { return err } - inputStatus, err := inputStatusFromResponse(res) - fmt.Println(inputStatus) - err = advancer.repository.Store( - input, res.Outputs, res.Reports, res.OutputsHash, res.MachineHash, res.Status) + err = advancer.repository.Store(input, res) if err != nil { return err } diff --git a/internal/node/machineadvancer/advancer_test.go b/internal/node/machineadvancer/advancer_test.go index 7839b8071..970e4a433 100644 --- a/internal/node/machineadvancer/advancer_test.go +++ b/internal/node/machineadvancer/advancer_test.go @@ -4,16 +4,16 @@ package machineadvancer import ( + "context" crand "crypto/rand" "errors" mrand "math/rand" "testing" "time" - . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/node/nodemachine" - "github.com/cartesi/rollups-node/pkg/rollupsmachine" "github.com/stretchr/testify/suite" ) @@ -26,7 +26,7 @@ type MachineAdvancerSuite struct{ suite.Suite } func (s *MachineAdvancerSuite) TestNew() { s.Run("Ok", func() { require := s.Require() - machines := map[Address]Machine{randomAddress(): newMockMachine()} + machines := map[model.Address]Machine{randomAddress(): newMockMachine()} repository := newMockRepository() machineAdvancer, err := New(machines, repository, time.Nanosecond) require.NotNil(machineAdvancer) @@ -43,7 +43,7 @@ func (s *MachineAdvancerSuite) TestNew() { s.Run("InvalidRepository", func() { require := s.Require() - machines := map[Address]Machine{randomAddress(): newMockMachine()} + machines := map[model.Address]Machine{randomAddress(): newMockMachine()} machineAdvancer, err := New(machines, nil, time.Nanosecond) require.Nil(machineAdvancer) require.Equal(ErrInvalidRepository, err) @@ -51,7 +51,7 @@ func (s *MachineAdvancerSuite) TestNew() { s.Run("InvalidPollingInterval", func() { require := s.Require() - machines := map[Address]Machine{randomAddress(): newMockMachine()} + machines := map[model.Address]Machine{randomAddress(): newMockMachine()} repository := newMockRepository() machineAdvancer, err := New(machines, repository, time.Duration(0)) require.Nil(machineAdvancer) @@ -67,16 +67,16 @@ func (s *MachineAdvancerSuite) TestStart() { type StartSuite struct { suite.Suite - machines map[Address]Machine + machines map[model.Address]Machine repository *MockRepository } func (s *StartSuite) SetupTest() { - s.machines = map[Address]Machine{} + s.machines = map[model.Address]Machine{} s.repository = newMockRepository() } -// TODO: this test is absurdly basic. We need more tests. +// NOTE: This test is very basic! We need more tests! func (s *StartSuite) TestBasic() { require := s.Require() @@ -87,7 +87,7 @@ func (s *StartSuite) TestBasic() { machine.add(advanceResponse, nil) s.machines[appAddress] = machine - s.repository.add(map[Address][]MachineInput{appAddress: randomInputs(1)}, nil, nil) + s.repository.add(map[model.Address][]model.Input{appAddress: randomInputs(1)}, nil, nil) machineAdvancer, err := New(s.machines, s.repository, time.Nanosecond) require.NotNil(machineAdvancer) @@ -121,7 +121,10 @@ func (m *MockMachine) add(result *nodemachine.AdvanceResponse, err error) { m.errors = append(m.errors, err) } -func (m *MockMachine) Advance(input []byte) (*nodemachine.AdvanceResponse, error) { +func (m *MockMachine) Advance( + _ context.Context, + input []byte, +) (*nodemachine.AdvanceResponse, error) { result, err := m.results[m.index], m.errors[m.index] m.index += 1 return result, err @@ -131,7 +134,7 @@ func (m *MockMachine) Advance(input []byte) (*nodemachine.AdvanceResponse, error type MockRepository struct { getInputsIndex uint8 - getInputsResults []map[Address][]MachineInput + getInputsResults []map[model.Address][]model.Input getInputsErrors []error storeIndex uint8 @@ -142,7 +145,7 @@ type MockRepository struct { func newMockRepository() *MockRepository { return &MockRepository{ getInputsIndex: 0, - getInputsResults: []map[Address][]MachineInput{}, + getInputsResults: []map[model.Address][]model.Input{}, getInputsErrors: []error{}, storeIndex: 0, storeErrors: []error{}, @@ -151,7 +154,7 @@ func newMockRepository() *MockRepository { } func (r *MockRepository) add( - getInputsResult map[Address][]MachineInput, + getInputsResult map[model.Address][]model.Input, getInputsError error, storeError error, ) { @@ -162,7 +165,9 @@ func (r *MockRepository) add( var testFinished = errors.New("test finished") -func (r *MockRepository) GetInputs(appAddresses []Address) (map[Address][]MachineInput, error) { +func (r *MockRepository) GetInputs( + appAddresses []model.Address, +) (map[model.Address][]model.Input, error) { if int(r.getInputsIndex) == len(r.getInputsResults) { return nil, testFinished } @@ -171,45 +176,31 @@ func (r *MockRepository) GetInputs(appAddresses []Address) (map[Address][]Machin return result, err } -func (r *MockRepository) Store( - input MachineInput, - outputs []MachineOutput, - reports []MachineReport, - outputsHash Hash, - machineHash Hash, - status rollupsmachine.Status) error { - +func (r *MockRepository) Store(input model.Input, res *nodemachine.AdvanceResponse) error { err := r.storeErrors[r.storeIndex] r.storeIndex += 1 - r.stored = append(r.stored, &nodemachine.AdvanceResponse{ - Outputs: outputs, - Reports: reports, - OutputsHash: outputsHash, - MachineHash: machineHash, - Status: status, - }) - + r.stored = append(r.stored, res) return err } // ------------------------------------------------------------------------------------------------ -func randomAddress() Address { +func randomAddress() model.Address { address := make([]byte, 20) _, err := crand.Read(address) if err != nil { panic(err) } - return Address(address) + return model.Address(address) } -func randomHash() Hash { +func randomHash() model.Hash { hash := make([]byte, 32) _, err := crand.Read(hash) if err != nil { panic(err) } - return Hash(hash) + return model.Hash(hash) } func randomBytes() []byte { @@ -231,10 +222,10 @@ func randomSliceOfBytes() [][]byte { return slice } -func randomInputs(size int) []MachineInput { - slice := make([]MachineInput, size) +func randomInputs(size int) []model.Input { + slice := make([]model.Input, size) for i := 0; i < size; i++ { - slice[i] = MachineInput{ID: uint64(i), Payload: randomBytes()} + slice[i] = model.Input{Id: uint64(i), RawData: randomBytes()} } return slice @@ -242,10 +233,10 @@ func randomInputs(size int) []MachineInput { func randomAdvanceResponse() *nodemachine.AdvanceResponse { return &nodemachine.AdvanceResponse{ + Status: model.InputStatusAccepted, Outputs: randomSliceOfBytes(), Reports: randomSliceOfBytes(), OutputsHash: randomHash(), MachineHash: randomHash(), - Status: rollupsmachine.RequestAccepted, } } diff --git a/internal/node/machineadvancer/repository.go b/internal/node/machineadvancer/repository.go index c38ad3204..252f1061c 100644 --- a/internal/node/machineadvancer/repository.go +++ b/internal/node/machineadvancer/repository.go @@ -4,43 +4,13 @@ package machineadvancer import ( - "errors" - - . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/node/nodemachine" - "github.com/cartesi/rollups-node/pkg/rollupsmachine" ) type Repository interface { - GetInputs(appAddresses []Address) (map[Address][]MachineInput, error) - - Store(input MachineInput, - outputs []MachineOutput, - reports []MachineReport, - outputsHash Hash, - machineHash Hash, - status rollupsmachine.Status) error -} + // Only needs Id and RawData fields from model.Input. + GetInputs(appAddresses []model.Address) (map[model.Address][]model.Input, error) -func inputStatusFromResponse(res *nodemachine.AdvanceResponse) (InputCompletionStatus, error) { - switch res.Status { - case rollupsmachine.RequestAccepted: - return InputStatusAccepted, nil - case rollupsmachine.RequestRejected: - return InputStatusRejected, nil - case rollupsmachine.RequestException: - return InputStatusException, nil - case rollupsmachine.MachineHalted: - return InputStatusMachineHalted, nil - case rollupsmachine.CycleLimitExceeded: - return InputStatusCycleLimitExceeded, nil - case rollupsmachine.TimeLimitExceeded: - return InputStatusTimeLimitExceeded, nil - case rollupsmachine.PayloadLengthLimitExceeded: - return InputStatusPayloadLengthLimitExceeded, nil - case rollupsmachine.OutputsLimitExceeded: - return "TODO", errors.New("TODO") - default: - return "TODO", errors.New("TODO") - } + Store(model.Input, *nodemachine.AdvanceResponse) error } diff --git a/internal/node/model/machine.go b/internal/node/model/machine.go deleted file mode 100644 index 099f89558..000000000 --- a/internal/node/model/machine.go +++ /dev/null @@ -1,14 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -package model - -type MachineInput struct { - ID uint64 - Payload []byte -} - -type ( - MachineOutput = []byte - MachineReport = []byte -) diff --git a/internal/node/nodemachine/machine.go b/internal/node/nodemachine/machine.go index d96695753..029b81901 100644 --- a/internal/node/nodemachine/machine.go +++ b/internal/node/nodemachine/machine.go @@ -5,6 +5,8 @@ package nodemachine import ( "context" + "errors" + "time" "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/node/nodemachine/pmutex" @@ -13,6 +15,8 @@ import ( "golang.org/x/sync/semaphore" ) +var ErrTimeLimitExceeded = errors.New("time limit exceeded") + type AdvanceResponse struct { Status model.InputCompletionStatus Outputs [][]byte @@ -21,9 +25,14 @@ type AdvanceResponse struct { MachineHash model.Hash } +func (res AdvanceResponse) StatusOk() bool { + return res.Status == model.InputStatusAccepted || res.Status == model.InputStatusRejected +} + type InspectResponse struct { Accepted bool Reports [][]byte + Err error } type RollupsMachine interface { @@ -39,6 +48,9 @@ type RollupsMachine interface { type NodeMachine struct { RollupsMachine + // Timeout in seconds. + timeout time.Duration + // Ensures advance/inspect mutual exclusion when accessing the inner RollupsMachine. // Advances have a higher priority than Inspects to acquire the lock. mutex *pmutex.PMutex @@ -55,9 +67,9 @@ func New(rollupsMachine RollupsMachine, maxConcurrentInspects int8) *NodeMachine } } -func (machine *NodeMachine) Advance(input []byte) (_ *AdvanceResponse, err error) { +func (machine *NodeMachine) Advance(ctx context.Context, input []byte) (*AdvanceResponse, error) { var fork RollupsMachine - res := AdvanceResponse{} + var err error { // Forks the machine. machine.mutex.HLock() @@ -68,42 +80,45 @@ func (machine *NodeMachine) Advance(input []byte) (_ *AdvanceResponse, err error } } - { // Sends the advance-state request. + // Sends the advance-state request to the forked machine. + res, err, timedOut := runWithTimeout(ctx, machine.timeout, func() (*AdvanceResponse, error) { accepted, outputs, reports, outputsHash, err := fork.Advance(input) status, err := toInputStatus(accepted, err) if err != nil { return nil, err } - res.Status = status - res.Outputs = outputs - res.Reports = reports - res.OutputsHash = outputsHash + return &AdvanceResponse{ + Status: status, + Outputs: outputs, + Reports: reports, + OutputsHash: outputsHash, + }, nil + }) + if err != nil { + goto end + } + if timedOut { + res = &AdvanceResponse{Status: model.InputStatusTimeLimitExceeded} } // Only gets the post-advance machine hash if the request was accepted. if res.Status == model.InputStatusAccepted { res.MachineHash, err = fork.Hash() if err != nil { - return nil, err + goto end } } - // If the forked machine is in a valid state. - if res.Status == model.InputStatusAccepted || res.Status == model.InputStatusRejected { - // Destroys the old machine and replaces the current machine with the forked machine. + // If the forked machine is in a valid state: + if res.StatusOk() { + // Switches the current machine and the forked machine. machine.mutex.HLock() defer machine.mutex.Unlock() - err = machine.Close() - if err != nil { - return nil, err - } - rollupsmachine.StopServer() - machine.RollupsMachine = fork - } else { - // Destroys the forked machine. + fork, machine.RollupsMachine = machine.RollupsMachine, fork } - return &res, nil +end: + return res, errors.Join(err, fork.Close()) } func (machine *NodeMachine) Inspect(ctx context.Context, query []byte) (*InspectResponse, error) { @@ -114,9 +129,9 @@ func (machine *NodeMachine) Inspect(ctx context.Context, query []byte) (*Inspect } defer machine.inspects.Release(1) - // Forks the machine. var fork RollupsMachine - { + + { // Forks the machine. machine.mutex.LLock() defer machine.mutex.Unlock() fork, err = machine.RollupsMachine.Fork() @@ -125,15 +140,16 @@ func (machine *NodeMachine) Inspect(ctx context.Context, query []byte) (*Inspect } } - // Sends the inspect-state request. - reports, status, err := fork.Inspect(query) - if err != nil { - return nil, err + // Sends the inspect-state request to the forked machine. + res, _, timedOut := runWithTimeout(ctx, machine.timeout, func() (*InspectResponse, error) { + accepted, reports, err := fork.Inspect(query) + return &InspectResponse{Accepted: accepted, Reports: reports, Err: err}, nil + }) + if timedOut { + res = &InspectResponse{Err: ErrTimeLimitExceeded} } - // Destroys the forked machine and returns the reports. - err = fork.Destroy() - return &InspectResponse{Reports: reports, Status: status}, err + return res, fork.Close() } // ------------------------------------------------------------------------------------------------ @@ -163,8 +179,34 @@ func toInputStatus(accepted bool, err error) (status model.InputCompletionStatus } // ErrPayloadLengthLimitExceeded - // ErrTimeLimitExceeded - - // InputStatusTimeLimitExceeded // InputStatusPayloadLengthLimitExceeded } + +func runWithTimeout[T any]( + ctx context.Context, + timeout time.Duration, + f func() (*T, error), +) (_ *T, _ error, timedOut bool) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + success := make(chan *T, 1) + failure := make(chan error, 1) + go func() { + t, err := f() + if err != nil { + failure <- err + } else { + success <- t + } + }() + + select { + case <-ctx.Done(): + return nil, nil, true + case t := <-success: + return t, nil, false + case err := <-failure: + return nil, err, false + } +}