Skip to content

Commit

Permalink
Backup one block at a time.
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Booth committed Feb 11, 2021
1 parent 48db78b commit 4ca0a04
Showing 1 changed file with 43 additions and 44 deletions.
87 changes: 43 additions & 44 deletions electrumx/server/block_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
from electrumx.lib.script import is_unspendable_legacy, is_unspendable_genesis
from electrumx.lib.util import (
chunks, class_logger, pack_le_uint32, pack_le_uint64, unpack_le_uint64
class_logger, pack_le_uint32, pack_le_uint64, unpack_le_uint64
)
from electrumx.server.db import FlushData

Expand Down Expand Up @@ -209,7 +209,7 @@ def schedule_reorg(self, count):
self.reorg_count = count
self.blocks_event.set()

async def reorg_chain(self, count):
async def _reorg_chain(self, count):
'''Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for
Expand All @@ -220,46 +220,47 @@ async def reorg_chain(self, count):
self.logger.info(f'faking a reorg of {count:,d} blocks')
await self.flush(True)

async def get_raw_blocks(last_height, hex_hashes):
heights = range(last_height, last_height - len(hex_hashes), -1)
async def get_raw_block(hex_hash, height):
try:
blocks = [self.db.read_raw_block(height) for height in heights]
self.logger.info(f'read {len(blocks)} blocks from disk')
return blocks
block = self.db.read_raw_block(height)
self.logger.info(f'read block {hex_hash} at height {height:,d} from disk')
except FileNotFoundError:
return await self.daemon.raw_blocks(hex_hashes)

_start, last, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
raw_blocks = await get_raw_blocks(last, hex_hashes)
await self.backup_blocks(raw_blocks)
block = await self.daemon.raw_blocks([hex_hash])[0]
self.logger.info(f'obtained block {hex_hash} at height {height:,d} from daemon')
return block

_start, height, hashes = await self._reorg_hashes(count)
hex_hashes = [hash_to_hex_str(block_hash) for block_hash in hashes]
for hex_hash in reversed(hex_hashes):
raw_block = await get_raw_block(hex_hash, height)
await self._backup_block(raw_block)
# self.touched can include other addresses which is harmless, but remove None.
self.touched.discard(None)
self.db.flush_backup(self.flush_data(), self.touched)
last -= len(raw_blocks)
height -= 1

self.logger.info('backed up to height {:,d}'.format(self.height))

await self.prefetcher.reset_height(self.height)
self.backed_up_event.set()
self.backed_up_event.clear()

async def reorg_hashes(self, count):
async def _reorg_hashes(self, count):
'''Return a pair (start, last, hashes) of blocks to back up during a
reorg.
The hashes are returned in order of increasing height. Start
is the height of the first hash, last of the last.
'''
start, count = await self.calc_reorg_range(count)
start, count = await self._calc_reorg_range(count)
last = start + count - 1
s = '' if count == 1 else 's'
self.logger.info(f'chain was reorganised replacing {count:,d} '
f'block{s} at heights {start:,d}-{last:,d}')

return start, last, await self.db.fs_block_hashes(start, count)

async def calc_reorg_range(self, count):
async def _calc_reorg_range(self, count):
'''Calculate the reorg range'''

def diff_pos(hashes1, hashes2):
Expand Down Expand Up @@ -441,38 +442,36 @@ def advance_txs(self, txs, is_unspendable):

return undo_info

async def backup_blocks(self, raw_blocks):
'''Backup the raw blocks and flush.
async def _backup_block(self, raw_block):
'''Backup the raw block and flush.
The blocks should be in order of decreasing height, starting at.
self.height. A flush is performed once the blocks are backed up.
The blocks should be in order of decreasing height, starting at. self.height. A
flush is performed once the blocks are backed up.
'''
self.db.assert_flushed(self.flush_data())
assert self.height >= len(raw_blocks)
assert self.height > 0
genesis_activation = self.coin.GENESIS_ACTIVATION

coin = self.coin
for raw_block in raw_blocks:
# Check and update self.tip
block = coin.block(raw_block)
header_hash = coin.header_hash(block.header)
if header_hash != self.tip:
raise ChainError('backup block {} not tip {} at height {:,d}'
.format(hash_to_hex_str(header_hash),
hash_to_hex_str(self.tip),
self.height))
self.tip = coin.header_prevhash(block.header)
is_unspendable = (is_unspendable_genesis if self.height >= genesis_activation
else is_unspendable_legacy)
self.backup_txs(block.transactions, is_unspendable)
self.height -= 1
self.db.tx_counts.pop()

await sleep(0)

self.logger.info('backed up to height {:,d}'.format(self.height))
# Check and update self.tip
block = coin.block(raw_block)
header_hash = coin.header_hash(block.header)
if header_hash != self.tip:
raise ChainError('backup block {} not tip {} at height {:,d}'
.format(hash_to_hex_str(header_hash),
hash_to_hex_str(self.tip),
self.height))
self.tip = coin.header_prevhash(block.header)
is_unspendable = (is_unspendable_genesis if self.height >= genesis_activation
else is_unspendable_legacy)
self._backup_txs(block.transactions, is_unspendable)
self.height -= 1
self.db.tx_counts.pop()

await sleep(0)

def backup_txs(self, txs, is_unspendable):
def _backup_txs(self, txs, is_unspendable):
# Prevout values, in order down the block (coinbase first if present)
# undo_info is in reverse block order
undo_info = self.db.read_undo_info(self.height)
Expand Down Expand Up @@ -612,7 +611,7 @@ async def _process_blocks(self):
async def process_event():
'''Perform any pending reorg, the process prefetched blocks.'''
if self.reorg_count is not None:
await self.reorg_chain(self.reorg_count)
await self._reorg_chain(self.reorg_count)
self.reorg_count = None
blocks = self.prefetcher.get_prefetched_blocks()
await self._advance_blocks(blocks)
Expand Down

0 comments on commit 4ca0a04

Please sign in to comment.