diff --git a/.gitignore b/.gitignore index ff5e1f0..562b935 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ *~ *.pch *.pyc +*.ipynb __pycache__/ .idea .vscode/ diff --git a/README.md b/README.md index c17e327..f2f320c 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,112 @@ Example: check accounts in `account/samples/verify_ownership.yaml`. python -m account.verify_ownership --input account/samples/verify_ownership.yaml ``` +## block + +Running block extraction scripts requires the installaton of the local **block** package. This can be accomplished as follows: +```sh +pip install -e setup.py +``` + +### extractor/extract + +_extracts chain data from node files and produces compact output for applications_ + +The extractor is the first step in processing raw block data using the block-level scripts, either to drive visualizations or chain analysis. +There are two output types: +- blocks +- statements + +Example: extract data from node files stored in `block/data` +Default output dir is `block/resources` + +```sh +python extractor/extract.py --input data --output resources --stream +``` + +The --stream flag will stream block data and write a single output at a time, which dramatically reduces memory footprint at the cost of significantly slower processing. +Without this flag the extractor is extremely memory intensive as it loads the entire chain state representation into memory; omit at your own risk. + +### extractor/process + +_processes extracted chain data to generate useful/readable representation of chain state_ + +The processor streams data output by the extractor and builds human-readable representations of the block headers +as well as a rich, indexable representation of the chain state. +There are two output types: +- block headers +- chain state + +Example: process data from extractor output stored in `block/resources` +Default output dir is `block/resources` + +```sh +python extractor/process.py --input resources --output resources +``` + +### delegates/find_delegates + +_finds current delegates associated with one or more nodes using serialized state data_ + +This script requires a JSON containing accounts similar to what is receieved from the /node/info API endpoint; see example in `resources/accounts.json`. +As long as node URLs/names are available it will attempt to get missing information from the nodes. + +Example: find all delegates from nodes listed in `resources/accounts.json` using chain state from `resources/state_map.msgpack`. +Default output dir is `block/delegates/output`. + +```sh +python delegates/find_delegates.py --input resources/accounts.json --state_path resources/state_map.msgpack +``` + +### harvester/get_harvester_stats + +_aggregate harvesting statistics using serialized state data_ + +This script requires a JSON containing harvester addresses; see example in `resources/accounts.json`. +Stats are aggregated for the full chain history and binned based on provided frequencies. +The output falls into three categories: +- blocks harvested +- fees collected +- total XYM balance + +Example: get stats for harvesters listed in `resources/accounts.json` using chain state from `resources/state_map.msgpack` and `resources/block_header_df.pkl` +Default output dir is `block/harvester/output` + +```sh +python harvester/get_harvester_stats.py --input resources/accounts.json --state_path resources/state_map.msgpack --headers_path resources/block_header_df.pkl +``` + +### nft/nember_extract + +_extract transactions corresponding to minting of nember NFTs_ + +Produces two types of output +- NFT descriptions +- transactions involving NFTs after minting + +Example: extract nember data from chain data in `resources/block_data.msgpack` +Default output dir is `block/nft/output` + +```sh +python nft/nember_extract.py --input resources/block_data.msgpack --output nft/output +``` + +### nft/nember_scrape + +_scrape transactions corresponding to minting of nember NFTs from API nodes_ + +Produces two types of output +- NFT descriptions +- transactions involving NFTs after minting + +Example: scrape all transactions corresponding to nember NFTs (takes a couple hours minimum) +Default output dir is `block/nft/output` + +```sh +python nft/nember_scrape.py +``` + + ## health ### check_nem_balances diff --git a/block/block/__init__.py b/block/block/__init__.py new file mode 100644 index 0000000..0ae78ff --- /dev/null +++ b/block/block/__init__.py @@ -0,0 +1 @@ +__all__ = ['extractor', 'extractor.util', 'extractor.state', 'extractor.format'] diff --git a/block/block/delegates/__init__.py b/block/block/delegates/__init__.py new file mode 100644 index 0000000..73e9ec2 --- /dev/null +++ b/block/block/delegates/__init__.py @@ -0,0 +1,3 @@ +from block.delegates.delegates import find_delegates + +__all__ = ['find_delegates'] diff --git a/block/block/delegates/delegates.py b/block/block/delegates/delegates.py new file mode 100644 index 0000000..1da26e7 --- /dev/null +++ b/block/block/delegates/delegates.py @@ -0,0 +1,42 @@ +"""Symbol delegate mapping utilities""" + +from binascii import unhexlify + +import requests + +from block.extractor import public_key_to_address + + +def find_delegates(accounts, state_map): + """Find current delegates for each node based on chain state at final height""" + + accounts = accounts.copy() + for acc in accounts: + if 'nodePublicKey' in acc: + node_address = public_key_to_address(unhexlify(acc['nodePublicKey'])) + else: + print('No node public key present, trying to collect from API') + try: + node_key = requests.get(f'http://{acc["name"]}:3000/node/info').json()['nodePublicKey'] + node_address = public_key_to_address(unhexlify(node_key)) + except requests.exceptions.ConnectionError: + print(f'Failed to connect, skipping node: {acc["name"]}') + continue + + # initialize delegates with node address + valid_delegates = [acc['address']] + invalid_delegates = [] + + for key, val in state_map.items(): + if node_address in val['node_key_link']: + if val['node_key_link'][node_address][-1][1] == float('inf'): + if sum(val['xym_balance'].values()) >= (10000 * 1e6): + valid_delegates.append(key) + else: + invalid_delegates.append(key) + acc.update({ + 'node_address': node_address, + 'valid_delegates': valid_delegates, + 'invalid_delegates': invalid_delegates + }) + return accounts diff --git a/block/block/delegates/find_delegates.py b/block/block/delegates/find_delegates.py new file mode 100644 index 0000000..1839d83 --- /dev/null +++ b/block/block/delegates/find_delegates.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +"""Symbol delegate identification script""" + +import argparse +import json + +from block.delegates.delegates import find_delegates +from block.extractor import XYMStateMap + +if __name__ == '__main__': + + parser = argparse.ArgumentParser() + parser.add_argument('--input', type=str, default='resources/accounts.json', help='path to load node information from') + parser.add_argument('--output', type=str, default='delegates/output/node_delegates.json', help='path to write delegates json') + parser.add_argument('--state_path', type=str, default='resources/state_map.msgpack', help='path to load state map from') + + args = parser.parse_args() + + print(f'Reading state from {args.state_path}') + state_map = XYMStateMap.read_msgpack(args.state_path) + + print(f'Reading nodes from {args.input}') + with open(args.input, 'r', encoding='utf8') as f: + accounts = json.loads(f.read())['accounts'] + + print('Identifying delegates . . .') + delegate_accounts = find_delegates(accounts, state_map) + + print(f'All accounts processed, writing output to {args.output}') + with open(args.output, 'w', encoding='utf8') as f: + f.write(json.dumps(delegate_accounts, indent=4)) + + print('Delegate analysis complete!') diff --git a/block/block/extractor/__init__.py b/block/block/extractor/__init__.py new file mode 100644 index 0000000..df937ac --- /dev/null +++ b/block/block/extractor/__init__.py @@ -0,0 +1,15 @@ +from block.extractor.state import XYMStateMap +from block.extractor.util import encode_address, fmt_unpack, public_key_to_address + +__all__ = [ + 'state', + 'format', + 'util', + 'statements', + 'body', + 'process', + 'XYMStateMap', + 'fmt_unpack', + 'encode_address', + 'public_key_to_address' +] diff --git a/block/block/extractor/body.py b/block/block/extractor/body.py new file mode 100644 index 0000000..aadec31 --- /dev/null +++ b/block/block/extractor/body.py @@ -0,0 +1,466 @@ +import struct +from binascii import hexlify, unhexlify + +from block.extractor.format import (EMBED_TX_H_FORMAT, EMBED_TX_H_LEN, FOOTER_FORMAT, FOOTER_LEN, HEADER_FORMAT, IMPORTANCE_FOOTER_FORMAT, + IMPORTANCE_FOOTER_LEN, TX_H_FORMAT, TX_H_LEN) +from block.extractor.util import encode_address, fmt_unpack, public_key_to_address + + +def deserialize_header(header_data): + """Produce a python dict from a raw xym header blob + + Parameters + ---------- + header_data: bytes + Byte array containing serialized header + + Returns + ------- + header: dict + Dict containing block header field keys and primitive or bytes values + + """ + + header = fmt_unpack(header_data, HEADER_FORMAT) + for key, val in HEADER_FORMAT.items(): + if key == 'type': + header[key] = hexlify(header[key][::-1]) + elif key == 'beneficiary_address': + header[key] = encode_address(header[key]) + elif val[-1] == 's': + header[key] = hexlify(header[key]) + header['harvester'] = public_key_to_address(unhexlify(header['signer_public_key'])) + return header + + +def deserialize_footer(footer_data, header): + """Produce a nested python dict from a raw xym footer blob + + Parameters + ---------- + footer_data: bytes + Byte array containing serialized footer + header: dict + Deserialized header dict as produced by:func:`deserialize_header` + + Returns + ------- + footer: dict + Dict containing block footer field keys and primitive or bytes values + as well as a list of deserialized transaction dicts + + """ + + # parse static footer fields + if header['type'] == b'8043': # nemesis + footer = fmt_unpack(footer_data[:IMPORTANCE_FOOTER_LEN], IMPORTANCE_FOOTER_FORMAT) + i = IMPORTANCE_FOOTER_LEN + elif header['type'] == b'8143': # normal + footer = fmt_unpack(footer_data[:FOOTER_LEN], FOOTER_FORMAT) + i = FOOTER_LEN + elif header['type'] == b'8243': # importance + footer = fmt_unpack(footer_data[:IMPORTANCE_FOOTER_LEN], IMPORTANCE_FOOTER_FORMAT) + i = IMPORTANCE_FOOTER_LEN + else: + raise ValueError(f'Unknown Block Type Encountered: {header["type"]}') + + # parse transactions + tx_data = [] + tx_count = 0 + statement_count = 0 + total_fee = 0 + while i < len(footer_data): + tx_header = fmt_unpack(footer_data[i:i+TX_H_LEN], TX_H_FORMAT) + tx_header['id'] = statement_count + 1 # tx ids are 1-based + tx_header['signature'] = hexlify(tx_header['signature']) + tx_header['signer_public_key'] = hexlify(tx_header['signer_public_key']) + tx_header['type'] = hexlify(tx_header['type'][::-1]) + tx_header['payload'] = deserialize_tx_payload(footer_data[i+TX_H_LEN:i+tx_header['size']], tx_header['type']) + tx_data.append(tx_header) + + total_fee += min(tx_header['max_fee'], tx_header['size'] * header['fee_multiplier']) + tx_count += (1+tx_header['payload']['embedded_tx_count']) if 'embedded_tx_count' in tx_header['payload'] else 1 + statement_count += 1 + i += tx_header['size'] + (8 - tx_header['size']) % 8 + + footer['total_fee'] = total_fee + footer['statement_count'] = statement_count + footer['tx_count'] = tx_count + footer['transactions'] = tx_data + + return footer + + +def deserialize_tx_payload(payload_data, payload_type): + # pylint: disable=too-many-branches, too-many-statements + """Produce a nested python dict from a raw xym statemet payload + + Parameters + ---------- + payload_data: bytes + Byte array containing serialized tx payload + payload_type: bytes + Byte array containing the hex representation of the type field from + the transaction header associated with payload + + Returns + ------- + payload: dict + Dict containing tx payload field keys and primitive or bytes values. In + the case of aggregate transactions, will include a list containing dict + representations of deserialized embedded transactions. + + """ + + # Account Link + if payload_type == b'414c': # AccountKeyLinkTransaction + schema = { + 'linked_public_key': '32s', + 'link_action': 'B' + } + payload = fmt_unpack(payload_data, schema) + + elif payload_type == b'424c': # NodeKeyLinkTransaction + schema = { + 'linked_public_key': '32s', + 'link_action': 'B' + } + payload = fmt_unpack(payload_data, schema) + + # Aggregate + elif payload_type == b'4141': # AggregateCompleteTransaction + schema = { + 'transactions_hash': '32s', + 'payload_size': 'I', + 'aggregate_complete_transaction_reserved_1': 'I' + } + i = 40 + payload = fmt_unpack(payload_data[:i], schema) + e_tx_count = 0 + e_tx_data = [] + while i < 8 + payload['payload_size']: + e_tx_header = fmt_unpack(payload_data[i:i+EMBED_TX_H_LEN], EMBED_TX_H_FORMAT) + e_tx_header['id'] = e_tx_count + 1 # tx ids are 1-based + e_tx_header['signer_public_key'] = hexlify(e_tx_header['signer_public_key']) + e_tx_header['type'] = hexlify(e_tx_header['type'][::-1]) + e_tx_header['payload'] = deserialize_tx_payload(payload_data[i+EMBED_TX_H_LEN:i+e_tx_header['size']], e_tx_header['type']) + e_tx_data.append(e_tx_header) + e_tx_count += 1 + i += e_tx_header['size'] + (8 - e_tx_header['size']) % 8 + + payload['embedded_tx_count'] = e_tx_count + payload['embedded_transactions'] = e_tx_data + payload['cosignatures'] = payload_data[i:] + + elif payload_type == b'4241': # AggregateBondedTransaction + schema = { + 'transactions_hash': '32s', + 'payload_size': 'I', + 'aggregate_complete_transaction_reserved_1': 'I' + } + i = 40 + payload = fmt_unpack(payload_data[:i], schema) + e_tx_count = 0 + e_tx_data = [] + while i < 8 + payload['payload_size']: + e_tx_header = fmt_unpack(payload_data[i:i+EMBED_TX_H_LEN], EMBED_TX_H_FORMAT) + e_tx_header['id'] = e_tx_count + 1 # tx ids are 1-based + e_tx_header['signer_public_key'] = hexlify(e_tx_header['signer_public_key']) + e_tx_header['type'] = hexlify(e_tx_header['type'][::-1]) + e_tx_header['payload'] = deserialize_tx_payload(payload_data[i+EMBED_TX_H_LEN:i+e_tx_header['size']], e_tx_header['type']) + e_tx_data.append(e_tx_header) + e_tx_count += 1 + i += e_tx_header['size'] + (8 - e_tx_header['size']) % 8 + + payload['embedded_tx_count'] = e_tx_count + payload['embedded_transactions'] = e_tx_data + payload['cosignatures'] = payload_data[i:] + + # Core + elif payload_type == b'4143': # VotingKeyLinkTransaction + schema = { + 'linked_public_key': '32s', + 'start_point': 'I', + 'end_point': 'I', + 'link_action': 'B' + } + payload = fmt_unpack(payload_data, schema) + + elif payload_type == b'4243': # VrfKeyLinkTransaction + schema = { + 'linked_public_key': '32s', + 'link_action': 'B' + } + payload = fmt_unpack(payload_data, schema) + + # Mosaic + elif payload_type == b'414d': # MosaicDefinitionTransaction + schema = { + 'id': 'Q', + 'duration': 'Q', + 'nonce': 'I', + 'flags': 'B', + 'divisibility': 'B' + } + payload = fmt_unpack(payload_data, schema) + + elif payload_type == b'424d': # MosaicSupplyChangeTransaction + schema = { + 'mosaic_id': 'Q', + 'delta': 'Q', + 'action': 'B', + } + payload = fmt_unpack(payload_data, schema) + + elif payload_type == b'434d': # MosaicSupplyRevocationTransaction + schema = { + 'source_address': '24s', + 'mosaic_id': 'Q', + 'amount': 'Q', + } + payload = fmt_unpack(payload_data, schema) + + # Namespace + elif payload_type == b'414e': # NamespaceRegistrationTransaction + schema = { + 'identifier': 'Q', + 'id': 'Q', + 'registration_type': 'B', + 'name_size': 'B', + } + payload = fmt_unpack(payload_data[:18], schema) + payload['name'] = payload_data[18:] + if payload['registration_type'] == 0: + payload['duration'] = payload['identifier'] + elif payload['registration_type'] == 1: + payload['parent_id'] = payload['identifier'] + else: + raise ValueError(f'Unknown registration type for Namespace RegistrationTransaction: {payload["registration_type"]}') + del payload['identifier'] + + elif payload_type == b'424e': # AddressAliasTransaction + schema = { + 'namespace_id': 'Q', + 'address': '24s', + 'alias_action': 'B' + } + payload = fmt_unpack(payload_data, schema) + + elif payload_type == b'434e': # MosaicAliasTransaction + schema = { + 'namespace_id': 'Q', + 'mosaic_id': 'Q', + 'alias_action': 'B' + } + payload = fmt_unpack(payload_data, schema) + + # Metadata + elif payload_type == b'4144': # AccountMetadataTransaction + schema = { + 'target_address': '24s', + 'scoped_metadata_key': 'Q', + 'value_size_delta': 'H', + 'value_size': 'H', + } + payload = fmt_unpack(payload_data[:36], schema) + payload['target_address'] = encode_address(payload['target_address']) + payload['value'] = payload_data[36:] + + elif payload_type == b'4244': # MosaicMetadataTransaction + schema = { + 'target_address': '24s', + 'scoped_metadata_key': 'Q', + 'target_mosaic_id': 'Q', + 'value_size_delta': 'H', + 'value_size': 'H', + } + payload = fmt_unpack(payload_data[:44], schema) + payload['target_address'] = encode_address(payload['target_address']) + payload['value'] = payload_data[44:] + + elif payload_type == b'4344': # NamespaceMetadataTransaction + schema = { + 'target_address': '24s', + 'scoped_metadata_key': 'Q', + 'target_namespace_id': 'Q', + 'value_size_delta': 'H', + 'value_size': 'H', + } + payload = fmt_unpack(payload_data[:44], schema) + payload['target_address'] = encode_address(payload['target_address']) + payload['value'] = payload_data[44:] + + # Multisignature + elif payload_type == b'4155': # MultisigAccountModificationTransaction + schema = { + 'min_removal_delta': 'B', + 'min_approval_delta': 'b', + 'address_additions_count': 'B', + 'address_deletions_count': 'B', + 'multisig_account_modificaion_transacion_body_reserved_1': 'I' + } + payload = fmt_unpack(payload_data[:8], schema) + i = 8 + if payload['address_additions_count'] > 0: + payload['address_additions'] = struct.unpack( + '<' + '24s'*payload['address_additions_count'], payload_data[i:i+payload['address_additions_count']*24]) + i += payload['address_additions_count']*24 + else: + payload['address_additions'] = [] + + if payload['address_deletions_count'] > 0: + payload['address_deletions'] = struct.unpack( + '<' + '24s'*payload['address_deletions_count'], payload_data[i:i+payload['address_deletions_count']*24]) + else: + payload['address_deletions'] = [] + + # Hash Lock + elif payload_type == b'4148': # HashLockTransaction + schema = { + 'reserved_1': '8s', # NOT in the schema but shows up in the data ?!? + 'mosaic': 'Q', + 'duration': 'Q', + 'hash': '32s' + } + payload = fmt_unpack(payload_data, schema) + + # Secret Lock + elif payload_type == b'4152': # SecretLockTransaction + schema = { + 'recipient_address': '24s', + 'secret': '32s', + 'mosaic_id': 'Q', + 'amount': 'Q', + 'duration': 'Q', + 'hash_algorithm': 'B' + } + payload = fmt_unpack(payload_data, schema) + payload['recipient_address'] = encode_address(payload['recipient_address']) + + elif payload_type == b'4252': # SecretProofTransaction + schema = { + 'recipient_address': '24s', + 'secret': '32s', + 'proof_size': 'H', + 'hash_algorithm': 'B', + } + payload = fmt_unpack(payload_data[:59], schema) + payload['recipient_address'] = encode_address(payload['recipient_address']) + payload['proof'] = payload_data[59:] + + # Account restriction + elif payload_type == b'4150': # AccountAddressRestrictionTransaction + schema = { + 'restriction_type': 'H', + 'restriction_additions_count': 'B', + 'restriction_deletions_count': 'B', + 'account_restriction_transaction_body_reserved_1': 'I', + } + payload = fmt_unpack(payload_data[:8], schema) + i = 8 + if payload['restriction_additions_count'] > 0: + payload['restriction_additions'] = struct.unpack( + '<' + '24s'*payload['restriction_additions_count'], payload_data[i:i+payload['restriction_additions_count']*24]) + i += payload['restriction_additions_count']*24 + else: + payload['restriction_additions'] = [] + + if payload['restriction_deletions_count'] > 0: + payload['restriction_deletions'] = struct.unpack( + '<' + '24s'*payload['restriction_deletions_count'], payload_data[i:i+payload['restriction_deletions_count']*24]) + else: + payload['restriction_deletions'] = [] + + elif payload_type == b'4250': # AccountMosaicRestrictionTransaction + schema = { + 'restriction_type': 'H', + 'restriction_additions_count': 'B', + 'restriction_deletions_count': 'B', + 'account_restriction_transaction_body_reserved_1': 'I', + } + payload = fmt_unpack(payload_data[:8], schema) + i = 8 + if payload['restriction_additions_count'] > 0: + payload['restriction_additions'] = struct.unpack( + '<' + 'Q'*payload['restriction_additions_count'], payload_data[i:i+payload['restriction_additions_count']*8]) + i += payload['restriction_additions_count']*8 + else: + payload['restriction_additions'] = [] + + if payload['restriction_deletions_count'] > 0: + payload['restriction_deletions'] = struct.unpack( + '<' + 'Q'*payload['restriction_deletions_count'], payload_data[i:i+payload['restriction_deletions_count']*8]) + else: + payload['restriction_deletions'] = [] + + elif payload_type == b'4350': # AccountOperationRestrictionTransaction + schema = { + 'restriction_type': 'H', + 'restriction_additions_count': 'B', + 'restriction_deletions_count': 'B', + 'account_restriction_transaction_body_reserved_1': 'I', + } + payload = fmt_unpack(payload_data[:8], schema) + i = 8 + if payload['restriction_additions_count'] > 0: + payload['restriction_additions'] = struct.unpack( + '<' + '2s'*payload['restriction_additions_count'], payload_data[i:i+payload['restriction_additions_count']*2]) + i += payload['restriction_additions_count']*2 + else: + payload['restriction_additions'] = [] + + if payload['restriction_deletions_count'] > 0: + payload['restriction_deletions'] = struct.unpack( + '<' + '2s'*payload['restriction_deletions_count'], payload_data[i:i+payload['restriction_deletions_count']*24]) + else: + payload['restriction_deletions'] = [] + + # Mosaic restriction + elif payload_type == b'4151': # MosaicGlobalRestrictionTransaction + schema = { + 'mosaic_id': 'Q', + 'reference_mosaic_id': 'Q', + 'restriction_key': 'Q', + 'previous_restriction_value': 'Q', + 'new_restriction_value': 'Q', + 'previous_restriction_type': 'B', + 'new_restriction_type': 'B' + } + payload = fmt_unpack(payload_data, schema) + + elif payload_type == b'4251': # MosaicAddressRestrictionTransaction + schema = { + 'mosaic_id': 'Q', + 'restriction_key': 'Q', + 'previous_restriction_value': 'Q', + 'new_restriction_value': 'Q', + 'target_address': '24s' + } + payload = fmt_unpack(payload_data, schema) + payload['target_address'] = encode_address(payload['target_address']) + + # Transfer + elif payload_type == b'4154': # TransferTransaction + schema = { + 'recipient_address': '24s', + 'message_size': 'H', + 'mosaics_count': 'B', + 'transfer_transaction_body_reserved_1': 'I', + 'transfer_transaction_body_reserved_2': 'B', + } + payload = fmt_unpack(payload_data[:32], schema) + i = 32 + payload['mosaics'] = [] + for _ in range(payload['mosaics_count']): + mosaic = {} + mosaic['mosaic_id'] = struct.unpack(' height: + continue + + while statement_height < height: + statement_height, statements, _ = next(statements_) + + for stmt in statements['transaction_statements']: + for receipt in stmt['receipts']: + state_map.insert_rcpt(receipt, height) + + file.write(msgpack.packb((statement_height, statements,))) + + assert len([*statements_]) == 0, 'ERROR: statement data length does not match block length' + + print('statement data extraction complete!\n') + print(f'statement data written to {os.path.join(args.output,"stmt_data.msgpack")}') + + state_map.to_msgpack(os.path.join(args.output, 'state_map.msgpack')) + + print(f'state data written to {os.path.join(args.output,"state_map.msgpack")}') + + print('exiting . . .') + + +def main_stream(args): + + # pylint: disable=too-many-locals, too-many-statements, consider-using-with + + block_format_pattern = re.compile('[0-9]{5}'+args.block_extension) + block_paths = glob.glob(os.path.join(args.input, '**', '*'+args.block_extension), recursive=True) + block_paths = tqdm(sorted(list(filter(lambda x: block_format_pattern.match(os.path.basename(x)), block_paths)))) + + state_map = XYMStateMap() + blocks_to_go = [] + + statements_ = deserialize_statements(get_statement_paths(block_dir=args.input, statement_extension=args.statement_extension)) + statement_height, statements, _ = next(statements_) + + packer = msgpack.Packer() + statement_store = open(os.path.join(args.output, 'stmt_data.msgpack'), 'wb') + block_store = open(os.path.join(args.output, 'block_data.msgpack'), 'wb') + + for path in block_paths: + + block_paths.set_description(f'processing block file: {path}; current queue len: {len(blocks_to_go)}') + + with open(path, mode='rb') as file: + block_data = file.read() + + i = args.db_offset_bytes + + while i < len(block_data): + + # get fixed length data + header = deserialize_header(block_data[i:i+HEADER_LEN]) + footer = deserialize_footer(block_data[i+HEADER_LEN:i+header['size']], header) + i += header['size'] + block_hash, generation_hash = struct.unpack('<32s32s', block_data[i:i+64]) + i += 64 + + # get transaction hashes + num_tx_hashes = struct.unpack('I', block_data[i:i+4])[0] + i += 4 + tx_hashes = None + if args.save_tx_hashes: + tx_hashes = [] + for _ in range(num_tx_hashes): + tx_hashes.append(fmt_unpack(block_data[i:i+TX_HASH_LEN], TX_HASH_FORMAT)) + i += TX_HASH_LEN + else: + i += num_tx_hashes * TX_HASH_LEN + + # get sub cache merkle roots + root_hash_len = struct.unpack('I', block_data[i:i+4])[0] * 32 + i += 4 + merkle_roots = None + if args.save_subcache_merkle_roots: + merkle_roots = fmt_unpack(block_data[i:i+root_hash_len], SUBCACHE_MERKLE_ROOT_FORMAT) + i += root_hash_len + + block_data = { + 'header': header, + 'footer': footer, + 'block_hash': block_hash, + 'generation_hash': generation_hash, + 'tx_hashes': tx_hashes, + 'subcache_merkle_roots': merkle_roots + } + + block_store.write(packer.pack(block_data)) + + # heap ensures we insert blocks and statements into state map in the right order + heapq.heappush(blocks_to_go, (block_data['header']['height'], block_data)) + + while len(blocks_to_go) > 0 and statement_height == blocks_to_go[0][0]: + height, block_data = heapq.heappop(blocks_to_go) + state_map.insert_block(block_data) + for stmt in statements['transaction_statements']: + for receipt in stmt['receipts']: + state_map.insert_rcpt(receipt, height) + statement_store.write(packer.pack((statement_height, statements))) + try: + statement_height, statements, _ = next(statements_) + except StopIteration: + break + + print('block data extraction complete!\n') + print(f'block data written to {os.path.join(args.output,"block_data.msgpack")}') + + assert len([*statements_]) == 0, 'ERROR: statement data length does not match block length' + print('statement data extraction complete!\n') + print(f'statement data written to {os.path.join(args.output,"stmt_data.msgpack")}') + + state_map.to_msgpack(os.path.join(args.output, 'state_map.msgpack')) + + print(f'state data written to {os.path.join(args.output,"state_map.msgpack")}') + + print('exiting . . .') + + +def parse_args(argv): + parser = argparse.ArgumentParser(argv) + parser.add_argument('--input', type=str, default='data', help='directory containing block store') + parser.add_argument('--output', type=str, default='resources', help='directory to dump output') + parser.add_argument('--block_extension', type=str, default='.dat', help='extension of block files; must be unique') + parser.add_argument('--statement_extension', type=str, default='.stmt', help='extension of block files; must be unique') + parser.add_argument('--db_offset_bytes', type=int, default=DB_OFFSET_BYTES, help='padding bytes at start of storage files') + parser.add_argument('--save_tx_hashes', action='store_true', help='flag to keep full tx hashes') + parser.add_argument('--save_subcache_merkle_roots', action='store_true', help='flag to keep subcache merkle roots') + parser.add_argument('--quiet', action='store_true', help='do not show progress bars') + parser.add_argument('--stream', action='store_true', help='dramatically reduce memory footprint at the cost of performance') + + return parser.parse_args() + + +if __name__ == '__main__': + parsed_args = parse_args(sys.argv) + if parsed_args.quiet: + tqdm = functools.partial(tqdm, disable=True) + if parsed_args.stream: + main_stream(parsed_args) + else: + main(parsed_args) diff --git a/block/block/extractor/format.py b/block/block/extractor/format.py new file mode 100644 index 0000000..22fc6bc --- /dev/null +++ b/block/block/extractor/format.py @@ -0,0 +1,134 @@ +"""Fixed structure of Symbol block entity bytes for unpacking""" + +HEADER_FORMAT = { + 'size': 'I', + 'reserved_1': 'I', + 'signature': '64s', + 'signer_public_key': '32s', + 'reserved_2': 'I', + 'version': 'B', + 'network': 'B', + 'type': '2s', + 'height': 'Q', + 'timestamp': 'Q', + 'difficulty': 'Q', + 'generation_hash_proof': '80s', + 'previous_block_hash': '32s', + 'transactions_hash': '32s', + 'receipts_hash': '32s', + 'state_hash': '32s', + 'beneficiary_address': '24s', + 'fee_multiplier': 'I'} + +HEADER_LEN = 372 + +DB_OFFSET_BYTES = 800 + +FOOTER_FORMAT = { + 'reserved': 'I'} + +FOOTER_LEN = 4 + +IMPORTANCE_FOOTER_FORMAT = { + 'voting_eligible_accounts_count': 'I', + 'harvesting_eligible_accounts_count': 'Q', + 'total_voting_balance': 'Q', + 'previous_importance_block_hash': '32s'} + +IMPORTANCE_FOOTER_LEN = 52 + +TX_H_FORMAT = { + 'size': 'I', + 'reserved_1': 'I', + 'signature': '64s', + 'signer_public_key': '32s', + 'reserved_2': 'I', + 'version': 'B', + 'network': 'B', + 'type': '2s', + 'max_fee': 'Q', + 'deadline': 'Q'} + +TX_H_LEN = 128 + +EMBED_TX_H_FORMAT = { + 'size': 'I', + 'reserved_1': 'I', + 'signer_public_key': '32s', + 'reserved_2': 'I', + 'version': 'B', + 'network': 'B', + 'type': '2s'} + +EMBED_TX_H_LEN = 48 + +SUBCACHE_MERKLE_ROOT_FORMAT = { + 'account_state': '32s', + 'namespace': '32s', + 'mosaic': '32s', + 'multisig': '32s', + 'hash_lock_info': '32s', + 'secret_lock_info': '32s', + 'account_restriction': '32s', + 'mosaic_restriction': '32s', + 'metadata': '32s'} + +TX_HASH_FORMAT = { + 'entity_hash': '32s', + 'merkle_component_hash': '32s'} + +TX_HASH_LEN = 64 + +RECEIPT_SOURCE_FORMAT = { + 'primary_id': 'I', + 'secondary_id': 'I'} + +RECEIPT_SOURCE_LEN = 8 + +RECEIPT_FORMAT = { + 'size': 'I', + 'version': 'H', + 'type': 'H'} + +RECEIPT_LEN = 8 + +ADDRESS_RESOLUTION_FORMAT = { + 'primary_id': 'I', + 'secondary_id': 'I', + 'resolved': '24s'} + +ADDRESS_RESOLUTION_LEN = 32 + +MOSAIC_RESOLUTION_FORMAT = { + 'primary_id': 'I', + 'secondary_id': 'I', + 'resolved': 'Q'} + +MOSAIC_RESOLUTION_LEN = 16 + + +TX_NAME_MAP = { + b'414c': 'Account Key Link', + b'424c': 'Node Key Link', + b'4141': 'Aggregate Complete', + b'4241': 'Aggregate Bonded', + b'4143': 'Voting Key Link', + b'4243': 'Vrf Key Link', + b'414d': 'Mosaic Definition', + b'424d': 'Mosaic Supply Change', + b'414e': 'Namespace Registration', + b'424e': 'Address Alias', + b'434e': 'Mosaic Alias', + b'4144': 'Account Metadata', + b'4244': 'Mosaic Metadata', + b'4344': 'Namespace Metadata', + b'4155': 'Multisig Account Modification', + b'4148': 'Hash Lock', + b'4152': 'Secret Lock', + b'4252': 'Secret Proof', + b'4150': 'Account Address Restriction', + b'4250': 'Account Mosaic Restriction', + b'4350': 'Account Operation Restriction', + b'4151': 'Mosaic Global Restriction', + b'4251': 'Mosaic Address Restriction', + b'4154': 'Transfer'} diff --git a/block/block/extractor/process.py b/block/block/extractor/process.py new file mode 100644 index 0000000..b319a3a --- /dev/null +++ b/block/block/extractor/process.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 +"""Symbol block data processing script""" + +import argparse +import csv +import functools +import os +import sys +from binascii import hexlify + +import msgpack +import pandas as pd +from tqdm import tqdm + +from block.extractor.util import public_key_to_address + +HEADER_KEYS = [ + 'timestamp', + 'size', + 'reserved_1', + 'signature', + 'signer_public_key', + 'reserved_2', + 'version', + 'network', + 'type', + 'height', + 'difficulty', + 'generation_hash_proof', + 'previous_block_hash', + 'transactions_hash', + 'receipts_hash', + 'state_hash', + 'beneficiary_address', + 'fee_multiplier', + 'harvester', + 'statement_count', + 'tx_count', + 'total_fee'] + + +TX_KEYS = [ + 'timestamp', + 'size', + 'signature', + 'signer_public_key', + 'type', + 'max_fee', + 'deadline', + 'id', + 'height', + 'recipient_address', + 'message_size', + 'mosaics', + 'message', + 'linked_public_key', + 'link_action', + 'mosaic', + 'duration', + 'hash', + 'secret', + 'mosaic_id', + 'amount', + 'hash_algorithm', + 'restriction_type', + 'restriction_additions', + 'restriction_deletions', + 'nonce', + 'flags', + 'divisibility', + 'delta', + 'action', + 'source_address', + 'min_removal_delta', + 'min_approval_delta', + 'address_additions', + 'address_deletions', + 'registration_type', + 'name_size', + 'name', + 'parent_id', + 'namespace_id', + 'alias_action', + 'proof_size', + 'proof', + 'target_address', + 'scoped_metadata_key', + 'value_size_delta', + 'value_size', + 'value', + 'address', + 'target_mosaic_id', + 'start_point', + 'end_point', + 'target_namespace_id', +] + +ADDR_FIELDS = [ + 'recipient_address', + 'source_address', + 'address_additions', + 'address_deletions', + 'target_address', + 'address', +] + + +PUBKEY_FIELDS = [ + 'signer_public_key', + 'linked_public_key', +] + + +TX_KEYS_TO_DROP = [ + 'payload', + 'version', + 'network', + 'reserved_1', + 'reserved_2', + 'transfer_transaction_body_reserved_2', + 'transfer_transaction_body_reserved_1', + 'multisig_account_modificaion_transacion_body_reserved_1', + 'account_restriction_transaction_body_reserved_1', + 'mosaics_count', + 'address_additions_count', + 'address_deletions_count', + 'restriction_additions_count', + 'restriction_deletions_count' +] + + +def get_block_stats(block): + """Extract summary data from a block and flatten for tabular manipulation""" + data = block['header'].copy() + data['statement_count'] = block['footer']['statement_count'] + data['tx_count'] = block['footer']['tx_count'] + data['total_fee'] = block['footer']['total_fee'] + for key, value in data.items(): + if isinstance(value, bytes): + data[key] = value.decode('utf-8') + return data + + +def get_tx_stats(block): + """Extract transaction data from a block and flatten for tabular manipulation""" + data = [] + header = block['header'] + + # handle transactions + for transaction in block['footer']['transactions']: + if transaction['type'] in [b'4141', b'4241']: # aggregate transaction, append subtx instead + for sub_transaction in transaction['payload']['embedded_transactions']: + data.append(sub_transaction.copy()) + else: + data.append(transaction.copy()) + + # determine whether IDs are being handled appropriately; have some entry for headers of aggregate transactions? + for transaction in data: + transaction['height'] = header['height'] + transaction['timestamp'] = header['timestamp'] + transaction.update(transaction['payload']) + for key, value in list(transaction.items()): + if key in TX_KEYS_TO_DROP: + del transaction[key] + elif isinstance(value, bytes): + try: + transaction[key] = value.decode('utf-8') + except UnicodeDecodeError: + transaction[key] = hexlify(value).decode('utf-8') + elif isinstance(value, list): + transaction[key] = str(value) + + return data + + +def guarded_convert(pubkey_string): + """Convert address conditional on a check to ensure valid public key format""" + if isinstance(pubkey_string, str) and len(pubkey_string) == 64: + return public_key_to_address(bytes.fromhex(pubkey_string)) + return pubkey_string + + +def filter_transactions(transaction_df, address=None, transaction_types=None, start_datetime='1900-01-01', end_datetime='2200-01-01'): + """Filter processed transactions based on dates, tx types, and address""" + + start_datetime = pd.to_datetime(start_datetime) + end_datetime = pd.to_datetime(end_datetime) + + transaction_df = transaction_df.loc[start_datetime:end_datetime] + if transaction_df.empty: + return transaction_df + + filter_key = None + + # filter based on all address/public key fields for completeness + if address is not None: + filter_key = pd.Series(False, index=transaction_df.index) + for field in PUBKEY_FIELDS: + filter_key = filter_key | transaction_df[field].apply(lambda x: guarded_convert(x) == address) + for field in ADDR_FIELDS: + filter_key = filter_key | (transaction_df[field] == address) + + if transaction_types is not None: + if filter_key is None: + filter_key = pd.Series(True, index=transaction_df.index) + filter_key = filter_key & transaction_df['type'].isin(transaction_types) + + return transaction_df[filter_key] + + +def process_tx_file(transaction_file, address=None, transaction_types=None, start_datetime='1900-01-01', end_datetime='2200-01-01'): + """Read a processed transaction file, then stream chunks through filter""" + transaction_chunks = pd.read_csv(transaction_file, index_col=0, parse_dates=True, chunksize=10000) + filtered = [] + for chunk in transaction_chunks: + filtered.append(filter_transactions(chunk, address, transaction_types, start_datetime, end_datetime)) + return pd.concat(filtered, axis=0) + + +def decode_msgpack(packed_data): + """Recursively parse msgpack data to decode dict keys""" + decoded_data = packed_data + if isinstance(packed_data, dict): + decoded_data = {} + for key, value in packed_data.items(): + decoded_data[key.decode('utf-8')] = decode_msgpack(value) + elif isinstance(packed_data, list): + decoded_data = [] + for value in packed_data: + decoded_data.append(decode_msgpack(value)) + return decoded_data + + +def main(args): + # pylint: disable=too-many-locals, consider-using-with + + header_writer = csv.DictWriter( + open(os.path.join(args.output, args.header_save_path), 'a' if args.append else 'w', encoding='utf8'), + HEADER_KEYS, + extrasaction='ignore', + escapechar='\\', + quoting=csv.QUOTE_MINIMAL) + + transaction_writer = csv.DictWriter( + open(os.path.join(args.output, args.tx_save_path), 'a' if args.append else 'w', encoding='utf8'), + TX_KEYS, + extrasaction='ignore', + escapechar='\\', + quoting=csv.QUOTE_MINIMAL) + + # build a raw bytes unpacker; unicode errors ignored as tx serialization is not always valid unicode text + unpacker = msgpack.Unpacker(open(args.input, 'rb'), unicode_errors=None, raw=True) + + final_height = 0 + if args.append: + old_headers = pd.read_csv(os.path.join(args.output, args.header_save_path), chunksize=1024) + while True: + try: + chunk = next(old_headers) + except StopIteration: # we have found the end of the file + final_height = chunk.iloc[-1]['height'] + break + for _ in range(final_height): + unpacker.skip() + else: + header_writer.writeheader() + transaction_writer.writeheader() + + for block in tqdm(unpacker, total=args.total-final_height): + block = decode_msgpack(block) + + header = get_block_stats(block) + header['timestamp'] = pd.to_datetime(header['timestamp'], origin=pd.to_datetime('2021-03-16 00:06:25'), unit='ms') + header_writer.writerow(header) + + transactions = get_tx_stats(block) + for transaction in transactions: + transaction['timestamp'] = pd.to_datetime(transaction['timestamp'], origin=pd.to_datetime('2021-03-16 00:06:25'), unit='ms') + transaction_writer.writerows(transactions) + + +def parse_args(argv): + parser = argparse.ArgumentParser(argv) + parser.add_argument('--input', type=str, default='resources/block_data.msgpack', help='file containing extracted block data') + parser.add_argument('--output', type=str, default='resources', help='directory to dump output') + parser.add_argument('--append', action='store_true', help='add to existing data instead of rebuilding files from scratch') + parser.add_argument('--header_save_path', type=str, default='block_headers.csv', help='file to write the header table to') + parser.add_argument('--tx_save_path', type=str, default='transactions.csv', help='file to write the transaction table to') + parser.add_argument('--total', type=float, default=float('inf'), help='total number of blocks if known (gives accurate progress stats)') + parser.add_argument('--quiet', action='store_true', help='do not show progress bars') + + return parser.parse_args() + + +if __name__ == '__main__': + parsed_args = parse_args(sys.argv) + if parsed_args.quiet: + tqdm = functools.partial(tqdm, disable=True) + main(parsed_args) diff --git a/block/block/extractor/state.py b/block/block/extractor/state.py new file mode 100644 index 0000000..3f3bc33 --- /dev/null +++ b/block/block/extractor/state.py @@ -0,0 +1,389 @@ +"""Symbol chain state representation module""" + +from binascii import unhexlify +from collections import defaultdict +from functools import partial + +import msgpack +import networkx as nx +import numpy as np +import pandas as pd + +from block.extractor.util import public_key_to_address + + +class XYMStateMap(): + """Efficient, mutable representation of XYM network state + + Parameters + ---------- + state_map: dict, optional + Pre-existing state map to initialize internal state + + Attributes + ---------- + state_map: defaultdict + Dict mapping addresses to recorded quantities + harvester_mosaics: list[str] + List of string alias(es) for harvester mosaic + + """ + + def __init__(self, state_map=None, account_map=None, mosaic_map=None): + + if state_map is None: + state_map = {} + + if account_map is None: + account_map = {} + + if mosaic_map is None: + mosaic_map = {} + + if len(state_map): + state_map = {key: { + 'xym_balance': defaultdict(lambda: 0, val['xym_balance']), + 'mosaics': defaultdict(list, val['mosaics']), + 'harvest_fees': defaultdict(lambda: 0, val['harvest_fees']), + 'delegation_requests': defaultdict(list, val['delegation_requests']), + 'vrf_key_link': defaultdict(list, val['vrf_key_link']), + 'node_key_link': defaultdict(list, val['node_key_link']), + 'account_key_link': defaultdict(list, val['account_key_link']), + 'harvested': defaultdict(list, val['harvested']), + 'delegated': defaultdict(list, val['delegated']) + } for key, val in state_map.items()} + + self._state_map = defaultdict(lambda: { + 'xym_balance': defaultdict(lambda: 0), + 'mosaics': defaultdict(list), + 'harvest_fees': defaultdict(lambda: 0), + 'delegation_requests': defaultdict(list), + 'vrf_key_link': defaultdict(list), + 'node_key_link': defaultdict(list), + 'account_key_link': defaultdict(list), + 'harvested': defaultdict(list), + 'delegated': defaultdict(list) + }, state_map) + + # add invertible account map? + self._account_map = account_map + + self._mosiac_map = mosaic_map + + self._height_ts_map = {} + + self.harvester_mosaics = ['0x6bed913fa20223f8', '0xe74b99ba41f4afee'] # only care about XYM for now, hardcoded alias + self.node_color = 'CornflowerBlue' + self.harvester_color = 'LightBlue' + + def __getitem__(self, addr): + return self._state_map[addr] + + @classmethod + def read_msgpack(cls, msgpack_path): + """Read data from a mesgpack binary blob and build a state map""" + if isinstance(msgpack_path, str): + with open(msgpack_path, 'rb') as file: + state_map, account_map, mosaic_map = msgpack.unpack(file, unicode_errors=None, raw=False) + else: + raise TypeError(f'Unrecognized type {type(msgpack_path)} for read_msgpack, path str') + + return cls(state_map=state_map, account_map=account_map, mosaic_map=mosaic_map) + + def to_dict(self): + """Convert internal state map to serializable dictionary""" + sm_dict = dict(self._state_map) + for key, val in sm_dict.items(): + sm_dict[key] = dict(val) + for subkey, subval in val.items(): + sm_dict[key][subkey] = dict(subval) + return sm_dict + + def to_msgpack(self, msgpack_path): + """Produce serialized blob with msgpack""" + with open(msgpack_path, 'wb') as file: + file.write(msgpack.packb((self.to_dict(), self._account_map, self._mosiac_map))) + + def keys(self): + """Produce a view of all addresses in the state map""" + return self._state_map.keys() + + def values(self): + """Produce a view of all address data in the state map""" + return self._state_map.values() + + def items(self): + """Produce a list of tuples containing addresses and data""" + return self._state_map.items() + + def insert_txn(self, txn, height, fee_multiplier): + # pylint: disable=too-many-locals, too-many-branches + + """Insert a transaction into the state map and record resulting changes + + Parameters + ---------- + txn: dict + Deserialized transaction + height: int + Height of transaction + fee_multiplier: float + Fee multiplier for transaction's containing block + + """ + + # need to handle flows for *all* mosaics, not just XYM + address = public_key_to_address(unhexlify(txn['signer_public_key'])) + + if txn['type'] == b'4154': # transfer txn + if len(txn['payload']['message']) and txn['payload']['message'][0] == 0xfe: + self._state_map[address]['delegation_requests'][txn['payload']['recipient_address']].append(height) + elif txn['payload']['mosaics_count'] > 0: + for mosaic in txn['payload']['mosaics']: + if hex(mosaic['mosaic_id']) in self.harvester_mosaics: + self._state_map[address]['xym_balance'][height] -= mosaic['amount'] + self._state_map[txn['payload']['recipient_address']]['xym_balance'][height] += mosaic['amount'] + else: + self._state_map[address]['mosaics'][hex(mosaic['mosaic_id'])].append((height, -mosaic['amount'])) + self._state_map[txn['payload']['recipient_address']]['mosaics'][hex(mosaic['mosaic_id'])].append( + (height, mosaic['amount'])) + + elif txn['type'] in [b'4243', b'424c', b'414c']: # key link txn + if txn['type'] == b'4243': + link_key = 'vrf_key_link' + elif txn['type'] == b'424c': + link_key = 'node_key_link' + elif txn['type'] == b'414c': + link_key = 'account_key_link' + self._account_map[public_key_to_address(txn['payload']['linked_public_key'])] = address + if txn['payload']['link_action'] == 1: + self._state_map[address][link_key][public_key_to_address(txn['payload']['linked_public_key'])].append([height, np.inf]) + else: + self._state_map[address][link_key][public_key_to_address(txn['payload']['linked_public_key'])][-1][1] = height + + elif txn['type'] in [b'434d']: + if hex(txn['payload']['mosaic_id']) in self.harvester_mosaics: + self._state_map[txn['payload']['source_address']]['xym_balance'][height] -= txn['payload']['amount'] + + elif txn['type'] == b'0x434e': # mosaic alias + if txn['payload']['alias_action'] == 0x1: + self._mosiac_map[txn['payload']['namespace_id']] = txn['payload']['mosaic_id'] + else: + self._mosiac_map[txn['payload']['namespace_id']] = None + + # need to add address alias resolution + + elif txn['type'] in [b'4141', b'4241']: # aggregate txn + for sub_txn in txn['payload']['embedded_transactions']: + self.insert_txn(sub_txn, height, None) + + if fee_multiplier is not None: # handle fees + self._state_map[address]['xym_balance'][height] -= min(txn['max_fee'], txn['size']*fee_multiplier) + + def insert_rcpt(self, rcpt, height): + """Insert a receipt into the state map and record resulting changes + + Parameters + ---------- + rcpt: dict + Deserialized receipt + height: int + Height of receipt + + """ + + if rcpt['type'] in [0x124D, 0x134E]: # rental fee receipts + if hex(rcpt['payload']['mosaic_id']) in ['0x6bed913fa20223f8', '0xe74b99ba41f4afee']: + self._state_map[rcpt['payload']['sender_address']]['xym_balance'][height] -= rcpt['payload']['amount'] + self._state_map[rcpt['payload']['recipient_address']]['xym_balance'][height] += rcpt['payload']['amount'] + + elif rcpt['type'] in [0x2143, 0x2248, 0x2348, 0x2252, 0x2352]: # balance change receipts (credit) + self._state_map[rcpt['payload']['target_address']]['xym_balance'][height] += rcpt['payload']['amount'] + if rcpt['type'] == 0x2143: # harvest fee + self._state_map[rcpt['payload']['target_address']]['harvest_fees'][height] += rcpt['payload']['amount'] + + elif rcpt['type'] in [0x3148, 0x3152]: # balance change receipts (debit) + self._state_map[rcpt['payload']['target_address']]['xym_balance'][height] -= rcpt['payload']['amount'] + + if rcpt['type'] == 0xE143: # aggregate receipts + for sub_rcpt in rcpt['receipts']: + self.insert_rcpt(sub_rcpt, height) + + def insert_block(self, block): + """Insert a block into the state map and record resulting changes + + Parameters + ---------- + block: dict + Deserialized block + + """ + header = block['header'] + height = header['height'] + self._height_ts_map[height] = pd.to_datetime(header['timestamp'], origin=pd.to_datetime('2021-03-16 00:06:25'), unit='ms') + + # handle harvester information + harvester = header['harvester'] + if harvester in self._account_map: + harvester = self._account_map[harvester] + + self._state_map[harvester]['harvested'][height] = header['beneficiary_address'] + if harvester != header['beneficiary_address']: + self._state_map[header['beneficiary_address']]['delegated'][height] = harvester + + # handle transactions + for txn in block['footer']['transactions']: + self.insert_txn(txn, height, header['fee_multiplier']) + + def get_balance_series(self, address, freq=None): + """Produce a time-series representing xym balance for a given address + + Parameters + ---------- + address: str + Size (in XYM) below which harvesters are ignored + freq: str, optional + Frequency at which balance series should be resampled + + Returns + ------- + balance_series: pandas.Series + Series with balances in XYM and a datetime index + """ + b_series = pd.Series({self._height_ts_map[k]: v for k, v in self[address]['xym_balance'].items()}, dtype=float) + if freq is not None: + b_series = b_series.resample(freq).sum() + return b_series / 1000000 # divide by one million to get units of XYM + + def get_harvester_graph(self, height=np.inf, min_harvester_size=10000, min_node_size=10000, track_remote=False): + # pylint: disable=too-many-locals + """Produce a graph representing harvester-node relationships for a range of network heights + + Parameters + ---------- + height: int + Height at which to represent harvester connections + min_harvester_size: int, optional + Size (in XYM) below which harvesters are ignored + min_node_size: int, optional + Size (in XYM, representing total delegated harvester balance) below which nodes are ignored + track_remote: bool + Determines whether remote harvesters (i.e. non-delegated harvesters) should be tracked + + Returns + ------- + harvester_graph: networkx.DiGraph + Graph in which addresses are represented as nodes and edges represent a delegated harvesting relationship + """ + harvester_map = {} + node_map = defaultdict(list) + + for key, val in self._state_map.items(): + balance = sum([x for h, x in val['xym_balance'].items() if h <= height]) / 1e6 + if balance >= min_harvester_size: + curr_node = None + link_start = 0 + for addr, links in val['node_key_link'].items(): + for link in links: + if link[0] <= height <= link[1]: + curr_node = addr + link_start = link[0] + break + link_start = max(link_start, link[1]) + if curr_node is not None: + break + + num_h = sum(map(partial(lambda s, h, x: int(s <= x <= h), link_start, height), val['harvested'].keys())) + min_height = min(val['xym_balance'].keys()) + + if curr_node is None: # not currently a delegated harvester, no node key link + if track_remote: + if num_h > 0: + harvester_map[key] = { + 'type': 'remote_harvester', + 'color': self.harvester_color, + 'balance': balance, 'size': np.sqrt(balance), + 'link_age': height-link_start, + 'min_height': min_height} + # node_map[k].append((k, balance, height-link_start)) + else: + harvester_map[key] = { + 'type': 'delegated_harvester', + 'color': self.harvester_color, + 'balance': balance, + 'size': np.sqrt(balance), + 'link_age': height-link_start, + 'min_height': min_height} + node_map[curr_node].append((key, balance, height-link_start, link_start)) + + graph = nx.DiGraph() + graph.add_nodes_from(harvester_map.items()) + graph.add_nodes_from([ + (k, { + 'type': 'node', + 'color': self.node_color, + 'balance': sum([x[1] for x in v]), + 'size': np.sqrt(sum([x[1] for x in v])), + 'link_age': np.mean([[x[2] for x in v]]), + 'min_height': min([x[3] for x in v])}) + for k, v in node_map.items() + if sum([x[1] for x in v]) >= min_node_size]) + + for node, delegates in node_map.items(): + graph.add_edges_from([(node, d[0], {'link_age': d[2]}) for d in delegates]) + + return graph + + def get_harvester_bubbles(self, min_height=0, max_height=np.inf, min_harvester_size=1, min_delegate_size=1): + """Produce a bubble chart representing harvester-node relationships for a range of network heights + + Parameters + ---------- + min_height: int + Height at which to begin recording harvesting signatures + max_height: int + Height at which to stop recording harvesting signatures + min_harvester_size: int, optional + min_delegate_size: int, optional + + Returns + ------- + bubble_graph: networkx.Graph + Graph in which addresses are represented as nodes and parent attributes represent a delegated harvesting relationship + """ + harvester_map = defaultdict(lambda: []) + + for key, val in self._state_map.items(): + for height, addr in val['harvested'].items(): + if min_height <= height <= max_height: + harvester_map[key].append(addr) + + delegate_map = defaultdict(lambda: []) + + for key, val in self._state_map.items(): + for height, addr in val['delegated'].items(): + if min_height <= height <= max_height: + delegate_map[key].append(addr) + + harvester_size_map = {k: { + 'size': len(v), + 'color': self.node_color, + 'type': 'node'} + for k, v in harvester_map.items() if len(v) >= min_harvester_size} + + delegate_size_map = {k: { + 'size': len(v), + 'color': self.harvester_color, + 'parent': max(set(v), key=v.count), + 'type': 'delegate'} + for k, v in delegate_map.items() if len(v) >= min_delegate_size} + + graph = nx.Graph() + graph.add_nodes_from(harvester_size_map.items()) + graph.add_nodes_from(delegate_size_map.items()) + + return graph + + +if __name__ == '__main__': + print('Nothing to do here; if you need to build a state map use extract.py!') diff --git a/block/block/extractor/statements.py b/block/block/extractor/statements.py new file mode 100644 index 0000000..988865f --- /dev/null +++ b/block/block/extractor/statements.py @@ -0,0 +1,349 @@ +import glob +import os +import re +import struct + +from tqdm import tqdm + +from block.extractor.format import (ADDRESS_RESOLUTION_FORMAT, ADDRESS_RESOLUTION_LEN, DB_OFFSET_BYTES, MOSAIC_RESOLUTION_FORMAT, + MOSAIC_RESOLUTION_LEN, RECEIPT_FORMAT, RECEIPT_LEN, RECEIPT_SOURCE_FORMAT, RECEIPT_SOURCE_LEN) +from block.extractor.util import encode_address, fmt_unpack + + +def deserialize_receipt_payload(receipt_data, receipt_type): + # pylint: disable=too-many-statements, too-many-branches + + """Produce a nested python dict from a raw receipt payload + + Parameters + ---------- + receipt_data : bytes + Byte array containing serialized receipt payload + receipt_type: bytes + Byte array containing the hex representation of the type field from the receipt header + + Returns + ------- + receipt: dict + Dict containing receipt payload field keys and primitive or bytes values + + """ + + # Reserved + if receipt_type == 0x0000: # reserved receipt + payload = None + + # Balance Transfer + elif receipt_type == 0x124D: # mosaic rental fee receipt + schema = { + 'mosaic_id': 'Q', + 'amount': 'Q', + 'sender_address': '24s', + 'recipient_address': '24s' + } + payload = fmt_unpack(receipt_data, schema) + payload['sender_address'] = encode_address(payload['sender_address']) + payload['recipient_address'] = encode_address(payload['recipient_address']) + + elif receipt_type == 0x134E: # namespace rental fee receipt + schema = { + 'mosaic_id': 'Q', + 'amount': 'Q', + 'sender_address': '24s', + 'recipient_address': '24s' + } + payload = fmt_unpack(receipt_data, schema) + payload['sender_address'] = encode_address(payload['sender_address']) + payload['recipient_address'] = encode_address(payload['recipient_address']) + + # Balance Change (Credit) + elif receipt_type == 0x2143: # harvest fee receipt + schema = { + 'mosaic_id': 'Q', + 'amount': 'Q', + 'target_address': '24s', + } + payload = fmt_unpack(receipt_data, schema) + payload['target_address'] = encode_address(payload['target_address']) + + elif receipt_type == 0x2248: # lock hash completed receipt + schema = { + 'mosaic_id': 'Q', + 'amount': 'Q', + 'target_address': '24s', + } + payload = fmt_unpack(receipt_data, schema) + payload['target_address'] = encode_address(payload['target_address']) + + elif receipt_type == 0x2348: # lock hash expired receipt + schema = { + 'mosaic_id': 'Q', + 'amount': 'Q', + 'target_address': '24s', + } + payload = fmt_unpack(receipt_data, schema) + payload['target_address'] = encode_address(payload['target_address']) + + elif receipt_type == 0x2252: # lock secret completed receipt + schema = { + 'mosaic_id': 'Q', + 'amount': 'Q', + 'target_address': '24s', + } + payload = fmt_unpack(receipt_data, schema) + payload['target_address'] = encode_address(payload['target_address']) + + elif receipt_type == 0x2352: # lock secret expired receipt + schema = { + 'mosaic_id': 'Q', + 'amount': 'Q', + 'target_address': '24s', + } + payload = fmt_unpack(receipt_data, schema) + payload['target_address'] = encode_address(payload['target_address']) + + # Balance Change (Debit) + elif receipt_type == 0x3148: # lock hash created receipt + schema = { + 'mosaic_id': 'Q', + 'amount': 'Q', + 'target_address': '24s', + } + payload = fmt_unpack(receipt_data, schema) + payload['target_address'] = encode_address(payload['target_address']) + + elif receipt_type == 0x3152: # lock secret created receipt + schema = { + 'mosaic_id': 'Q', + 'amount': 'Q', + 'target_address': '24s', + } + payload = fmt_unpack(receipt_data, schema) + payload['target_address'] = encode_address(payload['target_address']) + + # Artifact Expiry + elif receipt_type == 0x414D: # mosaic expired receipt + schema = { + 'mosaic_id': 'Q' + } + payload = fmt_unpack(receipt_data, schema) + + elif receipt_type == 0x414E: # namespace expired receipt + schema = { + 'mosaic_id': 'Q' + } + payload = fmt_unpack(receipt_data, schema) + + elif receipt_type == 0x424E: # namespace deleted receipt + schema = { + 'mosaic_id': 'Q' + } + payload = fmt_unpack(receipt_data, schema) + + # Inflation + elif receipt_type == 0x5143: # inflation receipt + schema = { + 'mosaic_id': 'Q', + 'amount': 'Q', + } + payload = fmt_unpack(receipt_data, schema) + + # Transaction Statement + elif receipt_type == 0xE143: # transaction group receipt + receipt_source = fmt_unpack(receipt_data[:RECEIPT_SOURCE_LEN], RECEIPT_SOURCE_FORMAT) + i = RECEIPT_SOURCE_LEN + + receipt_count = struct.unpack('