diff --git a/CHANGELOG.md b/CHANGELOG.md index 2888c085be..2e5be034d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * [#1645](https://github.com/crypto-org-chain/cronos/pull/1645) Gen test tx in parallel even in single node. * (testground)[#1644](https://github.com/crypto-org-chain/cronos/pull/1644) load generator retry with backoff on error. * [#1648](https://github.com/crypto-org-chain/cronos/pull/1648) Add abort OE in PrepareProposal. +* (testground)[#]() Benchmark support batch mode. *Oct 14, 2024* diff --git a/testground/benchmark/benchmark/batch.py b/testground/benchmark/benchmark/batch.py new file mode 100644 index 0000000000..10cf0b770c --- /dev/null +++ b/testground/benchmark/benchmark/batch.py @@ -0,0 +1,127 @@ +import base64 +import itertools +import json +import multiprocessing +import os +import tempfile +from collections import namedtuple + +from eth_account import Account +from eth_account._utils.legacy_transactions import Transaction, vrs_from +from eth_account._utils.signing import hash_of_signed_transaction +from hexbytes import HexBytes + +from .utils import DEFAULT_DENOM, split, split_batch + + +def build_tx(raw: HexBytes): + tx = Transaction.from_bytes(raw) + msg_hash = hash_of_signed_transaction(tx) + vrs = vrs_from(tx) + sender = HexBytes(Account._recover_hash(msg_hash, vrs=vrs)) + msg = { + "@type": "/ethermint.evm.v1.MsgEthereumTx", + "data": None, + "size": 0, + "deprecated_hash": "", + "deprecated_from": "", + "from": base64.b64encode(sender).decode(), + "raw": raw.hex(), + } + fee = tx.gas * tx.gasPrice + return { + "body": { + "messages": [msg], + "memo": "", + "timeout_height": "0", + "extension_options": [ + {"@type": "/ethermint.evm.v1.ExtensionOptionsEthereumTx"} + ], + "non_critical_extension_options": [], + }, + "auth_info": { + "signer_infos": [], + "fee": { + "amount": [{"denom": DEFAULT_DENOM, "amount": str(fee)}], + "gas_limit": str(tx.gas), + "payer": "", + "granter": "", + }, + "tip": None, + }, + "signatures": [], + } + + +def build_batch_tx(signed_txs: [str]): + "return cosmos batch tx and eth tx hashes" + tmp_txs = [build_tx(HexBytes(signed)) for signed in signed_txs] + + msgs = [tx["body"]["messages"][0] for tx in tmp_txs] + fee = sum(int(tx["auth_info"]["fee"]["amount"][0]["amount"]) for tx in tmp_txs) + gas_limit = sum(int(tx["auth_info"]["fee"]["gas_limit"]) for tx in tmp_txs) + + return { + "body": { + "messages": msgs, + "memo": "", + "timeout_height": "0", + "extension_options": [ + {"@type": "/ethermint.evm.v1.ExtensionOptionsEthereumTx"} + ], + "non_critical_extension_options": [], + }, + "auth_info": { + "signer_infos": [], + "fee": { + "amount": [{"denom": DEFAULT_DENOM, "amount": str(fee)}], + "gas_limit": str(gas_limit), + "payer": "", + "granter": "", + }, + }, + "signatures": [], + } + + +Batch = namedtuple("Batch", ["start", "end", "raw"]) +Job = namedtuple("Job", ["batches", "cli"]) + + +def _do_job(job: Job): + txs = [] + for batch in job.batches: + tx = build_batch_tx(batch.raw) + with tempfile.NamedTemporaryFile("w") as fp: + fp.write(json.dumps(tx)) + fp.flush() + encoded = job.cli("tx", "encode", fp.name).strip() + txs.append(encoded) + print("batch built:", batch.start, batch.end, len(encoded)) + return txs + + +def build_batch_txs(cli, raw: [str], batch) -> [str]: + batches = split_batch(len(raw), batch) + chunks = split(len(batches), os.cpu_count()) + jobs = [ + Job([Batch(b, e, raw[b:e]) for b, e in batches[begin:end]], cli) + for begin, end in chunks + ] + txs = [] + with multiprocessing.Pool() as pool: + txs = pool.map(_do_job, jobs) + return list(itertools.chain(*txs)) + + +if __name__ == "__main__": + from .cli import ChainCommand + + raw = [ + "0xf86580843b9aca008252089410000000000000000000000000000000000000000180820636a08acf3003dc8df7e81c2ee5c3e97125f526de84414fef21ea428accf83457a7a5a01dd8f73270b4152f6fd513d12703b7f45480190b4795d39f6d7626a396e7c032", # noqa + "0xf86580843b9aca008252089410000000000000000000000000000000000000000180820636a071d777ab815c11af0000b86333498c74dc0dfda45348869f5d11628932c7e1cca02b23fa2620796944df821ba6531eba1affa569d52389b9f799118d5a32a64f4b", # noqa + "0xf86580843b9aca008252089410000000000000000000000000000000000000000180820636a05dd5f6bfe94f2de23cc15c1217080b13b53159d73ee71e081639671dc3ce7ea6a02693d3fc4ed1578b2cbdc6e318755bc425ec23a6dcba3856b2e20729f58f2378", # noqa + ] + txs = [HexBytes(tx) for tx in raw] + for tx in build_batch_txs(ChainCommand("cronosd"), txs, 2): + print(tx) diff --git a/testground/benchmark/benchmark/peer.py b/testground/benchmark/benchmark/peer.py index 83da764961..a1526f8870 100644 --- a/testground/benchmark/benchmark/peer.py +++ b/testground/benchmark/benchmark/peer.py @@ -11,6 +11,7 @@ from .cli import ChainCommand from .types import Balance, GenesisAccount, PeerPacket from .utils import ( + DEFAULT_DENOM, bech32_to_eth, eth_to_bech32, gen_account, @@ -19,7 +20,6 @@ patch_toml, ) -DEFAULT_DENOM = "basecro" VAL_ACCOUNT = "validator" VAL_INITIAL_AMOUNT = Balance(amount="100000000000000000000", denom=DEFAULT_DENOM) VAL_STAKED_AMOUNT = Balance(amount="10000000000000000000", denom=DEFAULT_DENOM) diff --git a/testground/benchmark/benchmark/stateless.py b/testground/benchmark/benchmark/stateless.py index 8ac87f20b8..c42aa2140a 100644 --- a/testground/benchmark/benchmark/stateless.py +++ b/testground/benchmark/benchmark/stateless.py @@ -59,6 +59,7 @@ def validate_json(ctx, param, value): @click.option("--num-txs", default=1000) @click.option("--num-idle", default=20) @click.option("--tx-type", default="simple-transfer") +@click.option("--batch-size", default=1) @click.option("--config-patch", default="{}", callback=validate_json) @click.option("--app-patch", default="{}", callback=validate_json) @click.option("--genesis-patch", default="{}", callback=validate_json) @@ -82,6 +83,7 @@ def _gen( num_txs: int = 1000, num_idle: int = 20, tx_type: str = "simple-transfer", + batch_size: int = 1, validator_generate_load: bool = True, config_patch: dict = None, app_patch: dict = None, @@ -145,7 +147,7 @@ def _gen( "num_txs": num_txs, "num_idle": num_idle, "tx_type": tx_type, - "validator-generate-load": validator_generate_load, + "validator_generate_load": validator_generate_load, } (outdir / "config.json").write_text(json.dumps(cfg)) @@ -234,9 +236,10 @@ def generate_load(datadir: Path, global_seq: int): """ manually generate load to an existing node """ + cli = ChainCommand(LOCAL_CRONOSD_PATH) cfg = json.loads((datadir / "config.json").read_text()) txs = prepare_txs(cfg, datadir, global_seq) - asyncio.run(transaction.send(txs)) + asyncio.run(transaction.send(cli, txs, cfg["batch_size"])) print("sent", len(txs), "txs") print("wait for 20 idle blocks") detect_idle_halted(cfg["num_idle"], 20) @@ -269,7 +272,7 @@ def job(global_seq): def do_run( datadir: Path, home: Path, cronosd: str, group: str, global_seq: int, cfg: dict ): - if group == FULLNODE_GROUP or cfg.get("validator-generate-load", True): + if group == FULLNODE_GROUP or cfg.get("validator_generate_load", True): txs = prepare_txs(cfg, datadir, global_seq) else: txs = [] @@ -291,7 +294,7 @@ def do_run( wait_for_block(cli, 3) if txs: - asyncio.run(transaction.send(txs)) + asyncio.run(transaction.send(cli, txs, cfg["batch_size"])) print("sent", len(txs), "txs") # node quit when the chain is idle or halted for a while diff --git a/testground/benchmark/benchmark/stats.py b/testground/benchmark/benchmark/stats.py index 1a0978179c..c5a74c8fa9 100644 --- a/testground/benchmark/benchmark/stats.py +++ b/testground/benchmark/benchmark/stats.py @@ -1,6 +1,6 @@ from datetime import datetime -from .utils import block, block_height +from .utils import block, block_eth, block_height # the tps calculation use the average of the last 10 blocks TPS_WINDOW = 5 @@ -19,18 +19,33 @@ def calculate_tps(blocks): return txs / time_diff -def dump_block_stats(fp): +def get_block_info_cosmos(height): + blk = block(height) + timestamp = datetime.fromisoformat(blk["result"]["block"]["header"]["time"]) + txs = len(blk["result"]["block"]["data"]["txs"]) + return timestamp, txs + + +def get_block_info_eth(height): + blk = block_eth(height) + timestamp = datetime.fromtimestamp(int(blk["timestamp"], 0)) + txs = len(blk["transactions"]) + return timestamp, txs + + +def dump_block_stats(fp, eth=True): """ - dump simple statistics for blocks for analysis + dump block stats using web3 json-rpc, which splits batch tx """ tps_list = [] current = block_height() blocks = [] # skip block 1 whose timestamp is not accurate for i in range(2, current + 1): - blk = block(i) - timestamp = datetime.fromisoformat(blk["result"]["block"]["header"]["time"]) - txs = len(blk["result"]["block"]["data"]["txs"]) + if eth: + timestamp, txs = get_block_info_eth(i) + else: + timestamp, txs = get_block_info_cosmos(i) blocks.append((txs, timestamp)) tps = calculate_tps(blocks[-TPS_WINDOW:]) tps_list.append(tps) diff --git a/testground/benchmark/benchmark/transaction.py b/testground/benchmark/benchmark/transaction.py index 78c77a3804..826e68925f 100644 --- a/testground/benchmark/benchmark/transaction.py +++ b/testground/benchmark/benchmark/transaction.py @@ -10,8 +10,9 @@ import eth_abi import ujson +from .batch import build_batch_txs from .erc20 import CONTRACT_ADDRESS -from .utils import LOCAL_JSON_RPC, gen_account, split +from .utils import LOCAL_JSON_RPC, LOCAL_RPC, gen_account, split GAS_PRICE = 1000000000 CHAIN_ID = 777 @@ -109,7 +110,7 @@ def load(datadir: Path, global_seq: int) -> [str]: @backoff.on_predicate(backoff.expo, max_time=60, max_value=5) @backoff.on_exception(backoff.expo, aiohttp.ClientError, max_time=60, max_value=5) -async def async_sendtx(session, raw): +async def async_sendtx_eth(session, raw): async with session.post( LOCAL_JSON_RPC, json={ @@ -121,15 +122,44 @@ async def async_sendtx(session, raw): ) as rsp: data = await rsp.json() if "error" in data: - print("send tx error, will retry,", data["error"]) + print("send eth tx error, will retry,", data["error"]) return False return True -async def send(txs): +@backoff.on_predicate(backoff.expo, max_time=60, max_value=5) +@backoff.on_exception(backoff.expo, aiohttp.ClientError, max_time=60, max_value=5) +async def async_sendtx_cosmos(session, raw): + async with session.post( + LOCAL_RPC, + json={ + "jsonrpc": "2.0", + "method": "broadcast_tx_async", + "params": { + "tx": raw, + }, + "id": 1, + }, + ) as rsp: + data = await rsp.json() + if "error" in data: + print("send cosmos tx error, will retry,", data["error"]) + return False + return True + + +async def send(cli, txs, batch: int): + if batch <= 1: + sendtx = async_sendtx_eth + else: + sendtx = async_sendtx_cosmos + print("build batch txs begin", batch) + txs = build_batch_txs(cli, txs, batch) + print("build batch txs end") + connector = aiohttp.TCPConnector(limit=CONNECTION_POOL_SIZE) async with aiohttp.ClientSession( connector=connector, json_serialize=ujson.dumps ) as session: - tasks = [asyncio.ensure_future(async_sendtx(session, raw)) for raw in txs] + tasks = [asyncio.ensure_future(sendtx(session, raw)) for raw in txs] await asyncio.gather(*tasks) diff --git a/testground/benchmark/benchmark/utils.py b/testground/benchmark/benchmark/utils.py index 5a0c033c98..14e58f55e8 100644 --- a/testground/benchmark/benchmark/utils.py +++ b/testground/benchmark/benchmark/utils.py @@ -15,6 +15,7 @@ CRONOS_ADDRESS_PREFIX = "crc" LOCAL_RPC = "http://127.0.0.1:26657" LOCAL_JSON_RPC = "http://127.0.0.1:8545" +DEFAULT_DENOM = "basecro" def patch_toml_doc(doc, patch): @@ -170,6 +171,18 @@ def block(height): return requests.get(f"{LOCAL_RPC}/block?height={height}").json() +def block_eth(height: int): + return requests.post( + f"{LOCAL_JSON_RPC}", + json={ + "jsonrpc": "2.0", + "method": "eth_getBlockByNumber", + "params": [hex(height), False], + "id": 1, + }, + ).json()["result"] + + def block_txs(height): return block(height)["result"]["block"]["data"]["txs"] @@ -180,3 +193,14 @@ def split(a: int, n: int): """ k, m = divmod(a, n) return [(i * k + min(i, m), (i + 1) * k + min(i + 1, m)) for i in range(n)] + + +def split_batch(a: int, size: int): + """ + Split range(0, a) into batches with size + """ + k, m = divmod(a, size) + parts = [(i * size, (i + 1) * size) for i in range(k)] + if m: + parts.append((k * size, a)) + return parts