Skip to content

Commit

Permalink
Merge pull request #29 from bitwiseguy/ss/finish-old-backfills
Browse files Browse the repository at this point in the history
Store backfill_processes status for protection against interruptions
  • Loading branch information
danyalprout authored Jun 21, 2024
2 parents 1ef67f6 + cfba816 commit 988d545
Show file tree
Hide file tree
Showing 11 changed files with 496 additions and 52 deletions.
2 changes: 1 addition & 1 deletion api/service/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (a *API) blobSidecarHandler(w http.ResponseWriter, r *http.Request) {
return
}

result, storageErr := a.dataStoreClient.Read(r.Context(), beaconBlockHash)
result, storageErr := a.dataStoreClient.ReadBlob(r.Context(), beaconBlockHash)
if storageErr != nil {
if errors.Is(storageErr, storage.ErrNotFound) {
errUnknownBlock.write(w)
Expand Down
4 changes: 2 additions & 2 deletions api/service/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ func TestAPIService(t *testing.T) {
},
}

err := fs.Write(context.Background(), blockOne)
err := fs.WriteBlob(context.Background(), blockOne)
require.NoError(t, err)

err = fs.Write(context.Background(), blockTwo)
err = fs.WriteBlob(context.Background(), blockTwo)
require.NoError(t, err)

beaconClient.Headers["finalized"] = &v1.BeaconBlockHeader{
Expand Down
136 changes: 114 additions & 22 deletions archiver/service/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/google/uuid"
)

const (
Expand All @@ -37,6 +38,7 @@ func NewArchiver(l log.Logger, cfg flags.ArchiverConfig, dataStoreClient storage
metrics: m,
beaconClient: client,
stopCh: make(chan struct{}),
id: uuid.New().String(),
}, nil
}

Expand All @@ -47,6 +49,7 @@ type Archiver struct {
beaconClient BeaconClient
metrics metrics.Metricer
stopCh chan struct{}
id string
}

// Start starts archiving blobs. It begins polling the beacon node for the latest blocks and persisting blobs for
Expand All @@ -63,6 +66,8 @@ func (a *Archiver) Start(ctx context.Context) error {
return err
}

a.waitObtainStorageLock(ctx)

go a.backfillBlobs(ctx, currentBlock)

return a.trackLatestBlocks(ctx)
Expand Down Expand Up @@ -119,7 +124,7 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier
}

// The blob that is being written has not been validated. It is assumed that the beacon node is trusted.
err = a.dataStoreClient.Write(ctx, blobData)
err = a.dataStoreClient.WriteBlob(ctx, blobData)

if err != nil {
a.log.Error("failed to write blob", "err", err)
Expand All @@ -131,36 +136,123 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier
return currentHeader.Data, exists, nil
}

const LockUpdateInterval = 10 * time.Second
const LockTimeout = int64(20) // 20 seconds
var ObtainLockRetryInterval = 10 * time.Second

func (a *Archiver) waitObtainStorageLock(ctx context.Context) {
lockfile, err := a.dataStoreClient.ReadLockfile(ctx)
if err != nil {
a.log.Crit("failed to read lockfile", "err", err)
}

currentTime := time.Now().Unix()
emptyLockfile := storage.Lockfile{}
if lockfile != emptyLockfile {
for lockfile.ArchiverId != a.id && lockfile.Timestamp+LockTimeout > currentTime {
// Loop until the timestamp read from storage is expired
a.log.Info("waiting for storage lock timestamp to expire",
"timestamp", strconv.FormatInt(lockfile.Timestamp, 10),
"currentTime", strconv.FormatInt(currentTime, 10),
)
time.Sleep(ObtainLockRetryInterval)
lockfile, err = a.dataStoreClient.ReadLockfile(ctx)
if err != nil {
a.log.Crit("failed to read lockfile", "err", err)
}
currentTime = time.Now().Unix()
}
}

err = a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime})
if err != nil {
a.log.Crit("failed to write to lockfile: %v", err)
}
a.log.Info("obtained storage lock")

go func() {
// Retain storage lock by continually updating the stored timestamp
ticker := time.NewTicker(LockUpdateInterval)
for {
select {
case <-ticker.C:
currentTime := time.Now().Unix()
err := a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime})
if err != nil {
a.log.Error("failed to update lockfile timestamp", "err", err)
}
case <-ctx.Done():
ticker.Stop()
return
}
}
}()
}

// backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted
// to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled.
// If an error is encountered persisting a block, it will retry after waiting for a period of time.
func (a *Archiver) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) {
current, alreadyExists, err := latest, false, error(nil)

defer func() {
a.log.Info("backfill complete", "endHash", current.Root.String(), "startHash", latest.Root.String())
}()
// Add backfill process that starts at latest slot, then loop through all backfill processes
backfillProcesses, err := a.dataStoreClient.ReadBackfillProcesses(ctx)
if err != nil {
a.log.Crit("failed to read backfill_processes", "err", err)
}
backfillProcesses[common.Hash(latest.Root)] = storage.BackfillProcess{Start: *latest, Current: *latest}
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)

backfillLoop := func(start *v1.BeaconBlockHeader, current *v1.BeaconBlockHeader) {
curr, alreadyExists, err := current, false, error(nil)
count := 0
a.log.Info("backfill process initiated",
"currHash", curr.Root.String(),
"currSlot", curr.Header.Message.Slot,
"startHash", start.Root.String(),
"startSlot", start.Header.Message.Slot,
)

defer func() {
a.log.Info("backfill process complete",
"endHash", curr.Root.String(),
"endSlot", curr.Header.Message.Slot,
"startHash", start.Root.String(),
"startSlot", start.Header.Message.Slot,
)
delete(backfillProcesses, common.Hash(start.Root))
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
}()

for !alreadyExists {
previous := curr

if common.Hash(curr.Root) == a.cfg.OriginBlock {
a.log.Info("reached origin block", "hash", curr.Root.String())
return
}

for !alreadyExists {
previous := current
curr, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false)
if err != nil {
a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String())
// Revert back to block we failed to fetch
curr = previous
time.Sleep(backfillErrorRetryInterval)
continue
}

if common.Hash(current.Root) == a.cfg.OriginBlock {
a.log.Info("reached origin block", "hash", current.Root.String())
return
}
if !alreadyExists {
a.metrics.RecordProcessedBlock(metrics.BlockSourceBackfill)
}

current, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false)
if err != nil {
a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String())
// Revert back to block we failed to fetch
current = previous
time.Sleep(backfillErrorRetryInterval)
continue
count++
if count%10 == 0 {
backfillProcesses[common.Hash(start.Root)] = storage.BackfillProcess{Start: *start, Current: *curr}
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
}
}
}

if !alreadyExists {
a.metrics.RecordProcessedBlock(metrics.BlockSourceBackfill)
}
for _, process := range backfillProcesses {
backfillLoop(&process.Start, &process.Current)
}
}

Expand Down
105 changes: 101 additions & 4 deletions archiver/service/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestArchiver_BackfillToOrigin(t *testing.T) {
svc, fs := setup(t, beacon)

// We have the current head, which is block 5 written to storage
err := fs.Write(context.Background(), storage.BlobData{
err := fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Five,
},
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
svc, fs := setup(t, beacon)

// We have the current head, which is block 5 written to storage
err := fs.Write(context.Background(), storage.BlobData{
err := fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Five,
},
Expand All @@ -130,7 +130,7 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
require.NoError(t, err)

// We also have block 1 written to storage
err = fs.Write(context.Background(), storage.BlobData{
err = fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.One,
},
Expand All @@ -156,13 +156,110 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
require.NoError(t, err)
require.True(t, exists)

data, err := fs.Read(context.Background(), blob)
data, err := fs.ReadBlob(context.Background(), blob)
require.NoError(t, err)
require.NotNil(t, data)
require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[blob.String()])
}
}

func TestArchiver_ObtainLockfile(t *testing.T) {
beacon := beacontest.NewDefaultStubBeaconClient(t)
svc, _ := setup(t, beacon)

currentTime := time.Now().Unix()
expiredTime := currentTime - 19
err := svc.dataStoreClient.WriteLockfile(context.Background(), storage.Lockfile{ArchiverId: "FAKEID", Timestamp: expiredTime})
require.NoError(t, err)

ObtainLockRetryInterval = 1 * time.Second
svc.waitObtainStorageLock(context.Background())

lockfile, err := svc.dataStoreClient.ReadLockfile(context.Background())
require.NoError(t, err)
require.Equal(t, svc.id, lockfile.ArchiverId)
require.True(t, lockfile.Timestamp >= currentTime)
}

func TestArchiver_BackfillFinishOldProcess(t *testing.T) {
beacon := beacontest.NewDefaultStubBeaconClient(t)
svc, fs := setup(t, beacon)

// We have the current head, which is block 5 written to storage
err := fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Five,
},
BlobSidecars: storage.BlobSidecars{
Data: beacon.Blobs[blobtest.Five.String()],
},
})
require.NoError(t, err)

// We also have block 3 written to storage
err = fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Three,
},
BlobSidecars: storage.BlobSidecars{
Data: beacon.Blobs[blobtest.Three.String()],
},
})
require.NoError(t, err)

// We also have block 1 written to storage
err = fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.One,
},
BlobSidecars: storage.BlobSidecars{
Data: beacon.Blobs[blobtest.One.String()],
},
})
require.NoError(t, err)

// We expect to backfill blob 4 first, then 2 in a separate process
expectedBlobs := []common.Hash{blobtest.Four, blobtest.Two}

for _, blob := range expectedBlobs {
exists, err := fs.Exists(context.Background(), blob)
require.NoError(t, err)
require.False(t, exists)
}

actualProcesses, err := svc.dataStoreClient.ReadBackfillProcesses(context.Background())
expectedProcesses := make(storage.BackfillProcesses)
require.NoError(t, err)
require.Equal(t, expectedProcesses, actualProcesses)

expectedProcesses[blobtest.Three] = storage.BackfillProcess{Start: *beacon.Headers[blobtest.Three.String()], Current: *beacon.Headers[blobtest.Three.String()]}
err = svc.dataStoreClient.WriteBackfillProcesses(context.Background(), expectedProcesses)
require.NoError(t, err)

actualProcesses, err = svc.dataStoreClient.ReadBackfillProcesses(context.Background())
require.NoError(t, err)
require.Equal(t, expectedProcesses, actualProcesses)

svc.backfillBlobs(context.Background(), beacon.Headers[blobtest.Five.String()])

for _, blob := range expectedBlobs {
exists, err := fs.Exists(context.Background(), blob)
require.NoError(t, err)
require.True(t, exists)

data, err := fs.ReadBlob(context.Background(), blob)
require.NoError(t, err)
require.NotNil(t, data)
require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[blob.String()])
}

actualProcesses, err = svc.dataStoreClient.ReadBackfillProcesses(context.Background())
require.NoError(t, err)
svc.log.Info("backfill processes", "processes", actualProcesses)
require.Equal(t, storage.BackfillProcesses{}, actualProcesses)

}

func TestArchiver_LatestStopsAtExistingBlock(t *testing.T) {
beacon := beacontest.NewDefaultStubBeaconClient(t)
svc, fs := setup(t, beacon)
Expand Down
2 changes: 2 additions & 0 deletions common/blobtest/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var (
Three = common.Hash{3}
Four = common.Hash{4}
Five = common.Hash{5}
Six = common.Hash{6}
Seven = common.Hash{7}

StartSlot = uint64(10)
EndSlot = uint64(15)
Expand Down
Loading

0 comments on commit 988d545

Please sign in to comment.