From 4ca0a049e80a0832dd9a34f5a0b3bfc398f80cca Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 11 Feb 2021 19:49:21 +0000 Subject: [PATCH] Backup one block at a time. --- electrumx/server/block_processor.py | 87 ++++++++++++++--------------- 1 file changed, 43 insertions(+), 44 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index bca47f16e..f3c37bb3c 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -20,7 +20,7 @@ from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN from electrumx.lib.script import is_unspendable_legacy, is_unspendable_genesis from electrumx.lib.util import ( - chunks, class_logger, pack_le_uint32, pack_le_uint64, unpack_le_uint64 + class_logger, pack_le_uint32, pack_le_uint64, unpack_le_uint64 ) from electrumx.server.db import FlushData @@ -209,7 +209,7 @@ def schedule_reorg(self, count): self.reorg_count = count self.blocks_event.set() - async def reorg_chain(self, count): + async def _reorg_chain(self, count): '''Handle a chain reorganisation. Count is the number of blocks to simulate a reorg, or None for @@ -220,38 +220,39 @@ async def reorg_chain(self, count): self.logger.info(f'faking a reorg of {count:,d} blocks') await self.flush(True) - async def get_raw_blocks(last_height, hex_hashes): - heights = range(last_height, last_height - len(hex_hashes), -1) + async def get_raw_block(hex_hash, height): try: - blocks = [self.db.read_raw_block(height) for height in heights] - self.logger.info(f'read {len(blocks)} blocks from disk') - return blocks + block = self.db.read_raw_block(height) + self.logger.info(f'read block {hex_hash} at height {height:,d} from disk') except FileNotFoundError: - return await self.daemon.raw_blocks(hex_hashes) - - _start, last, hashes = await self.reorg_hashes(count) - # Reverse and convert to hex strings. - hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] - for hex_hashes in chunks(hashes, 50): - raw_blocks = await get_raw_blocks(last, hex_hashes) - await self.backup_blocks(raw_blocks) + block = await self.daemon.raw_blocks([hex_hash])[0] + self.logger.info(f'obtained block {hex_hash} at height {height:,d} from daemon') + return block + + _start, height, hashes = await self._reorg_hashes(count) + hex_hashes = [hash_to_hex_str(block_hash) for block_hash in hashes] + for hex_hash in reversed(hex_hashes): + raw_block = await get_raw_block(hex_hash, height) + await self._backup_block(raw_block) # self.touched can include other addresses which is harmless, but remove None. self.touched.discard(None) self.db.flush_backup(self.flush_data(), self.touched) - last -= len(raw_blocks) + height -= 1 + + self.logger.info('backed up to height {:,d}'.format(self.height)) await self.prefetcher.reset_height(self.height) self.backed_up_event.set() self.backed_up_event.clear() - async def reorg_hashes(self, count): + async def _reorg_hashes(self, count): '''Return a pair (start, last, hashes) of blocks to back up during a reorg. The hashes are returned in order of increasing height. Start is the height of the first hash, last of the last. ''' - start, count = await self.calc_reorg_range(count) + start, count = await self._calc_reorg_range(count) last = start + count - 1 s = '' if count == 1 else 's' self.logger.info(f'chain was reorganised replacing {count:,d} ' @@ -259,7 +260,7 @@ async def reorg_hashes(self, count): return start, last, await self.db.fs_block_hashes(start, count) - async def calc_reorg_range(self, count): + async def _calc_reorg_range(self, count): '''Calculate the reorg range''' def diff_pos(hashes1, hashes2): @@ -441,38 +442,36 @@ def advance_txs(self, txs, is_unspendable): return undo_info - async def backup_blocks(self, raw_blocks): - '''Backup the raw blocks and flush. + async def _backup_block(self, raw_block): + '''Backup the raw block and flush. - The blocks should be in order of decreasing height, starting at. - self.height. A flush is performed once the blocks are backed up. + The blocks should be in order of decreasing height, starting at. self.height. A + flush is performed once the blocks are backed up. ''' self.db.assert_flushed(self.flush_data()) - assert self.height >= len(raw_blocks) + assert self.height > 0 genesis_activation = self.coin.GENESIS_ACTIVATION coin = self.coin - for raw_block in raw_blocks: - # Check and update self.tip - block = coin.block(raw_block) - header_hash = coin.header_hash(block.header) - if header_hash != self.tip: - raise ChainError('backup block {} not tip {} at height {:,d}' - .format(hash_to_hex_str(header_hash), - hash_to_hex_str(self.tip), - self.height)) - self.tip = coin.header_prevhash(block.header) - is_unspendable = (is_unspendable_genesis if self.height >= genesis_activation - else is_unspendable_legacy) - self.backup_txs(block.transactions, is_unspendable) - self.height -= 1 - self.db.tx_counts.pop() - - await sleep(0) - self.logger.info('backed up to height {:,d}'.format(self.height)) + # Check and update self.tip + block = coin.block(raw_block) + header_hash = coin.header_hash(block.header) + if header_hash != self.tip: + raise ChainError('backup block {} not tip {} at height {:,d}' + .format(hash_to_hex_str(header_hash), + hash_to_hex_str(self.tip), + self.height)) + self.tip = coin.header_prevhash(block.header) + is_unspendable = (is_unspendable_genesis if self.height >= genesis_activation + else is_unspendable_legacy) + self._backup_txs(block.transactions, is_unspendable) + self.height -= 1 + self.db.tx_counts.pop() + + await sleep(0) - def backup_txs(self, txs, is_unspendable): + def _backup_txs(self, txs, is_unspendable): # Prevout values, in order down the block (coinbase first if present) # undo_info is in reverse block order undo_info = self.db.read_undo_info(self.height) @@ -612,7 +611,7 @@ async def _process_blocks(self): async def process_event(): '''Perform any pending reorg, the process prefetched blocks.''' if self.reorg_count is not None: - await self.reorg_chain(self.reorg_count) + await self._reorg_chain(self.reorg_count) self.reorg_count = None blocks = self.prefetcher.get_prefetched_blocks() await self._advance_blocks(blocks)