From e197c24e436d46f56bb1077a0e67485c02ff38e3 Mon Sep 17 00:00:00 2001 From: Renan Santos Date: Wed, 21 Aug 2024 16:41:51 -0300 Subject: [PATCH] feat(advancer): add the advancer's repository --- .../root/db/check/check.go | 13 +- .../root/db/upgrade/upgrade.go | 19 +- internal/node/advancer/advancer.go | 16 +- internal/node/startup/startup.go | 29 +-- internal/repository/advancer.go | 221 +++++++++++++++++ internal/repository/advancer_test.go | 155 ++++++++++++ internal/repository/base.go | 17 +- internal/repository/base_test.go | 6 +- ...ut_claim_output_report_nodeconfig.down.sql | 1 + ...nput_claim_output_report_nodeconfig.up.sql | 31 ++- .../000002_create_postgraphile_view.down.sql | 2 +- .../000002_create_postgraphile_view.up.sql | 0 internal/repository/schema/schema.go | 85 +++++++ internal/repository/schemamanager.go | 102 -------- test/advancer/advancer_test.go | 232 ++++++++++++++++++ test/tooling/db/db.go | 74 ++++++ 16 files changed, 840 insertions(+), 163 deletions(-) create mode 100644 internal/repository/advancer.go create mode 100644 internal/repository/advancer_test.go rename internal/repository/{ => schema}/migrations/000001_create_application_input_claim_output_report_nodeconfig.down.sql (92%) rename internal/repository/{ => schema}/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql (84%) rename internal/repository/{ => schema}/migrations/000002_create_postgraphile_view.down.sql (78%) rename internal/repository/{ => schema}/migrations/000002_create_postgraphile_view.up.sql (100%) create mode 100644 internal/repository/schema/schema.go delete mode 100644 internal/repository/schemamanager.go create mode 100644 test/advancer/advancer_test.go create mode 100644 test/tooling/db/db.go diff --git a/cmd/cartesi-rollups-cli/root/db/check/check.go b/cmd/cartesi-rollups-cli/root/db/check/check.go index 3c5fc7df8..95aa5d2d6 100644 --- a/cmd/cartesi-rollups-cli/root/db/check/check.go +++ b/cmd/cartesi-rollups-cli/root/db/check/check.go @@ -3,10 +3,10 @@ package check import ( - "fmt" + "log/slog" "github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/common" - "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/internal/repository/schema" "github.com/spf13/cobra" ) @@ -17,13 +17,12 @@ var Cmd = &cobra.Command{ } func run(cmd *cobra.Command, args []string) { - - schemaManager, err := repository.NewSchemaManager(common.PostgresEndpoint) + schema, err := schema.New(common.PostgresEndpoint) cobra.CheckErr(err) - defer schemaManager.Close() + defer schema.Close() - err = schemaManager.ValidateSchemaVersion() + version, err := schema.ValidateVersion() cobra.CheckErr(err) - fmt.Printf("Database Schema is at the correct version: %d\n", repository.EXPECTED_VERSION) + slog.Info("Database Schema is at the correct version.", "version", version) } diff --git a/cmd/cartesi-rollups-cli/root/db/upgrade/upgrade.go b/cmd/cartesi-rollups-cli/root/db/upgrade/upgrade.go index 6783cccd7..20b443925 100644 --- a/cmd/cartesi-rollups-cli/root/db/upgrade/upgrade.go +++ b/cmd/cartesi-rollups-cli/root/db/upgrade/upgrade.go @@ -3,11 +3,10 @@ package upgrade import ( - "fmt" "log/slog" "github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/common" - "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/internal/repository/schema" "github.com/spf13/cobra" ) @@ -18,21 +17,15 @@ var Cmd = &cobra.Command{ } func run(cmd *cobra.Command, args []string) { - - schemaManager, err := repository.NewSchemaManager(common.PostgresEndpoint) + schema, err := schema.New(common.PostgresEndpoint) cobra.CheckErr(err) - defer schemaManager.Close() + defer schema.Close() - err = schemaManager.Upgrade() + err = schema.Up() cobra.CheckErr(err) - version, err := schemaManager.GetVersion() + version, err := schema.ValidateVersion() cobra.CheckErr(err) - if repository.EXPECTED_VERSION != version { - slog.Warn("Current version is different to expected one") - } - - fmt.Printf("Database Schema successfully Updated. Current version is %d\n", version) - + slog.Info("Database Schema successfully Updated.", "version", version) } diff --git a/internal/node/advancer/advancer.go b/internal/node/advancer/advancer.go index 32ef675ab..5e13cf756 100644 --- a/internal/node/advancer/advancer.go +++ b/internal/node/advancer/advancer.go @@ -67,6 +67,14 @@ func (advancer *Advancer) Step(ctx context.Context) error { } } + // Updates the status of the epochs. + for _, app := range apps { + err := advancer.repository.UpdateEpochs(ctx, app) + if err != nil { + return err + } + } + return nil } @@ -99,11 +107,7 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In } } - // Updates the status of the epochs based on the last processed input. - lastInput := inputs[len(inputs)-1] - err := advancer.repository.UpdateEpochs(ctx, app, lastInput) - - return err + return nil } // ------------------------------------------------------------------------------------------------ @@ -114,7 +118,7 @@ type Repository interface { StoreAdvanceResult(context.Context, *Input, *nodemachine.AdvanceResult) error - UpdateEpochs(_ context.Context, app Address, lastInput *Input) error + UpdateEpochs(_ context.Context, app Address) error } // A map of application addresses to machines. diff --git a/internal/node/startup/startup.go b/internal/node/startup/startup.go index 146448fa0..6555be92b 100644 --- a/internal/node/startup/startup.go +++ b/internal/node/startup/startup.go @@ -12,6 +12,7 @@ import ( "github.com/cartesi/rollups-node/internal/node/config" "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/internal/repository/schema" "github.com/ethereum/go-ethereum/common" "github.com/jackc/pgx/v5" "github.com/lmittmann/tint" @@ -20,31 +21,19 @@ import ( // Validates the Node Database Schema Version func ValidateSchema(config config.NodeConfig) error { - var ( - schemaManager *repository.SchemaManager - err error - ) - - if !config.PostgresSslMode { - schemaManager, err = repository.NewSchemaManager( - fmt.Sprintf("%v?sslmode=disable", config.PostgresEndpoint.Value)) - if err != nil { - return err - } - } else { - schemaManager, err = repository.NewSchemaManager(config.PostgresEndpoint.Value) - if err != nil { - return err - } + endpoint := config.PostgresEndpoint.Value + if config.PostgresSslMode { + endpoint += "?sslmode=disable" } - defer schemaManager.Close() - err = schemaManager.ValidateSchemaVersion() + + schema, err := schema.New(endpoint) if err != nil { return err } + defer schema.Close() - return nil - + _, err = schema.ValidateVersion() + return err } // Configure the node logs diff --git a/internal/repository/advancer.go b/internal/repository/advancer.go new file mode 100644 index 000000000..6c74f225c --- /dev/null +++ b/internal/repository/advancer.go @@ -0,0 +1,221 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package repository + +import ( + "context" + "errors" + "fmt" + "strings" + + . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/internal/nodemachine" + "github.com/jackc/pgx/v5" +) + +var ErrAdvancerRepository = errors.New("advancer repository error") + +type AdvancerRepository struct{ *Database } + +func (repo *AdvancerRepository) GetInputs( + ctx context.Context, + apps []Address, +) (map[Address][]*Input, error) { + result := map[Address][]*Input{} + if len(apps) == 0 { + return result, nil + } + + query := fmt.Sprintf(` + SELECT id, application_address, raw_data + FROM input + WHERE status = 'NONE' + AND application_address IN %s + ORDER BY index ASC, application_address + `, toSqlIn(apps)) // NOTE: not sanitized + rows, err := repo.db.Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("%w (failed querying inputs): %w", ErrAdvancerRepository, err) + } + + var input Input + scans := []any{&input.Id, &input.AppAddress, &input.RawData} + _, err = pgx.ForEachRow(rows, scans, func() error { + input := input + if _, ok := result[input.AppAddress]; ok { //nolint:gosimple + result[input.AppAddress] = append(result[input.AppAddress], &input) + } else { + result[input.AppAddress] = []*Input{&input} + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("%w (failed reading input rows): %w", ErrAdvancerRepository, err) + } + + return result, nil +} + +func (repo *AdvancerRepository) StoreResults( + ctx context.Context, + input *Input, + res *nodemachine.AdvanceResult, +) error { + tx, err := repo.db.Begin(ctx) + if err != nil { + return errors.Join(ErrBeginTx, err) + } + + // Inserts the outputs. + nextOutputIndex, err := repo.getNextIndex(ctx, tx, "output", input.AppAddress) + if err != nil { + return err + } + err = repo.insert(ctx, tx, "output", res.Outputs, input.Id, nextOutputIndex) + if err != nil { + return err + } + + // Inserts the reports. + nextReportIndex, err := repo.getNextIndex(ctx, tx, "report", input.AppAddress) + if err != nil { + return err + } + err = repo.insert(ctx, tx, "report", res.Reports, input.Id, nextReportIndex) + if err != nil { + return err + } + + // Updates the input's status. + err = repo.updateInput(ctx, tx, input.Id, res.Status, res.OutputsHash, res.MachineHash) + if err != nil { + return err + } + + err = tx.Commit(ctx) + if err != nil { + return errors.Join(ErrCommitTx, err, tx.Rollback(ctx)) + } + + return nil +} + +func (repo *AdvancerRepository) UpdateEpochs(ctx context.Context, app Address) error { + query := ` + UPDATE epoch + SET status = 'PROCESSED_ALL_INPUTS' + WHERE id IN (( + SELECT DISTINCT epoch.id + FROM epoch INNER JOIN input ON (epoch.id = input.epoch_id) + WHERE epoch.application_address = @applicationAddress + AND epoch.status = 'CLOSED' + AND input.status != 'NONE' + ) EXCEPT ( + SELECT DISTINCT epoch.id + FROM epoch INNER JOIN input ON (epoch.id = input.epoch_id) + WHERE epoch.application_address = @applicationAddress + AND epoch.status = 'CLOSED' + AND input.status = 'NONE')) + ` + args := pgx.NamedArgs{"applicationAddress": app} + _, err := repo.db.Exec(ctx, query, args) + if err != nil { + return errors.Join(ErrUpdateRow, err) + } + return nil +} + +// ------------------------------------------------------------------------------------------------ + +func (_ *AdvancerRepository) getNextIndex( + ctx context.Context, + tx pgx.Tx, + tableName string, + appAddress Address, +) (uint64, error) { + var nextIndex uint64 + query := fmt.Sprintf(` + SELECT COALESCE(MAX(%s.index) + 1, 0) + FROM input INNER JOIN %s ON input.id = %s.input_id + WHERE input.status = 'ACCEPTED' + AND input.application_address = $1 + `, tableName, tableName, tableName) + err := tx.QueryRow(ctx, query, appAddress).Scan(&nextIndex) + if err != nil { + err = fmt.Errorf("failed to get the next %s index: %w", tableName, err) + return 0, errors.Join(err, tx.Rollback(ctx)) + } + return nextIndex, nil +} + +func (_ *AdvancerRepository) insert( + ctx context.Context, + tx pgx.Tx, + tableName string, + dataArray [][]byte, + inputId uint64, + nextIndex uint64, +) error { + lenOutputs := int64(len(dataArray)) + if lenOutputs < 1 { + return nil + } + + rows := [][]any{} + for i, data := range dataArray { + rows = append(rows, []any{inputId, nextIndex + uint64(i), data}) + } + + count, err := tx.CopyFrom( + ctx, + pgx.Identifier{tableName}, + []string{"input_id", "index", "raw_data"}, + pgx.CopyFromRows(rows), + ) + if err != nil { + return errors.Join(ErrCopyFrom, err, tx.Rollback(ctx)) + } + if lenOutputs != count { + err := fmt.Errorf("not all %ss were inserted (%d != %d)", tableName, lenOutputs, count) + return errors.Join(err, tx.Rollback(ctx)) + } + + return nil +} + +func (_ *AdvancerRepository) updateInput( + ctx context.Context, + tx pgx.Tx, + inputId uint64, + status InputCompletionStatus, + outputsHash Hash, + machineHash *Hash, +) error { + query := ` + UPDATE input + SET (status, outputs_hash, machine_hash) = (@status, @outputsHash, @machineHash) + WHERE id = @id + ` + args := pgx.NamedArgs{ + "status": status, + "outputsHash": outputsHash, + "machineHash": machineHash, + "id": inputId, + } + _, err := tx.Exec(ctx, query, args) + if err != nil { + return errors.Join(ErrUpdateRow, err, tx.Rollback(ctx)) + } + return nil +} + +// ------------------------------------------------------------------------------------------------ + +func toSqlIn[T fmt.Stringer](a []T) string { + s := []string{} + for _, x := range a { + s = append(s, fmt.Sprintf("'\\x%s'", x.String()[2:])) + } + return fmt.Sprintf("(%s)", strings.Join(s, ", ")) +} diff --git a/internal/repository/advancer_test.go b/internal/repository/advancer_test.go new file mode 100644 index 000000000..1eb457ff2 --- /dev/null +++ b/internal/repository/advancer_test.go @@ -0,0 +1,155 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package repository + +import ( + "context" + "testing" + + . "github.com/cartesi/rollups-node/internal/node/model" + + "github.com/cartesi/rollups-node/pkg/rollupsmachine" + "github.com/cartesi/rollups-node/test/tooling/db" + "github.com/ethereum/go-ethereum/common" + + "github.com/stretchr/testify/require" +) + +func TestAdvancerRepository(t *testing.T) { + ctx := context.Background() + + endpoint, err := db.Setup(ctx) + require.Nil(t, err) + + database, err := Connect(ctx, endpoint) + require.Nil(t, err) + require.NotNil(t, database) + + app, _, _, err := populate(database) + require.Nil(t, err) + + repository := &AdvancerRepository{Database: database} + + t.Run("UpdateEpochs", func(t *testing.T) { + require := require.New(t) + + err = repository.UpdateEpochs(ctx, app.ContractAddress) + require.Nil(err) + + epoch0, err := repository.GetEpoch(ctx, 0, app.ContractAddress) + require.Nil(err) + require.NotNil(epoch0) + + epoch1, err := repository.GetEpoch(ctx, 1, app.ContractAddress) + require.Nil(err) + require.NotNil(epoch1) + + epoch2, err := repository.GetEpoch(ctx, 2, app.ContractAddress) + require.Nil(err) + require.NotNil(epoch2) + + epoch3, err := repository.GetEpoch(ctx, 3, app.ContractAddress) + require.Nil(err) + require.NotNil(epoch3) + + require.Equal(EpochStatusProcessedAllInputs, epoch0.Status) + require.Equal(EpochStatusProcessedAllInputs, epoch1.Status) + require.Equal(EpochStatusClosed, epoch2.Status) + require.Equal(EpochStatusOpen, epoch3.Status) + }) +} + +// ------------------------------------------------------------------------------------------------ + +func populate(database *Database) (*Application, []*Epoch, []*Input, error) { + ctx := context.Background() + + app := &Application{ + ContractAddress: common.HexToAddress("deadbeef"), + IConsensusAddress: common.HexToAddress("beefdead"), + TemplateHash: [32]byte{}, + LastProcessedBlock: 0, + Status: "RUNNING", + } + + err := database.InsertApplication(ctx, app) + if err != nil { + return nil, nil, nil, err + } + + epochs := []*Epoch{{ + FirstBlock: 0, + LastBlock: 1, + Status: EpochStatusClosed, + }, { + FirstBlock: 2, + LastBlock: 3, + Status: EpochStatusClosed, + }, { + FirstBlock: 4, + LastBlock: 5, + Status: EpochStatusClosed, + }, { + FirstBlock: 6, + LastBlock: 7, + Status: EpochStatusOpen, + }} + + for i, epoch := range epochs { + epoch.Index = uint64(i) + epoch.AppAddress = app.ContractAddress + epoch.Id, err = database.InsertEpoch(ctx, epoch) + if err != nil { + return nil, nil, nil, err + } + } + + inputs := []*Input{{ + EpochId: epochs[0].Id, + CompletionStatus: InputStatusAccepted, + RawData: []byte("first input"), + }, { + EpochId: epochs[0].Id, + CompletionStatus: InputStatusRejected, + RawData: []byte("second input"), + }, { + EpochId: epochs[1].Id, + CompletionStatus: InputStatusException, + RawData: []byte("third input"), + }, { + EpochId: epochs[1].Id, + CompletionStatus: InputStatusAccepted, + RawData: []byte("fourth input"), + }, { + EpochId: epochs[2].Id, + CompletionStatus: InputStatusAccepted, + RawData: []byte("fifth input"), + }, { + EpochId: epochs[2].Id, + CompletionStatus: InputStatusNone, + RawData: []byte("sixth input"), + }, { + EpochId: epochs[3].Id, + CompletionStatus: InputStatusNone, + RawData: []byte("seventh input"), + }} + + for i, input := range inputs { + input.Index = uint64(i) + input.BlockNumber = uint64(i) + input.AppAddress = app.ContractAddress + + input.RawData, err = rollupsmachine.Input{Data: input.RawData}.Encode() + if err != nil { + return nil, nil, nil, err + } + + input.Id, err = database.InsertInput(ctx, input) + if err != nil { + return nil, nil, nil, err + } + } + + return app, epochs, inputs, nil +} diff --git a/internal/repository/base.go b/internal/repository/base.go index 10f688bc5..3cc6cf4d0 100644 --- a/internal/repository/base.go +++ b/internal/repository/base.go @@ -19,7 +19,14 @@ type Database struct { db *pgxpool.Pool } -var ErrInsertRow = errors.New("unable to insert row") +var ( + ErrInsertRow = errors.New("unable to insert row") + ErrUpdateRow = errors.New("unable to update row") + ErrCopyFrom = errors.New("unable to COPY FROM") + + ErrBeginTx = errors.New("unable to begin transaction") + ErrCommitTx = errors.New("unable to commit transaction") +) func Connect( ctx context.Context, @@ -119,7 +126,6 @@ func (pg *Database) InsertEpoch( ctx context.Context, epoch *Epoch, ) (uint64, error) { - var id uint64 query := ` @@ -140,7 +146,8 @@ func (pg *Database) InsertEpoch( @status, @applicationAddress) RETURNING - id` + id + ` args := pgx.NamedArgs{ "index": epoch.Index, @@ -283,7 +290,9 @@ func (pg *Database) InsertSnapshot( (@inputId, @appAddress, @uri) - RETURNING id` + RETURNING + id + ` args := pgx.NamedArgs{ "inputId": snapshot.InputId, diff --git a/internal/repository/base_test.go b/internal/repository/base_test.go index be5cd122f..a6459d4e2 100644 --- a/internal/repository/base_test.go +++ b/internal/repository/base_test.go @@ -10,6 +10,8 @@ import ( "time" . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/internal/repository/schema" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go" @@ -38,10 +40,10 @@ func (s *RepositorySuite) SetupSuite() { endpoint, err := s.postgres.ConnectionString(s.ctx, "sslmode=disable") s.Require().Nil(err) - schemaManager, err := NewSchemaManager(endpoint) + schema, err := schema.New(endpoint) s.Require().Nil(err) - err = schemaManager.Upgrade() + err = schema.Up() s.Require().Nil(err) s.database, err = Connect(s.ctx, endpoint) diff --git a/internal/repository/migrations/000001_create_application_input_claim_output_report_nodeconfig.down.sql b/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.down.sql similarity index 92% rename from internal/repository/migrations/000001_create_application_input_claim_output_report_nodeconfig.down.sql rename to internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.down.sql index 2c4628a58..62147fc4b 100644 --- a/internal/repository/migrations/000001_create_application_input_claim_output_report_nodeconfig.down.sql +++ b/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.down.sql @@ -9,6 +9,7 @@ DROP TABLE IF EXISTS "input"; DROP TABLE IF EXISTS "epoch"; DROP TABLE IF EXISTS "application"; +DROP FUNCTION IF EXISTS "f_maxuint64"; DROP TYPE IF EXISTS "InputCompletionStatus"; DROP TYPE IF EXISTS "ApplicationStatus"; diff --git a/internal/repository/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql b/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql similarity index 84% rename from internal/repository/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql rename to internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql index b62f17042..5bce75001 100644 --- a/internal/repository/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql +++ b/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql @@ -3,16 +3,31 @@ CREATE TYPE "ApplicationStatus" AS ENUM ('RUNNING', 'NOT RUNNING'); -CREATE TYPE "InputCompletionStatus" AS ENUM ('NONE', 'ACCEPTED', 'REJECTED', 'EXCEPTION', 'MACHINE_HALTED', 'CYCLE_LIMIT_EXCEEDED', 'TIME_LIMIT_EXCEEDED', 'PAYLOAD_LENGTH_LIMIT_EXCEEDED'); +CREATE TYPE "InputCompletionStatus" AS ENUM ( + 'NONE', + 'ACCEPTED', + 'REJECTED', + 'EXCEPTION', + 'MACHINE_HALTED', + 'CYCLE_LIMIT_EXCEEDED', + 'TIME_LIMIT_EXCEEDED', + 'PAYLOAD_LENGTH_LIMIT_EXCEEDED'); CREATE TYPE "DefaultBlock" AS ENUM ('FINALIZED', 'LATEST', 'PENDING', 'SAFE'); -CREATE TYPE "EpochStatus" AS ENUM ('OPEN', 'CLOSED', 'PROCESSED_ALL_INPUTS', 'CLAIM_COMPUTED', 'CLAIM_SUBMITTED', 'CLAIM_ACCEPTED', 'CLAIM_REJECTED'); +CREATE TYPE "EpochStatus" AS ENUM ( + 'OPEN', + 'CLOSED', + 'PROCESSED_ALL_INPUTS', + 'CLAIM_COMPUTED', + 'CLAIM_SUBMITTED', + 'CLAIM_ACCEPTED', + 'CLAIM_REJECTED'); -CREATE FUNCTION public.f_maxuint64() - RETURNS NUMERIC(20,0) - LANGUAGE sql IMMUTABLE PARALLEL SAFE AS -'SELECT 18446744073709551615'; +CREATE FUNCTION "f_maxuint64"() + RETURNS NUMERIC(20,0) + LANGUAGE sql IMMUTABLE PARALLEL SAFE AS + 'SELECT 18446744073709551615'; CREATE TABLE "application" ( @@ -72,7 +87,7 @@ CREATE TABLE "output" CONSTRAINT "output_input_id_fkey" FOREIGN KEY ("input_id") REFERENCES "input"("id") ); -CREATE UNIQUE INDEX "output_idx" ON "output"("index"); +CREATE INDEX "output_idx" ON "output"("index"); CREATE TABLE "report" ( @@ -84,7 +99,7 @@ CREATE TABLE "report" CONSTRAINT "report_input_id_fkey" FOREIGN KEY ("input_id") REFERENCES "input"("id") ); -CREATE UNIQUE INDEX "report_idx" ON "report"("index"); +CREATE INDEX "report_idx" ON "report"("index"); CREATE TABLE "snapshot" ( diff --git a/internal/repository/migrations/000002_create_postgraphile_view.down.sql b/internal/repository/schema/migrations/000002_create_postgraphile_view.down.sql similarity index 78% rename from internal/repository/migrations/000002_create_postgraphile_view.down.sql rename to internal/repository/schema/migrations/000002_create_postgraphile_view.down.sql index 0ba224155..4fc005d2a 100644 --- a/internal/repository/migrations/000002_create_postgraphile_view.down.sql +++ b/internal/repository/schema/migrations/000002_create_postgraphile_view.down.sql @@ -1,4 +1,4 @@ -- (c) Cartesi and individual authors (see AUTHORS) -- SPDX-License-Identifier: Apache-2.0 (see LICENSE) -DROP SCHEMA graphql CASCADE; \ No newline at end of file +DROP SCHEMA graphql CASCADE; diff --git a/internal/repository/migrations/000002_create_postgraphile_view.up.sql b/internal/repository/schema/migrations/000002_create_postgraphile_view.up.sql similarity index 100% rename from internal/repository/migrations/000002_create_postgraphile_view.up.sql rename to internal/repository/schema/migrations/000002_create_postgraphile_view.up.sql diff --git a/internal/repository/schema/schema.go b/internal/repository/schema/schema.go new file mode 100644 index 000000000..7989bf64f --- /dev/null +++ b/internal/repository/schema/schema.go @@ -0,0 +1,85 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package schema + +import ( + "embed" + "errors" + "fmt" + "log/slog" + + "github.com/golang-migrate/migrate/v4" + mig "github.com/golang-migrate/migrate/v4" + _ "github.com/golang-migrate/migrate/v4/database/postgres" + _ "github.com/golang-migrate/migrate/v4/source/file" + "github.com/golang-migrate/migrate/v4/source/iofs" +) + +//go:embed migrations/* +var content embed.FS + +const ExpectedVersion uint = 2 + +type Schema struct { + migrate *mig.Migrate +} + +func New(postgresEndpoint string) (*Schema, error) { + driver, err := iofs.New(content, "migrations") + if err != nil { + return nil, err + } + + migrate, err := mig.NewWithSourceInstance("iofs", driver, postgresEndpoint) + if err != nil { + return nil, err + } + + return &Schema{migrate: migrate}, nil +} + +func (s *Schema) Version() (uint, error) { + version, _, err := s.migrate.Version() + if err != nil && errors.Is(err, migrate.ErrNilVersion) { + return version, fmt.Errorf("No valid database schema found") + } + return version, err +} + +func (s *Schema) Up() error { + if err := s.migrate.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) { + return err + } + return nil +} + +func (s *Schema) Down() error { + if err := s.migrate.Down(); err != nil && !errors.Is(err, migrate.ErrNoChange) { + return err + } + return nil +} + +func (s *Schema) Close() { + source, db := s.migrate.Close() + if source != nil { + slog.Error("Error releasing migration sources", "error", source) + } + if db != nil { + slog.Error("Error closing db connection", "error", db) + } +} + +func (s *Schema) ValidateVersion() (uint, error) { + version, err := s.Version() + if err != nil { + return 0, err + } + + if version != ExpectedVersion { + format := "Database schema version mismatch. Expected %d but it is %d" + return 0, fmt.Errorf(format, ExpectedVersion, version) + } + return version, nil +} diff --git a/internal/repository/schemamanager.go b/internal/repository/schemamanager.go deleted file mode 100644 index 4c271b39c..000000000 --- a/internal/repository/schemamanager.go +++ /dev/null @@ -1,102 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -package repository - -import ( - "embed" - "errors" - "fmt" - "log/slog" - - "github.com/golang-migrate/migrate/v4" - mig "github.com/golang-migrate/migrate/v4" - _ "github.com/golang-migrate/migrate/v4/database/postgres" - _ "github.com/golang-migrate/migrate/v4/source/file" - "github.com/golang-migrate/migrate/v4/source/iofs" -) - -//go:embed migrations/* -var content embed.FS - -const ( - EXPECTED_VERSION = 2 -) - -type ( - Migrate = mig.Migrate -) - -type SchemaManager struct { - migrate *Migrate -} - -func NewSchemaManager(postgresEndpoint string) (*SchemaManager, error) { - - driver, err := iofs.New(content, "migrations") - if err != nil { - return nil, err - } - - migrate, err := mig.NewWithSourceInstance( - "iofs", - driver, - postgresEndpoint, - ) - if err != nil { - return nil, err - } - return &SchemaManager{ - migrate: migrate, - }, nil - -} - -func (s *SchemaManager) GetVersion() (uint, error) { - - version, _, err := s.migrate.Version() - - if err != nil { - if errors.Is(err, migrate.ErrNilVersion) { - return version, fmt.Errorf("No valid database schema found") - } - } - - return version, err - -} - -func (s *SchemaManager) Upgrade() error { - if err := s.migrate.Up(); err != nil { - if !errors.Is(err, migrate.ErrNoChange) { - return err - } - } - return nil -} - -func (s *SchemaManager) Close() { - source, db := s.migrate.Close() - if source != nil { - slog.Error("Error releasing migration sources", "error", source) - } - if db != nil { - slog.Error("Error closing db connection", "error", db) - } -} - -func (s *SchemaManager) ValidateSchemaVersion() error { - version, err := s.GetVersion() - if err != nil { - return err - } - - if version != EXPECTED_VERSION { - return fmt.Errorf( - "Database schema version mismatch. Expected %d but it is %d", - EXPECTED_VERSION, - version, - ) - } - return nil -} diff --git a/test/advancer/advancer_test.go b/test/advancer/advancer_test.go new file mode 100644 index 000000000..6de9536f3 --- /dev/null +++ b/test/advancer/advancer_test.go @@ -0,0 +1,232 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package advancer + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/cartesi/rollups-node/internal/node/advancer" + "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/internal/nodemachine" + "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/pkg/emulator" + "github.com/cartesi/rollups-node/pkg/rollupsmachine" + "github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine" + "github.com/cartesi/rollups-node/test/snapshot" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/modules/postgres" + "github.com/testcontainers/testcontainers-go/wait" +) + +var appAddress model.Address + +func TestAdvancer(t *testing.T) { + require := require.New(t) + + // Creates the snapshot. + script := "ioctl-echo-loop --vouchers=1 --notices=3 --reports=5 --verbose=1" + snapshot, err := snapshot.FromScript(script, uint64(1_000_000_000)) + require.Nil(err) + defer func() { require.Nil(snapshot.Close()) }() + + // Starts the server. + verbosity := cartesimachine.ServerVerbosityInfo + address, err := cartesimachine.StartServer(verbosity, 0, os.Stdout, os.Stderr) + require.Nil(err) + + // Loads the cartesimachine. + config := &emulator.MachineRuntimeConfig{} + cartesiMachine, err := cartesimachine.Load(snapshot.Path(), address, config) + require.Nil(err) + require.NotNil(cartesiMachine) + + // Wraps the cartesimachine with rollupsmachine. + rollupsMachine, err := rollupsmachine.New(cartesiMachine, 50_000_000, 5_000_000_000) + require.Nil(err) + require.NotNil(rollupsMachine) + + // Wraps the rollupsmachine with nodemachine. + nodeMachine := nodemachine.New(rollupsMachine, time.Minute, 10) + require.NotNil(nodeMachine) + defer func() { require.Nil(nodeMachine.Close()) }() + + // Creates the machine pool. + appAddress = common.HexToAddress("deadbeef") + machines := advancer.Machines{appAddress: nodeMachine} + + // Creates the background context. + ctx := context.Background() + + // Create the database container. + databaseContainer, err := newDatabaseContainer(ctx) + require.Nil(err) + defer func() { require.Nil(databaseContainer.Terminate(ctx)) }() + + // Setups the database. + database, err := newDatabase(ctx, databaseContainer) + require.Nil(err) + err = populateDatabase(ctx, database) + require.Nil(err, "%v", err) + defer database.Close() + + // Creates the advancer's repository. + repository := &repository.AdvancerRepository{Database: database} + + // Creates the advancer. + advancer, err := advancer.New(machines, repository) + require.Nil(err) + require.NotNil(advancer) + poller, err := advancer.Poller(5 * time.Second) + require.Nil(err) + require.NotNil(poller) + + // Starts the advancer in a separate goroutine. + done := make(chan struct{}, 1) + go func() { + ready := make(chan struct{}, 1) + err = poller.Start(ctx, ready) + <-ready + require.Nil(err, "%v", err) + done <- struct{}{} + }() + + // Orders the advancer to stop after some time has passed. + time.Sleep(5 * time.Second) + poller.Stop() + +wait: + for { + select { + case <-done: + fmt.Println("Done!") + break wait + default: + fmt.Println("Waiting...") + time.Sleep(time.Second) + } + } +} + +func newDatabaseContainer(ctx context.Context) (*postgres.PostgresContainer, error) { + dbName := "cartesinode" + dbUser := "admin" + dbPassword := "password" + + // Start the postgres container + container, err := postgres.Run( + ctx, + "postgres:16-alpine", + postgres.WithDatabase(dbName), + postgres.WithUsername(dbUser), + postgres.WithPassword(dbPassword), + testcontainers.WithWaitStrategy( + wait.ForLog("database system is ready to accept connections"). + WithOccurrence(2). + WithStartupTimeout(10*time.Second)), + ) + + return container, err +} + +// func newLocalDatabase(ctx context.Context) (*repository.Database, error) { +// endpoint := "postgres://renan:renan@localhost:5432/renan?sslmode=disable" +// +// schemaManager, err := repository.NewSchemaManager(endpoint) +// if err != nil { +// return nil, err +// } +// +// err = schemaManager.DeleteAll() +// if err != nil { +// return nil, err +// } +// +// err = schemaManager.Upgrade() +// if err != nil { +// return nil, err +// } +// +// database, err := repository.Connect(ctx, endpoint) +// if err != nil { +// return nil, err +// } +// +// return database, nil +// } + +func newDatabase( + ctx context.Context, + container *postgres.PostgresContainer, +) (*repository.Database, error) { + endpoint, err := container.ConnectionString(ctx, "sslmode=disable") + if err != nil { + return nil, err + } + + schemaManager, err := repository.NewSchemaManager(endpoint) + if err != nil { + return nil, err + } + + err = schemaManager.Upgrade() + if err != nil { + return nil, err + } + + database, err := repository.Connect(ctx, endpoint) + if err != nil { + return nil, err + } + + return database, nil +} + +func populateDatabase(ctx context.Context, database *repository.Database) (err error) { + application := &model.Application{ + ContractAddress: appAddress, + TemplateHash: [32]byte{}, + LastProcessedBlock: 0, + Status: "RUNNING", + } + err = database.InsertApplication(ctx, application) + if err != nil { + return + } + + inputs := []*model.Input{{ + CompletionStatus: model.InputStatusAccepted, + RawData: []byte("first input"), + AppAddress: appAddress, + }, { + CompletionStatus: model.InputStatusNone, + RawData: []byte("second input"), + AppAddress: appAddress, + }, { + CompletionStatus: model.InputStatusNone, + RawData: []byte("third input"), + AppAddress: appAddress, + }} + + for i, input := range inputs { + input.Index = uint64(i) + input.BlockNumber = uint64(i) + input.RawData, err = rollupsmachine.Input{Data: input.RawData}.Encode() + if err != nil { + return + } + id, err := database.InsertInput(ctx, input) + input.Id = id + if err != nil { + return err + } + } + + return +} diff --git a/test/tooling/db/db.go b/test/tooling/db/db.go new file mode 100644 index 000000000..f2a338127 --- /dev/null +++ b/test/tooling/db/db.go @@ -0,0 +1,74 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package db + +import ( + "context" + "os" + "time" + + "github.com/cartesi/rollups-node/internal/repository/schema" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/modules/postgres" + "github.com/testcontainers/testcontainers-go/wait" +) + +const ( + postgresImage = "postgres:16-alpine" + postgresDatabase = "cartesinode" + postgresUsername = "admin" + postgresPassword = "password" +) + +func Setup(ctx context.Context) (string, error) { + endpoint, ok := os.LookupEnv("TEST_POSTGRES_ENDPOINT") + if !ok { + container, err := SetupContainer(ctx) + if err != nil { + return "", err + } + + endpoint, err = container.ConnectionString(ctx, "sslmode=disable") + if err != nil { + return "", err + } + } + + err := SetupSchema(endpoint) + if err != nil { + return "", err + } + + return endpoint, nil +} + +func SetupContainer(ctx context.Context) (*postgres.PostgresContainer, error) { + log := "database system is ready to accept connections" + strategy := wait.ForLog(log).WithOccurrence(2).WithStartupTimeout(10 * time.Second) + return postgres.Run(ctx, + postgresImage, + postgres.WithDatabase(postgresDatabase), + postgres.WithUsername(postgresUsername), + postgres.WithPassword(postgresPassword), + testcontainers.WithWaitStrategy(strategy)) +} + +func SetupSchema(endpoint string) error { + schema, err := schema.New(endpoint) + if err != nil { + return err + } + + err = schema.Down() + if err != nil { + return err + } + + err = schema.Up() + if err != nil { + return err + } + + return nil +}