Skip to content

Commit

Permalink
(follower_node)support beacon node client as blob provider (#988)
Browse files Browse the repository at this point in the history
* support beacon node client as blob provider

* fix

* fix formatting

* use url.JoinPath instead of path

* don't move pos each time in blob_client_list

---------

Co-authored-by: jonastheis <[email protected]>
  • Loading branch information
NazariiDenha and jonastheis authored Aug 19, 2024
1 parent 0b2fe3b commit 9e3c838
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 63 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ var (
utils.DASnapshotFileFlag,
utils.DABlockNativeAPIEndpointFlag,
utils.DABlobScanAPIEndpointFlag,
utils.DABeaconNodeAPIEndpointFlag,
}

rpcFlags = []cli.Flag{
Expand Down
14 changes: 10 additions & 4 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,16 +879,19 @@ var (
}
DASnapshotFileFlag = cli.StringFlag{
Name: "da.snapshot.file",
Usage: "Snapshot file to sync from da",
Usage: "Snapshot file to sync from DA",
}
DABlobScanAPIEndpointFlag = cli.StringFlag{
Name: "da.blob.blobscan",
Usage: "BlobScan blob api endpoint",
Value: ethconfig.Defaults.DA.BlobScanAPIEndpoint,
Usage: "BlobScan blob API endpoint",
}
DABlockNativeAPIEndpointFlag = cli.StringFlag{
Name: "da.blob.blocknative",
Usage: "BlockNative blob api endpoint",
Usage: "BlockNative blob API endpoint",
}
DABeaconNodeAPIEndpointFlag = cli.StringFlag{
Name: "da.blob.beaconnode",
Usage: "Beacon node API endpoint",
}
)

Expand Down Expand Up @@ -1625,6 +1628,9 @@ func setDA(ctx *cli.Context, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(DABlockNativeAPIEndpointFlag.Name) {
cfg.DA.BlockNativeAPIEndpoint = ctx.GlobalString(DABlockNativeAPIEndpointFlag.Name)
}
if ctx.GlobalIsSet(DABeaconNodeAPIEndpointFlag.Name) {
cfg.DA.BeaconNodeAPIEndpoint = ctx.GlobalString(DABeaconNodeAPIEndpointFlag.Name)
}
}
}

Expand Down
183 changes: 183 additions & 0 deletions rollup/da_syncer/blob_client/beacon_node_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package blob_client

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
"github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service"
)

type BeaconNodeClient struct {
apiEndpoint string
l1Client *rollup_sync_service.L1Client
genesisTime uint64
secondsPerSlot uint64
}

var (
beaconNodeGenesisEndpoint = "/eth/v1/beacon/genesis"
beaconNodeSpecEndpoint = "/eth/v1/config/spec"
beaconNodeBlobEndpoint = "/eth/v1/beacon/blob_sidecars"
)

func NewBeaconNodeClient(apiEndpoint string, l1Client *rollup_sync_service.L1Client) (*BeaconNodeClient, error) {
// get genesis time
genesisPath, err := url.JoinPath(apiEndpoint, beaconNodeGenesisEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to join path, err: %w", err)
}
resp, err := http.Get(genesisPath)
if err != nil {
return nil, fmt.Errorf("cannot do request, err: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
bodyStr := string(body)
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
}

var genesisResp GenesisResp
err = json.NewDecoder(resp.Body).Decode(&genesisResp)
if err != nil {
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
}
genesisTime, err := strconv.ParseUint(genesisResp.Data.GenesisTime, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to decode genesis time %s, err: %w", genesisResp.Data.GenesisTime, err)
}

// get seconds per slot from spec
specPath, err := url.JoinPath(apiEndpoint, beaconNodeSpecEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to join path, err: %w", err)
}
resp, err = http.Get(specPath)
if err != nil {
return nil, fmt.Errorf("cannot do request, err: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
bodyStr := string(body)
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
}

var specResp SpecResp
err = json.NewDecoder(resp.Body).Decode(&specResp)
if err != nil {
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
}
secondsPerSlot, err := strconv.ParseUint(specResp.Data.SecondsPerSlot, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to decode seconds per slot %s, err: %w", specResp.Data.SecondsPerSlot, err)
}
if secondsPerSlot == 0 {
return nil, fmt.Errorf("failed to make new BeaconNodeClient, secondsPerSlot is 0")
}

return &BeaconNodeClient{
apiEndpoint: apiEndpoint,
l1Client: l1Client,
genesisTime: genesisTime,
secondsPerSlot: secondsPerSlot,
}, nil
}

func (c *BeaconNodeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
// get block timestamp to calculate slot
header, err := c.l1Client.GetHeaderByNumber(blockNumber)
if err != nil {
return nil, fmt.Errorf("failed to get header by number, err: %w", err)
}
slot := (header.Time - c.genesisTime) / c.secondsPerSlot

// get blob sidecar for slot
blobSidecarPath, err := url.JoinPath(c.apiEndpoint, beaconNodeBlobEndpoint, fmt.Sprintf("%d", slot))
if err != nil {
return nil, fmt.Errorf("failed to join path, err: %w", err)
}
resp, err := http.Get(blobSidecarPath)
if err != nil {
return nil, fmt.Errorf("cannot do request, err: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
bodyStr := string(body)
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
}

var blobSidecarResp BlobSidecarResp
err = json.NewDecoder(resp.Body).Decode(&blobSidecarResp)
if err != nil {
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
}

// find blob with desired versionedHash
for _, blob := range blobSidecarResp.Data {
// calculate blob hash from commitment and check it with desired
commitmentBytes := common.FromHex(blob.KzgCommitment)
if len(commitmentBytes) != lenKZGCommitment {
return nil, fmt.Errorf("len of kzg commitment is not correct, expected: %d, got: %d", lenKZGCommitment, len(commitmentBytes))
}
commitment := kzg4844.Commitment(commitmentBytes)
blobVersionedHash := kzg4844.CalcBlobHashV1(sha256.New(), &commitment)

if blobVersionedHash == versionedHash {
// found desired blob
blobBytes := common.FromHex(blob.Blob)
if len(blobBytes) != lenBlobBytes {
return nil, fmt.Errorf("len of blob data is not correct, expected: %d, got: %d", lenBlobBytes, len(blobBytes))
}

b := kzg4844.Blob(blobBytes)
return &b, nil
}
}

return nil, fmt.Errorf("missing blob %v in slot %d, block number %d", versionedHash, slot, blockNumber)
}

type GenesisResp struct {
Data struct {
GenesisTime string `json:"genesis_time"`
} `json:"data"`
}

type SpecResp struct {
Data struct {
SecondsPerSlot string `json:"SECONDS_PER_SLOT"`
} `json:"data"`
}

type BlobSidecarResp struct {
Data []struct {
Index string `json:"index"`
Blob string `json:"blob"`
KzgCommitment string `json:"kzg_commitment"`
KzgProof string `json:"kzg_proof"`
SignedBlockHeader struct {
Message struct {
Slot string `json:"slot"`
ProposerIndex string `json:"proposer_index"`
ParentRoot string `json:"parent_root"`
StateRoot string `json:"state_root"`
BodyRoot string `json:"body_root"`
} `json:"message"`
Signature string `json:"signature"`
} `json:"signed_block_header"`
KzgCommitmentInclusionProof []string `json:"kzg_commitment_inclusion_proof"`
} `json:"data"`
}
6 changes: 3 additions & 3 deletions rollup/da_syncer/blob_client/blob_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
)

const (
okStatusCode int = 200
lenBlobBytes int = 131072
lenBlobBytes int = 131072
lenKZGCommitment int = 48
)

type BlobClient interface {
GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error)
GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error)
}
9 changes: 4 additions & 5 deletions rollup/da_syncer/blob_client/blob_client_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ func NewBlobClientList(blobClients ...BlobClient) *BlobClientList {
}
}

func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
func (c *BlobClientList) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
if len(c.list) == 0 {
return nil, fmt.Errorf("BlobClientList.GetBlobByVersionedHash: list of BlobClients is empty")
}

for i := 0; i < len(c.list); i++ {
blob, err := c.list[c.nextPos()].GetBlobByVersionedHash(ctx, versionedHash)
blob, err := c.list[c.curPos].GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, blockNumber)
if err == nil {
return blob, nil
}

c.nextPos()
// there was an error, try the next blob client in following iteration
log.Warn("BlobClientList: failed to get blob by versioned hash from BlobClient", "err", err, "blob client pos in BlobClientList", c.curPos)
}
Expand All @@ -42,9 +42,8 @@ func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHa
return nil, serrors.NewTemporaryError(errors.New("BlobClientList.GetBlobByVersionedHash: failed to get blob by versioned hash from all BlobClients"))
}

func (c *BlobClientList) nextPos() int {
func (c *BlobClientList) nextPos() {
c.curPos = (c.curPos + 1) % len(c.list)
return c.curPos
}

func (c *BlobClientList) AddBlobClient(blobClient BlobClient) {
Expand Down
42 changes: 4 additions & 38 deletions rollup/da_syncer/blob_client/blob_scan_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewBlobScanClient(apiEndpoint string) *BlobScanClient {
}
}

func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
func (c *BlobScanClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
// blobscan api docs https://api.blobscan.com/#/blobs/blob-getByBlobId
path, err := url.JoinPath(c.apiEndpoint, versionedHash.String())
if err != nil {
Expand All @@ -40,8 +40,8 @@ func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHa
return nil, fmt.Errorf("cannot do request, err: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != okStatusCode {
if resp.StatusCode == 404 {
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("no blob with versioned hash : %s", versionedHash.String())
}
var res ErrorRespBlobScan
Expand Down Expand Up @@ -69,44 +69,10 @@ func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHa
}

type BlobRespBlobScan struct {
Commitment string `json:"commitment"`
Proof string `json:"proof"`
Size int `json:"size"`
VersionedHash string `json:"versionedHash"`
Data string `json:"data"`
DataStorageReferences []struct {
BlobStorage string `json:"blobStorage"`
DataReference string `json:"dataReference"`
} `json:"dataStorageReferences"`
Transactions []struct {
Hash string `json:"hash"`
Index int `json:"index"`
Block struct {
Number int `json:"number"`
BlobGasUsed string `json:"blobGasUsed"`
BlobAsCalldataGasUsed string `json:"blobAsCalldataGasUsed"`
BlobGasPrice string `json:"blobGasPrice"`
ExcessBlobGas string `json:"excessBlobGas"`
Hash string `json:"hash"`
Timestamp string `json:"timestamp"`
Slot int `json:"slot"`
} `json:"block"`
From string `json:"from"`
To string `json:"to"`
MaxFeePerBlobGas string `json:"maxFeePerBlobGas"`
BlobAsCalldataGasUsed string `json:"blobAsCalldataGasUsed"`
Rollup string `json:"rollup"`
BlobAsCalldataGasFee string `json:"blobAsCalldataGasFee"`
BlobGasBaseFee string `json:"blobGasBaseFee"`
BlobGasMaxFee string `json:"blobGasMaxFee"`
BlobGasUsed string `json:"blobGasUsed"`
} `json:"transactions"`
Data string `json:"data"`
}

type ErrorRespBlobScan struct {
Message string `json:"message"`
Code string `json:"code"`
Issues []struct {
Message string `json:"message"`
} `json:"issues"`
}
11 changes: 3 additions & 8 deletions rollup/da_syncer/blob_client/block_native_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewBlockNativeClient(apiEndpoint string) *BlockNativeClient {
}
}

func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
func (c *BlockNativeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
// blocknative api docs https://docs.blocknative.com/blocknative-data-archive/blob-archive
path, err := url.JoinPath(c.apiEndpoint, versionedHash.String())
if err != nil {
Expand All @@ -33,7 +33,7 @@ func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versione
return nil, fmt.Errorf("cannot do request, err: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != okStatusCode {
if resp.StatusCode != http.StatusOK {
var res ErrorRespBlockNative
err = json.NewDecoder(resp.Body).Decode(&res)
if err != nil {
Expand All @@ -59,12 +59,7 @@ func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versione

type BlobRespBlockNative struct {
Blob struct {
VersionedHash string `json:"versionedHash"`
Commitment string `json:"commitment"`
Proof string `json:"proof"`
ZeroBytes int `json:"zeroBytes"`
NonZeroBytes int `json:"nonZeroBytes"`
Data string `json:"data"`
Data string `json:"data"`
} `json:"blob"`
}

Expand Down
2 changes: 1 addition & 1 deletion rollup/da_syncer/da/commitV1.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, db ethdb.Database
return nil, fmt.Errorf("failed to fetch blob hash, err: %w", err)
}

blob, err := blobClient.GetBlobByVersionedHash(ctx, versionedHash)
blob, err := blobClient.GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, vLog.BlockNumber)
if err != nil {
return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err)
}
Expand Down
Loading

0 comments on commit 9e3c838

Please sign in to comment.