Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Rearchive API #7

Merged
merged 4 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions archiver/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,14 @@ func Main() cliapp.LifecycleAction {
return nil, err
}

api := service.NewAPI(m, l)

l.Info("Initializing Archiver Service")
return service.NewService(l, cfg, api, storageClient, beaconClient, m)
archiver, err := service.NewArchiver(l, cfg, storageClient, beaconClient, m)
if err != nil {
return nil, fmt.Errorf("failed to initialize archiver: %w", err)
}

api := service.NewAPI(m, l, archiver)

return service.NewService(l, cfg, api, archiver, m)
}
}
5 changes: 3 additions & 2 deletions archiver/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ type BlockSource string
var (
MetricsNamespace = "blob_archiver"

BlockSourceBackfill BlockSource = "backfill"
BlockSourceLive BlockSource = "live"
BlockSourceBackfill BlockSource = "backfill"
BlockSourceLive BlockSource = "live"
BlockSourceRearchive BlockSource = "rearchive"
)

type Metricer interface {
Expand Down
92 changes: 85 additions & 7 deletions archiver/service/api.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package service

import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"

m "github.com/base-org/blob-archiver/archiver/metrics"
Expand All @@ -16,17 +19,19 @@ const (
)

type API struct {
router *chi.Mux
logger log.Logger
metrics m.Metricer
router *chi.Mux
logger log.Logger
metrics m.Metricer
archiver *Archiver
}

// NewAPI creates a new Archiver API instance. This API exposes an admin interface to control the archiver.
func NewAPI(metrics m.Metricer, logger log.Logger) *API {
func NewAPI(metrics m.Metricer, logger log.Logger, archiver *Archiver) *API {
result := &API{
router: chi.NewRouter(),
logger: logger,
metrics: metrics,
router: chi.NewRouter(),
archiver: archiver,
logger: logger,
metrics: metrics,
}

r := result.router
Expand All @@ -41,6 +46,79 @@ func NewAPI(metrics m.Metricer, logger log.Logger) *API {
})

r.Get("/", http.NotFound)
r.Post("/rearchive", result.rearchiveBlocks)

return result
}

type rearchiveResponse struct {
Error string `json:"error,omitempty"`
BlockStart uint64 `json:"blockStart"`
BlockEnd uint64 `json:"blockEnd"`
}

func toSlot(input string) (uint64, error) {
if input == "" {
return 0, fmt.Errorf("must provide param")
}
res, err := strconv.ParseUint(input, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid slot: \"%s\"", input)
}
return res, nil
}

// rearchiveBlocks rearchives blobs from blocks between the given from and to slots.
// If any blocks are already archived, they will be overwritten with data from the beacon node.
func (a *API) rearchiveBlocks(w http.ResponseWriter, r *http.Request) {
from, err := toSlot(r.URL.Query().Get("from"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(rearchiveResponse{
Error: fmt.Sprintf("invalid from param: %v", err),
})
return
}

to, err := toSlot(r.URL.Query().Get("to"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(rearchiveResponse{
Error: fmt.Sprintf("invalid to param: %v", err),
})
return
}

if from > to {
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(rearchiveResponse{
Error: fmt.Sprintf("invalid range: from %d to %d", from, to),
})
return
}

blockStart, blockEnd, err := a.archiver.rearchiveRange(from, to)
if err != nil {
a.logger.Error("Failed to rearchive blocks", "err", err)

w.WriteHeader(http.StatusInternalServerError)
err = json.NewEncoder(w).Encode(rearchiveResponse{
Error: err.Error(),
BlockStart: blockStart,
BlockEnd: blockEnd,
})
} else {
a.logger.Info("Rearchiving blocks complete")
w.WriteHeader(http.StatusOK)

err = json.NewEncoder(w).Encode(rearchiveResponse{
BlockStart: blockStart,
BlockEnd: blockEnd,
})
}

if err != nil {
a.logger.Error("Failed to write response", "err", err)
w.WriteHeader(http.StatusInternalServerError)
}
}
83 changes: 80 additions & 3 deletions archiver/service/api_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package service

import (
"encoding/json"
"net/http/httptest"
"testing"
"time"

"github.com/base-org/blob-archiver/archiver/flags"
"github.com/base-org/blob-archiver/archiver/metrics"
"github.com/base-org/blob-archiver/common/storage/storagetest"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)

func setupAPI(t *testing.T) *API {
func setupAPI(t *testing.T) (*API, *storagetest.TestFileStorage) {
logger := testlog.Logger(t, log.LvlInfo)
m := metrics.NewMetrics()
return NewAPI(m, logger)
fs := storagetest.NewTestFileStorage(t, logger)
archiver, err := NewArchiver(logger, flags.ArchiverConfig{
PollInterval: 10 * time.Second,
}, fs, nil, m)
require.NoError(t, err)
return NewAPI(m, logger, archiver), fs
}

func TestHealthHandler(t *testing.T) {
a := setupAPI(t)
a, _ := setupAPI(t)

request := httptest.NewRequest("GET", "/healthz", nil)
response := httptest.NewRecorder()
Expand All @@ -26,3 +35,71 @@ func TestHealthHandler(t *testing.T) {

require.Equal(t, 200, response.Code)
}

func TestRearchiveHandler(t *testing.T) {
a, _ := setupAPI(t)

tests := []struct {
name string
path string
expectedStatus int
error string
}{
{
name: "should fail with no params",
path: "/rearchive",
expectedStatus: 400,
error: "invalid from param: must provide param",
},
{
name: "should fail with missing to param",
path: "/rearchive?from=1",
expectedStatus: 400,
error: "invalid to param: must provide param",
},
{
name: "should fail with missing from param",
path: "/rearchive?to=1",
expectedStatus: 400,
error: "invalid from param: must provide param",
},
{
name: "should fail with invalid from param",
path: "/rearchive?from=blah&to=1",
expectedStatus: 400,
error: "invalid from param: invalid slot: \"blah\"",
},
{
name: "should fail with invalid to param",
path: "/rearchive?from=1&to=blah",
expectedStatus: 400,
error: "invalid to param: invalid slot: \"blah\"",
},
{
name: "should fail with to greater than equal to from",
path: "/rearchive?from=2&to=1",
expectedStatus: 400,
error: "invalid range: from 2 to 1",
},
}

for _, tt := range tests {
test := tt
t.Run(test.name, func(t *testing.T) {
request := httptest.NewRequest("POST", test.path, nil)
response := httptest.NewRecorder()

a.router.ServeHTTP(response, request)

require.Equal(t, test.expectedStatus, response.Code)

var errResponse rearchiveResponse
err := json.NewDecoder(response.Body).Decode(&errResponse)
require.NoError(t, err)

if test.error != "" {
require.Equal(t, errResponse.Error, test.error)
}
})
}
}
Loading
Loading