diff --git a/archiver/cmd/main.go b/archiver/cmd/main.go index 00535f6..5b8b19a 100644 --- a/archiver/cmd/main.go +++ b/archiver/cmd/main.go @@ -66,9 +66,14 @@ func Main() cliapp.LifecycleAction { return nil, err } - api := service.NewAPI(m, l) - l.Info("Initializing Archiver Service") - return service.NewService(l, cfg, api, storageClient, beaconClient, m) + archiver, err := service.NewArchiver(l, cfg, storageClient, beaconClient, m) + if err != nil { + return nil, fmt.Errorf("failed to initialize archiver: %w", err) + } + + api := service.NewAPI(m, l, archiver) + + return service.NewService(l, cfg, api, archiver, m) } } diff --git a/archiver/metrics/metrics.go b/archiver/metrics/metrics.go index af14c75..b5e56f4 100644 --- a/archiver/metrics/metrics.go +++ b/archiver/metrics/metrics.go @@ -11,8 +11,9 @@ type BlockSource string var ( MetricsNamespace = "blob_archiver" - BlockSourceBackfill BlockSource = "backfill" - BlockSourceLive BlockSource = "live" + BlockSourceBackfill BlockSource = "backfill" + BlockSourceLive BlockSource = "live" + BlockSourceRearchive BlockSource = "rearchive" ) type Metricer interface { diff --git a/archiver/service/api.go b/archiver/service/api.go index 59e64d9..71b5918 100644 --- a/archiver/service/api.go +++ b/archiver/service/api.go @@ -1,7 +1,10 @@ package service import ( + "encoding/json" + "fmt" "net/http" + "strconv" "time" m "github.com/base-org/blob-archiver/archiver/metrics" @@ -16,17 +19,19 @@ const ( ) type API struct { - router *chi.Mux - logger log.Logger - metrics m.Metricer + router *chi.Mux + logger log.Logger + metrics m.Metricer + archiver *Archiver } // NewAPI creates a new Archiver API instance. This API exposes an admin interface to control the archiver. -func NewAPI(metrics m.Metricer, logger log.Logger) *API { +func NewAPI(metrics m.Metricer, logger log.Logger, archiver *Archiver) *API { result := &API{ - router: chi.NewRouter(), - logger: logger, - metrics: metrics, + router: chi.NewRouter(), + archiver: archiver, + logger: logger, + metrics: metrics, } r := result.router @@ -41,6 +46,79 @@ func NewAPI(metrics m.Metricer, logger log.Logger) *API { }) r.Get("/", http.NotFound) + r.Post("/rearchive", result.rearchiveBlocks) return result } + +type rearchiveResponse struct { + Error string `json:"error,omitempty"` + BlockStart uint64 `json:"blockStart"` + BlockEnd uint64 `json:"blockEnd"` +} + +func toSlot(input string) (uint64, error) { + if input == "" { + return 0, fmt.Errorf("must provide param") + } + res, err := strconv.ParseUint(input, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid slot: \"%s\"", input) + } + return res, nil +} + +// rearchiveBlocks rearchives blobs from blocks between the given from and to slots. +// If any blocks are already archived, they will be overwritten with data from the beacon node. +func (a *API) rearchiveBlocks(w http.ResponseWriter, r *http.Request) { + from, err := toSlot(r.URL.Query().Get("from")) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(rearchiveResponse{ + Error: fmt.Sprintf("invalid from param: %v", err), + }) + return + } + + to, err := toSlot(r.URL.Query().Get("to")) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(rearchiveResponse{ + Error: fmt.Sprintf("invalid to param: %v", err), + }) + return + } + + if from > to { + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(rearchiveResponse{ + Error: fmt.Sprintf("invalid range: from %d to %d", from, to), + }) + return + } + + blockStart, blockEnd, err := a.archiver.rearchiveRange(from, to) + if err != nil { + a.logger.Error("Failed to rearchive blocks", "err", err) + + w.WriteHeader(http.StatusInternalServerError) + err = json.NewEncoder(w).Encode(rearchiveResponse{ + Error: err.Error(), + BlockStart: blockStart, + BlockEnd: blockEnd, + }) + } else { + a.logger.Info("Rearchiving blocks complete") + w.WriteHeader(http.StatusOK) + + err = json.NewEncoder(w).Encode(rearchiveResponse{ + BlockStart: blockStart, + BlockEnd: blockEnd, + }) + } + + if err != nil { + a.logger.Error("Failed to write response", "err", err) + w.WriteHeader(http.StatusInternalServerError) + } +} diff --git a/archiver/service/api_test.go b/archiver/service/api_test.go index 8f2af61..f26f919 100644 --- a/archiver/service/api_test.go +++ b/archiver/service/api_test.go @@ -1,23 +1,32 @@ package service import ( + "encoding/json" "net/http/httptest" "testing" + "time" + "github.com/base-org/blob-archiver/archiver/flags" "github.com/base-org/blob-archiver/archiver/metrics" + "github.com/base-org/blob-archiver/common/storage/storagetest" "github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" ) -func setupAPI(t *testing.T) *API { +func setupAPI(t *testing.T) (*API, *storagetest.TestFileStorage) { logger := testlog.Logger(t, log.LvlInfo) m := metrics.NewMetrics() - return NewAPI(m, logger) + fs := storagetest.NewTestFileStorage(t, logger) + archiver, err := NewArchiver(logger, flags.ArchiverConfig{ + PollInterval: 10 * time.Second, + }, fs, nil, m) + require.NoError(t, err) + return NewAPI(m, logger, archiver), fs } func TestHealthHandler(t *testing.T) { - a := setupAPI(t) + a, _ := setupAPI(t) request := httptest.NewRequest("GET", "/healthz", nil) response := httptest.NewRecorder() @@ -26,3 +35,71 @@ func TestHealthHandler(t *testing.T) { require.Equal(t, 200, response.Code) } + +func TestRearchiveHandler(t *testing.T) { + a, _ := setupAPI(t) + + tests := []struct { + name string + path string + expectedStatus int + error string + }{ + { + name: "should fail with no params", + path: "/rearchive", + expectedStatus: 400, + error: "invalid from param: must provide param", + }, + { + name: "should fail with missing to param", + path: "/rearchive?from=1", + expectedStatus: 400, + error: "invalid to param: must provide param", + }, + { + name: "should fail with missing from param", + path: "/rearchive?to=1", + expectedStatus: 400, + error: "invalid from param: must provide param", + }, + { + name: "should fail with invalid from param", + path: "/rearchive?from=blah&to=1", + expectedStatus: 400, + error: "invalid from param: invalid slot: \"blah\"", + }, + { + name: "should fail with invalid to param", + path: "/rearchive?from=1&to=blah", + expectedStatus: 400, + error: "invalid to param: invalid slot: \"blah\"", + }, + { + name: "should fail with to greater than equal to from", + path: "/rearchive?from=2&to=1", + expectedStatus: 400, + error: "invalid range: from 2 to 1", + }, + } + + for _, tt := range tests { + test := tt + t.Run(test.name, func(t *testing.T) { + request := httptest.NewRequest("POST", test.path, nil) + response := httptest.NewRecorder() + + a.router.ServeHTTP(response, request) + + require.Equal(t, test.expectedStatus, response.Code) + + var errResponse rearchiveResponse + err := json.NewDecoder(response.Body).Decode(&errResponse) + require.NoError(t, err) + + if test.error != "" { + require.Equal(t, errResponse.Error, test.error) + } + }) + } +} diff --git a/archiver/service/archiver.go b/archiver/service/archiver.go index 1ad6c1b..fcdbc98 100644 --- a/archiver/service/archiver.go +++ b/archiver/service/archiver.go @@ -3,8 +3,7 @@ package service import ( "context" "errors" - "fmt" - "sync/atomic" + "strconv" "time" client "github.com/attestantio/go-eth2-client" @@ -13,73 +12,50 @@ import ( "github.com/base-org/blob-archiver/archiver/flags" "github.com/base-org/blob-archiver/archiver/metrics" "github.com/base-org/blob-archiver/common/storage" - "github.com/ethereum-optimism/optimism/op-service/httputil" - opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" ) -const liveFetchBlobMaximumRetries = 10 -const startupFetchBlobMaximumRetries = 3 -const backfillErrorRetryInterval = 5 * time.Second - -var ErrAlreadyStopped = errors.New("already stopped") +const ( + liveFetchBlobMaximumRetries = 10 + startupFetchBlobMaximumRetries = 3 + rearchiveMaximumRetries = 3 + backfillErrorRetryInterval = 5 * time.Second +) type BeaconClient interface { client.BlobSidecarsProvider client.BeaconBlockHeadersProvider } -func NewService(l log.Logger, cfg flags.ArchiverConfig, api *API, dataStoreClient storage.DataStore, client BeaconClient, m metrics.Metricer) (*ArchiverService, error) { - return &ArchiverService{ +func NewArchiver(l log.Logger, cfg flags.ArchiverConfig, dataStoreClient storage.DataStore, client BeaconClient, m metrics.Metricer) (*Archiver, error) { + return &Archiver{ log: l, cfg: cfg, dataStoreClient: dataStoreClient, metrics: m, - stopCh: make(chan struct{}), beaconClient: client, - api: api, + stopCh: make(chan struct{}), }, nil } -type ArchiverService struct { - stopped atomic.Bool - stopCh chan struct{} +type Archiver struct { log log.Logger + cfg flags.ArchiverConfig dataStoreClient storage.DataStore beaconClient BeaconClient - metricsServer *httputil.HTTPServer - cfg flags.ArchiverConfig metrics metrics.Metricer - api *API + stopCh chan struct{} } -// Start starts the archiver service. It begins polling the beacon node for the latest blocks and persisting blobs for +// Start starts archiving blobs. It begins polling the beacon node for the latest blocks and persisting blobs for // them. Concurrently it'll also begin a backfill process (see backfillBlobs) to store all blobs from the current head // to the previously stored blocks. This ensures that during restarts or outages of an archiver, any gaps will be // filled in. -func (a *ArchiverService) Start(ctx context.Context) error { - if a.cfg.MetricsConfig.Enabled { - a.log.Info("starting metrics server", "addr", a.cfg.MetricsConfig.ListenAddr, "port", a.cfg.MetricsConfig.ListenPort) - srv, err := opmetrics.StartServer(a.metrics.Registry(), a.cfg.MetricsConfig.ListenAddr, a.cfg.MetricsConfig.ListenPort) - if err != nil { - return err - } - - a.log.Info("started metrics server", "addr", srv.Addr()) - a.metricsServer = srv - } - - srv, err := httputil.StartHTTPServer(a.cfg.ListenAddr, a.api.router) - if err != nil { - return fmt.Errorf("failed to start Archiver API server: %w", err) - } - - a.log.Info("Archiver API server started", "address", srv.Addr().String()) - - currentBlob, _, err := retry.Do2(ctx, startupFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { - return a.persistBlobsForBlockToS3(ctx, "head") +func (a *Archiver) Start(ctx context.Context) error { + currentBlock, _, err := retry.Do2(ctx, startupFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { + return a.persistBlobsForBlockToS3(ctx, "head", false) }) if err != nil { @@ -87,17 +63,23 @@ func (a *ArchiverService) Start(ctx context.Context) error { return err } - go a.backfillBlobs(ctx, currentBlob) + go a.backfillBlobs(ctx, currentBlock) return a.trackLatestBlocks(ctx) } +// Stops the archiver service. +func (a *Archiver) Stop(ctx context.Context) error { + close(a.stopCh) + return nil +} + // persistBlobsForBlockToS3 fetches the blobs for a given block and persists them to S3. It returns the block header // and a boolean indicating whether the blobs already existed in S3 and any errors that occur. // If the blobs are already stored, it will not overwrite the data. Currently, the archiver does not // perform any validation of the blobs, it assumes a trusted beacon node. See: // https://github.com/base-org/blob-archiver/issues/4. -func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier string) (*v1.BeaconBlockHeader, bool, error) { +func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier string, overwrite bool) (*v1.BeaconBlockHeader, bool, error) { currentHeader, err := a.beaconClient.BeaconBlockHeader(ctx, &api.BeaconBlockHeaderOpts{ Block: blockIdentifier, }) @@ -113,7 +95,7 @@ func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIde return nil, false, err } - if exists { + if exists && !overwrite { a.log.Debug("blob already exists", "hash", currentHeader.Data.Root) return currentHeader.Data, true, nil } @@ -146,36 +128,13 @@ func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIde a.metrics.RecordStoredBlobs(len(blobSidecars.Data)) - return currentHeader.Data, false, nil -} - -// Stops the archiver service. -func (a *ArchiverService) Stop(ctx context.Context) error { - if a.stopped.Load() { - return ErrAlreadyStopped - } - a.log.Info("Stopping Archiver") - a.stopped.Store(true) - - close(a.stopCh) - - if a.metricsServer != nil { - if err := a.metricsServer.Stop(ctx); err != nil { - return err - } - } - - return nil -} - -func (a *ArchiverService) Stopped() bool { - return a.stopped.Load() + return currentHeader.Data, exists, nil } // backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted // to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled. // If an error is encountered persisting a block, it will retry after waiting for a period of time. -func (a *ArchiverService) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) { +func (a *Archiver) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) { current, alreadyExists, err := latest, false, error(nil) for !alreadyExists { @@ -185,7 +144,7 @@ func (a *ArchiverService) backfillBlobs(ctx context.Context, latest *v1.BeaconBl } previous := current - current, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String()) + current, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false) if err != nil { a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String()) // Revert back to block we failed to fetch @@ -203,7 +162,7 @@ func (a *ArchiverService) backfillBlobs(ctx context.Context, latest *v1.BeaconBl } // trackLatestBlocks will poll the beacon node for the latest blocks and persist blobs for them. -func (a *ArchiverService) trackLatestBlocks(ctx context.Context) error { +func (a *Archiver) trackLatestBlocks(ctx context.Context) error { t := time.NewTicker(a.cfg.PollInterval) defer t.Stop() @@ -222,7 +181,7 @@ func (a *ArchiverService) trackLatestBlocks(ctx context.Context) error { // processBlocksUntilKnownBlock will fetch and persist blobs for blocks until it finds a block that has been stored before. // In the case of a reorg, it will fetch the new head and then walk back the chain, storing all blobs until it finds a // known block -- that already exists in the archivers' storage. -func (a *ArchiverService) processBlocksUntilKnownBlock(ctx context.Context) { +func (a *Archiver) processBlocksUntilKnownBlock(ctx context.Context) { a.log.Debug("refreshing live data") var start *v1.BeaconBlockHeader @@ -230,7 +189,7 @@ func (a *ArchiverService) processBlocksUntilKnownBlock(ctx context.Context) { for { current, alreadyExisted, err := retry.Do2(ctx, liveFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { - return a.persistBlobsForBlockToS3(ctx, currentBlockId) + return a.persistBlobsForBlockToS3(ctx, currentBlockId, false) }) if err != nil { @@ -254,3 +213,44 @@ func (a *ArchiverService) processBlocksUntilKnownBlock(ctx context.Context) { a.log.Info("live data refreshed", "startHash", start.Root.String(), "endHash", currentBlockId) } + +// rearchiveRange will rearchive all blocks in the range from the given start to end. It returns the start and end of the +// range that was successfully rearchived. On any persistent errors, it will halt archiving and return the range of blocks +// that were rearchived and the error that halted the process. +func (a *Archiver) rearchiveRange(from uint64, to uint64) (uint64, uint64, error) { + for i := from; i <= to; i++ { + id := strconv.FormatUint(i, 10) + + l := a.log.New("slot", id) + + l.Info("rearchiving block") + + rewritten, err := retry.Do(context.Background(), rearchiveMaximumRetries, retry.Exponential(), func() (bool, error) { + _, _, e := a.persistBlobsForBlockToS3(context.Background(), id, true) + + // If the block is not found, we can assume that the slot has been skipped + if e != nil { + var apiErr *api.Error + if errors.As(e, &apiErr) && apiErr.StatusCode == 404 { + return false, nil + } + + return false, e + } + + return true, nil + }) + + if err != nil { + return from, i, err + } + + if !rewritten { + l.Info("block not found during reachiving", "slot", id) + } + + a.metrics.RecordProcessedBlock(metrics.BlockSourceRearchive) + } + + return from, to, nil +} diff --git a/archiver/service/archiver_test.go b/archiver/service/archiver_test.go index 25a537b..7733649 100644 --- a/archiver/service/archiver_test.go +++ b/archiver/service/archiver_test.go @@ -17,15 +17,15 @@ import ( "github.com/stretchr/testify/require" ) -func setup(t *testing.T, beacon *beacontest.StubBeaconClient) (*ArchiverService, *storagetest.TestFileStorage) { +func setup(t *testing.T, beacon *beacontest.StubBeaconClient) (*Archiver, *storagetest.TestFileStorage) { l := testlog.Logger(t, log.LvlInfo) fs := storagetest.NewTestFileStorage(t, l) m := metrics.NewMetrics() - svc, err := NewService(l, flags.ArchiverConfig{ + svc, err := NewArchiver(l, flags.ArchiverConfig{ PollInterval: 5 * time.Second, OriginBlock: blobtest.OriginBlock, - }, NewAPI(m, l), fs, beacon, m) + }, fs, beacon, m) require.NoError(t, err) return svc, fs } @@ -35,7 +35,7 @@ func TestArchiver_FetchAndPersist(t *testing.T) { fs.CheckNotExistsOrFail(t, blobtest.OriginBlock) - header, alreadyExists, err := svc.persistBlobsForBlockToS3(context.Background(), blobtest.OriginBlock.String()) + header, alreadyExists, err := svc.persistBlobsForBlockToS3(context.Background(), blobtest.OriginBlock.String(), false) require.False(t, alreadyExists) require.NoError(t, err) require.NotNil(t, header) @@ -43,7 +43,7 @@ func TestArchiver_FetchAndPersist(t *testing.T) { fs.CheckExistsOrFail(t, blobtest.OriginBlock) - header, alreadyExists, err = svc.persistBlobsForBlockToS3(context.Background(), blobtest.OriginBlock.String()) + header, alreadyExists, err = svc.persistBlobsForBlockToS3(context.Background(), blobtest.OriginBlock.String(), false) require.True(t, alreadyExists) require.NoError(t, err) require.NotNil(t, header) @@ -52,6 +52,38 @@ func TestArchiver_FetchAndPersist(t *testing.T) { fs.CheckExistsOrFail(t, blobtest.OriginBlock) } +func TestArchiver_FetchAndPersistOverwriting(t *testing.T) { + beacon := beacontest.NewDefaultStubBeaconClient(t) + svc, fs := setup(t, beacon) + + // Blob 5 already exists + fs.WriteOrFail(t, storage.BlobData{ + Header: storage.Header{ + BeaconBlockHash: blobtest.Five, + }, + BlobSidecars: storage.BlobSidecars{ + Data: beacon.Blobs[blobtest.Five.String()], + }, + }) + + require.Equal(t, fs.ReadOrFail(t, blobtest.Five).BlobSidecars.Data, beacon.Blobs[blobtest.Five.String()]) + + // change the blob data -- this isn't possible w/out changing the hash. But it allows us to test the overwrite + beacon.Blobs[blobtest.Five.String()] = blobtest.NewBlobSidecars(t, 6) + + _, exists, err := svc.persistBlobsForBlockToS3(context.Background(), blobtest.Five.String(), true) + require.NoError(t, err) + require.True(t, exists) + + // It should have overwritten the blob data + require.Equal(t, fs.ReadOrFail(t, blobtest.Five).BlobSidecars.Data, beacon.Blobs[blobtest.Five.String()]) + + // Overwriting a non-existent blob should return exists=false + _, exists, err = svc.persistBlobsForBlockToS3(context.Background(), blobtest.Four.String(), true) + require.NoError(t, err) + require.False(t, exists) +} + func TestArchiver_BackfillToOrigin(t *testing.T) { beacon := beacontest.NewDefaultStubBeaconClient(t) svc, fs := setup(t, beacon) @@ -303,3 +335,44 @@ func TestArchiver_LatestHaltsOnPersistentError(t *testing.T) { fs.CheckNotExistsOrFail(t, blobtest.Four) fs.CheckExistsOrFail(t, blobtest.Three) } + +func TestArchiver_RearchiveRange(t *testing.T) { + beacon := beacontest.NewDefaultStubBeaconClient(t) + svc, fs := setup(t, beacon) + + // 5 is the current head, if three already exists, we should write 5 and 4 and stop at three + fs.WriteOrFail(t, storage.BlobData{ + Header: storage.Header{ + BeaconBlockHash: blobtest.Three, + }, + BlobSidecars: storage.BlobSidecars{ + Data: beacon.Blobs[blobtest.Three.String()], + }, + }) + + // startSlot+1 == One + fs.CheckNotExistsOrFail(t, blobtest.One) + fs.CheckNotExistsOrFail(t, blobtest.Two) + fs.CheckExistsOrFail(t, blobtest.Three) + fs.CheckNotExistsOrFail(t, blobtest.Four) + + // this modifies the blobs at 3, purely to test the blob is rearchived + beacon.Blobs[blobtest.Three.String()] = blobtest.NewBlobSidecars(t, 6) + + from, to := blobtest.StartSlot+1, blobtest.StartSlot+4 + + actualFrom, actualTo, err := svc.rearchiveRange(from, to) + // Should index the whole range + require.NoError(t, err) + require.Equal(t, from, actualFrom) + require.Equal(t, to, actualTo) + + // Should have written all the blobs + fs.CheckExistsOrFail(t, blobtest.One) + fs.CheckExistsOrFail(t, blobtest.Two) + fs.CheckExistsOrFail(t, blobtest.Three) + fs.CheckExistsOrFail(t, blobtest.Four) + + // Should have overwritten any existing blobs + require.Equal(t, fs.ReadOrFail(t, blobtest.Three).BlobSidecars.Data, beacon.Blobs[blobtest.Three.String()]) +} diff --git a/archiver/service/service.go b/archiver/service/service.go new file mode 100644 index 0000000..b4f265d --- /dev/null +++ b/archiver/service/service.go @@ -0,0 +1,80 @@ +package service + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + + "github.com/base-org/blob-archiver/archiver/flags" + "github.com/base-org/blob-archiver/archiver/metrics" + "github.com/ethereum-optimism/optimism/op-service/httputil" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum/go-ethereum/log" +) + +var ErrAlreadyStopped = errors.New("already stopped") + +func NewService(l log.Logger, cfg flags.ArchiverConfig, api *API, archiver *Archiver, m metrics.Metricer) (*ArchiverService, error) { + return &ArchiverService{ + log: l, + cfg: cfg, + archiver: archiver, + metrics: m, + api: api, + }, nil +} + +type ArchiverService struct { + stopped atomic.Bool + log log.Logger + metricsServer *httputil.HTTPServer + cfg flags.ArchiverConfig + metrics metrics.Metricer + api *API + archiver *Archiver +} + +// Start starts the archiver service. It'll start the API's as well as the archiving process. +func (a *ArchiverService) Start(ctx context.Context) error { + if a.cfg.MetricsConfig.Enabled { + a.log.Info("starting metrics server", "addr", a.cfg.MetricsConfig.ListenAddr, "port", a.cfg.MetricsConfig.ListenPort) + srv, err := opmetrics.StartServer(a.metrics.Registry(), a.cfg.MetricsConfig.ListenAddr, a.cfg.MetricsConfig.ListenPort) + if err != nil { + return err + } + + a.log.Info("started metrics server", "addr", srv.Addr()) + a.metricsServer = srv + } + + srv, err := httputil.StartHTTPServer(a.cfg.ListenAddr, a.api.router) + if err != nil { + return fmt.Errorf("failed to start Archiver API server: %w", err) + } + + a.log.Info("Archiver API server started", "address", srv.Addr().String()) + + return a.archiver.Start(ctx) +} + +// Stops the archiver service. +func (a *ArchiverService) Stop(ctx context.Context) error { + if a.stopped.Load() { + return ErrAlreadyStopped + } + a.log.Info("Stopping Archiver") + a.stopped.Store(true) + + if a.metricsServer != nil { + if err := a.metricsServer.Stop(ctx); err != nil { + return err + } + } + + return a.archiver.Stop(ctx) +} + +func (a *ArchiverService) Stopped() bool { + return a.stopped.Load() +} diff --git a/common/beacon/beacontest/stub.go b/common/beacon/beacontest/stub.go index 7e48440..9e1211a 100644 --- a/common/beacon/beacontest/stub.go +++ b/common/beacon/beacontest/stub.go @@ -3,6 +3,7 @@ package beacontest import ( "context" "fmt" + "strconv" "testing" "github.com/attestantio/go-eth2-client/api" @@ -61,16 +62,29 @@ func NewDefaultStubBeaconClient(t *testing.T) *StubBeaconClient { headBlobs := blobtest.NewBlobSidecars(t, 6) finalizedBlobs := blobtest.NewBlobSidecars(t, 4) + startSlot := blobtest.StartSlot + return &StubBeaconClient{ Headers: map[string]*v1.BeaconBlockHeader{ - blobtest.OriginBlock.String(): makeHeader(10, blobtest.OriginBlock, common.Hash{9, 9, 9}), - blobtest.One.String(): makeHeader(11, blobtest.One, blobtest.OriginBlock), - blobtest.Two.String(): makeHeader(12, blobtest.Two, blobtest.One), - blobtest.Three.String(): makeHeader(13, blobtest.Three, blobtest.Two), - blobtest.Four.String(): makeHeader(14, blobtest.Four, blobtest.Three), - blobtest.Five.String(): makeHeader(15, blobtest.Five, blobtest.Four), - "head": makeHeader(15, blobtest.Five, blobtest.Four), - "finalized": makeHeader(13, blobtest.Three, blobtest.Two), + // Lookup by hash + blobtest.OriginBlock.String(): makeHeader(startSlot, blobtest.OriginBlock, common.Hash{9, 9, 9}), + blobtest.One.String(): makeHeader(startSlot+1, blobtest.One, blobtest.OriginBlock), + blobtest.Two.String(): makeHeader(startSlot+2, blobtest.Two, blobtest.One), + blobtest.Three.String(): makeHeader(startSlot+3, blobtest.Three, blobtest.Two), + blobtest.Four.String(): makeHeader(startSlot+4, blobtest.Four, blobtest.Three), + blobtest.Five.String(): makeHeader(startSlot+5, blobtest.Five, blobtest.Four), + + // Lookup by identifier + "head": makeHeader(startSlot+5, blobtest.Five, blobtest.Four), + "finalized": makeHeader(startSlot+3, blobtest.Three, blobtest.Two), + + // Lookup by slot + strconv.FormatUint(startSlot, 10): makeHeader(startSlot, blobtest.OriginBlock, common.Hash{9, 9, 9}), + strconv.FormatUint(startSlot+1, 10): makeHeader(startSlot+1, blobtest.One, blobtest.OriginBlock), + strconv.FormatUint(startSlot+2, 10): makeHeader(startSlot+2, blobtest.Two, blobtest.One), + strconv.FormatUint(startSlot+3, 10): makeHeader(startSlot+3, blobtest.Three, blobtest.Two), + strconv.FormatUint(startSlot+4, 10): makeHeader(startSlot+4, blobtest.Four, blobtest.Three), + strconv.FormatUint(startSlot+5, 10): makeHeader(startSlot+5, blobtest.Five, blobtest.Four), }, Blobs: map[string][]*deneb.BlobSidecar{ blobtest.OriginBlock.String(): blobtest.NewBlobSidecars(t, 1), diff --git a/common/blobtest/helpers.go b/common/blobtest/helpers.go index fcb7e24..a37d354 100644 --- a/common/blobtest/helpers.go +++ b/common/blobtest/helpers.go @@ -17,6 +17,8 @@ var ( Three = common.Hash{3} Four = common.Hash{4} Five = common.Hash{5} + + StartSlot = uint64(10) ) func RandBytes(t *testing.T, size uint) []byte {