Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store backfill_processes status for protection against interruptions #29

Merged
merged 12 commits into from
Jun 21, 2024
Merged
2 changes: 1 addition & 1 deletion api/service/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (a *API) blobSidecarHandler(w http.ResponseWriter, r *http.Request) {
return
}

result, storageErr := a.dataStoreClient.Read(r.Context(), beaconBlockHash)
result, storageErr := a.dataStoreClient.ReadBlob(r.Context(), beaconBlockHash)
if storageErr != nil {
if errors.Is(storageErr, storage.ErrNotFound) {
errUnknownBlock.write(w)
Expand Down
4 changes: 2 additions & 2 deletions api/service/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ func TestAPIService(t *testing.T) {
},
}

err := fs.Write(context.Background(), blockOne)
err := fs.WriteBlob(context.Background(), blockOne)
require.NoError(t, err)

err = fs.Write(context.Background(), blockTwo)
err = fs.WriteBlob(context.Background(), blockTwo)
require.NoError(t, err)

beaconClient.Headers["finalized"] = &v1.BeaconBlockHeader{
Expand Down
136 changes: 114 additions & 22 deletions archiver/service/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/google/uuid"
)

const (
Expand All @@ -37,6 +38,7 @@ func NewArchiver(l log.Logger, cfg flags.ArchiverConfig, dataStoreClient storage
metrics: m,
beaconClient: client,
stopCh: make(chan struct{}),
id: uuid.New().String(),
}, nil
}

Expand All @@ -47,6 +49,7 @@ type Archiver struct {
beaconClient BeaconClient
metrics metrics.Metricer
stopCh chan struct{}
id string
}

// Start starts archiving blobs. It begins polling the beacon node for the latest blocks and persisting blobs for
Expand All @@ -63,6 +66,8 @@ func (a *Archiver) Start(ctx context.Context) error {
return err
}

a.waitObtainStorageLock(ctx)
bitwiseguy marked this conversation as resolved.
Show resolved Hide resolved

go a.backfillBlobs(ctx, currentBlock)

return a.trackLatestBlocks(ctx)
Expand Down Expand Up @@ -119,7 +124,7 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier
}

// The blob that is being written has not been validated. It is assumed that the beacon node is trusted.
err = a.dataStoreClient.Write(ctx, blobData)
err = a.dataStoreClient.WriteBlob(ctx, blobData)

if err != nil {
a.log.Error("failed to write blob", "err", err)
Expand All @@ -131,36 +136,123 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier
return currentHeader.Data, exists, nil
}

const LockUpdateInterval = 10 * time.Second
const LockTimeout = int64(20) // 20 seconds
var ObtainLockRetryInterval = 10 * time.Second

func (a *Archiver) waitObtainStorageLock(ctx context.Context) {
lockfile, err := a.dataStoreClient.ReadLockfile(ctx)
if err != nil {
a.log.Crit("failed to read lockfile", "err", err)
}

currentTime := time.Now().Unix()
emptyLockfile := storage.Lockfile{}
if lockfile != emptyLockfile {
for lockfile.ArchiverId != a.id && lockfile.Timestamp+LockTimeout > currentTime {
// Loop until the timestamp read from storage is expired
a.log.Info("waiting for storage lock timestamp to expire",
"timestamp", strconv.FormatInt(lockfile.Timestamp, 10),
"currentTime", strconv.FormatInt(currentTime, 10),
)
time.Sleep(ObtainLockRetryInterval)
lockfile, err = a.dataStoreClient.ReadLockfile(ctx)
if err != nil {
a.log.Crit("failed to read lockfile", "err", err)
}
currentTime = time.Now().Unix()
}
}

err = a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime})
bitwiseguy marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
a.log.Crit("failed to write to lockfile: %v", err)
}
a.log.Info("obtained storage lock")

go func() {
// Retain storage lock by continually updating the stored timestamp
ticker := time.NewTicker(LockUpdateInterval)
for {
select {
case <-ticker.C:
currentTime := time.Now().Unix()
err := a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime})
if err != nil {
a.log.Error("failed to update lockfile timestamp", "err", err)
}
case <-ctx.Done():
ticker.Stop()
return
}
}
}()
}

// backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted
// to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled.
// If an error is encountered persisting a block, it will retry after waiting for a period of time.
func (a *Archiver) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) {
current, alreadyExists, err := latest, false, error(nil)

defer func() {
a.log.Info("backfill complete", "endHash", current.Root.String(), "startHash", latest.Root.String())
}()
// Add backfill process that starts at latest slot, then loop through all backfill processes
backfillProcesses, err := a.dataStoreClient.ReadBackfillProcesses(ctx)
if err != nil {
a.log.Crit("failed to read backfill_processes", "err", err)
}
backfillProcesses[common.Hash(latest.Root)] = storage.BackfillProcess{Start: *latest, Current: *latest}
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)

backfillLoop := func(start *v1.BeaconBlockHeader, current *v1.BeaconBlockHeader) {
curr, alreadyExists, err := current, false, error(nil)
count := 0
a.log.Info("backfill process initiated",
"currHash", curr.Root.String(),
"currSlot", curr.Header.Message.Slot,
"startHash", start.Root.String(),
"startSlot", start.Header.Message.Slot,
)

defer func() {
a.log.Info("backfill process complete",
"endHash", curr.Root.String(),
"endSlot", curr.Header.Message.Slot,
"startHash", start.Root.String(),
"startSlot", start.Header.Message.Slot,
)
delete(backfillProcesses, common.Hash(start.Root))
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
}()

for !alreadyExists {
previous := curr

if common.Hash(curr.Root) == a.cfg.OriginBlock {
a.log.Info("reached origin block", "hash", curr.Root.String())
return
}

for !alreadyExists {
previous := current
curr, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false)
if err != nil {
a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String())
// Revert back to block we failed to fetch
curr = previous
time.Sleep(backfillErrorRetryInterval)
continue
}

if common.Hash(current.Root) == a.cfg.OriginBlock {
a.log.Info("reached origin block", "hash", current.Root.String())
return
}
if !alreadyExists {
a.metrics.RecordProcessedBlock(metrics.BlockSourceBackfill)
}

current, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false)
if err != nil {
a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String())
// Revert back to block we failed to fetch
current = previous
time.Sleep(backfillErrorRetryInterval)
continue
count++
if count%10 == 0 {
backfillProcesses[common.Hash(start.Root)] = storage.BackfillProcess{Start: *start, Current: *curr}
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
}
}
}

if !alreadyExists {
a.metrics.RecordProcessedBlock(metrics.BlockSourceBackfill)
}
for _, process := range backfillProcesses {
backfillLoop(&process.Start, &process.Current)
}
}

Expand Down
105 changes: 101 additions & 4 deletions archiver/service/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestArchiver_BackfillToOrigin(t *testing.T) {
svc, fs := setup(t, beacon)

// We have the current head, which is block 5 written to storage
err := fs.Write(context.Background(), storage.BlobData{
err := fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Five,
},
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
svc, fs := setup(t, beacon)

// We have the current head, which is block 5 written to storage
err := fs.Write(context.Background(), storage.BlobData{
err := fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Five,
},
Expand All @@ -130,7 +130,7 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
require.NoError(t, err)

// We also have block 1 written to storage
err = fs.Write(context.Background(), storage.BlobData{
err = fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.One,
},
Expand All @@ -156,13 +156,110 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
require.NoError(t, err)
require.True(t, exists)

data, err := fs.Read(context.Background(), blob)
data, err := fs.ReadBlob(context.Background(), blob)
require.NoError(t, err)
require.NotNil(t, data)
require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[blob.String()])
}
}

func TestArchiver_ObtainLockfile(t *testing.T) {
beacon := beacontest.NewDefaultStubBeaconClient(t)
svc, _ := setup(t, beacon)

currentTime := time.Now().Unix()
expiredTime := currentTime - 19
err := svc.dataStoreClient.WriteLockfile(context.Background(), storage.Lockfile{ArchiverId: "FAKEID", Timestamp: expiredTime})
require.NoError(t, err)

ObtainLockRetryInterval = 1 * time.Second
svc.waitObtainStorageLock(context.Background())

lockfile, err := svc.dataStoreClient.ReadLockfile(context.Background())
require.NoError(t, err)
require.Equal(t, svc.id, lockfile.ArchiverId)
require.True(t, lockfile.Timestamp >= currentTime)
}

func TestArchiver_BackfillFinishOldProcess(t *testing.T) {
beacon := beacontest.NewDefaultStubBeaconClient(t)
svc, fs := setup(t, beacon)

// We have the current head, which is block 5 written to storage
err := fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Five,
},
BlobSidecars: storage.BlobSidecars{
Data: beacon.Blobs[blobtest.Five.String()],
},
})
require.NoError(t, err)

// We also have block 3 written to storage
err = fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Three,
},
BlobSidecars: storage.BlobSidecars{
Data: beacon.Blobs[blobtest.Three.String()],
},
})
require.NoError(t, err)

// We also have block 1 written to storage
err = fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.One,
},
BlobSidecars: storage.BlobSidecars{
Data: beacon.Blobs[blobtest.One.String()],
},
})
require.NoError(t, err)

// We expect to backfill blob 4 first, then 2 in a separate process
expectedBlobs := []common.Hash{blobtest.Four, blobtest.Two}

for _, blob := range expectedBlobs {
exists, err := fs.Exists(context.Background(), blob)
require.NoError(t, err)
require.False(t, exists)
}

actualProcesses, err := svc.dataStoreClient.ReadBackfillProcesses(context.Background())
expectedProcesses := make(storage.BackfillProcesses)
require.NoError(t, err)
require.Equal(t, expectedProcesses, actualProcesses)

expectedProcesses[blobtest.Three] = storage.BackfillProcess{Start: *beacon.Headers[blobtest.Three.String()], Current: *beacon.Headers[blobtest.Three.String()]}
err = svc.dataStoreClient.WriteBackfillProcesses(context.Background(), expectedProcesses)
require.NoError(t, err)

actualProcesses, err = svc.dataStoreClient.ReadBackfillProcesses(context.Background())
require.NoError(t, err)
require.Equal(t, expectedProcesses, actualProcesses)

svc.backfillBlobs(context.Background(), beacon.Headers[blobtest.Five.String()])

for _, blob := range expectedBlobs {
exists, err := fs.Exists(context.Background(), blob)
require.NoError(t, err)
require.True(t, exists)

data, err := fs.ReadBlob(context.Background(), blob)
require.NoError(t, err)
require.NotNil(t, data)
require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[blob.String()])
}

actualProcesses, err = svc.dataStoreClient.ReadBackfillProcesses(context.Background())
require.NoError(t, err)
svc.log.Info("backfill processes", "processes", actualProcesses)
require.Equal(t, storage.BackfillProcesses{}, actualProcesses)

}

func TestArchiver_LatestStopsAtExistingBlock(t *testing.T) {
beacon := beacontest.NewDefaultStubBeaconClient(t)
svc, fs := setup(t, beacon)
Expand Down
2 changes: 2 additions & 0 deletions common/blobtest/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var (
Three = common.Hash{3}
Four = common.Hash{4}
Five = common.Hash{5}
Six = common.Hash{6}
Seven = common.Hash{7}

StartSlot = uint64(10)
EndSlot = uint64(15)
Expand Down
Loading