Skip to content

Commit

Permalink
Merge branch 'main' into integratiointest
Browse files Browse the repository at this point in the history
  • Loading branch information
ping-ke committed Sep 9, 2024
2 parents 63579fb + 6d2b0e4 commit 6fd3efe
Show file tree
Hide file tree
Showing 15 changed files with 290 additions and 264 deletions.
21 changes: 8 additions & 13 deletions cmd/es-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package main
import (
"context"
"fmt"
"math/big"
"net"
"os"
"os/signal"
Expand Down Expand Up @@ -231,20 +230,16 @@ func EsNodeInit(ctx *cli.Context) error {
log.Info("Storage config loaded", "storageCfg", storageCfg)
var shardIdxList []uint64
if len(shardIndexes) > 0 {
// check existense of shard indexes but add shard 0 anyway
out:
for i := 0; i < len(shardIndexes); i++ {
shard := uint64(shardIndexes[i])
if shard > 0 {
diff, err := getDifficulty(cctx, client, l1Contract, shard)
if err != nil {
log.Error("Failed to get shard info from contract", "error", err)
return err
}
if diff != nil && diff.Cmp(big.NewInt(0)) == 0 {
return fmt.Errorf("Shard not exist: %d", shard)
new := uint64(shardIndexes[i])
// prevent duplicated
for _, s := range shardIdxList {
if s == new {
continue out
}
}
shardIdxList = append(shardIdxList, shard)
shardIdxList = append(shardIdxList, new)
}
} else {
// get shard indexes of length shardLen from contract
Expand All @@ -254,7 +249,7 @@ func EsNodeInit(ctx *cli.Context) error {
return err
}
if len(shardList) == 0 {
return fmt.Errorf("No shard indexes found")
return fmt.Errorf("no shard indexes found")
}
shardIdxList = shardList
}
Expand Down
16 changes: 12 additions & 4 deletions cmd/es-node/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ func getShardList(ctx context.Context, client *ethclient.Client, contract common
}

func getDifficulty(ctx context.Context, client *ethclient.Client, contract common.Address, shardIdx uint64) (*big.Int, error) {
res, err := getMiningInfo(ctx, client, contract, shardIdx)
if err != nil {
return nil, err
}
return res[1].(*big.Int), nil
}

func getMiningInfo(ctx context.Context, client *ethclient.Client, contract common.Address, shardIdx uint64) ([]interface{}, error) {
uint256Type, _ := abi.NewType("uint256", "", nil)
dataField, _ := abi.Arguments{{Type: uint256Type}}.Pack(new(big.Int).SetUint64(shardIdx))
h := crypto.Keccak256Hash([]byte(`infos(uint256)`))
Expand All @@ -136,10 +144,10 @@ func getDifficulty(ctx context.Context, client *ethclient.Client, contract commo
{Type: uint256Type},
}.UnpackValues(bs)
if res == nil || len(res) < 3 {
log.Error("Query difficulty by shard", "error", "invalid result", "result", res)
log.Error("Query mining info by shard", "error", "invalid result", "result", res)
return nil, fmt.Errorf("invalid result: %v", res)
}
return res[1].(*big.Int), nil
return res, nil
}

func createDataFile(cfg *storage.StorageConfig, shardIdxList []uint64, datadir string, encodingType int) ([]string, error) {
Expand All @@ -154,8 +162,8 @@ func createDataFile(cfg *storage.StorageConfig, shardIdxList []uint64, datadir s
for _, shardIdx := range shardIdxList {
dataFile := filepath.Join(datadir, fmt.Sprintf(fileName, shardIdx))
if _, err := os.Stat(dataFile); err == nil {
log.Error("Creating data file", "error", "file already exists, will not overwrite", "file", dataFile)
return nil, err
log.Warn("Creating data file", "error", "file already exists, will not overwrite", "file", dataFile)
continue
}
if cfg.ChunkSize == 0 {
return nil, fmt.Errorf("chunk size should not be 0")
Expand Down
61 changes: 52 additions & 9 deletions cmd/es-utils/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,24 @@ func SendBlobTx(
}
}

maxFeePerDataGas256, err := DecodeUint256String(maxFeePerDataGas)
if err != nil {
log.Crit("Invalid max_fee_per_data_gas", "error", err)
var blobPrice *uint256.Int
if maxFeePerDataGas != "" {
maxFeePerDataGas256, err := DecodeUint256String(maxFeePerDataGas)
if err != nil {
log.Crit("Invalid max_fee_per_data_gas", "error", err)
}
blobPrice = maxFeePerDataGas256
} else {
blobBaseFee, err := queryBlobBaseFee(client)
if err != nil {
log.Crit("Error getting blob base fee", "error", err)
}
log.Info("Query blob base fee done", "blobBaseFee", blobBaseFee)
blobBaseFee256, nok := uint256.FromBig(blobBaseFee)
if nok {
log.Crit("Error converting blob base fee to uint256", "blobBaseFee", blobBaseFee)
}
blobPrice = blobBaseFee256
}
var blobs []kzg4844.Blob
if needEncoding {
Expand Down Expand Up @@ -159,7 +174,7 @@ func SendBlobTx(
To: to,
Value: value256,
Data: calldataBytes,
BlobFeeCap: maxFeePerDataGas256,
BlobFeeCap: blobPrice,
BlobHashes: versionedHashes,
Sidecar: sideCar,
}
Expand Down Expand Up @@ -300,6 +315,8 @@ func UploadBlobs(
}
signer := crypto.PubkeyToAddress(key.PublicKey)
var keys []common.Hash
var blobIndex []*big.Int
var lengthes []*big.Int

var blobs []kzg4844.Blob
if needEncoding {
Expand All @@ -309,10 +326,23 @@ func UploadBlobs(
}
for i, blob := range blobs {
keys = append(keys, genKey(signer, i, blob[:]))
blobIndex = append(blobIndex, new(big.Int).SetUint64(uint64(i)))
lengthes = append(lengthes, new(big.Int).SetUint64(BlobSize))
}
log.Info("blobs", "keys", keys, "blobIndexes", blobIndex, "sizes", lengthes)
bytes32Array, _ := abi.NewType("bytes32[]", "", nil)
dataField, _ := abi.Arguments{{Type: bytes32Array}}.Pack(keys)
h := crypto.Keccak256Hash([]byte("putBlobs(bytes32[])"))
uint256Array, _ := abi.NewType("uint256[]", "", nil)
args := abi.Arguments{
{Type: bytes32Array},
{Type: uint256Array},
{Type: uint256Array},
}
dataField, err := args.Pack(keys, blobIndex, lengthes)
if err != nil {
log.Error("Failed to pack data", "err", err)
return nil, nil, err
}
h := crypto.Keccak256Hash([]byte("putBlobs(bytes32[],uint256[],uint256[])"))
calldata := "0x" + common.Bytes2Hex(append(h[0:4], dataField...))
tx := SendBlobTx(
rpc,
Expand All @@ -325,7 +355,7 @@ func UploadBlobs(
5000000,
"",
"",
"300000000",
"",
chainID,
calldata,
)
Expand Down Expand Up @@ -371,10 +401,23 @@ func UploadBlobs(
log.Info("Timed out for receipt, query contract for data hash...")
}
// if wait receipt timed out or failed, query contract for data hash
return getKvInfo(pc, contractAddr, len(blobs))
return getKvInfo(pc, len(blobs))
}

func queryBlobBaseFee(l1 *ethclient.Client) (*big.Int, error) {
var hex string
err := l1.Client().CallContext(context.Background(), &hex, "eth_blobBaseFee")
if err != nil {
return nil, err
}
blobBaseFee, ok := new(big.Int).SetString(hex, 0)
if !ok {
return nil, errors.New("invalid blob base fee")
}
return blobBaseFee, nil
}

func getKvInfo(pc *eth.PollingClient, contractAddr common.Address, blobLen int) ([]uint64, []common.Hash, error) {
func getKvInfo(pc *eth.PollingClient, blobLen int) ([]uint64, []common.Hash, error) {
lastIdx, err := pc.GetStorageLastBlobIdx(rpc.LatestBlockNumber.Int64())
if err != nil {
return nil, nil, err
Expand Down
30 changes: 29 additions & 1 deletion ethstorage/eth/polling_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ var ErrSubscriberClosed = errors.New("subscriber closed")

type PollingClient struct {
*ethclient.Client
queryHeader func() (*types.Header, error)
isHTTP bool
lgr log.Logger
pollRate time.Duration
Expand All @@ -38,6 +37,8 @@ type PollingClient struct {
currHead *types.Header
esContract common.Address
subID int
NetworkID *big.Int
queryHeader func() (*types.Header, error)

// pollReqCh is used to request new polls of the upstream
// RPC client.
Expand Down Expand Up @@ -74,6 +75,10 @@ func NewClient(
lgr log.Logger,
) *PollingClient {
ctx, cancel := context.WithCancel(ctx)
networkID, err := c.NetworkID(ctx)
if err != nil {
lgr.Crit("Failed to get network id", "err", err)
}
res := &PollingClient{
Client: c,
isHTTP: isHTTP,
Expand All @@ -85,6 +90,7 @@ func NewClient(
pollReqCh: make(chan struct{}, 1),
subs: make(map[int]chan *types.Header),
closedCh: make(chan struct{}),
NetworkID: networkID,
}
if qh == nil {
res.queryHeader = res.getLatestHeader
Expand Down Expand Up @@ -292,6 +298,28 @@ func (w *PollingClient) GetKvMetas(kvIndices []uint64, blockNumber int64) ([][32
return res[0].([][32]byte), nil
}

func (w *PollingClient) GetMiningReward(shard uint64, blockNumber int64) (*big.Int, error) {
h := crypto.Keccak256Hash([]byte(`miningReward(uint256,uint256)`))
uint256Type, _ := abi.NewType("uint256", "", nil)
dataField, err := abi.Arguments{
{Type: uint256Type},
{Type: uint256Type},
}.Pack(new(big.Int).SetUint64(shard), new(big.Int).SetInt64(blockNumber))
if err != nil {
return nil, err
}
calldata := append(h[0:4], dataField...)
callMsg := ethereum.CallMsg{
To: &w.esContract,
Data: calldata,
}
bs, err := w.Client.CallContract(context.Background(), callMsg, nil)
if err != nil {
return nil, err
}
return new(big.Int).SetBytes(bs), nil
}

func (w *PollingClient) ReadContractField(fieldName string, blockNumber *big.Int) ([]byte, error) {
h := crypto.Keccak256Hash([]byte(fieldName + "()"))
msg := ethereum.CallMsg{
Expand Down
94 changes: 5 additions & 89 deletions ethstorage/miner/l1_mining_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
)

const (
gasBufferRatio = 1.2
rewardDenominator = 10000
gasBufferRatio = 1.2
)

var (
Expand Down Expand Up @@ -154,9 +153,9 @@ func (m *l1MiningAPI) SubmitMinedResult(ctx context.Context, contract common.Add
}
m.lg.Info("Estimated gas done", "gas", estimatedGas)
cost := new(big.Int).Mul(new(big.Int).SetUint64(estimatedGas), gasPrice)
reward, err := m.estimateReward(ctx, cfg, contract, rst.startShardId, rst.blockNumber)
reward, err := m.GetMiningReward(rst.startShardId, rst.blockNumber.Int64())
if err != nil {
m.lg.Error("Calculate reward failed", "error", err.Error())
m.lg.Error("Query mining reward failed", "error", err.Error())
return common.Hash{}, err
}
profit := new(big.Int).Sub(reward, cost)
Expand All @@ -168,13 +167,7 @@ func (m *l1MiningAPI) SubmitMinedResult(ctx context.Context, contract common.Add
)
return common.Hash{}, errDropped
}

chainID, err := m.NetworkID(ctx)
if err != nil {
m.lg.Error("Get chainID failed", "error", err.Error())
return common.Hash{}, err
}
sign := cfg.SignerFnFactory(chainID)
sign := cfg.SignerFnFactory(m.NetworkID)
nonce, err := m.NonceAt(ctx, cfg.SignerAddr, big.NewInt(rpc.LatestBlockNumber.Int64()))
if err != nil {
m.lg.Error("Query nonce failed", "error", err.Error())
Expand All @@ -183,7 +176,7 @@ func (m *l1MiningAPI) SubmitMinedResult(ctx context.Context, contract common.Add
m.lg.Debug("Query nonce done", "nonce", nonce)
gas := uint64(float64(estimatedGas) * gasBufferRatio)
rawTx := &types.DynamicFeeTx{
ChainID: chainID,
ChainID: m.NetworkID,
Nonce: nonce,
GasTipCap: tip,
GasFeeCap: gasPrice,
Expand Down Expand Up @@ -228,80 +221,3 @@ func (m *l1MiningAPI) getRandaoProof(ctx context.Context, blockNumber *big.Int)
}
return headerRlp, nil
}

// TODO: implement `miningReward()` in the contract to replace this impl
func (m *l1MiningAPI) estimateReward(ctx context.Context, cfg Config, contract common.Address, shard uint64, block *big.Int) (*big.Int, error) {

lastKv, err := m.PollingClient.GetStorageLastBlobIdx(rpc.LatestBlockNumber.Int64())
if err != nil {
m.lg.Error("Failed to get lastKvIdx", "error", err)
return nil, err
}
info, err := m.GetMiningInfo(ctx, contract, shard)
if err != nil {
m.lg.Error("Failed to get es mining info", "error", err.Error())
return nil, err
}
lastMineTime := info.LastMineTime

plmt, err := m.ReadContractField("prepaidLastMineTime", nil)
if err != nil {
m.lg.Error("Failed to read prepaidLastMineTime", "error", err.Error())
return nil, err
}
prepaidLastMineTime := new(big.Int).SetBytes(plmt).Uint64()

var lastShard uint64
if lastKv > 0 {
lastShard = (lastKv - 1) / cfg.ShardEntry
}
curBlock, err := m.HeaderByNumber(ctx, big.NewInt(rpc.LatestBlockNumber.Int64()))
if err != nil {
m.lg.Error("Failed to get latest block", "error", err.Error())
return nil, err
}

minedTs := curBlock.Time - (new(big.Int).Sub(curBlock.Number, block).Uint64())*12
reward := big.NewInt(0)
if shard < lastShard {
basePayment := new(big.Int).Mul(cfg.StorageCost, new(big.Int).SetUint64(cfg.ShardEntry))
reward = paymentIn(basePayment, cfg.DcfFactor, lastMineTime, minedTs, cfg.StartTime)
} else if shard == lastShard {
basePayment := new(big.Int).Mul(cfg.StorageCost, new(big.Int).SetUint64(lastKv%cfg.ShardEntry))
reward = paymentIn(basePayment, cfg.DcfFactor, lastMineTime, minedTs, cfg.StartTime)
// Additional prepaid for the last shard
if prepaidLastMineTime < minedTs {
additionalReward := paymentIn(cfg.PrepaidAmount, cfg.DcfFactor, prepaidLastMineTime, minedTs, cfg.StartTime)
reward = new(big.Int).Add(reward, additionalReward)
}
}
minerReward := new(big.Int).Div(
new(big.Int).Mul(new(big.Int).SetUint64(rewardDenominator-cfg.TreasuryShare), reward),
new(big.Int).SetUint64(rewardDenominator),
)
return minerReward, nil
}

func paymentIn(x, dcfFactor *big.Int, fromTs, toTs, startTime uint64) *big.Int {
return new(big.Int).Rsh(
new(big.Int).Mul(
x,
new(big.Int).Sub(
pow(dcfFactor, fromTs-startTime),
pow(dcfFactor, toTs-startTime),
)),
128,
)
}

func pow(fp *big.Int, n uint64) *big.Int {
v := new(big.Int).Lsh(big.NewInt(1), 128)
for n != 0 {
if (n & 1) == 1 {
v = new(big.Int).Rsh(new(big.Int).Mul(v, fp), 128)
}
fp = new(big.Int).Rsh(new(big.Int).Mul(fp, fp), 128)
n = n / 2
}
return v
}
Loading

0 comments on commit 6fd3efe

Please sign in to comment.