diff --git a/api/service/api.go b/api/service/api.go index 07a8a53..d5a7059 100644 --- a/api/service/api.go +++ b/api/service/api.go @@ -179,7 +179,7 @@ func (a *API) blobSidecarHandler(w http.ResponseWriter, r *http.Request) { return } - result, storageErr := a.dataStoreClient.Read(r.Context(), beaconBlockHash) + result, storageErr := a.dataStoreClient.ReadBlob(r.Context(), beaconBlockHash) if storageErr != nil { if errors.Is(storageErr, storage.ErrNotFound) { errUnknownBlock.write(w) diff --git a/api/service/api_test.go b/api/service/api_test.go index 6619e6a..fb8b752 100644 --- a/api/service/api_test.go +++ b/api/service/api_test.go @@ -92,10 +92,10 @@ func TestAPIService(t *testing.T) { }, } - err := fs.Write(context.Background(), blockOne) + err := fs.WriteBlob(context.Background(), blockOne) require.NoError(t, err) - err = fs.Write(context.Background(), blockTwo) + err = fs.WriteBlob(context.Background(), blockTwo) require.NoError(t, err) beaconClient.Headers["finalized"] = &v1.BeaconBlockHeader{ diff --git a/archiver/service/archiver.go b/archiver/service/archiver.go index caf7f0f..242d946 100644 --- a/archiver/service/archiver.go +++ b/archiver/service/archiver.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/google/uuid" ) const ( @@ -37,6 +38,7 @@ func NewArchiver(l log.Logger, cfg flags.ArchiverConfig, dataStoreClient storage metrics: m, beaconClient: client, stopCh: make(chan struct{}), + id: uuid.New().String(), }, nil } @@ -47,6 +49,7 @@ type Archiver struct { beaconClient BeaconClient metrics metrics.Metricer stopCh chan struct{} + id string } // Start starts archiving blobs. It begins polling the beacon node for the latest blocks and persisting blobs for @@ -63,6 +66,8 @@ func (a *Archiver) Start(ctx context.Context) error { return err } + a.waitObtainStorageLock(ctx) + go a.backfillBlobs(ctx, currentBlock) return a.trackLatestBlocks(ctx) @@ -119,7 +124,7 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier } // The blob that is being written has not been validated. It is assumed that the beacon node is trusted. - err = a.dataStoreClient.Write(ctx, blobData) + err = a.dataStoreClient.WriteBlob(ctx, blobData) if err != nil { a.log.Error("failed to write blob", "err", err) @@ -131,36 +136,123 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier return currentHeader.Data, exists, nil } +const LockUpdateInterval = 10 * time.Second +const LockTimeout = int64(20) // 20 seconds +var ObtainLockRetryInterval = 10 * time.Second + +func (a *Archiver) waitObtainStorageLock(ctx context.Context) { + lockfile, err := a.dataStoreClient.ReadLockfile(ctx) + if err != nil { + a.log.Crit("failed to read lockfile", "err", err) + } + + currentTime := time.Now().Unix() + emptyLockfile := storage.Lockfile{} + if lockfile != emptyLockfile { + for lockfile.ArchiverId != a.id && lockfile.Timestamp+LockTimeout > currentTime { + // Loop until the timestamp read from storage is expired + a.log.Info("waiting for storage lock timestamp to expire", + "timestamp", strconv.FormatInt(lockfile.Timestamp, 10), + "currentTime", strconv.FormatInt(currentTime, 10), + ) + time.Sleep(ObtainLockRetryInterval) + lockfile, err = a.dataStoreClient.ReadLockfile(ctx) + if err != nil { + a.log.Crit("failed to read lockfile", "err", err) + } + currentTime = time.Now().Unix() + } + } + + err = a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime}) + if err != nil { + a.log.Crit("failed to write to lockfile: %v", err) + } + a.log.Info("obtained storage lock") + + go func() { + // Retain storage lock by continually updating the stored timestamp + ticker := time.NewTicker(LockUpdateInterval) + for { + select { + case <-ticker.C: + currentTime := time.Now().Unix() + err := a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime}) + if err != nil { + a.log.Error("failed to update lockfile timestamp", "err", err) + } + case <-ctx.Done(): + ticker.Stop() + return + } + } + }() +} + // backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted // to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled. // If an error is encountered persisting a block, it will retry after waiting for a period of time. func (a *Archiver) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) { - current, alreadyExists, err := latest, false, error(nil) - - defer func() { - a.log.Info("backfill complete", "endHash", current.Root.String(), "startHash", latest.Root.String()) - }() + // Add backfill process that starts at latest slot, then loop through all backfill processes + backfillProcesses, err := a.dataStoreClient.ReadBackfillProcesses(ctx) + if err != nil { + a.log.Crit("failed to read backfill_processes", "err", err) + } + backfillProcesses[common.Hash(latest.Root)] = storage.BackfillProcess{Start: *latest, Current: *latest} + a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses) + + backfillLoop := func(start *v1.BeaconBlockHeader, current *v1.BeaconBlockHeader) { + curr, alreadyExists, err := current, false, error(nil) + count := 0 + a.log.Info("backfill process initiated", + "currHash", curr.Root.String(), + "currSlot", curr.Header.Message.Slot, + "startHash", start.Root.String(), + "startSlot", start.Header.Message.Slot, + ) + + defer func() { + a.log.Info("backfill process complete", + "endHash", curr.Root.String(), + "endSlot", curr.Header.Message.Slot, + "startHash", start.Root.String(), + "startSlot", start.Header.Message.Slot, + ) + delete(backfillProcesses, common.Hash(start.Root)) + a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses) + }() + + for !alreadyExists { + previous := curr + + if common.Hash(curr.Root) == a.cfg.OriginBlock { + a.log.Info("reached origin block", "hash", curr.Root.String()) + return + } - for !alreadyExists { - previous := current + curr, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false) + if err != nil { + a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String()) + // Revert back to block we failed to fetch + curr = previous + time.Sleep(backfillErrorRetryInterval) + continue + } - if common.Hash(current.Root) == a.cfg.OriginBlock { - a.log.Info("reached origin block", "hash", current.Root.String()) - return - } + if !alreadyExists { + a.metrics.RecordProcessedBlock(metrics.BlockSourceBackfill) + } - current, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false) - if err != nil { - a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String()) - // Revert back to block we failed to fetch - current = previous - time.Sleep(backfillErrorRetryInterval) - continue + count++ + if count%10 == 0 { + backfillProcesses[common.Hash(start.Root)] = storage.BackfillProcess{Start: *start, Current: *curr} + a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses) + } } + } - if !alreadyExists { - a.metrics.RecordProcessedBlock(metrics.BlockSourceBackfill) - } + for _, process := range backfillProcesses { + backfillLoop(&process.Start, &process.Current) } } diff --git a/archiver/service/archiver_test.go b/archiver/service/archiver_test.go index 7733649..b6c4dee 100644 --- a/archiver/service/archiver_test.go +++ b/archiver/service/archiver_test.go @@ -89,7 +89,7 @@ func TestArchiver_BackfillToOrigin(t *testing.T) { svc, fs := setup(t, beacon) // We have the current head, which is block 5 written to storage - err := fs.Write(context.Background(), storage.BlobData{ + err := fs.WriteBlob(context.Background(), storage.BlobData{ Header: storage.Header{ BeaconBlockHash: blobtest.Five, }, @@ -119,7 +119,7 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) { svc, fs := setup(t, beacon) // We have the current head, which is block 5 written to storage - err := fs.Write(context.Background(), storage.BlobData{ + err := fs.WriteBlob(context.Background(), storage.BlobData{ Header: storage.Header{ BeaconBlockHash: blobtest.Five, }, @@ -130,7 +130,7 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) { require.NoError(t, err) // We also have block 1 written to storage - err = fs.Write(context.Background(), storage.BlobData{ + err = fs.WriteBlob(context.Background(), storage.BlobData{ Header: storage.Header{ BeaconBlockHash: blobtest.One, }, @@ -156,13 +156,110 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) { require.NoError(t, err) require.True(t, exists) - data, err := fs.Read(context.Background(), blob) + data, err := fs.ReadBlob(context.Background(), blob) require.NoError(t, err) require.NotNil(t, data) require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[blob.String()]) } } +func TestArchiver_ObtainLockfile(t *testing.T) { + beacon := beacontest.NewDefaultStubBeaconClient(t) + svc, _ := setup(t, beacon) + + currentTime := time.Now().Unix() + expiredTime := currentTime - 19 + err := svc.dataStoreClient.WriteLockfile(context.Background(), storage.Lockfile{ArchiverId: "FAKEID", Timestamp: expiredTime}) + require.NoError(t, err) + + ObtainLockRetryInterval = 1 * time.Second + svc.waitObtainStorageLock(context.Background()) + + lockfile, err := svc.dataStoreClient.ReadLockfile(context.Background()) + require.NoError(t, err) + require.Equal(t, svc.id, lockfile.ArchiverId) + require.True(t, lockfile.Timestamp >= currentTime) +} + +func TestArchiver_BackfillFinishOldProcess(t *testing.T) { + beacon := beacontest.NewDefaultStubBeaconClient(t) + svc, fs := setup(t, beacon) + + // We have the current head, which is block 5 written to storage + err := fs.WriteBlob(context.Background(), storage.BlobData{ + Header: storage.Header{ + BeaconBlockHash: blobtest.Five, + }, + BlobSidecars: storage.BlobSidecars{ + Data: beacon.Blobs[blobtest.Five.String()], + }, + }) + require.NoError(t, err) + + // We also have block 3 written to storage + err = fs.WriteBlob(context.Background(), storage.BlobData{ + Header: storage.Header{ + BeaconBlockHash: blobtest.Three, + }, + BlobSidecars: storage.BlobSidecars{ + Data: beacon.Blobs[blobtest.Three.String()], + }, + }) + require.NoError(t, err) + + // We also have block 1 written to storage + err = fs.WriteBlob(context.Background(), storage.BlobData{ + Header: storage.Header{ + BeaconBlockHash: blobtest.One, + }, + BlobSidecars: storage.BlobSidecars{ + Data: beacon.Blobs[blobtest.One.String()], + }, + }) + require.NoError(t, err) + + // We expect to backfill blob 4 first, then 2 in a separate process + expectedBlobs := []common.Hash{blobtest.Four, blobtest.Two} + + for _, blob := range expectedBlobs { + exists, err := fs.Exists(context.Background(), blob) + require.NoError(t, err) + require.False(t, exists) + } + + actualProcesses, err := svc.dataStoreClient.ReadBackfillProcesses(context.Background()) + expectedProcesses := make(storage.BackfillProcesses) + require.NoError(t, err) + require.Equal(t, expectedProcesses, actualProcesses) + + expectedProcesses[blobtest.Three] = storage.BackfillProcess{Start: *beacon.Headers[blobtest.Three.String()], Current: *beacon.Headers[blobtest.Three.String()]} + err = svc.dataStoreClient.WriteBackfillProcesses(context.Background(), expectedProcesses) + require.NoError(t, err) + + actualProcesses, err = svc.dataStoreClient.ReadBackfillProcesses(context.Background()) + require.NoError(t, err) + require.Equal(t, expectedProcesses, actualProcesses) + + svc.backfillBlobs(context.Background(), beacon.Headers[blobtest.Five.String()]) + + for _, blob := range expectedBlobs { + exists, err := fs.Exists(context.Background(), blob) + require.NoError(t, err) + require.True(t, exists) + + data, err := fs.ReadBlob(context.Background(), blob) + require.NoError(t, err) + require.NotNil(t, data) + require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[blob.String()]) + } + + actualProcesses, err = svc.dataStoreClient.ReadBackfillProcesses(context.Background()) + require.NoError(t, err) + svc.log.Info("backfill processes", "processes", actualProcesses) + require.Equal(t, storage.BackfillProcesses{}, actualProcesses) + +} + func TestArchiver_LatestStopsAtExistingBlock(t *testing.T) { beacon := beacontest.NewDefaultStubBeaconClient(t) svc, fs := setup(t, beacon) diff --git a/common/blobtest/helpers.go b/common/blobtest/helpers.go index e5d65a8..57d9523 100644 --- a/common/blobtest/helpers.go +++ b/common/blobtest/helpers.go @@ -17,6 +17,8 @@ var ( Three = common.Hash{3} Four = common.Hash{4} Five = common.Hash{5} + Six = common.Hash{6} + Seven = common.Hash{7} StartSlot = uint64(10) EndSlot = uint64(15) diff --git a/common/storage/file.go b/common/storage/file.go index f143f64..eebcfda 100644 --- a/common/storage/file.go +++ b/common/storage/file.go @@ -5,6 +5,7 @@ import ( "encoding/json" "os" "path" + "strconv" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -16,10 +17,30 @@ type FileStorage struct { } func NewFileStorage(dir string, l log.Logger) *FileStorage { - return &FileStorage{ + storage := &FileStorage{ log: l, directory: dir, } + + _, err := storage.ReadBackfillProcesses(context.Background()) + if err == ErrNotFound { + storage.log.Info("creating empty backfill_processes file") + err = storage.WriteBackfillProcesses(context.Background(), BackfillProcesses{}) + if err != nil { + storage.log.Crit("failed to create empty backfill_processes file", "err", err) + } + } + + _, err = storage.ReadLockfile(context.Background()) + if err == ErrNotFound { + storage.log.Info("creating empty lockfile file") + err = storage.WriteLockfile(context.Background(), Lockfile{}) + if err != nil { + storage.log.Crit("failed to create empty lockfile file", "err", err) + } + } + + return storage } func (s *FileStorage) Exists(_ context.Context, hash common.Hash) (bool, error) { @@ -33,7 +54,7 @@ func (s *FileStorage) Exists(_ context.Context, hash common.Hash) (bool, error) return true, nil } -func (s *FileStorage) Read(_ context.Context, hash common.Hash) (BlobData, error) { +func (s *FileStorage) ReadBlob(_ context.Context, hash common.Hash) (BlobData, error) { data, err := os.ReadFile(s.fileName(hash)) if err != nil { if os.IsNotExist(err) { @@ -51,7 +72,81 @@ func (s *FileStorage) Read(_ context.Context, hash common.Hash) (BlobData, error return result, nil } -func (s *FileStorage) Write(_ context.Context, data BlobData) error { +func (s *FileStorage) ReadBackfillProcesses(ctx context.Context) (BackfillProcesses, error) { + BackfillMu.Lock() + defer BackfillMu.Unlock() + + data, err := os.ReadFile(path.Join(s.directory, "backfill_processes")) + if err != nil { + if os.IsNotExist(err) { + return BackfillProcesses{}, ErrNotFound + } + + return BackfillProcesses{}, err + } + var result BackfillProcesses + err = json.Unmarshal(data, &result) + if err != nil { + s.log.Warn("error decoding backfill_processes", "err", err) + return BackfillProcesses{}, ErrMarshaling + } + return result, nil +} + +func (s *FileStorage) ReadLockfile(ctx context.Context) (Lockfile, error) { + data, err := os.ReadFile(path.Join(s.directory, "lockfile")) + if err != nil { + if os.IsNotExist(err) { + return Lockfile{}, ErrNotFound + } + + return Lockfile{}, err + } + var result Lockfile + err = json.Unmarshal(data, &result) + if err != nil { + s.log.Warn("error decoding lockfile", "err", err) + return Lockfile{}, ErrMarshaling + } + return result, nil +} + +func (s *FileStorage) WriteBackfillProcesses(_ context.Context, data BackfillProcesses) error { + BackfillMu.Lock() + defer BackfillMu.Unlock() + + b, err := json.Marshal(data) + if err != nil { + s.log.Warn("error encoding backfill_processes", "err", err) + return ErrMarshaling + } + err = os.WriteFile(path.Join(s.directory, "backfill_processes"), b, 0644) + if err != nil { + s.log.Warn("error writing backfill_processes", "err", err) + return err + } + + s.log.Info("wrote backfill_processes") + return nil +} + +func (s *FileStorage) WriteLockfile(_ context.Context, data Lockfile) error { + b, err := json.Marshal(data) + if err != nil { + s.log.Warn("error encoding lockfile", "err", err) + return ErrMarshaling + } + err = os.WriteFile(path.Join(s.directory, "lockfile"), b, 0644) + if err != nil { + s.log.Warn("error writing lockfile", "err", err) + return err + } + + s.log.Info("wrote to lockfile", "archiverId", data.ArchiverId, "timestamp", strconv.FormatInt(data.Timestamp, 10)) + return nil +} + +func (s *FileStorage) WriteBlob(_ context.Context, data BlobData) error { b, err := json.Marshal(data) if err != nil { s.log.Warn("error encoding blob", "err", err) diff --git a/common/storage/file_test.go b/common/storage/file_test.go index b987099..f8baac4 100644 --- a/common/storage/file_test.go +++ b/common/storage/file_test.go @@ -29,7 +29,7 @@ func runTestExists(t *testing.T, s DataStore) { require.NoError(t, err) require.False(t, exists) - err = s.Write(context.Background(), BlobData{ + err = s.WriteBlob(context.Background(), BlobData{ Header: Header{ BeaconBlockHash: id, }, @@ -52,11 +52,11 @@ func TestExists(t *testing.T) { func runTestRead(t *testing.T, s DataStore) { id := common.Hash{1, 2, 3} - _, err := s.Read(context.Background(), id) + _, err := s.ReadBlob(context.Background(), id) require.Error(t, err) require.True(t, errors.Is(err, ErrNotFound)) - err = s.Write(context.Background(), BlobData{ + err = s.WriteBlob(context.Background(), BlobData{ Header: Header{ BeaconBlockHash: id, }, @@ -64,7 +64,7 @@ func runTestRead(t *testing.T, s DataStore) { }) require.NoError(t, err) - data, err := s.Read(context.Background(), id) + data, err := s.ReadBlob(context.Background(), id) require.NoError(t, err) require.Equal(t, id, data.Header.BeaconBlockHash) } @@ -84,14 +84,14 @@ func TestBrokenStorage(t *testing.T) { // Delete the directory to simulate broken storage cleanup() - _, err := fs.Read(context.Background(), id) + _, err := fs.ReadBlob(context.Background(), id) require.Error(t, err) exists, err := fs.Exists(context.Background(), id) require.False(t, exists) require.NoError(t, err) // No error should be returned, as in this test we've just delted the directory - err = fs.Write(context.Background(), BlobData{ + err = fs.WriteBlob(context.Background(), BlobData{ Header: Header{ BeaconBlockHash: id, }, @@ -109,7 +109,7 @@ func TestReadInvalidData(t *testing.T) { err := os.WriteFile(fs.fileName(id), []byte("invalid json"), 0644) require.NoError(t, err) - _, err = fs.Read(context.Background(), id) + _, err = fs.ReadBlob(context.Background(), id) require.Error(t, err) require.True(t, errors.Is(err, ErrMarshaling)) } diff --git a/common/storage/s3.go b/common/storage/s3.go index bf68e92..9757e4b 100644 --- a/common/storage/s3.go +++ b/common/storage/s3.go @@ -7,6 +7,7 @@ import ( "encoding/json" "io" "path" + "strconv" "github.com/base-org/blob-archiver/common/flags" "github.com/ethereum/go-ethereum/common" @@ -40,13 +41,33 @@ func NewS3Storage(cfg flags.S3Config, l log.Logger) (*S3Storage, error) { return nil, err } - return &S3Storage{ + storage := &S3Storage{ s3: client, bucket: cfg.Bucket, path: cfg.Path, log: l, compress: cfg.Compress, - }, nil + } + + _, err = storage.ReadBackfillProcesses(context.Background()) + if err == ErrNotFound { + storage.log.Info("creating empty backfill_processes object") + err = storage.WriteBackfillProcesses(context.Background(), BackfillProcesses{}) + if err != nil { + log.Crit("failed to create backfill_processes key") + } + } + + _, err = storage.ReadLockfile(context.Background()) + if err == ErrNotFound { + storage.log.Info("creating empty lockfile object") + err = storage.WriteLockfile(context.Background(), Lockfile{}) + if err != nil { + log.Crit("failed to create backfill_processes key") + } + } + + return storage, nil } func (s *S3Storage) Exists(ctx context.Context, hash common.Hash) (bool, error) { @@ -63,7 +84,7 @@ func (s *S3Storage) Exists(ctx context.Context, hash common.Hash) (bool, error) return true, nil } -func (s *S3Storage) Read(ctx context.Context, hash common.Hash) (BlobData, error) { +func (s *S3Storage) ReadBlob(ctx context.Context, hash common.Hash) (BlobData, error) { res, err := s.s3.GetObject(ctx, s.bucket, path.Join(s.path, hash.String()), minio.GetObjectOptions{}) if err != nil { s.log.Info("unexpected error fetching blob", "hash", hash.String(), "err", err) @@ -103,7 +124,121 @@ func (s *S3Storage) Read(ctx context.Context, hash common.Hash) (BlobData, error return data, nil } -func (s *S3Storage) Write(ctx context.Context, data BlobData) error { +func (s *S3Storage) ReadBackfillProcesses(ctx context.Context) (BackfillProcesses, error) { + BackfillMu.Lock() + defer BackfillMu.Unlock() + + res, err := s.s3.GetObject(ctx, s.bucket, path.Join(s.path, "backfill_processes"), minio.GetObjectOptions{}) + if err != nil { + s.log.Info("unexpected error fetching backfill_processes", "err", err) + return BackfillProcesses{}, ErrStorage + } + defer res.Close() + _, err = res.Stat() + if err != nil { + errResponse := minio.ToErrorResponse(err) + if errResponse.Code == "NoSuchKey" { + s.log.Info("unable to find backfill_processes key") + return BackfillProcesses{}, ErrNotFound + } else { + s.log.Info("unexpected error fetching backfill_processes", "err", err) + return BackfillProcesses{}, ErrStorage + } + } + + var reader io.ReadCloser = res + defer reader.Close() + + var data BackfillProcesses + err = json.NewDecoder(reader).Decode(&data) + if err != nil { + s.log.Warn("error decoding backfill_processes", "err", err) + return BackfillProcesses{}, ErrMarshaling + } + + return data, nil +} + +func (s *S3Storage) ReadLockfile(ctx context.Context) (Lockfile, error) { + res, err := s.s3.GetObject(ctx, s.bucket, path.Join(s.path, "lockfile"), minio.GetObjectOptions{}) + if err != nil { + s.log.Info("unexpected error fetching lockfile", "err", err) + return Lockfile{}, ErrStorage + } + defer res.Close() + _, err = res.Stat() + if err != nil { + errResponse := minio.ToErrorResponse(err) + if errResponse.Code == "NoSuchKey" { + s.log.Info("unable to find lockfile key") + return Lockfile{}, ErrNotFound + } else { + s.log.Info("unexpected error fetching lockfile", "err", err) + return Lockfile{}, ErrStorage + } + } + + var reader io.ReadCloser = res + defer reader.Close() + + var data Lockfile + err = json.NewDecoder(reader).Decode(&data) + if err != nil { + s.log.Warn("error decoding lockfile", "err", err) + return Lockfile{}, ErrMarshaling + } + + return data, nil +} + +func (s *S3Storage) WriteBackfillProcesses(ctx context.Context, data BackfillProcesses) error { + BackfillMu.Lock() + defer BackfillMu.Unlock() + + d, err := json.Marshal(data) + if err != nil { + s.log.Warn("error encoding backfill_processes", "err", err) + return ErrMarshaling + } + + options := minio.PutObjectOptions{ + ContentType: "application/json", + } + reader := bytes.NewReader(d) + + _, err = s.s3.PutObject(ctx, s.bucket, path.Join(s.path, "backfill_processes"), reader, int64(len(d)), options) + if err != nil { + s.log.Warn("error writing to backfill_processes", "err", err) + return ErrStorage + } + + s.log.Info("wrote to backfill_processes") + return nil +} + +func (s *S3Storage) WriteLockfile(ctx context.Context, data Lockfile) error { + d, err := json.Marshal(data) + if err != nil { + s.log.Warn("error encoding lockfile", "err", err) + return ErrMarshaling + } + + options := minio.PutObjectOptions{ + ContentType: "application/json", + } + reader := bytes.NewReader(d) + + _, err = s.s3.PutObject(ctx, s.bucket, path.Join(s.path, "lockfile"), reader, int64(len(d)), options) + if err != nil { + s.log.Warn("error writing to lockfile", "err", err) + return ErrStorage + } + + s.log.Info("wrote to lockfile", "archiverId", data.ArchiverId, "timestamp", strconv.FormatInt(data.Timestamp, 10)) + return nil +} + +func (s *S3Storage) WriteBlob(ctx context.Context, data BlobData) error { b, err := json.Marshal(data) if err != nil { s.log.Warn("error encoding blob", "err", err) diff --git a/common/storage/storage.go b/common/storage/storage.go index 57d2468..3ebcee2 100644 --- a/common/storage/storage.go +++ b/common/storage/storage.go @@ -3,7 +3,9 @@ package storage import ( "context" "errors" + "sync" + v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/deneb" "github.com/base-org/blob-archiver/common/flags" "github.com/ethereum/go-ethereum/common" @@ -62,6 +64,23 @@ type BlobData struct { BlobSidecars BlobSidecars `json:"blob_sidecars"` } +var BackfillMu sync.Mutex + +type BackfillProcess struct { + Start v1.BeaconBlockHeader `json:"start_block"` + Current v1.BeaconBlockHeader `json:"current_block"` +} + +type Lockfile struct { + ArchiverId string `json:"archiver_id"` + Timestamp int64 `json:"timestamp"` +} + +// BackfillProcesses maps backfill start block hash --> BackfillProcess. This allows us to track +// multiple processes and reengage a previous backfill in case an archiver restart interrupted +// an active backfill +type BackfillProcesses map[common.Hash]BackfillProcess + // DataStoreReader is the interface for reading from a data store. type DataStoreReader interface { // Exists returns true if the given blob hash exists in the data store, false otherwise. @@ -69,22 +88,26 @@ type DataStoreReader interface { // - nil: the existence check was successful. In this case the boolean should also be set correctly. // - ErrStorage: there was an error accessing the data store. Exists(ctx context.Context, hash common.Hash) (bool, error) - // Read reads the blob data for the given beacon block hash from the data store. + // ReadBlob reads the blob data for the given beacon block hash from the data store. // It should return one of the following: // - nil: reading the blob was successful. The blob data is also returned. // - ErrNotFound: the blob data was not found in the data store. // - ErrStorage: there was an error accessing the data store. // - ErrMarshaling: there was an error decoding the blob data. - Read(ctx context.Context, hash common.Hash) (BlobData, error) + ReadBlob(ctx context.Context, hash common.Hash) (BlobData, error) + ReadBackfillProcesses(ctx context.Context) (BackfillProcesses, error) + ReadLockfile(ctx context.Context) (Lockfile, error) } // DataStoreWriter is the interface for writing to a data store. type DataStoreWriter interface { - // Write writes the given blob data to the data store. It should return one of the following errors: + // WriteBlob writes the given blob data to the data store. It should return one of the following errors: // - nil: writing the blob was successful. // - ErrStorage: there was an error accessing the data store. // - ErrMarshaling: there was an error encoding the blob data. - Write(ctx context.Context, data BlobData) error + WriteBlob(ctx context.Context, data BlobData) error + WriteBackfillProcesses(ctx context.Context, data BackfillProcesses) error + WriteLockfile(ctx context.Context, data Lockfile) error } // DataStore is the interface for a data store that can be both written to and read from. diff --git a/common/storage/storagetest/stub.go b/common/storage/storagetest/stub.go index 0389e60..60900ae 100644 --- a/common/storage/storagetest/stub.go +++ b/common/storage/storagetest/stub.go @@ -26,13 +26,13 @@ func (s *TestFileStorage) WritesFailTimes(times int) { s.writeFailCount = times } -func (s *TestFileStorage) Write(_ context.Context, data storage.BlobData) error { +func (s *TestFileStorage) WriteBlob(_ context.Context, data storage.BlobData) error { if s.writeFailCount > 0 { s.writeFailCount-- return storage.ErrStorage } - return s.FileStorage.Write(context.Background(), data) + return s.FileStorage.WriteBlob(context.Background(), data) } func (fs *TestFileStorage) CheckExistsOrFail(t *testing.T, hash common.Hash) { @@ -48,12 +48,12 @@ func (fs *TestFileStorage) CheckNotExistsOrFail(t *testing.T, hash common.Hash) } func (fs *TestFileStorage) WriteOrFail(t *testing.T, data storage.BlobData) { - err := fs.Write(context.Background(), data) + err := fs.WriteBlob(context.Background(), data) require.NoError(t, err) } func (fs *TestFileStorage) ReadOrFail(t *testing.T, hash common.Hash) storage.BlobData { - data, err := fs.Read(context.Background(), hash) + data, err := fs.ReadBlob(context.Background(), hash) require.NoError(t, err) require.NotNil(t, data) return data diff --git a/go.mod b/go.mod index d847843..cd254be 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/ethereum-optimism/optimism v1.7.6 github.com/ethereum/go-ethereum v1.101315.1 github.com/go-chi/chi/v5 v5.0.12 + github.com/google/uuid v1.6.0 github.com/minio/minio-go/v7 v7.0.70 github.com/prometheus/client_golang v1.19.0 github.com/rs/zerolog v1.32.0 @@ -49,7 +50,6 @@ require ( github.com/gofrs/flock v0.8.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/holiman/uint256 v1.2.4 // indirect github.com/huandu/go-clone v1.6.0 // indirect