Skip to content

Commit

Permalink
Merge pull request #7 from base-org/rearchive-request
Browse files Browse the repository at this point in the history
Add Rearchive API
  • Loading branch information
danyalprout authored Feb 20, 2024
2 parents ec55442 + d99f0f4 commit 88f0b16
Show file tree
Hide file tree
Showing 9 changed files with 431 additions and 101 deletions.
11 changes: 8 additions & 3 deletions archiver/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,14 @@ func Main() cliapp.LifecycleAction {
return nil, err
}

api := service.NewAPI(m, l)

l.Info("Initializing Archiver Service")
return service.NewService(l, cfg, api, storageClient, beaconClient, m)
archiver, err := service.NewArchiver(l, cfg, storageClient, beaconClient, m)
if err != nil {
return nil, fmt.Errorf("failed to initialize archiver: %w", err)
}

api := service.NewAPI(m, l, archiver)

return service.NewService(l, cfg, api, archiver, m)
}
}
5 changes: 3 additions & 2 deletions archiver/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ type BlockSource string
var (
MetricsNamespace = "blob_archiver"

BlockSourceBackfill BlockSource = "backfill"
BlockSourceLive BlockSource = "live"
BlockSourceBackfill BlockSource = "backfill"
BlockSourceLive BlockSource = "live"
BlockSourceRearchive BlockSource = "rearchive"
)

type Metricer interface {
Expand Down
92 changes: 85 additions & 7 deletions archiver/service/api.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package service

import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"

m "github.com/base-org/blob-archiver/archiver/metrics"
Expand All @@ -16,17 +19,19 @@ const (
)

type API struct {
router *chi.Mux
logger log.Logger
metrics m.Metricer
router *chi.Mux
logger log.Logger
metrics m.Metricer
archiver *Archiver
}

// NewAPI creates a new Archiver API instance. This API exposes an admin interface to control the archiver.
func NewAPI(metrics m.Metricer, logger log.Logger) *API {
func NewAPI(metrics m.Metricer, logger log.Logger, archiver *Archiver) *API {
result := &API{
router: chi.NewRouter(),
logger: logger,
metrics: metrics,
router: chi.NewRouter(),
archiver: archiver,
logger: logger,
metrics: metrics,
}

r := result.router
Expand All @@ -41,6 +46,79 @@ func NewAPI(metrics m.Metricer, logger log.Logger) *API {
})

r.Get("/", http.NotFound)
r.Post("/rearchive", result.rearchiveBlocks)

return result
}

type rearchiveResponse struct {
Error string `json:"error,omitempty"`
BlockStart uint64 `json:"blockStart"`
BlockEnd uint64 `json:"blockEnd"`
}

func toSlot(input string) (uint64, error) {
if input == "" {
return 0, fmt.Errorf("must provide param")
}
res, err := strconv.ParseUint(input, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid slot: \"%s\"", input)
}
return res, nil
}

// rearchiveBlocks rearchives blobs from blocks between the given from and to slots.
// If any blocks are already archived, they will be overwritten with data from the beacon node.
func (a *API) rearchiveBlocks(w http.ResponseWriter, r *http.Request) {
from, err := toSlot(r.URL.Query().Get("from"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(rearchiveResponse{
Error: fmt.Sprintf("invalid from param: %v", err),
})
return
}

to, err := toSlot(r.URL.Query().Get("to"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(rearchiveResponse{
Error: fmt.Sprintf("invalid to param: %v", err),
})
return
}

if from > to {
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(rearchiveResponse{
Error: fmt.Sprintf("invalid range: from %d to %d", from, to),
})
return
}

blockStart, blockEnd, err := a.archiver.rearchiveRange(from, to)
if err != nil {
a.logger.Error("Failed to rearchive blocks", "err", err)

w.WriteHeader(http.StatusInternalServerError)
err = json.NewEncoder(w).Encode(rearchiveResponse{
Error: err.Error(),
BlockStart: blockStart,
BlockEnd: blockEnd,
})
} else {
a.logger.Info("Rearchiving blocks complete")
w.WriteHeader(http.StatusOK)

err = json.NewEncoder(w).Encode(rearchiveResponse{
BlockStart: blockStart,
BlockEnd: blockEnd,
})
}

if err != nil {
a.logger.Error("Failed to write response", "err", err)
w.WriteHeader(http.StatusInternalServerError)
}
}
83 changes: 80 additions & 3 deletions archiver/service/api_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package service

import (
"encoding/json"
"net/http/httptest"
"testing"
"time"

"github.com/base-org/blob-archiver/archiver/flags"
"github.com/base-org/blob-archiver/archiver/metrics"
"github.com/base-org/blob-archiver/common/storage/storagetest"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)

func setupAPI(t *testing.T) *API {
func setupAPI(t *testing.T) (*API, *storagetest.TestFileStorage) {
logger := testlog.Logger(t, log.LvlInfo)
m := metrics.NewMetrics()
return NewAPI(m, logger)
fs := storagetest.NewTestFileStorage(t, logger)
archiver, err := NewArchiver(logger, flags.ArchiverConfig{
PollInterval: 10 * time.Second,
}, fs, nil, m)
require.NoError(t, err)
return NewAPI(m, logger, archiver), fs
}

func TestHealthHandler(t *testing.T) {
a := setupAPI(t)
a, _ := setupAPI(t)

request := httptest.NewRequest("GET", "/healthz", nil)
response := httptest.NewRecorder()
Expand All @@ -26,3 +35,71 @@ func TestHealthHandler(t *testing.T) {

require.Equal(t, 200, response.Code)
}

func TestRearchiveHandler(t *testing.T) {
a, _ := setupAPI(t)

tests := []struct {
name string
path string
expectedStatus int
error string
}{
{
name: "should fail with no params",
path: "/rearchive",
expectedStatus: 400,
error: "invalid from param: must provide param",
},
{
name: "should fail with missing to param",
path: "/rearchive?from=1",
expectedStatus: 400,
error: "invalid to param: must provide param",
},
{
name: "should fail with missing from param",
path: "/rearchive?to=1",
expectedStatus: 400,
error: "invalid from param: must provide param",
},
{
name: "should fail with invalid from param",
path: "/rearchive?from=blah&to=1",
expectedStatus: 400,
error: "invalid from param: invalid slot: \"blah\"",
},
{
name: "should fail with invalid to param",
path: "/rearchive?from=1&to=blah",
expectedStatus: 400,
error: "invalid to param: invalid slot: \"blah\"",
},
{
name: "should fail with to greater than equal to from",
path: "/rearchive?from=2&to=1",
expectedStatus: 400,
error: "invalid range: from 2 to 1",
},
}

for _, tt := range tests {
test := tt
t.Run(test.name, func(t *testing.T) {
request := httptest.NewRequest("POST", test.path, nil)
response := httptest.NewRecorder()

a.router.ServeHTTP(response, request)

require.Equal(t, test.expectedStatus, response.Code)

var errResponse rearchiveResponse
err := json.NewDecoder(response.Body).Decode(&errResponse)
require.NoError(t, err)

if test.error != "" {
require.Equal(t, errResponse.Error, test.error)
}
})
}
}
Loading

0 comments on commit 88f0b16

Please sign in to comment.