From 863c8dd2c1d0960cd10158cc03761b54c0084d1e Mon Sep 17 00:00:00 2001 From: Jaguar0625 Date: Sun, 27 Feb 2022 21:14:18 -0400 Subject: [PATCH] [history]: downloader improvements for better lukkatax compatability 1. update downloader to include block hashes for harvests and full addresses 2. include full addresses and ISO timestamps in default merger output 3. add splitter tool for breaking up batch downloaded data --- client/NemClient.py | 24 +++++++++++---- client/SymbolClient.py | 25 +++++++++------- client/TimeoutHTTPAdapter.py | 6 ++-- client/pod.py | 1 + history/downloader.py | 4 +-- history/merger.py | 20 +++++++++++-- history/splitter.py | 57 ++++++++++++++++++++++++++++++++++++ 7 files changed, 114 insertions(+), 23 deletions(-) create mode 100644 history/splitter.py diff --git a/client/NemClient.py b/client/NemClient.py index 53f8bec..f7d0e1e 100644 --- a/client/NemClient.py +++ b/client/NemClient.py @@ -1,4 +1,4 @@ -from symbolchain.core.CryptoTypes import PublicKey +from symbolchain.core.CryptoTypes import Hash256, PublicKey from symbolchain.core.nem.Network import Address, Network from symbolchain.core.nem.NetworkTimestamp import NetworkTimestamp @@ -14,8 +14,10 @@ class AccountInfo: - def __init__(self): - self.address = None + def __init__(self, address): + self.address = address + self.address_name = address + self.vested_balance = 0 self.balance = 0 self.public_key = None @@ -43,6 +45,16 @@ def get_harvester_signer_public_key(self, height): json_response = self._post_json('block/at/public', {'height': height}) return PublicKey(json_response['signer']) + def get_block_hash(self, height): + # due to limitation in NEM REST API, we need to query next block to get current block hash + # as a result, newest block hash will be unknown + json_response = self._post_json('block/at/public', {'height': height+1}) + + if 'error' in json_response: + raise Exception(str(json_response)) + + return Hash256(json_response['prevBlockHash']['data']) if 'prevBlockHash' in json_response else None + def get_node_info(self): json_response = self._get_json('node/info') return json_response @@ -58,8 +70,7 @@ def get_account_info(self, address, forwarded=False): json_account = json_response['account'] json_meta = json_response['meta'] - account_info = AccountInfo() - account_info.address = Address(json_account['address']) + account_info = AccountInfo(Address(json_account['address'])) account_info.vested_balance = json_account['vestedBalance'] / MICROXEM_PER_XEM account_info.balance = json_account['balance'] / MICROXEM_PER_XEM account_info.public_key = PublicKey(json_account['publicKey']) if json_account['publicKey'] else None @@ -85,6 +96,7 @@ def get_harvests(self, address, start_id=None): snapshot.amount = int(json_harvest['totalFee']) / MICROXEM_PER_XEM snapshot.height = int(json_harvest['height']) snapshot.collation_id = int(json_harvest['id']) + snapshot.hash = self.get_block_hash(snapshot.height) snapshots.append(snapshot) return snapshots @@ -110,7 +122,7 @@ def get_transfers(self, address, start_id=None): snapshot.fee_paid = fee_microxem / MICROXEM_PER_XEM snapshot.height = int(json_meta['height']) snapshot.collation_id = json_meta['id'] - snapshot.hash = json_meta['hash']['data'] + snapshot.hash = Hash256(json_meta['hash']['data']) snapshots.append(snapshot) return snapshots diff --git a/client/SymbolClient.py b/client/SymbolClient.py index b547025..6744b3a 100644 --- a/client/SymbolClient.py +++ b/client/SymbolClient.py @@ -20,7 +20,8 @@ XYM_NETWORK_MOSAIC_IDS_MAP = { 0x68: '6BED913FA20223F8', - 0x98: 'E74B99BA41F4AFEE' + 0x98: '3A8416DB2D53B6C8', + 'alias': 'E74B99BA41F4AFEE' } MICROXYM_PER_XYM = 1000000.0 @@ -40,8 +41,10 @@ class AccountInfo: - def __init__(self): - self.address = None + def __init__(self, address): + self.address = address + self.address_name = address + self.balance = 0 self.public_key = None self.importance = 0.0 @@ -198,8 +201,7 @@ def get_richlist_account_infos(self, page_number, page_size, mosaic_id): @staticmethod def _parse_account_info(json_account, mosaic_id=None): - account_info = AccountInfo() - account_info.address = Address(unhexlify(json_account['address'])) + account_info = AccountInfo(Address(unhexlify(json_account['address']))) if not mosaic_id: mosaic_id = XYM_NETWORK_MOSAIC_IDS_MAP[account_info.address.bytes[0]] @@ -252,7 +254,7 @@ def get_harvests(self, address, start_id=None): snapshot = TransactionSnapshot(address, 'harvest') snapshot.height = int(json_statement['height']) - snapshot.timestamp = self._get_block_time_and_multiplier(snapshot.height)[0] + (snapshot.timestamp, _, snapshot.hash) = self._get_block_time_and_multiplier_and_hash(snapshot.height) for json_receipt in json_statement['receipts']: receipt_type = json_receipt['type'] @@ -279,7 +281,7 @@ def get_transfers(self, address, start_id=None): snapshot = TransactionSnapshot(address, 'transfer') snapshot.height = int(json_meta['height']) - (snapshot.timestamp, fee_multiplier) = self._get_block_time_and_multiplier(snapshot.height) + (snapshot.timestamp, fee_multiplier, _) = self._get_block_time_and_multiplier_and_hash(snapshot.height) snapshot.hash = json_meta['hash'] (amount_microxym, fee_microxym) = self._process_xym_changes(snapshot, json_transaction, snapshot.hash, fee_multiplier) @@ -328,7 +330,7 @@ def _calculate_transfer_amount(address, json_transactions): direction = 1 for json_mosaic in json_transaction['mosaics']: - if json_mosaic['id'] == XYM_NETWORK_MOSAIC_IDS_MAP[Address(address).bytes[0]]: + if json_mosaic['id'] in (XYM_NETWORK_MOSAIC_IDS_MAP[Address(address).bytes[0]], XYM_NETWORK_MOSAIC_IDS_MAP['alias']): amount_microxym += int(json_mosaic['amount']) * direction return amount_microxym @@ -345,10 +347,13 @@ def _is_recipient(address, json_transaction): def _is_aggregate(transaction_type): return any(TRANSACTION_TYPES[name] == transaction_type for name in ['aggregate_complete', 'aggregate_bonded']) - def _get_block_time_and_multiplier(self, height): + def _get_block_time_and_multiplier_and_hash(self, height): json_block_and_meta = self._get_json(f'blocks/{height}') json_block = json_block_and_meta['block'] - return (NetworkTimestamp(int(json_block['timestamp'])).to_datetime(), json_block['feeMultiplier']) + return ( + NetworkTimestamp(int(json_block['timestamp'])).to_datetime(), + json_block['feeMultiplier'], + Hash256(json_block_and_meta['meta']['hash'])) def _get_page(self, rest_path, start_id): return self._get_json(rest_path if not start_id else f'{rest_path}&offset={start_id}') diff --git a/client/TimeoutHTTPAdapter.py b/client/TimeoutHTTPAdapter.py index 77448c4..7a16039 100644 --- a/client/TimeoutHTTPAdapter.py +++ b/client/TimeoutHTTPAdapter.py @@ -21,11 +21,11 @@ def send(self, request, **kwargs): def create_http_session(**kwargs): retries = Retry( - total=kwargs.get('retry_count', 7), + total=kwargs.get('retry_count', 20), backoff_factor=1, status_forcelist=(429, 500, 502, 503, 504), - allowed_methods=['GET'] if not kwargs.get('retry_post', False) else ['GET', 'POST']) - adapter = TimeoutHTTPAdapter(max_retries=retries, timeout=kwargs.get('timeout', 10)) + allowed_methods=['GET', 'POST'] if not kwargs.get('retry_post', False) else ['GET', 'POST']) + adapter = TimeoutHTTPAdapter(max_retries=retries, timeout=kwargs.get('timeout', 30)) http = requests.Session() http.mount('http://', adapter) diff --git a/client/pod.py b/client/pod.py index 12f5f00..7b41003 100644 --- a/client/pod.py +++ b/client/pod.py @@ -22,6 +22,7 @@ class TransactionSnapshot(): def __init__(self, address, tag): self.address = address + self.address_name = address self.tag = tag self.timestamp = None diff --git a/history/downloader.py b/history/downloader.py index 57ec768..97d2513 100644 --- a/history/downloader.py +++ b/history/downloader.py @@ -19,7 +19,7 @@ def download(self, start_date, end_date, output_filepath): log.info(f'[{output_filepath}] downloading chain activity from {start_date} to {end_date}') with open(output_filepath, 'wt', encoding='utf8') as outfile: - column_names = ['timestamp', 'amount', 'fee_paid', 'height', 'address', 'tag', 'comments', 'hash'] + column_names = ['timestamp', 'amount', 'fee_paid', 'height', 'address', 'address_name', 'tag', 'comments', 'hash'] csv_writer = csv.DictWriter(outfile, column_names, extrasaction='ignore') csv_writer.writeheader() @@ -50,7 +50,7 @@ def _download_batch(self, mode, start_date, end_date, output_filepath, csv_write if snapshot.timestamp.date() > end_date: continue - snapshot.address = self.account_descriptor.name + snapshot.address_name = self.account_descriptor.name csv_writer.writerow(vars(snapshot)) num_rows_written += 1 diff --git a/history/merger.py b/history/merger.py index 7b90af9..918913a 100644 --- a/history/merger.py +++ b/history/merger.py @@ -1,5 +1,6 @@ import argparse import csv +import datetime from pathlib import Path from zenlog import log @@ -9,10 +10,11 @@ class TransactionsLoader(): - def __init__(self, directory, ticker, currency): + def __init__(self, directory, ticker, currency, human_readable): self.directory = Path(directory) self.ticker = ticker self.currency = currency + self.human_readable = human_readable self.price_map = {} self.transaction_snapshots = [] @@ -40,6 +42,8 @@ def load(self, filename): for row in csv_reader: snapshot = client.pod.AugmentedTransactionSnapshot() snapshot.__dict__.update(row) + raw_timestamp = snapshot.timestamp + snapshot.fix_types() price_snapshot = self.price_map[snapshot.timestamp.date()] @@ -47,6 +51,13 @@ def load(self, filename): self._fixup_comments(snapshot, price_snapshot) self._fixup_tag(snapshot) + + if self.human_readable: + snapshot.address = snapshot.address_name + else: + snapshot.timestamp = datetime.datetime.fromisoformat(raw_timestamp).isoformat() + snapshot.timestamp = snapshot.timestamp.replace('+00:00', 'Z') + self.transaction_snapshots.append(snapshot) @staticmethod @@ -94,6 +105,10 @@ def save(self, filename): f'{self.ticker}/{self.currency}' ] + field_names[6:] + if not self.human_readable: + field_names += ['address_name'] + column_headers += ['address_name'] + csv_writer = csv.DictWriter(outfile, field_names, extrasaction='ignore') csv_writer.writerow(dict(zip(field_names, column_headers))) @@ -107,9 +122,10 @@ def main(): parser.add_argument('--output', help='output filename', required=True) parser.add_argument('--ticker', help='ticker symbol', default='nem') parser.add_argument('--currency', help='fiat currency', default='usd') + parser.add_argument('--human-readable', help='outputs a more human readable format', action='store_true') args = parser.parse_args() - transactions_loader = TransactionsLoader(args.input, args.ticker, args.currency) + transactions_loader = TransactionsLoader(args.input, args.ticker, args.currency, args.human_readable) transactions_loader.load_price_map() for filepath in Path(args.input).iterdir(): diff --git a/history/splitter.py b/history/splitter.py new file mode 100644 index 0000000..d9d27ea --- /dev/null +++ b/history/splitter.py @@ -0,0 +1,57 @@ +import argparse +import datetime +import os +from pathlib import Path + +from zenlog import log + + +def main(): + parser = argparse.ArgumentParser( + description='filter csv file by date range', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--input', help='input directory', required=True) + parser.add_argument('--output', help='output directory', required=True) + parser.add_argument('--start-date', help='start date', required=True) + parser.add_argument('--end-date', help='end date', default=datetime.date.today().isoformat()) + args = parser.parse_args() + + input_directory = Path(args.input) + if not input_directory.exists(): + log.warn(f'input directory \'{args.input}\' does not exist') + return + + output_directory = Path(args.output) + if output_directory.exists(): + log.warn(f'output directory \'{args.output}\' already exists') + return + + log.info('starting processing!') + + output_directory.mkdir(parents=True) + + start_date = datetime.date.fromisoformat(args.start_date) + end_date = datetime.date.fromisoformat(args.end_date) + + for filename in os.listdir(input_directory): + log.info(f'processing {filename}...') + with open(input_directory / filename, 'rt', encoding='utf8') as infile: + is_empty = True + with open(output_directory / filename, 'wt', encoding='utf8') as outfile: + input_lines = infile.readlines() + outfile.write(input_lines[0]) + + for line in input_lines[1:]: + timestamp = datetime.datetime.fromisoformat(line[:line.index(',')]) + if timestamp.date() < start_date or timestamp.date() > end_date: + continue + + outfile.write(line) + is_empty = False + + if is_empty: + Path(output_directory / filename).unlink() + + +if '__main__' == __name__: + main()