Skip to content

Commit

Permalink
add support multi-shard and verify rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
ping-ke committed Sep 12, 2024
1 parent 09d5f4c commit 89e8505
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 23 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ jobs:
run: |
cd ./integration_tests/scripts
npm install --force
node ituploader.js > upload.log
node ituploader.js 10800 true > upload.log
- name: Test
run: |
./run-l2-it.sh > es-node-it.log&
cp ./integration_tests/scripts/.data ./cmd/integration-test-server/.data
cd ./cmd/integration-test-server
node ituploader.js 12288 false > upload.log&
cd ./cmd/integration-test-server --contract_addr $ES_NODE_CONTRACT_ADDRESS
go build
./integration-test-server > itserver.log
130 changes: 110 additions & 20 deletions cmd/integration-test-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ import (
"errors"
"flag"
"fmt"
"github.com/ethereum/go-ethereum/common/hexutil"
"io"
"math"
"net/http"
"os"
"time"

"github.com/crate-crypto/go-proto-danksharding-crypto/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethstorage/go-ethstorage/cmd/es-utils/utils"
es "github.com/ethstorage/go-ethstorage/ethstorage"
"github.com/ethstorage/go-ethstorage/ethstorage/node"
Expand All @@ -29,18 +33,19 @@ const (
expectedStateRefreshTime = 5 * time.Minute
executionTime = 2 * time.Hour

kvEntries = 32768
kvSize = 128 * 1024
dataSize = 126976

blobEmptyFillingMask = byte(0b10000000)
kvEntries = 8192
kvSize = 32 * 4096
dataSize = 31 * 4096

rpcEndpoint = "http://127.0.0.1:9595"
uploadedDataFile = ".data"
shardFile = "../../es-data-it/shard-0.dat"
shardFile0 = "../../es-data-it/shard-0.dat"
shardFile1 = "../../es-data-it/shard-1.dat"
)

var (
portFlag = flag.Int("port", 9096, "Listener port for the es-node to report node status")
portFlag = flag.Int("port", 9096, "Listener port for the es-node to report node status")
contractAddr = flag.String("contract_addr", "", "EthStorage contract address")
)

var (
Expand All @@ -50,6 +55,7 @@ var (
hasConnectedPeer = false
testLog = log.New("IntegrationTest")
prover = prv.NewKZGProver(testLog)
contractAddress = common.Address{}
)

func HelloHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -102,7 +108,6 @@ func ReportStateHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"status":"ok"}`))
}

// TODO support multi shards
func checkState(oldState, newState *node.NodeState) {
if len(oldState.Shards) != len(newState.Shards) {
addErrorMessage(fmt.Sprintf("shards count mismatch between two state, new %d, old %d", len(newState.Shards), len(oldState.Shards)))
Expand Down Expand Up @@ -180,8 +185,29 @@ func checkFinalState(state *node.NodeState) {
}
}

// TODO support multi shards
// TODO read data from RPC
func createShardManager() (*es.ShardManager, error) {
sm := es.NewShardManager(contractAddress, kvSize, kvEntries, kvSize)
df0, err := es.OpenDataFile(shardFile0)
if err != nil {
return nil, err
}
err = sm.AddDataFileAndShard(df0)
if err != nil {
return nil, err
}

df1, err := es.OpenDataFile(shardFile1)
if err != nil {
return nil, err
}
err = sm.AddDataFileAndShard(df1)
if err != nil {
return nil, err
}

return sm, nil
}

func verifyData() error {
file, err := os.OpenFile(uploadedDataFile, os.O_RDONLY, 0755)
if err != nil {
Expand All @@ -193,32 +219,88 @@ func verifyData() error {
fileScanner.Buffer(make([]byte, dataSize*2), kvSize*2)
fileScanner.Split(bufio.ScanLines)

df, err := es.OpenDataFile(shardFile)
sm, err := createShardManager()
if err != nil {
return err
}

ds := es.NewDataShard(0, kvSize, kvEntries, kvSize)
ds.AddDataFile(df)

i := uint64(0)
for fileScanner.Scan() {
expectedData := common.Hex2Bytes(fileScanner.Text())
blobs := utils.EncodeBlobs(expectedData)
commit, _ := ds.ReadMeta(i)
data, err := ds.Read(i, kvSize, common.BytesToHash(commit))
blob := utils.EncodeBlobs(expectedData)[0]
commit, _, _ := sm.TryReadMeta(i)
data, _, err := sm.TryRead(i, kvSize, common.BytesToHash(commit))
if err != nil {
return errors.New(fmt.Sprintf("read %d from shard fail with err: %s", i, err.Error()))
}
if bytes.Compare(blobs[0][:], data) != 0 {
return errors.New(fmt.Sprintf("compare data %d fail, expected data %s; data: %s",
i, common.Bytes2Hex(blobs[0][:256]), common.Bytes2Hex(data[:256])))
if bytes.Compare(blob[:], data) != 0 {
return errors.New(fmt.Sprintf("compare shard data %d fail, expected data %s; data: %s",
i, common.Bytes2Hex(blob[:256]), common.Bytes2Hex(data[:256])))
}
i++
}
return nil
}

func verifyDataFromRPC() error {
client, err := rpc.DialHTTP(rpcEndpoint)
if err != nil {
return err
}
defer client.Close()

file, err := os.OpenFile(uploadedDataFile, os.O_RDONLY, 0755)
if err != nil {
return err
}
defer file.Close()

fileScanner := bufio.NewScanner(file)
fileScanner.Buffer(make([]byte, dataSize*2), kvSize*2)
fileScanner.Split(bufio.ScanLines)

i := uint64(0)
for fileScanner.Scan() {
expectedData := common.Hex2Bytes(fileScanner.Text())
blob := utils.EncodeBlobs(expectedData)[0]
commit, err := kzg4844.BlobToCommitment(blob)
if err != nil {
return fmt.Errorf("blobToCommitment failed: %w", err)
}
hash := common.Hash(eth.KZGToVersionedHash(commit))

data, err := downloadBlobFromRPC(client, i, hash)
if err != nil {
return errors.New(fmt.Sprintf("get data %d from rpc fail with err: %s", i, err.Error()))
}
if bytes.Compare(blob[:], data) != 0 {
return errors.New(fmt.Sprintf("compare rpc data %d fail, expected data %s; data: %s",
i, common.Bytes2Hex(blob[:256]), common.Bytes2Hex(data[:256])))
}
i++
}
return nil
}

func downloadBlobFromRPC(client *rpc.Client, kvIndex uint64, hash common.Hash) ([]byte, error) {
var result hexutil.Bytes
err := client.Call(&result, "es_getBlob", kvIndex, hash, 0, 0, 4096*32)
if err != nil {
return nil, err
}

var blob kzg4844.Blob
copy(blob[:], result)
commit, err := kzg4844.BlobToCommitment(blob)
if err != nil {
return nil, fmt.Errorf("blobToCommitment failed: %w", err)
}
if common.Hash(eth.KZGToVersionedHash(commit)) != hash {
return nil, fmt.Errorf("invalid blob for %s", hash)
}
return result, nil
}

func addErrorMessage(errMessage string) {
log.Warn("Add error message", "msg", errMessage)
errorMessages = append(errorMessages, errMessage+"\n")
Expand All @@ -238,6 +320,11 @@ func main() {
if *portFlag < 0 || *portFlag > math.MaxUint16 {
log.Crit("Invalid port")
}
if *contractAddr == "" {
log.Crit("Invalid contract address")
} else {
contractAddress = common.HexToAddress(*contractAddr)
}

go listenAndServe(*portFlag)

Expand All @@ -246,6 +333,9 @@ func main() {
if err := verifyData(); err != nil {
addErrorMessage(err.Error())
}
if err := verifyDataFromRPC(); err != nil {
addErrorMessage(err.Error())
}

if len(errorMessages) > 0 {
log.Crit(fmt.Sprintf("integration test fail %v", errorMessages))
Expand Down
7 changes: 6 additions & 1 deletion integration_tests/scripts/ituploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ const contractABI = [

const provider = new ethers.JsonRpcProvider(RPC);
const contract = new Contract(contractAddr, contractABI, provider);
const MAX_BLOB = 10800n;
const MAX_BLOB = BigInt(process.argv[2]);
const BATCH_SIZE = 6n;
const NEED_WAIT = (process.argv[3] === 'true');

async function UploadBlobsForIntegrationTest() {
// put blobs
Expand Down Expand Up @@ -55,6 +56,10 @@ async function UploadBlobsForIntegrationTest() {
}
}

if (!NEED_WAIT) {
return
}

let latestBlock
try {
latestBlock = await provider.getBlock();
Expand Down
4 changes: 4 additions & 0 deletions run-l2-it-rpc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

data_dir="./es-data-it-bootnode"
storage_file_0="$data_dir/shard-0.dat"
storage_file_1="$data_dir/shard-1.dat"
zkey_file="./build/bin/snark_lib/zkey/blob_poseidon2.zkey"

if test -d ${data_dir} ; then
Expand All @@ -11,6 +12,8 @@ mkdir ${data_dir}
echo "8714eb2672bb7ab01089a1060150b30bc374a3b00e18926460f169256d126339" > "${data_dir}/esnode_p2p_priv.txt"

./init-l2.sh \
--shard_index 0 \
--shard_index 1 \
--encoding_type=0 \
--datadir $data_dir \
--storage.l1contract $ES_NODE_CONTRACT_ADDRESS
Expand All @@ -20,6 +23,7 @@ exec ./build/bin/es-node \
--network integration \
--datadir $data_dir \
--storage.files $storage_file_0 \
--storage.files $storage_file_1 \
--storage.l1contract $ES_NODE_CONTRACT_ADDRESS \
--miner.enabled=false \
--miner.zkey $zkey_file \
Expand Down
4 changes: 4 additions & 0 deletions run-l2-it.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

data_dir="./es-data-it"
storage_file_0="$data_dir/shard-0.dat"
storage_file_1="$data_dir/shard-1.dat"
zkey_file="./build/bin/snark_lib/zkey/blob_poseidon2.zkey"

if test -d ${data_dir} ; then
Expand All @@ -10,6 +11,8 @@ fi
mkdir ${data_dir}

./init-l2.sh \
--shard_index 0 \
--shard_index 1 \
--datadir $data_dir \
--storage.l1contract $ES_NODE_CONTRACT_ADDRESS

Expand All @@ -18,6 +21,7 @@ exec ./build/bin/es-node \
--network integration \
--datadir $data_dir \
--storage.files $storage_file_0 \
--storage.files $storage_file_1 \
--storage.l1contract $ES_NODE_CONTRACT_ADDRESS \
--miner.enabled \
--miner.zkey $zkey_file \
Expand Down

0 comments on commit 89e8505

Please sign in to comment.