Skip to content

Commit

Permalink
Problem: benchmark don't support batch tx
Browse files Browse the repository at this point in the history
Solution:
- add batch mode
  • Loading branch information
yihuang committed Oct 18, 2024
1 parent 85ca58b commit 89da005
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [#1645](https://github.com/crypto-org-chain/cronos/pull/1645) Gen test tx in parallel even in single node.
* (testground)[#1644](https://github.com/crypto-org-chain/cronos/pull/1644) load generator retry with backoff on error.
* [#1648](https://github.com/crypto-org-chain/cronos/pull/1648) Add abort OE in PrepareProposal.
* (testground)[#]() Benchmark support batch mode.

*Oct 14, 2024*

Expand Down
127 changes: 127 additions & 0 deletions testground/benchmark/benchmark/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import base64
import itertools
import json
import multiprocessing
import os
import tempfile
from collections import namedtuple

from eth_account import Account
from eth_account._utils.legacy_transactions import Transaction, vrs_from
from eth_account._utils.signing import hash_of_signed_transaction
from hexbytes import HexBytes

from .utils import DEFAULT_DENOM, split, split_batch


def build_tx(raw: HexBytes):
tx = Transaction.from_bytes(raw)
msg_hash = hash_of_signed_transaction(tx)
vrs = vrs_from(tx)
sender = HexBytes(Account._recover_hash(msg_hash, vrs=vrs))
msg = {
"@type": "/ethermint.evm.v1.MsgEthereumTx",
"data": None,
"size": 0,
"deprecated_hash": "",
"deprecated_from": "",
"from": base64.b64encode(sender).decode(),
"raw": raw.hex(),
}
fee = tx.gas * tx.gasPrice
return {
"body": {
"messages": [msg],
"memo": "",
"timeout_height": "0",
"extension_options": [
{"@type": "/ethermint.evm.v1.ExtensionOptionsEthereumTx"}
],
"non_critical_extension_options": [],
},
"auth_info": {
"signer_infos": [],
"fee": {
"amount": [{"denom": DEFAULT_DENOM, "amount": str(fee)}],
"gas_limit": str(tx.gas),
"payer": "",
"granter": "",
},
"tip": None,
},
"signatures": [],
}


def build_batch_tx(signed_txs: [str]):
"return cosmos batch tx and eth tx hashes"
tmp_txs = [build_tx(HexBytes(signed)) for signed in signed_txs]

msgs = [tx["body"]["messages"][0] for tx in tmp_txs]
fee = sum(int(tx["auth_info"]["fee"]["amount"][0]["amount"]) for tx in tmp_txs)
gas_limit = sum(int(tx["auth_info"]["fee"]["gas_limit"]) for tx in tmp_txs)

return {
"body": {
"messages": msgs,
"memo": "",
"timeout_height": "0",
"extension_options": [
{"@type": "/ethermint.evm.v1.ExtensionOptionsEthereumTx"}
],
"non_critical_extension_options": [],
},
"auth_info": {
"signer_infos": [],
"fee": {
"amount": [{"denom": DEFAULT_DENOM, "amount": str(fee)}],
"gas_limit": str(gas_limit),
"payer": "",
"granter": "",
},
},
"signatures": [],
}


Batch = namedtuple("Batch", ["start", "end", "raw"])
Job = namedtuple("Job", ["batches", "cli"])


def _do_job(job: Job):
txs = []
for batch in job.batches:
tx = build_batch_tx(batch.raw)
with tempfile.NamedTemporaryFile("w") as fp:
fp.write(json.dumps(tx))
fp.flush()
encoded = job.cli("tx", "encode", fp.name).strip()
txs.append(encoded)
print("batch built:", batch.start, batch.end, len(encoded))
return txs


def build_batch_txs(cli, raw: [str], batch) -> [str]:
batches = split_batch(len(raw), batch)
chunks = split(len(batches), os.cpu_count())
jobs = [
Job([Batch(b, e, raw[b:e]) for b, e in batches[begin:end]], cli)
for begin, end in chunks
]
txs = []
with multiprocessing.Pool() as pool:
txs = pool.map(_do_job, jobs)
return list(itertools.chain(*txs))


if __name__ == "__main__":
from .cli import ChainCommand

raw = [
"0xf86580843b9aca008252089410000000000000000000000000000000000000000180820636a08acf3003dc8df7e81c2ee5c3e97125f526de84414fef21ea428accf83457a7a5a01dd8f73270b4152f6fd513d12703b7f45480190b4795d39f6d7626a396e7c032", # noqa
"0xf86580843b9aca008252089410000000000000000000000000000000000000000180820636a071d777ab815c11af0000b86333498c74dc0dfda45348869f5d11628932c7e1cca02b23fa2620796944df821ba6531eba1affa569d52389b9f799118d5a32a64f4b", # noqa
"0xf86580843b9aca008252089410000000000000000000000000000000000000000180820636a05dd5f6bfe94f2de23cc15c1217080b13b53159d73ee71e081639671dc3ce7ea6a02693d3fc4ed1578b2cbdc6e318755bc425ec23a6dcba3856b2e20729f58f2378", # noqa
]
txs = [HexBytes(tx) for tx in raw]
for tx in build_batch_txs(ChainCommand("cronosd"), txs, 2):
print(tx)
2 changes: 1 addition & 1 deletion testground/benchmark/benchmark/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .cli import ChainCommand
from .types import Balance, GenesisAccount, PeerPacket
from .utils import (
DEFAULT_DENOM,
bech32_to_eth,
eth_to_bech32,
gen_account,
Expand All @@ -19,7 +20,6 @@
patch_toml,
)

DEFAULT_DENOM = "basecro"
VAL_ACCOUNT = "validator"
VAL_INITIAL_AMOUNT = Balance(amount="100000000000000000000", denom=DEFAULT_DENOM)
VAL_STAKED_AMOUNT = Balance(amount="10000000000000000000", denom=DEFAULT_DENOM)
Expand Down
11 changes: 7 additions & 4 deletions testground/benchmark/benchmark/stateless.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def validate_json(ctx, param, value):
@click.option("--num-txs", default=1000)
@click.option("--num-idle", default=20)
@click.option("--tx-type", default="simple-transfer")
@click.option("--batch-size", default=1)
@click.option("--config-patch", default="{}", callback=validate_json)
@click.option("--app-patch", default="{}", callback=validate_json)
@click.option("--genesis-patch", default="{}", callback=validate_json)
Expand All @@ -82,6 +83,7 @@ def _gen(
num_txs: int = 1000,
num_idle: int = 20,
tx_type: str = "simple-transfer",
batch_size: int = 1,
validator_generate_load: bool = True,
config_patch: dict = None,
app_patch: dict = None,
Expand Down Expand Up @@ -145,7 +147,7 @@ def _gen(
"num_txs": num_txs,
"num_idle": num_idle,
"tx_type": tx_type,
"validator-generate-load": validator_generate_load,
"validator_generate_load": validator_generate_load,
}
(outdir / "config.json").write_text(json.dumps(cfg))

Expand Down Expand Up @@ -234,9 +236,10 @@ def generate_load(datadir: Path, global_seq: int):
"""
manually generate load to an existing node
"""
cli = ChainCommand(LOCAL_CRONOSD_PATH)
cfg = json.loads((datadir / "config.json").read_text())
txs = prepare_txs(cfg, datadir, global_seq)
asyncio.run(transaction.send(txs))
asyncio.run(transaction.send(cli, txs, cfg["batch_size"]))
print("sent", len(txs), "txs")
print("wait for 20 idle blocks")
detect_idle_halted(cfg["num_idle"], 20)
Expand Down Expand Up @@ -269,7 +272,7 @@ def job(global_seq):
def do_run(
datadir: Path, home: Path, cronosd: str, group: str, global_seq: int, cfg: dict
):
if group == FULLNODE_GROUP or cfg.get("validator-generate-load", True):
if group == FULLNODE_GROUP or cfg.get("validator_generate_load", True):
txs = prepare_txs(cfg, datadir, global_seq)
else:
txs = []
Expand All @@ -291,7 +294,7 @@ def do_run(
wait_for_block(cli, 3)

if txs:
asyncio.run(transaction.send(txs))
asyncio.run(transaction.send(cli, txs, cfg["batch_size"]))
print("sent", len(txs), "txs")

# node quit when the chain is idle or halted for a while
Expand Down
27 changes: 21 additions & 6 deletions testground/benchmark/benchmark/stats.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime

from .utils import block, block_height
from .utils import block, block_eth, block_height

# the tps calculation use the average of the last 10 blocks
TPS_WINDOW = 5
Expand All @@ -19,18 +19,33 @@ def calculate_tps(blocks):
return txs / time_diff


def dump_block_stats(fp):
def get_block_info_cosmos(height):
blk = block(height)
timestamp = datetime.fromisoformat(blk["result"]["block"]["header"]["time"])
txs = len(blk["result"]["block"]["data"]["txs"])
return timestamp, txs


def get_block_info_eth(height):
blk = block_eth(height)
timestamp = datetime.fromtimestamp(int(blk["timestamp"], 0))
txs = len(blk["transactions"])
return timestamp, txs


def dump_block_stats(fp, eth=True):
"""
dump simple statistics for blocks for analysis
dump block stats using web3 json-rpc, which splits batch tx
"""
tps_list = []
current = block_height()
blocks = []
# skip block 1 whose timestamp is not accurate
for i in range(2, current + 1):
blk = block(i)
timestamp = datetime.fromisoformat(blk["result"]["block"]["header"]["time"])
txs = len(blk["result"]["block"]["data"]["txs"])
if eth:
timestamp, txs = get_block_info_eth(i)
else:
timestamp, txs = get_block_info_cosmos(i)
blocks.append((txs, timestamp))
tps = calculate_tps(blocks[-TPS_WINDOW:])
tps_list.append(tps)
Expand Down
40 changes: 35 additions & 5 deletions testground/benchmark/benchmark/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
import eth_abi
import ujson

from .batch import build_batch_txs
from .erc20 import CONTRACT_ADDRESS
from .utils import LOCAL_JSON_RPC, gen_account, split
from .utils import LOCAL_JSON_RPC, LOCAL_RPC, gen_account, split

GAS_PRICE = 1000000000
CHAIN_ID = 777
Expand Down Expand Up @@ -109,7 +110,7 @@ def load(datadir: Path, global_seq: int) -> [str]:

@backoff.on_predicate(backoff.expo, max_time=60, max_value=5)
@backoff.on_exception(backoff.expo, aiohttp.ClientError, max_time=60, max_value=5)
async def async_sendtx(session, raw):
async def async_sendtx_eth(session, raw):
async with session.post(
LOCAL_JSON_RPC,
json={
Expand All @@ -121,15 +122,44 @@ async def async_sendtx(session, raw):
) as rsp:
data = await rsp.json()
if "error" in data:
print("send tx error, will retry,", data["error"])
print("send eth tx error, will retry,", data["error"])
return False
return True


async def send(txs):
@backoff.on_predicate(backoff.expo, max_time=60, max_value=5)
@backoff.on_exception(backoff.expo, aiohttp.ClientError, max_time=60, max_value=5)
async def async_sendtx_cosmos(session, raw):
async with session.post(
LOCAL_RPC,
json={
"jsonrpc": "2.0",
"method": "broadcast_tx_async",
"params": {
"tx": raw,
},
"id": 1,
},
) as rsp:
data = await rsp.json()
if "error" in data:
print("send cosmos tx error, will retry,", data["error"])
return False
return True


async def send(cli, txs, batch: int):
if batch <= 1:
sendtx = async_sendtx_eth
else:
sendtx = async_sendtx_cosmos
print("build batch txs begin", batch)
txs = build_batch_txs(cli, txs, batch)
print("build batch txs end")

connector = aiohttp.TCPConnector(limit=CONNECTION_POOL_SIZE)
async with aiohttp.ClientSession(
connector=connector, json_serialize=ujson.dumps
) as session:
tasks = [asyncio.ensure_future(async_sendtx(session, raw)) for raw in txs]
tasks = [asyncio.ensure_future(sendtx(session, raw)) for raw in txs]
await asyncio.gather(*tasks)
24 changes: 24 additions & 0 deletions testground/benchmark/benchmark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CRONOS_ADDRESS_PREFIX = "crc"
LOCAL_RPC = "http://127.0.0.1:26657"
LOCAL_JSON_RPC = "http://127.0.0.1:8545"
DEFAULT_DENOM = "basecro"


def patch_toml_doc(doc, patch):
Expand Down Expand Up @@ -170,6 +171,18 @@ def block(height):
return requests.get(f"{LOCAL_RPC}/block?height={height}").json()


def block_eth(height: int):
return requests.post(
f"{LOCAL_JSON_RPC}",
json={
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": [hex(height), False],
"id": 1,
},
).json()["result"]


def block_txs(height):
return block(height)["result"]["block"]["data"]["txs"]

Expand All @@ -180,3 +193,14 @@ def split(a: int, n: int):
"""
k, m = divmod(a, n)
return [(i * k + min(i, m), (i + 1) * k + min(i + 1, m)) for i in range(n)]


def split_batch(a: int, size: int):
"""
Split range(0, a) into batches with size
"""
k, m = divmod(a, size)
parts = [(i * size, (i + 1) * size) for i in range(k)]
if m:
parts.append((k * size, a))
return parts

0 comments on commit 89da005

Please sign in to comment.