Skip to content

Commit

Permalink
Use lockfile to prevent multiple archivers from writing to same storage
Browse files Browse the repository at this point in the history
  • Loading branch information
bitwiseguy committed Jun 19, 2024
1 parent d0e42fb commit 2437e3b
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 2 deletions.
55 changes: 55 additions & 0 deletions archiver/service/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/google/uuid"
)

const (
Expand All @@ -37,6 +38,7 @@ func NewArchiver(l log.Logger, cfg flags.ArchiverConfig, dataStoreClient storage
metrics: m,
beaconClient: client,
stopCh: make(chan struct{}),
id: uuid.New().String(),
}, nil
}

Expand All @@ -47,6 +49,7 @@ type Archiver struct {
beaconClient BeaconClient
metrics metrics.Metricer
stopCh chan struct{}
id string
}

// Start starts archiving blobs. It begins polling the beacon node for the latest blocks and persisting blobs for
Expand All @@ -63,6 +66,8 @@ func (a *Archiver) Start(ctx context.Context) error {
return err
}

a.waitObtainStorageLock(ctx)

go a.backfillBlobs(ctx, currentBlock)

return a.trackLatestBlocks(ctx)
Expand Down Expand Up @@ -131,6 +136,56 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier
return currentHeader.Data, exists, nil
}

const (
LockUpdateInterval = 10 * time.Second
ObtainLockRetryInterval = 10 * time.Second
LockTimeout = int64(20) // 20 seconds
)

func (a *Archiver) waitObtainStorageLock(ctx context.Context) {
lockfile, err := a.dataStoreClient.ReadLockfile(ctx)
if err != nil {
a.log.Crit("failed to read lockfile", "err", err)
}

currentTime := time.Now().Unix()
emptyLockfile := storage.Lockfile{}
if lockfile != emptyLockfile {
for lockfile.ArchiverId != a.id && lockfile.Timestamp+LockTimeout > currentTime {
// Loop until the timestamp read from storage is expired
time.Sleep(ObtainLockRetryInterval)
lockfile, err = a.dataStoreClient.ReadLockfile(ctx)
if err != nil {
a.log.Crit("failed to read lockfile", "err", err)
}
currentTime = time.Now().Unix()
}
}

err = a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime})
if err != nil {
a.log.Crit("failed to write to lockfile: %v", err)
}

go func() {
// Retain storage lock by continually updating the stored timestamp
ticker := time.NewTicker(LockUpdateInterval)
for {
select {
case <-ticker.C:
currentTime := time.Now().Unix()
err := a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime})
if err != nil {
a.log.Error("failed to update lockfile timestamp", "err", err)
}
case <-ctx.Done():
ticker.Stop()
return
}
}
}()
}

// backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted
// to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled.
// If an error is encountered persisting a block, it will retry after waiting for a period of time.
Expand Down
47 changes: 45 additions & 2 deletions common/storage/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@ func NewFileStorage(dir string, l log.Logger) *FileStorage {

_, err := storage.ReadBackfillProcesses(context.Background())
if err == ErrNotFound {
storage.log.Info("creating empty backfill_processes object")
storage.log.Info("creating empty backfill_processes file")
err = storage.WriteBackfillProcesses(context.Background(), BackfillProcesses{})
if err != nil {
storage.log.Crit("failed to create backfill_processes file")
storage.log.Crit("failed to create empty backfill_processes file", "err", err)
}
}

_, err = storage.ReadLockfile(context.Background())
if err == ErrNotFound {
storage.log.Info("creating empty lockfile file")
err = storage.WriteLockfile(context.Background(), Lockfile{})
if err != nil {
storage.log.Crit("failed to create empty lockfile file", "err", err)
}
}

Expand Down Expand Up @@ -83,6 +92,24 @@ func (s *FileStorage) ReadBackfillProcesses(ctx context.Context) (BackfillProces
return result, nil
}

func (s *FileStorage) ReadLockfile(ctx context.Context) (Lockfile, error) {
data, err := os.ReadFile(path.Join(s.directory, "lockfile"))
if err != nil {
if os.IsNotExist(err) {
return Lockfile{}, ErrNotFound
}

return Lockfile{}, err
}
var result Lockfile
err = json.Unmarshal(data, &result)
if err != nil {
s.log.Warn("error decoding lockfile", "err", err)
return Lockfile{}, ErrMarshaling
}
return result, nil
}

func (s *FileStorage) WriteBackfillProcesses(_ context.Context, data BackfillProcesses) error {
BackfillMu.Lock()
defer BackfillMu.Unlock()
Expand All @@ -102,6 +129,22 @@ func (s *FileStorage) WriteBackfillProcesses(_ context.Context, data BackfillPro
return nil
}

func (s *FileStorage) WriteLockfile(_ context.Context, data Lockfile) error {
b, err := json.Marshal(data)
if err != nil {
s.log.Warn("error encoding lockfile", "err", err)
return ErrMarshaling
}
err = os.WriteFile(path.Join(s.directory, "lockfile"), b, 0644)
if err != nil {
s.log.Warn("error writing lockfile", "err", err)
return err
}

s.log.Info("wrote to lockfile", "archiverId", data.ArchiverId, "timestamp", data.Timestamp)
return nil
}

func (s *FileStorage) WriteBlob(_ context.Context, data BlobData) error {
b, err := json.Marshal(data)
if err != nil {
Expand Down
63 changes: 63 additions & 0 deletions common/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ func NewS3Storage(cfg flags.S3Config, l log.Logger) (*S3Storage, error) {
}
}

_, err = storage.ReadLockfile(context.Background())
if err == ErrNotFound {
storage.log.Info("creating empty lockfile object")
err = storage.WriteLockfile(context.Background(), Lockfile{})
if err != nil {
log.Crit("failed to create backfill_processes key")
}
}

return storage, nil
}

Expand Down Expand Up @@ -149,6 +158,38 @@ func (s *S3Storage) ReadBackfillProcesses(ctx context.Context) (BackfillProcesse
return data, nil
}

func (s *S3Storage) ReadLockfile(ctx context.Context) (Lockfile, error) {
res, err := s.s3.GetObject(ctx, s.bucket, path.Join(s.path, "lockfile"), minio.GetObjectOptions{})
if err != nil {
s.log.Info("unexpected error fetching lockfile", "err", err)
return Lockfile{}, ErrStorage
}
defer res.Close()
_, err = res.Stat()
if err != nil {
errResponse := minio.ToErrorResponse(err)
if errResponse.Code == "NoSuchKey" {
s.log.Info("unable to find lockfile key")
return Lockfile{}, ErrNotFound
} else {
s.log.Info("unexpected error fetching lockfile", "err", err)
return Lockfile{}, ErrStorage
}
}

var reader io.ReadCloser = res
defer reader.Close()

var data Lockfile
err = json.NewDecoder(reader).Decode(&data)
if err != nil {
s.log.Warn("error decoding lockfile", "err", err)
return Lockfile{}, ErrMarshaling
}

return data, nil
}

func (s *S3Storage) WriteBackfillProcesses(ctx context.Context, data BackfillProcesses) error {
BackfillMu.Lock()
defer BackfillMu.Unlock()
Expand All @@ -174,6 +215,28 @@ func (s *S3Storage) WriteBackfillProcesses(ctx context.Context, data BackfillPro
return nil
}

func (s *S3Storage) WriteLockfile(ctx context.Context, data Lockfile) error {
d, err := json.Marshal(data)
if err != nil {
s.log.Warn("error encoding lockfile", "err", err)
return ErrMarshaling
}

options := minio.PutObjectOptions{
ContentType: "application/json",
}
reader := bytes.NewReader(d)

_, err = s.s3.PutObject(ctx, s.bucket, path.Join(s.path, "lockfile"), reader, int64(len(d)), options)
if err != nil {
s.log.Warn("error writing to lockfile", "err", err)
return ErrStorage
}

s.log.Info("wrote to lockfile", "archiverId", data.ArchiverId, "timestamp", data.Timestamp)
return nil
}

func (s *S3Storage) WriteBlob(ctx context.Context, data BlobData) error {
b, err := json.Marshal(data)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions common/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ type BackfillProcess struct {
Current v1.BeaconBlockHeader `json:"current_block"`
}

type Lockfile struct {
ArchiverId string `json:"archiver_id"`
Timestamp int64 `json:"timestamp"`
}

// BackfillProcesses maps backfill start block hash --> BackfillProcess. This allows us to track
// multiple processes and reengage a previous backfill in case an archiver restart interrupted
// an active backfill
Expand All @@ -91,6 +96,7 @@ type DataStoreReader interface {
// - ErrMarshaling: there was an error decoding the blob data.
ReadBlob(ctx context.Context, hash common.Hash) (BlobData, error)
ReadBackfillProcesses(ctx context.Context) (BackfillProcesses, error)
ReadLockfile(ctx context.Context) (Lockfile, error)
}

// DataStoreWriter is the interface for writing to a data store.
Expand All @@ -101,6 +107,7 @@ type DataStoreWriter interface {
// - ErrMarshaling: there was an error encoding the blob data.
WriteBlob(ctx context.Context, data BlobData) error
WriteBackfillProcesses(ctx context.Context, data BackfillProcesses) error
WriteLockfile(ctx context.Context, data Lockfile) error
}

// DataStore is the interface for a data store that can be both written to and read from.
Expand Down

0 comments on commit 2437e3b

Please sign in to comment.