diff --git a/cmd/es-node/main.go b/cmd/es-node/main.go index 05f0fec3..0ee90b03 100644 --- a/cmd/es-node/main.go +++ b/cmd/es-node/main.go @@ -6,7 +6,6 @@ package main import ( "context" "fmt" - "math/big" "net" "os" "os/signal" @@ -231,20 +230,16 @@ func EsNodeInit(ctx *cli.Context) error { log.Info("Storage config loaded", "storageCfg", storageCfg) var shardIdxList []uint64 if len(shardIndexes) > 0 { - // check existense of shard indexes but add shard 0 anyway + out: for i := 0; i < len(shardIndexes); i++ { - shard := uint64(shardIndexes[i]) - if shard > 0 { - diff, err := getDifficulty(cctx, client, l1Contract, shard) - if err != nil { - log.Error("Failed to get shard info from contract", "error", err) - return err - } - if diff != nil && diff.Cmp(big.NewInt(0)) == 0 { - return fmt.Errorf("Shard not exist: %d", shard) + new := uint64(shardIndexes[i]) + // prevent duplicated + for _, s := range shardIdxList { + if s == new { + continue out } } - shardIdxList = append(shardIdxList, shard) + shardIdxList = append(shardIdxList, new) } } else { // get shard indexes of length shardLen from contract @@ -254,7 +249,7 @@ func EsNodeInit(ctx *cli.Context) error { return err } if len(shardList) == 0 { - return fmt.Errorf("No shard indexes found") + return fmt.Errorf("no shard indexes found") } shardIdxList = shardList } diff --git a/cmd/es-node/utils.go b/cmd/es-node/utils.go index 43ff06e9..66f17b9a 100644 --- a/cmd/es-node/utils.go +++ b/cmd/es-node/utils.go @@ -117,6 +117,14 @@ func getShardList(ctx context.Context, client *ethclient.Client, contract common } func getDifficulty(ctx context.Context, client *ethclient.Client, contract common.Address, shardIdx uint64) (*big.Int, error) { + res, err := getMiningInfo(ctx, client, contract, shardIdx) + if err != nil { + return nil, err + } + return res[1].(*big.Int), nil +} + +func getMiningInfo(ctx context.Context, client *ethclient.Client, contract common.Address, shardIdx uint64) ([]interface{}, error) { uint256Type, _ := abi.NewType("uint256", "", nil) dataField, _ := abi.Arguments{{Type: uint256Type}}.Pack(new(big.Int).SetUint64(shardIdx)) h := crypto.Keccak256Hash([]byte(`infos(uint256)`)) @@ -136,10 +144,10 @@ func getDifficulty(ctx context.Context, client *ethclient.Client, contract commo {Type: uint256Type}, }.UnpackValues(bs) if res == nil || len(res) < 3 { - log.Error("Query difficulty by shard", "error", "invalid result", "result", res) + log.Error("Query mining info by shard", "error", "invalid result", "result", res) return nil, fmt.Errorf("invalid result: %v", res) } - return res[1].(*big.Int), nil + return res, nil } func createDataFile(cfg *storage.StorageConfig, shardIdxList []uint64, datadir string, encodingType int) ([]string, error) { @@ -154,8 +162,8 @@ func createDataFile(cfg *storage.StorageConfig, shardIdxList []uint64, datadir s for _, shardIdx := range shardIdxList { dataFile := filepath.Join(datadir, fmt.Sprintf(fileName, shardIdx)) if _, err := os.Stat(dataFile); err == nil { - log.Error("Creating data file", "error", "file already exists, will not overwrite", "file", dataFile) - return nil, err + log.Warn("Creating data file", "error", "file already exists, will not overwrite", "file", dataFile) + continue } if cfg.ChunkSize == 0 { return nil, fmt.Errorf("chunk size should not be 0") diff --git a/cmd/es-utils/utils/utils.go b/cmd/es-utils/utils/utils.go index 8859752f..be4db2d2 100644 --- a/cmd/es-utils/utils/utils.go +++ b/cmd/es-utils/utils/utils.go @@ -126,9 +126,24 @@ func SendBlobTx( } } - maxFeePerDataGas256, err := DecodeUint256String(maxFeePerDataGas) - if err != nil { - log.Crit("Invalid max_fee_per_data_gas", "error", err) + var blobPrice *uint256.Int + if maxFeePerDataGas != "" { + maxFeePerDataGas256, err := DecodeUint256String(maxFeePerDataGas) + if err != nil { + log.Crit("Invalid max_fee_per_data_gas", "error", err) + } + blobPrice = maxFeePerDataGas256 + } else { + blobBaseFee, err := queryBlobBaseFee(client) + if err != nil { + log.Crit("Error getting blob base fee", "error", err) + } + log.Info("Query blob base fee done", "blobBaseFee", blobBaseFee) + blobBaseFee256, nok := uint256.FromBig(blobBaseFee) + if nok { + log.Crit("Error converting blob base fee to uint256", "blobBaseFee", blobBaseFee) + } + blobPrice = blobBaseFee256 } var blobs []kzg4844.Blob if needEncoding { @@ -159,7 +174,7 @@ func SendBlobTx( To: to, Value: value256, Data: calldataBytes, - BlobFeeCap: maxFeePerDataGas256, + BlobFeeCap: blobPrice, BlobHashes: versionedHashes, Sidecar: sideCar, } @@ -300,6 +315,8 @@ func UploadBlobs( } signer := crypto.PubkeyToAddress(key.PublicKey) var keys []common.Hash + var blobIndex []*big.Int + var lengthes []*big.Int var blobs []kzg4844.Blob if needEncoding { @@ -309,10 +326,23 @@ func UploadBlobs( } for i, blob := range blobs { keys = append(keys, genKey(signer, i, blob[:])) + blobIndex = append(blobIndex, new(big.Int).SetUint64(uint64(i))) + lengthes = append(lengthes, new(big.Int).SetUint64(BlobSize)) } + log.Info("blobs", "keys", keys, "blobIndexes", blobIndex, "sizes", lengthes) bytes32Array, _ := abi.NewType("bytes32[]", "", nil) - dataField, _ := abi.Arguments{{Type: bytes32Array}}.Pack(keys) - h := crypto.Keccak256Hash([]byte("putBlobs(bytes32[])")) + uint256Array, _ := abi.NewType("uint256[]", "", nil) + args := abi.Arguments{ + {Type: bytes32Array}, + {Type: uint256Array}, + {Type: uint256Array}, + } + dataField, err := args.Pack(keys, blobIndex, lengthes) + if err != nil { + log.Error("Failed to pack data", "err", err) + return nil, nil, err + } + h := crypto.Keccak256Hash([]byte("putBlobs(bytes32[],uint256[],uint256[])")) calldata := "0x" + common.Bytes2Hex(append(h[0:4], dataField...)) tx := SendBlobTx( rpc, @@ -325,7 +355,7 @@ func UploadBlobs( 5000000, "", "", - "300000000", + "", chainID, calldata, ) @@ -371,10 +401,23 @@ func UploadBlobs( log.Info("Timed out for receipt, query contract for data hash...") } // if wait receipt timed out or failed, query contract for data hash - return getKvInfo(pc, contractAddr, len(blobs)) + return getKvInfo(pc, len(blobs)) +} + +func queryBlobBaseFee(l1 *ethclient.Client) (*big.Int, error) { + var hex string + err := l1.Client().CallContext(context.Background(), &hex, "eth_blobBaseFee") + if err != nil { + return nil, err + } + blobBaseFee, ok := new(big.Int).SetString(hex, 0) + if !ok { + return nil, errors.New("invalid blob base fee") + } + return blobBaseFee, nil } -func getKvInfo(pc *eth.PollingClient, contractAddr common.Address, blobLen int) ([]uint64, []common.Hash, error) { +func getKvInfo(pc *eth.PollingClient, blobLen int) ([]uint64, []common.Hash, error) { lastIdx, err := pc.GetStorageLastBlobIdx(rpc.LatestBlockNumber.Int64()) if err != nil { return nil, nil, err diff --git a/ethstorage/eth/polling_client.go b/ethstorage/eth/polling_client.go index 2ac02772..c9c31731 100644 --- a/ethstorage/eth/polling_client.go +++ b/ethstorage/eth/polling_client.go @@ -29,7 +29,6 @@ var ErrSubscriberClosed = errors.New("subscriber closed") type PollingClient struct { *ethclient.Client - queryHeader func() (*types.Header, error) isHTTP bool lgr log.Logger pollRate time.Duration @@ -38,6 +37,8 @@ type PollingClient struct { currHead *types.Header esContract common.Address subID int + NetworkID *big.Int + queryHeader func() (*types.Header, error) // pollReqCh is used to request new polls of the upstream // RPC client. @@ -74,6 +75,10 @@ func NewClient( lgr log.Logger, ) *PollingClient { ctx, cancel := context.WithCancel(ctx) + networkID, err := c.NetworkID(ctx) + if err != nil { + lgr.Crit("Failed to get network id", "err", err) + } res := &PollingClient{ Client: c, isHTTP: isHTTP, @@ -85,6 +90,7 @@ func NewClient( pollReqCh: make(chan struct{}, 1), subs: make(map[int]chan *types.Header), closedCh: make(chan struct{}), + NetworkID: networkID, } if qh == nil { res.queryHeader = res.getLatestHeader @@ -292,6 +298,28 @@ func (w *PollingClient) GetKvMetas(kvIndices []uint64, blockNumber int64) ([][32 return res[0].([][32]byte), nil } +func (w *PollingClient) GetMiningReward(shard uint64, blockNumber int64) (*big.Int, error) { + h := crypto.Keccak256Hash([]byte(`miningReward(uint256,uint256)`)) + uint256Type, _ := abi.NewType("uint256", "", nil) + dataField, err := abi.Arguments{ + {Type: uint256Type}, + {Type: uint256Type}, + }.Pack(new(big.Int).SetUint64(shard), new(big.Int).SetInt64(blockNumber)) + if err != nil { + return nil, err + } + calldata := append(h[0:4], dataField...) + callMsg := ethereum.CallMsg{ + To: &w.esContract, + Data: calldata, + } + bs, err := w.Client.CallContract(context.Background(), callMsg, nil) + if err != nil { + return nil, err + } + return new(big.Int).SetBytes(bs), nil +} + func (w *PollingClient) ReadContractField(fieldName string, blockNumber *big.Int) ([]byte, error) { h := crypto.Keccak256Hash([]byte(fieldName + "()")) msg := ethereum.CallMsg{ diff --git a/ethstorage/miner/l1_mining_api.go b/ethstorage/miner/l1_mining_api.go index 66722bfa..5bea0d10 100644 --- a/ethstorage/miner/l1_mining_api.go +++ b/ethstorage/miner/l1_mining_api.go @@ -21,8 +21,7 @@ import ( ) const ( - gasBufferRatio = 1.2 - rewardDenominator = 10000 + gasBufferRatio = 1.2 ) var ( @@ -154,9 +153,9 @@ func (m *l1MiningAPI) SubmitMinedResult(ctx context.Context, contract common.Add } m.lg.Info("Estimated gas done", "gas", estimatedGas) cost := new(big.Int).Mul(new(big.Int).SetUint64(estimatedGas), gasPrice) - reward, err := m.estimateReward(ctx, cfg, contract, rst.startShardId, rst.blockNumber) + reward, err := m.GetMiningReward(rst.startShardId, rst.blockNumber.Int64()) if err != nil { - m.lg.Error("Calculate reward failed", "error", err.Error()) + m.lg.Error("Query mining reward failed", "error", err.Error()) return common.Hash{}, err } profit := new(big.Int).Sub(reward, cost) @@ -168,13 +167,7 @@ func (m *l1MiningAPI) SubmitMinedResult(ctx context.Context, contract common.Add ) return common.Hash{}, errDropped } - - chainID, err := m.NetworkID(ctx) - if err != nil { - m.lg.Error("Get chainID failed", "error", err.Error()) - return common.Hash{}, err - } - sign := cfg.SignerFnFactory(chainID) + sign := cfg.SignerFnFactory(m.NetworkID) nonce, err := m.NonceAt(ctx, cfg.SignerAddr, big.NewInt(rpc.LatestBlockNumber.Int64())) if err != nil { m.lg.Error("Query nonce failed", "error", err.Error()) @@ -183,7 +176,7 @@ func (m *l1MiningAPI) SubmitMinedResult(ctx context.Context, contract common.Add m.lg.Debug("Query nonce done", "nonce", nonce) gas := uint64(float64(estimatedGas) * gasBufferRatio) rawTx := &types.DynamicFeeTx{ - ChainID: chainID, + ChainID: m.NetworkID, Nonce: nonce, GasTipCap: tip, GasFeeCap: gasPrice, @@ -228,80 +221,3 @@ func (m *l1MiningAPI) getRandaoProof(ctx context.Context, blockNumber *big.Int) } return headerRlp, nil } - -// TODO: implement `miningReward()` in the contract to replace this impl -func (m *l1MiningAPI) estimateReward(ctx context.Context, cfg Config, contract common.Address, shard uint64, block *big.Int) (*big.Int, error) { - - lastKv, err := m.PollingClient.GetStorageLastBlobIdx(rpc.LatestBlockNumber.Int64()) - if err != nil { - m.lg.Error("Failed to get lastKvIdx", "error", err) - return nil, err - } - info, err := m.GetMiningInfo(ctx, contract, shard) - if err != nil { - m.lg.Error("Failed to get es mining info", "error", err.Error()) - return nil, err - } - lastMineTime := info.LastMineTime - - plmt, err := m.ReadContractField("prepaidLastMineTime", nil) - if err != nil { - m.lg.Error("Failed to read prepaidLastMineTime", "error", err.Error()) - return nil, err - } - prepaidLastMineTime := new(big.Int).SetBytes(plmt).Uint64() - - var lastShard uint64 - if lastKv > 0 { - lastShard = (lastKv - 1) / cfg.ShardEntry - } - curBlock, err := m.HeaderByNumber(ctx, big.NewInt(rpc.LatestBlockNumber.Int64())) - if err != nil { - m.lg.Error("Failed to get latest block", "error", err.Error()) - return nil, err - } - - minedTs := curBlock.Time - (new(big.Int).Sub(curBlock.Number, block).Uint64())*12 - reward := big.NewInt(0) - if shard < lastShard { - basePayment := new(big.Int).Mul(cfg.StorageCost, new(big.Int).SetUint64(cfg.ShardEntry)) - reward = paymentIn(basePayment, cfg.DcfFactor, lastMineTime, minedTs, cfg.StartTime) - } else if shard == lastShard { - basePayment := new(big.Int).Mul(cfg.StorageCost, new(big.Int).SetUint64(lastKv%cfg.ShardEntry)) - reward = paymentIn(basePayment, cfg.DcfFactor, lastMineTime, minedTs, cfg.StartTime) - // Additional prepaid for the last shard - if prepaidLastMineTime < minedTs { - additionalReward := paymentIn(cfg.PrepaidAmount, cfg.DcfFactor, prepaidLastMineTime, minedTs, cfg.StartTime) - reward = new(big.Int).Add(reward, additionalReward) - } - } - minerReward := new(big.Int).Div( - new(big.Int).Mul(new(big.Int).SetUint64(rewardDenominator-cfg.TreasuryShare), reward), - new(big.Int).SetUint64(rewardDenominator), - ) - return minerReward, nil -} - -func paymentIn(x, dcfFactor *big.Int, fromTs, toTs, startTime uint64) *big.Int { - return new(big.Int).Rsh( - new(big.Int).Mul( - x, - new(big.Int).Sub( - pow(dcfFactor, fromTs-startTime), - pow(dcfFactor, toTs-startTime), - )), - 128, - ) -} - -func pow(fp *big.Int, n uint64) *big.Int { - v := new(big.Int).Lsh(big.NewInt(1), 128) - for n != 0 { - if (n & 1) == 1 { - v = new(big.Int).Rsh(new(big.Int).Mul(v, fp), 128) - } - fp = new(big.Int).Rsh(new(big.Int).Mul(fp, fp), 128) - n = n / 2 - } - return v -} diff --git a/ethstorage/miner/worker.go b/ethstorage/miner/worker.go index d94d67f7..b2c239f7 100644 --- a/ethstorage/miner/worker.go +++ b/ethstorage/miner/worker.go @@ -355,7 +355,7 @@ func (w *worker) taskLoop(taskCh chan *taskItem) { w.lg.Info("Mine task success", "shard", ti.shardIdx, "thread", ti.thread, "block", ti.blockNumber) } case <-w.exitCh: - w.lg.Warn("Worker is exiting from task loop...") + w.lg.Debug("Worker is exiting from task loop...") return } } @@ -459,6 +459,9 @@ func (w *worker) resultLoop() { errorCache = append(errorCache, err) case <-w.exitCh: w.lg.Warn("Worker is exiting from result loop...") + for _, e := range errorCache { + w.lg.Error("Mining error since es-node launched", "err", e) + } return } } @@ -548,19 +551,19 @@ func (w *worker) mineTask(t *taskItem) (bool, error) { return false, err } if t.requiredDiff.Cmp(new(big.Int).SetBytes(hash1.Bytes())) >= 0 { - w.lg.Info("Calculated a valid hash", "shard", t.shardIdx, "thread", t.thread, "block", t.blockNumber, "nonce", nonce) + w.lg.Info("Calculated a valid hash", "shard", t.shardIdx, "block", t.blockNumber, "timestamp", t.mineTime, "randao", t.mixHash, "nonce", nonce, "hash0", hash0, "hash1", hash1, "sampleIdxs", sampleIdxs) dataSet, kvIdxs, sampleIdxsInKv, encodingKeys, encodedSamples, err := w.getMiningData(t.task, sampleIdxs) if err != nil { w.lg.Error("Get sample data failed", "kvIdxs", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv, "err", err.Error()) return false, err } - w.lg.Info("Got sample data", "shard", t.shardIdx, "thread", t.thread, "block", t.blockNumber, "kvIdxs", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv) + w.lg.Info("Got sample data", "shard", t.shardIdx, "block", t.blockNumber, "encodedSamples", encodedSamples) masks, decodeProof, inclusiveProofs, err := w.prover.GetStorageProof(dataSet, encodingKeys, sampleIdxsInKv) if err != nil { w.lg.Error("Get storage proof error", "kvIdx", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv, "error", err.Error()) return false, fmt.Errorf("get proof err: %v", err) } - w.lg.Info("Got storage proof", "shard", t.shardIdx, "thread", t.thread, "block", t.blockNumber, "kvIdx", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv) + w.lg.Info("Got storage proof", "shard", t.shardIdx, "block", t.blockNumber, "kvIdx", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv) newResult := &result{ blockNumber: t.blockNumber, startShardId: t.shardIdx, diff --git a/ethstorage/storage_manager.go b/ethstorage/storage_manager.go index 9d9be6ac..bf2270b7 100644 --- a/ethstorage/storage_manager.go +++ b/ethstorage/storage_manager.go @@ -186,7 +186,7 @@ func (s *StorageManager) CommitBlobs(kvIndices []uint64, blobs [][]byte, commits for i := 0; i < len(kvIndices); i++ { encodedBlob, success, err := s.shardManager.TryEncodeKV(kvIndices[i], blobs[i], commits[i]) if !success || err != nil { - log.Warn("Blob encode failed", "index", kvIndices[i], "err", err.Error()) + log.Warn("Blob encode failed", "index", kvIndices[i], "err", err) continue } encodedBlobs[i] = encodedBlob @@ -230,7 +230,7 @@ func (s *StorageManager) CommitEmptyBlobs(start, limit uint64) (uint64, uint64, for i := start; i <= limit; i++ { encodedBlob, success, err := s.shardManager.TryEncodeKV(i, emptyBs, hash) if !success || err != nil { - log.Warn("Blob encode failed", "index", i, "err", err.Error()) + log.Warn("Blob encode failed", "index", i, "err", err) break } encodedBlobs = append(encodedBlobs, encodedBlob) diff --git a/init-l2.sh b/init-l2.sh index 9aa41b42..0436b36a 100755 --- a/init-l2.sh +++ b/init-l2.sh @@ -7,4 +7,4 @@ ./init.sh \ --l1.rpc http://65.109.20.29:8545 \ --storage.l1contract 0x64003adbdf3014f7E38FC6BE752EB047b95da89A \ -$@ \ No newline at end of file +$@ diff --git a/init.sh b/init.sh old mode 100755 new mode 100644 index d938b0f9..9608865d --- a/init.sh +++ b/init.sh @@ -16,6 +16,7 @@ zkp_mode=2 data_dir="./es-data" remaining_args="" +shards="--shard_index 0" while [ $# -gt 0 ]; do if [[ $1 == --miner.zk-prover-impl ]]; then @@ -28,6 +29,9 @@ while [ $# -gt 0 ]; do data_dir=$2 shift 2 else + if [[ $1 == --shard_index ]]; then + shards="" + fi remaining_args="$remaining_args $1" shift fi @@ -103,22 +107,16 @@ if [ "$zkp_impl" = 1 ]; then fi -storage_file_0="$data_dir/shard-0.dat" - -es_node_init="$executable init --shard_index 0 \ +es_node_init="$executable init $shards \ --datadir $data_dir \ --l1.rpc http://88.99.30.186:8545 \ --storage.l1contract 0x804C520d3c084C805E37A35E90057Ac32831F96f \ $remaining_args" -# create data file for shard 0 if not yet -if [ ! -e $storage_file_0 ]; then - if $es_node_init ; then - echo "√ Initialized ${storage_file_0} successfully" - else - echo "Error: failed to initialize ${storage_file_0}" - exit 1 - fi -else - echo "Warning: storage file ${storage_file_0} already exists, skip initialization." +# es-node will skip init if data files already exist +if $es_node_init ; then + echo "√ Initialized data files successfully." +else + echo "Error: failed to initialize data files." + exit 1 fi \ No newline at end of file diff --git a/integration_tests/gen_blob.go b/integration_tests/gen_blob.go index 75b74bb3..9d96c18a 100644 --- a/integration_tests/gen_blob.go +++ b/integration_tests/gen_blob.go @@ -47,3 +47,6 @@ func generateRandomContent(sizeInKB int) []byte { } return []byte(content) } +func generateRandomBlobs(blobLen int) []byte { + return generateRandomContent(128 * 31 / 32 * blobLen) +} diff --git a/integration_tests/node_mine_test.go b/integration_tests/node_mine_test.go index df0030e4..23c60a5c 100644 --- a/integration_tests/node_mine_test.go +++ b/integration_tests/node_mine_test.go @@ -11,13 +11,15 @@ import ( "math/big" "os" "path/filepath" - "strings" "sync" "testing" "time" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" "github.com/ethstorage/go-ethstorage/cmd/es-utils/utils" @@ -32,40 +34,45 @@ import ( "github.com/ethstorage/go-ethstorage/ethstorage/storage" ) +var ( + datadir string + randaoSourceURL = os.Getenv("ES_NODE_RANDAO_RPC") +) + const ( - maxBlobsPerTx = 4 + maxBlobsPerTx = 6 dataFileName = "shard-%d.dat" ) -// TODO: test 2 shards -var shardIds = []uint64{0} - func TestMining(t *testing.T) { + setup(t) + t.Cleanup(func() { + teardown(t) + }) + contract := l1Contract lg.Info("Test mining", "l1Endpoint", l1Endpoint, "contract", contract) - pClient, err := eth.Dial(l1Endpoint, contract, 12, lg) + pClient, err := eth.Dial(l1Endpoint, contract, 2, lg) if err != nil { t.Fatalf("Failed to connect to the Ethereum client: %v", err) } - + storConfig := initStorageConfig(t, pClient, contract, minerAddr) lastKv, err := pClient.GetStorageLastBlobIdx(rpc.LatestBlockNumber.Int64()) if err != nil { - lg.Error("Failed to get lastKvIdx", "error", err) - } else { - lg.Info("lastKv", "lastKv", lastKv) - } - if lastKv != 0 { - t.Fatalf("A newly deployed storage contract is required") - } - storConfig := initStorageConfig(t, pClient, contract, minerAddr) - files, err := createDataFiles(storConfig) + t.Fatalf("Failed to get lastKvIdx: %v", err) + } + lg.Info("lastKv", "lastKv", lastKv) + curShard := lastKv / storConfig.KvEntriesPerShard + lg.Info("Current shard", "shardId", curShard) + shardIds := []uint64{curShard + 1, curShard + 2} + lg.Info("Shards to mine", "shardIds", shardIds) + files, err := createDataFiles(storConfig, shardIds) if err != nil { t.Fatalf("Create data files error: %v", err) } storConfig.Filenames = files - miningConfig := initMiningConfig(t, contract, pClient) + miningConfig := initMiningConfig(t, pClient) lg.Info("Initialzed mining config", "miningConfig", fmt.Sprintf("%+v", miningConfig)) - defer cleanFiles(miningConfig.ZKWorkingDir) shardManager, err := initShardManager(*storConfig) if err != nil { t.Fatalf("init shard manager error: %v", err) @@ -75,7 +82,12 @@ func TestMining(t *testing.T) { resourcesCtx, close := context.WithCancel(context.Background()) feed := new(event.Feed) - l1api := miner.NewL1MiningAPI(pClient, nil, lg) + rc, err := eth.DialRandaoSource(resourcesCtx, randaoSourceURL, l1Endpoint, 2, lg) + if err != nil { + t.Fatalf("Failed to connect to the randao source: %v", err) + } + + l1api := miner.NewL1MiningAPI(pClient, rc, lg) pvr := prover.NewKZGPoseidonProver( miningConfig.ZKWorkingDir, miningConfig.ZKeyFile, @@ -88,11 +100,12 @@ func TestMining(t *testing.T) { mnr := miner.New(miningConfig, db, storageManager, l1api, br, &pvr, feed, lg) lg.Info("Initialized miner") - l1HeadsSub := event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) { + randaoHeadsSub := event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) { if err != nil { - lg.Warn("Resubscribing after failed L1 subscription", "err", err) + lg.Warn("Resubscribing after failed randao head subscription", "err", err) } - return eth.WatchHeadChanges(resourcesCtx, pClient, func(ctx context.Context, sig eth.L1BlockRef) { + return eth.WatchHeadChanges(resourcesCtx, rc, func(ctx context.Context, sig eth.L1BlockRef) { + lg.Debug("OnNewRandaoSourceHead", "blockNumber", sig.Number) select { case mnr.ChainHeadCh <- sig: default: @@ -100,84 +113,60 @@ func TestMining(t *testing.T) { } }) }) + lg.Info("Randao head subscribed") go func() { - err, ok := <-l1HeadsSub.Err() + err, ok := <-randaoHeadsSub.Err() if !ok { return } - lg.Error("L1 heads subscription error", "err", err) + lg.Error("Randao heads subscription error", "err", err) }() + if err := fillEmpty(storageManager, shardIds); err != nil { + t.Fatalf("Failed to fill empty: %v", err) + } + prepareData(t, pClient, storageManager) mnr.Start() - prepareData(t, pClient, storageManager, miningConfig.StorageCost.String()) - fillEmpty(t, pClient, storageManager) - var wg sync.WaitGroup + minedShardSig := make(chan uint64, len(shardIds)) minedShardCh := make(chan uint64) - for _, s := range shardIds { - feed.Send(protocol.EthStorageSyncDone{ - DoneType: protocol.SingleShardDone, - ShardId: s, - }) - info, err := l1api.GetMiningInfo( - context.Background(), - contract, - s, - ) - if err != nil { - t.Fatalf("Failed to get es mining info for shard %d: %v", s, err) - } - go waitForMined(l1api, contract, mnr.ChainHeadCh, s, info.BlockMined.Uint64(), minedShardCh) - wg.Add(1) - time.Sleep(360 * time.Second) - } - + minedShards := make(map[uint64]bool) go func() { - minedShards := make(map[uint64]bool) for minedShard := range minedShardCh { + minedShardSig <- minedShard + lg.Info("Mined shard", "shard", minedShard) if !minedShards[minedShard] { - lg.Info("Mined shard", "shard", minedShard) minedShards[minedShard] = true wg.Done() - lg.Info("wait group done") } } }() - lg.Info("wait group waiting") - wg.Wait() - l1HeadsSub.Unsubscribe() - mnr.Close() - close() -} - -func cleanFiles(proverDir string) { - for _, shardId := range shardIds { - fileName := fmt.Sprintf(dataFileName, shardId) - if _, err := os.Stat(fileName); !os.IsNotExist(err) { - err = os.Remove(fileName) - if err != nil { - fmt.Println(err) - } - } - } - - folderPath := filepath.Join(proverDir, "snarkbuild") - files, err := os.ReadDir(folderPath) - if err != nil { - fmt.Println(err) - return - } - for _, file := range files { - if !strings.HasPrefix(file.Name(), ".") { - err = os.RemoveAll(filepath.Join(folderPath, file.Name())) - if err != nil { - fmt.Println(err) + for i, s := range shardIds { + feed.Send(protocol.EthStorageSyncDone{ + DoneType: protocol.SingleShardDone, + ShardId: s, + }) + go waitForMined(l1api, contract, mnr.ChainHeadCh, s, minedShardCh) + wg.Add(1) + // defer next shard mining so that the started shard can be mined for a while + if i != len(shardIds)-1 { + var miningTime time.Duration = 60 + timeout := time.After(miningTime * time.Second) + select { + case minedShard := <-minedShardSig: + lg.Info(fmt.Sprintf("Shard %d successfully mined, will start next shard: %d", minedShard, shardIds[i+1])) + case <-timeout: + lg.Info(fmt.Sprintf("Shard %d has been mined for %ds, will start next shard: %d", shardIds[i], miningTime, shardIds[i+1])) } } } + wg.Wait() + randaoHeadsSub.Unsubscribe() + mnr.Close() + close() } -func waitForMined(l1api miner.L1API, contract common.Address, chainHeadCh chan eth.L1BlockRef, shardIdx, lastMined uint64, exitCh chan uint64) { +func waitForMined(l1api miner.L1API, contract common.Address, chainHeadCh chan eth.L1BlockRef, shardIdx uint64, exitCh chan uint64) { for range chainHeadCh { info, err := l1api.GetMiningInfo( context.Background(), @@ -188,8 +177,9 @@ func waitForMined(l1api miner.L1API, contract common.Address, chainHeadCh chan e lg.Warn("Failed to get es mining info", "error", err.Error()) continue } - if info.BlockMined.Uint64() > lastMined { - lg.Info("Mined new", "shard", shardIdx, "lastMined", lastMined, "justMined", info.BlockMined) + lg.Info("Starting shard mining", "shard", shardIdx, "lastMined", info.BlockMined, "blockNumber", info.LastMineTime) + if info.BlockMined.Uint64() > 0 { + lg.Info("Mined new", "shard", shardIdx, "justMined", info.BlockMined) exitCh <- shardIdx return } @@ -239,21 +229,38 @@ func initShardManager(storConfig storage.StorageConfig) (*ethstorage.ShardManage return shardManager, nil } -func fillEmpty(t *testing.T, l1Client *eth.PollingClient, storageMgr *ethstorage.StorageManager) { - lg.Info("Filling empty started") - totalBlobs := storageMgr.KvEntries() * uint64(len(shardIds)) - lastKvIdx := storageMgr.LastKvIndex() - lg.Info("Filling empty", "lastBlobIdx", lastKvIdx, "totalBlobs", totalBlobs) - - inserted, next, err := storageMgr.CommitEmptyBlobs(lastKvIdx, totalBlobs-1) +func fillEmpty(storageMgr *ethstorage.StorageManager, shards []uint64) error { + start := shards[0] * storageMgr.KvEntries() + totalEntries := storageMgr.KvEntries()*uint64(len(shards)) - 1 + lg.Info("Filling empty to shards", "start", start, "end", start+totalEntries) + inserted, next, err := storageMgr.CommitEmptyBlobs(start, start+totalEntries) if err != nil { - t.Fatalf("Commit empty blobs failed %v", err) + return err } lg.Info("Filling empty done", "inserted", inserted, "next", next) + return nil +} + +func getPayment(l1Client *eth.PollingClient, contract common.Address, batch uint64) (*big.Int, error) { + uint256Type, _ := abi.NewType("uint256", "", nil) + dataField, _ := abi.Arguments{{Type: uint256Type}}.Pack(new(big.Int).SetUint64(batch)) + h := crypto.Keccak256Hash([]byte(`upfrontPaymentInBatch(uint256)`)) + calldata := append(h[0:4], dataField...) + msg := ethereum.CallMsg{ + To: &contract, + Data: calldata, + } + bs, err := l1Client.CallContract(context.Background(), msg, nil) + if err != nil { + lg.Error("Failed to call contract", "error", err.Error()) + return nil, err + } + return new(big.Int).SetBytes(bs), nil } -func prepareData(t *testing.T, l1Client *eth.PollingClient, storageMgr *ethstorage.StorageManager, value string) { - data := generateRandomContent(124 * 10) +func prepareData(t *testing.T, l1Client *eth.PollingClient, storageMgr *ethstorage.StorageManager) { + // fill contract with almost 2 shards of blobs + data := generateRandomBlobs(int(storageMgr.KvEntries()*2) - 1) blobs := utils.EncodeBlobs(data) t.Logf("Blobs len %d \n", len(blobs)) var hashs []common.Hash @@ -272,11 +279,11 @@ func prepareData(t *testing.T, l1Client *eth.PollingClient, storageMgr *ethstora t.Fatalf("Get chain id failed %v", err) } for i := 0; i < txs; i++ { - max := maxBlobsPerTx + blobsPerTx := maxBlobsPerTx if i == txs-1 { - max = last + blobsPerTx = last } - blobGroup := blobs[i*maxBlobsPerTx : i*maxBlobsPerTx+max] + blobGroup := blobs[i*maxBlobsPerTx : i*maxBlobsPerTx+blobsPerTx] var blobData []byte for _, bd := range blobGroup { blobData = append(blobData, bd[:]...) @@ -284,9 +291,14 @@ func prepareData(t *testing.T, l1Client *eth.PollingClient, storageMgr *ethstora if len(blobData) == 0 { break } - kvIdxes, dataHashes, err := utils.UploadBlobs(l1Client, l1Endpoint, privateKey, chainID.String(), storageMgr.ContractAddress(), blobData, false, value) + totalValue, err := getPayment(l1Client, l1Contract, uint64(blobsPerTx)) if err != nil { - t.Fatalf("Upload blobs failed %v", err) + t.Fatalf("Get payment failed %v", err) + } + t.Logf("Get payment totalValue=%v \n", totalValue) + kvIdxes, dataHashes, err := utils.UploadBlobs(l1Client, l1Endpoint, privateKey, chainID.String(), storageMgr.ContractAddress(), blobData, false, totalValue.String()) + if err != nil { + t.Fatalf("Upload blobs failed: %v", err) } t.Logf("kvIdxes=%v \n", kvIdxes) t.Logf("dataHashes=%x \n", dataHashes) @@ -302,27 +314,23 @@ func prepareData(t *testing.T, l1Client *eth.PollingClient, storageMgr *ethstora if err != nil { t.Fatalf("Download all metas failed %v", err) } - totalKvs := len(shardIds) * int(storageMgr.KvEntries()) - limit := totalKvs - if limit > len(ids) { - limit = len(ids) - } - for i := 0; i < limit; i++ { + startKv := storageMgr.Shards()[0] * storageMgr.KvEntries() + for i := 0; i < len(ids); i++ { + if ids[i] < startKv { + continue + } err := storageMgr.CommitBlob(ids[i], blobs[i][:], hashs[i]) if err != nil { - t.Fatalf("Failed to commit blob: i=%d, id=%d, error: %v", i, ids[i], err) + t.Fatalf("Failed to commit blob: i=%d, kvIndex=%d, hash=%x, error: %v", i, ids[i], hashs[i], err) } - } - _, _, err = storageMgr.CommitEmptyBlobs(uint64(limit), uint64(totalKvs)-1) - if err != nil { - t.Fatalf("Commit empty blobs failed %v", err) + t.Logf("Committed blob: i=%d, kvIndex=%d, hash=%x", i, ids[i], hashs[i]) } } -func createDataFiles(cfg *storage.StorageConfig) ([]string, error) { +func createDataFiles(cfg *storage.StorageConfig, shardIds []uint64) ([]string, error) { var files []string for _, shardIdx := range shardIds { - fileName := fmt.Sprintf(dataFileName, shardIdx) + fileName := filepath.Join(datadir, fmt.Sprintf(dataFileName, shardIdx)) if _, err := os.Stat(fileName); err == nil { lg.Warn("Creating data file: file already exists, will be overwritten", "file", fileName) } @@ -348,7 +356,7 @@ func createDataFiles(cfg *storage.StorageConfig) ([]string, error) { return files, nil } -func initMiningConfig(t *testing.T, l1Contract common.Address, client *eth.PollingClient) *miner.Config { +func initMiningConfig(t *testing.T, client *eth.PollingClient) *miner.Config { miningConfig := &miner.Config{} factory, addrFrom, err := signer.SignerFactoryFromConfig(signer.CLIConfig{ PrivateKey: privateKey, @@ -414,13 +422,32 @@ func initMiningConfig(t *testing.T, l1Contract common.Address, client *eth.Polli t.Fatal("get prepaidAmount", err) } miningConfig.PrepaidAmount = new(big.Int).SetBytes(result) - - miningConfig.ZKeyFile = zkey2Name proverPath, _ := filepath.Abs(prPath) + zkeyFull := filepath.Join(proverPath, prover.SnarkLib, zkey2Name) + if _, err := os.Stat(zkeyFull); os.IsNotExist(err) { + t.Fatalf("%s not found", zkeyFull) + } + miningConfig.ZKeyFile = zkeyFull miningConfig.ZKWorkingDir = proverPath miningConfig.ZKProverMode = 2 - miningConfig.ZKProverImpl = 1 + miningConfig.ZKProverImpl = 2 miningConfig.ThreadsPerShard = 2 miningConfig.MinimumProfit = new(big.Int).SetInt64(-1e18) return miningConfig } + +func setup(t *testing.T) { + datadir = t.TempDir() + err := os.MkdirAll(datadir, 0700) + if err != nil { + t.Fatalf("Failed to create datadir: %v", err) + } + t.Logf("datadir %s", datadir) +} + +func teardown(t *testing.T) { + err := os.RemoveAll(datadir) + if err != nil { + t.Errorf("Failed to remove datadir: %v", err) + } +} diff --git a/integration_tests/run_tests.sh b/integration_tests/run_tests.sh index b163164f..0b5950fa 100755 --- a/integration_tests/run_tests.sh +++ b/integration_tests/run_tests.sh @@ -18,7 +18,7 @@ if [ -z "$ES_NODE_STORAGE_MINER" ]; then fi # A contract that will be update with new blob uploaded for the KZG test if [ -z "$ES_NODE_STORAGE_L1CONTRACT_KZG" ]; then - export ES_NODE_STORAGE_L1CONTRACT_KZG=0xB6e01Ca0c33B2bAbd2eccf008F0759131FC284dB + export ES_NODE_STORAGE_L1CONTRACT_KZG=0xe8F0898cbA701E677970DB33404A817Ff42D4499 fi # A contract address that clef server checks against before signing the miner transaction if [ -z "$ES_NODE_STORAGE_L1CONTRACT_CLEF" ]; then @@ -26,7 +26,7 @@ if [ -z "$ES_NODE_STORAGE_L1CONTRACT_CLEF" ]; then fi # A newly deployed contract is required for each run for miner test, with zkp verifier of mode 2 if [ -z "$ES_NODE_STORAGE_L1CONTRACT" ]; then - export ES_NODE_STORAGE_L1CONTRACT=0x458FF419F57050a6ec2d3Eb049a28f4F6937c11E + export ES_NODE_STORAGE_L1CONTRACT=0xe8F0898cbA701E677970DB33404A817Ff42D4499 fi # A contract with zkp verifier of mode 1 (one proof per sample) if [ -z "$ES_NODE_STORAGE_L1CONTRACT_ZKP1" ]; then @@ -34,24 +34,21 @@ if [ -z "$ES_NODE_STORAGE_L1CONTRACT_ZKP1" ]; then fi # The commonly used l1 eth rpc endpoint if [ -z "$ES_NODE_L1_ETH_RPC" ]; then - export ES_NODE_L1_ETH_RPC=http://88.99.30.186:8545 + export ES_NODE_L1_ETH_RPC="http://65.109.20.29:8545" # L2 fi # The clef endpoint that the miner will use to sign the transaction if [ -z "$ES_NODE_CLEF_RPC" ]; then export ES_NODE_CLEF_RPC="http://65.108.236.27:8550" fi +if [ -z "$ES_NODE_RANDAO_RPC" ]; then + export ES_NODE_RANDAO_RPC="http://88.99.30.186:8545" +fi + echo ES_NODE_L1_ETH_RPC = $ES_NODE_L1_ETH_RPC echo ES_NODE_STORAGE_L1CONTRACT = $ES_NODE_STORAGE_L1CONTRACT echo ES_NODE_STORAGE_MINER = $ES_NODE_STORAGE_MINER -# download zkeys if not yet -zkey_file="./ethstorage/prover/snark_lib/blob_poseidon.zkey" -if [ ! -e ${zkey_file} ]; then - echo "${zkey_file} not found, start downloading..." - zkey_url="https://drive.usercontent.google.com/download?id=1ZLfhYeCXMnbk6wUiBADRAn1mZ8MI_zg-&export=download&confirm=t&uuid=16ddcd58-2498-4d65-8931-934df3d0065c" - curl $zkey_url -o ${zkey_file} -fi zkey_file="./ethstorage/prover/snark_lib/blob_poseidon2.zkey" if [ ! -e ${zkey_file} ]; then echo "${zkey_file} not found, start downloading..." @@ -59,5 +56,5 @@ if [ ! -e ${zkey_file} ]; then curl $zkey_url -o ${zkey_file} fi -go test -tags rapidsnark_asm -timeout 0 github.com/ethstorage/go-ethstorage/integration_tests -v +go test -tags rapidsnark_asm -run ^TestMining$ -timeout 0 github.com/ethstorage/go-ethstorage/integration_tests -v -count=1 \ No newline at end of file diff --git a/integration_tests/zk_prover2_test.go b/integration_tests/zk_prover2_test.go index 775bc6dd..167daa87 100644 --- a/integration_tests/zk_prover2_test.go +++ b/integration_tests/zk_prover2_test.go @@ -47,8 +47,8 @@ func TestZKProver_GenerateZKProof(t *testing.T) { "12199007973319674300030596965685270475268514105269206407619072166392043015767", } libDir := filepath.Join(proverPath, prover.SnarkLib) - pjs := prover.NewZKProver(proverPath, zkey2Name, prover.Wasm2Name, lg) - pgo, err := prover.NewZKProverGo(libDir, zkey2Name, prover.Wasm2Name, lg) + pjs := prover.NewZKProver(proverPath, zkeyFull, prover.Wasm2Name, lg) + pgo, err := prover.NewZKProverGo(libDir, zkeyFull, prover.Wasm2Name, lg) if err != nil { t.Errorf("NewZKProverGo() error = %v", err) return diff --git a/integration_tests/zk_prover_test.go b/integration_tests/zk_prover_test.go index d5746b9a..42ef8c58 100644 --- a/integration_tests/zk_prover_test.go +++ b/integration_tests/zk_prover_test.go @@ -31,6 +31,8 @@ var zkp1Contract = common.HexToAddress(os.Getenv("ES_NODE_STORAGE_L1CONTRACT_ZKP const zkeyName = "blob_poseidon.zkey" func TestZKProver_GenerateZKProofPerSample(t *testing.T) { + // skip now as zkey is not ready for mode 1 + t.SkipNow() proverPath, _ := filepath.Abs(prPath) zkeyFull := filepath.Join(proverPath, prover.SnarkLib, zkeyName) if _, err := os.Stat(zkeyFull); os.IsNotExist(err) { diff --git a/run.sh b/run.sh index b77ace1c..5cab5994 100755 --- a/run.sh +++ b/run.sh @@ -17,12 +17,18 @@ $executable --version echo "========================================" data_dir="./es-data" -storage_file_0="$data_dir/shard-0.dat" +file_flags="" + +for file in ${data_dir}/shard-[0-9]*.dat; do + if [ -f "$file" ]; then + file_flags+=" --storage.files $file" + fi +done start_flags=" --network devnet \ --datadir $data_dir \ + $file_flags \ --storage.l1contract 0x804C520d3c084C805E37A35E90057Ac32831F96f \ - --storage.files $storage_file_0 \ --l1.rpc http://88.99.30.186:8545 \ --l1.beacon http://88.99.30.186:3500 \ --l1.beacon-based-time 1706684472 \ @@ -36,4 +42,4 @@ start_flags=" --network devnet \ --p2p.bootnodes enr:-Li4QF3vBkkDQYNLHlVjW5NcEpXAsfNtE1lUVb_LgUQ_Ot2afS8jbDfnYQBDABJud_5Hd1hX_1cNeGVU6Tem06WDlfaGAY1e3vNvimV0aHN0b3JhZ2XbAYDY15SATFINPAhMgF43o16QBXrDKDH5b8GAgmlkgnY0gmlwhEFtP5qJc2VjcDI1NmsxoQK8XODtSv0IsrhBxZmTZBZEoLssb7bTX0YOVl6S0yLxuYN0Y3CCJAaDdWRwgnZh \ $@" -exec $executable $start_flags \ No newline at end of file +exec $executable $start_flags