Skip to content

Commit

Permalink
[history]: downloader improvements for better lukkatax compatability
Browse files Browse the repository at this point in the history
1. update downloader to include block hashes for harvests and full addresses

2. include full addresses and ISO timestamps in default merger output

3. add splitter tool for breaking up batch downloaded data
  • Loading branch information
Jaguar0625 committed Mar 2, 2022
1 parent 63d99c2 commit 863c8dd
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 23 deletions.
24 changes: 18 additions & 6 deletions client/NemClient.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from symbolchain.core.CryptoTypes import PublicKey
from symbolchain.core.CryptoTypes import Hash256, PublicKey
from symbolchain.core.nem.Network import Address, Network
from symbolchain.core.nem.NetworkTimestamp import NetworkTimestamp

Expand All @@ -14,8 +14,10 @@


class AccountInfo:
def __init__(self):
self.address = None
def __init__(self, address):
self.address = address
self.address_name = address

self.vested_balance = 0
self.balance = 0
self.public_key = None
Expand Down Expand Up @@ -43,6 +45,16 @@ def get_harvester_signer_public_key(self, height):
json_response = self._post_json('block/at/public', {'height': height})
return PublicKey(json_response['signer'])

def get_block_hash(self, height):
# due to limitation in NEM REST API, we need to query next block to get current block hash
# as a result, newest block hash will be unknown
json_response = self._post_json('block/at/public', {'height': height+1})

if 'error' in json_response:
raise Exception(str(json_response))

return Hash256(json_response['prevBlockHash']['data']) if 'prevBlockHash' in json_response else None

def get_node_info(self):
json_response = self._get_json('node/info')
return json_response
Expand All @@ -58,8 +70,7 @@ def get_account_info(self, address, forwarded=False):
json_account = json_response['account']
json_meta = json_response['meta']

account_info = AccountInfo()
account_info.address = Address(json_account['address'])
account_info = AccountInfo(Address(json_account['address']))
account_info.vested_balance = json_account['vestedBalance'] / MICROXEM_PER_XEM
account_info.balance = json_account['balance'] / MICROXEM_PER_XEM
account_info.public_key = PublicKey(json_account['publicKey']) if json_account['publicKey'] else None
Expand All @@ -85,6 +96,7 @@ def get_harvests(self, address, start_id=None):
snapshot.amount = int(json_harvest['totalFee']) / MICROXEM_PER_XEM
snapshot.height = int(json_harvest['height'])
snapshot.collation_id = int(json_harvest['id'])
snapshot.hash = self.get_block_hash(snapshot.height)
snapshots.append(snapshot)

return snapshots
Expand All @@ -110,7 +122,7 @@ def get_transfers(self, address, start_id=None):
snapshot.fee_paid = fee_microxem / MICROXEM_PER_XEM
snapshot.height = int(json_meta['height'])
snapshot.collation_id = json_meta['id']
snapshot.hash = json_meta['hash']['data']
snapshot.hash = Hash256(json_meta['hash']['data'])
snapshots.append(snapshot)

return snapshots
Expand Down
25 changes: 15 additions & 10 deletions client/SymbolClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

XYM_NETWORK_MOSAIC_IDS_MAP = {
0x68: '6BED913FA20223F8',
0x98: 'E74B99BA41F4AFEE'
0x98: '3A8416DB2D53B6C8',
'alias': 'E74B99BA41F4AFEE'
}

MICROXYM_PER_XYM = 1000000.0
Expand All @@ -40,8 +41,10 @@


class AccountInfo:
def __init__(self):
self.address = None
def __init__(self, address):
self.address = address
self.address_name = address

self.balance = 0
self.public_key = None
self.importance = 0.0
Expand Down Expand Up @@ -198,8 +201,7 @@ def get_richlist_account_infos(self, page_number, page_size, mosaic_id):

@staticmethod
def _parse_account_info(json_account, mosaic_id=None):
account_info = AccountInfo()
account_info.address = Address(unhexlify(json_account['address']))
account_info = AccountInfo(Address(unhexlify(json_account['address'])))

if not mosaic_id:
mosaic_id = XYM_NETWORK_MOSAIC_IDS_MAP[account_info.address.bytes[0]]
Expand Down Expand Up @@ -252,7 +254,7 @@ def get_harvests(self, address, start_id=None):

snapshot = TransactionSnapshot(address, 'harvest')
snapshot.height = int(json_statement['height'])
snapshot.timestamp = self._get_block_time_and_multiplier(snapshot.height)[0]
(snapshot.timestamp, _, snapshot.hash) = self._get_block_time_and_multiplier_and_hash(snapshot.height)

for json_receipt in json_statement['receipts']:
receipt_type = json_receipt['type']
Expand All @@ -279,7 +281,7 @@ def get_transfers(self, address, start_id=None):

snapshot = TransactionSnapshot(address, 'transfer')
snapshot.height = int(json_meta['height'])
(snapshot.timestamp, fee_multiplier) = self._get_block_time_and_multiplier(snapshot.height)
(snapshot.timestamp, fee_multiplier, _) = self._get_block_time_and_multiplier_and_hash(snapshot.height)

snapshot.hash = json_meta['hash']
(amount_microxym, fee_microxym) = self._process_xym_changes(snapshot, json_transaction, snapshot.hash, fee_multiplier)
Expand Down Expand Up @@ -328,7 +330,7 @@ def _calculate_transfer_amount(address, json_transactions):
direction = 1

for json_mosaic in json_transaction['mosaics']:
if json_mosaic['id'] == XYM_NETWORK_MOSAIC_IDS_MAP[Address(address).bytes[0]]:
if json_mosaic['id'] in (XYM_NETWORK_MOSAIC_IDS_MAP[Address(address).bytes[0]], XYM_NETWORK_MOSAIC_IDS_MAP['alias']):
amount_microxym += int(json_mosaic['amount']) * direction

return amount_microxym
Expand All @@ -345,10 +347,13 @@ def _is_recipient(address, json_transaction):
def _is_aggregate(transaction_type):
return any(TRANSACTION_TYPES[name] == transaction_type for name in ['aggregate_complete', 'aggregate_bonded'])

def _get_block_time_and_multiplier(self, height):
def _get_block_time_and_multiplier_and_hash(self, height):
json_block_and_meta = self._get_json(f'blocks/{height}')
json_block = json_block_and_meta['block']
return (NetworkTimestamp(int(json_block['timestamp'])).to_datetime(), json_block['feeMultiplier'])
return (
NetworkTimestamp(int(json_block['timestamp'])).to_datetime(),
json_block['feeMultiplier'],
Hash256(json_block_and_meta['meta']['hash']))

def _get_page(self, rest_path, start_id):
return self._get_json(rest_path if not start_id else f'{rest_path}&offset={start_id}')
Expand Down
6 changes: 3 additions & 3 deletions client/TimeoutHTTPAdapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ def send(self, request, **kwargs):

def create_http_session(**kwargs):
retries = Retry(
total=kwargs.get('retry_count', 7),
total=kwargs.get('retry_count', 20),
backoff_factor=1,
status_forcelist=(429, 500, 502, 503, 504),
allowed_methods=['GET'] if not kwargs.get('retry_post', False) else ['GET', 'POST'])
adapter = TimeoutHTTPAdapter(max_retries=retries, timeout=kwargs.get('timeout', 10))
allowed_methods=['GET', 'POST'] if not kwargs.get('retry_post', False) else ['GET', 'POST'])
adapter = TimeoutHTTPAdapter(max_retries=retries, timeout=kwargs.get('timeout', 30))

http = requests.Session()
http.mount('http://', adapter)
Expand Down
1 change: 1 addition & 0 deletions client/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class TransactionSnapshot():

def __init__(self, address, tag):
self.address = address
self.address_name = address
self.tag = tag

self.timestamp = None
Expand Down
4 changes: 2 additions & 2 deletions history/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def download(self, start_date, end_date, output_filepath):
log.info(f'[{output_filepath}] downloading chain activity from {start_date} to {end_date}')

with open(output_filepath, 'wt', encoding='utf8') as outfile:
column_names = ['timestamp', 'amount', 'fee_paid', 'height', 'address', 'tag', 'comments', 'hash']
column_names = ['timestamp', 'amount', 'fee_paid', 'height', 'address', 'address_name', 'tag', 'comments', 'hash']
csv_writer = csv.DictWriter(outfile, column_names, extrasaction='ignore')
csv_writer.writeheader()

Expand Down Expand Up @@ -50,7 +50,7 @@ def _download_batch(self, mode, start_date, end_date, output_filepath, csv_write
if snapshot.timestamp.date() > end_date:
continue

snapshot.address = self.account_descriptor.name
snapshot.address_name = self.account_descriptor.name
csv_writer.writerow(vars(snapshot))
num_rows_written += 1

Expand Down
20 changes: 18 additions & 2 deletions history/merger.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import csv
import datetime
from pathlib import Path

from zenlog import log
Expand All @@ -9,10 +10,11 @@


class TransactionsLoader():
def __init__(self, directory, ticker, currency):
def __init__(self, directory, ticker, currency, human_readable):
self.directory = Path(directory)
self.ticker = ticker
self.currency = currency
self.human_readable = human_readable

self.price_map = {}
self.transaction_snapshots = []
Expand Down Expand Up @@ -40,13 +42,22 @@ def load(self, filename):
for row in csv_reader:
snapshot = client.pod.AugmentedTransactionSnapshot()
snapshot.__dict__.update(row)
raw_timestamp = snapshot.timestamp

snapshot.fix_types()

price_snapshot = self.price_map[snapshot.timestamp.date()]
snapshot.set_price(price_snapshot.price)

self._fixup_comments(snapshot, price_snapshot)
self._fixup_tag(snapshot)

if self.human_readable:
snapshot.address = snapshot.address_name
else:
snapshot.timestamp = datetime.datetime.fromisoformat(raw_timestamp).isoformat()
snapshot.timestamp = snapshot.timestamp.replace('+00:00', 'Z')

self.transaction_snapshots.append(snapshot)

@staticmethod
Expand Down Expand Up @@ -94,6 +105,10 @@ def save(self, filename):
f'{self.ticker}/{self.currency}'
] + field_names[6:]

if not self.human_readable:
field_names += ['address_name']
column_headers += ['address_name']

csv_writer = csv.DictWriter(outfile, field_names, extrasaction='ignore')
csv_writer.writerow(dict(zip(field_names, column_headers)))

Expand All @@ -107,9 +122,10 @@ def main():
parser.add_argument('--output', help='output filename', required=True)
parser.add_argument('--ticker', help='ticker symbol', default='nem')
parser.add_argument('--currency', help='fiat currency', default='usd')
parser.add_argument('--human-readable', help='outputs a more human readable format', action='store_true')

args = parser.parse_args()
transactions_loader = TransactionsLoader(args.input, args.ticker, args.currency)
transactions_loader = TransactionsLoader(args.input, args.ticker, args.currency, args.human_readable)
transactions_loader.load_price_map()

for filepath in Path(args.input).iterdir():
Expand Down
57 changes: 57 additions & 0 deletions history/splitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import argparse
import datetime
import os
from pathlib import Path

from zenlog import log


def main():
parser = argparse.ArgumentParser(
description='filter csv file by date range',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--input', help='input directory', required=True)
parser.add_argument('--output', help='output directory', required=True)
parser.add_argument('--start-date', help='start date', required=True)
parser.add_argument('--end-date', help='end date', default=datetime.date.today().isoformat())
args = parser.parse_args()

input_directory = Path(args.input)
if not input_directory.exists():
log.warn(f'input directory \'{args.input}\' does not exist')
return

output_directory = Path(args.output)
if output_directory.exists():
log.warn(f'output directory \'{args.output}\' already exists')
return

log.info('starting processing!')

output_directory.mkdir(parents=True)

start_date = datetime.date.fromisoformat(args.start_date)
end_date = datetime.date.fromisoformat(args.end_date)

for filename in os.listdir(input_directory):
log.info(f'processing {filename}...')
with open(input_directory / filename, 'rt', encoding='utf8') as infile:
is_empty = True
with open(output_directory / filename, 'wt', encoding='utf8') as outfile:
input_lines = infile.readlines()
outfile.write(input_lines[0])

for line in input_lines[1:]:
timestamp = datetime.datetime.fromisoformat(line[:line.index(',')])
if timestamp.date() < start_date or timestamp.date() > end_date:
continue

outfile.write(line)
is_empty = False

if is_empty:
Path(output_directory / filename).unlink()


if '__main__' == __name__:
main()

0 comments on commit 863c8dd

Please sign in to comment.