From 89f14158b66ed6c3b2e5a9a03c31b4aaec3a3e7c Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Mon, 28 Oct 2024 20:55:58 +0000 Subject: [PATCH] Ocean tries to catchup on startup if below tip height --- lib/ain-ocean/src/api/pool_pair/mod.rs | 4 +- lib/ain-ocean/src/api/pool_pair/service.rs | 16 ++- lib/ain-ocean/src/indexer/mod.rs | 14 ++- lib/ain-ocean/src/indexer/oracle.rs | 30 ++--- lib/ain-ocean/src/indexer/poolswap.rs | 102 ++++++++++++--- lib/ain-ocean/src/lib.rs | 5 +- lib/ain-ocean/src/model/poolswap.rs | 2 +- lib/ain-rs-exports/src/lib.rs | 1 + lib/ain-rs-exports/src/ocean.rs | 5 + src/Makefile.am | 2 + src/init.cpp | 27 ++-- src/ocean.cpp | 139 +++++++++++++++++++++ src/ocean.h | 9 ++ 13 files changed, 293 insertions(+), 63 deletions(-) create mode 100644 src/ocean.cpp create mode 100644 src/ocean.h diff --git a/lib/ain-ocean/src/api/pool_pair/mod.rs b/lib/ain-ocean/src/api/pool_pair/mod.rs index a7795f46961..045e035cf19 100644 --- a/lib/ain-ocean/src/api/pool_pair/mod.rs +++ b/lib/ain-ocean/src/api/pool_pair/mod.rs @@ -434,9 +434,9 @@ async fn list_pool_swaps_verbose( _ => true, }) .map(|item| async { - let (_, swap) = item?; + let (key, swap) = item?; let from = find_swap_from(&ctx, &swap).await?; - let to = find_swap_to(&ctx, &swap).await?; + let to = find_swap_to(&ctx, &key, &swap).await?; let swap_type = check_swap_type(&ctx, &swap).await?; diff --git a/lib/ain-ocean/src/api/pool_pair/service.rs b/lib/ain-ocean/src/api/pool_pair/service.rs index 9042eaaa701..c12d3780a45 100644 --- a/lib/ain-ocean/src/api/pool_pair/service.rs +++ b/lib/ain-ocean/src/api/pool_pair/service.rs @@ -21,7 +21,7 @@ use crate::{ NotFoundKind, OtherSnafu, }, indexer::PoolSwapAggregatedInterval, - model::{PoolSwap, PoolSwapAggregatedAggregated}, + model::{PoolSwap, PoolSwapAggregatedAggregated, PoolSwapKey}, storage::{RepositoryOps, SecondaryIndex, SortOrder}, Result, }; @@ -673,6 +673,7 @@ pub async fn find_swap_from( pub async fn find_swap_to( ctx: &Arc, + swap_key: &PoolSwapKey, swap: &PoolSwap, ) -> Result> { let PoolSwap { @@ -689,9 +690,20 @@ pub async fn find_swap_to( let display_symbol = parse_display_symbol(&to_token); + // TODO Index to_amount if missing + if to_amount.is_none() { + let amount = 0; + let swap = PoolSwap { + to_amount: Some(amount), + ..swap.clone() + }; + ctx.services.pool.by_id.put(swap_key, &swap)?; + } + Ok(Some(PoolSwapFromToData { address: to_address, - amount: Decimal::new(to_amount.to_owned(), 8).to_string(), + // amount: Decimal::new(to_amount.to_owned(), 8).to_string(), // Need fallback + amount: Decimal::new(to_amount.to_owned().unwrap_or_default(), 8).to_string(), symbol: to_token.symbol, display_symbol, })) diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index b991f22c5e7..ab05e0b2aea 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -60,7 +60,7 @@ fn get_bucket(block: &Block, interval: i64) -> i64 { } fn index_block_start(services: &Arc, block: &Block) -> Result<()> { - let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); + let mut pool_pairs = services.pool_pair_cache.get(); pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); for interval in AGGREGATED_INTERVALS { @@ -116,7 +116,7 @@ fn index_block_start(services: &Arc, block: &Block) -> Re } fn invalidate_block_start(services: &Arc, block: &Block) -> Result<()> { - let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); + let mut pool_pairs = services.pool_pair_cache.get(); pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); for interval in AGGREGATED_INTERVALS { @@ -601,6 +601,14 @@ fn invalidate_block_end(services: &Arc, block: &BlockContext) -> Resul Ok(()) } +pub fn get_block_height(services: &Arc) -> Result { + Ok(services + .block + .by_height + .get_highest()? + .map_or(0, |block| block.height)) +} + pub fn index_block(services: &Arc, block: Block) -> Result<()> { trace!("[index_block] Indexing block..."); let start = Instant::now(); @@ -658,6 +666,7 @@ pub fn index_block(services: &Arc, block: Block) -> Resul DfTx::SetLoanToken(data) => data.index(services, &ctx)?, DfTx::CompositeSwap(data) => data.index(services, &ctx)?, DfTx::PlaceAuctionBid(data) => data.index(services, &ctx)?, + DfTx::CreatePoolPair(_) => services.pool_pair_cache.invalidate(), _ => (), } log_elapsed(start, "Indexed dftx"); @@ -755,6 +764,7 @@ pub fn invalidate_block(services: &Arc, block: Block) -> DfTx::SetLoanToken(data) => data.invalidate(services, &ctx)?, DfTx::CompositeSwap(data) => data.invalidate(services, &ctx)?, DfTx::PlaceAuctionBid(data) => data.invalidate(services, &ctx)?, + DfTx::CreatePoolPair(_) => services.pool_pair_cache.invalidate(), _ => (), } log_elapsed(start, "Invalidate dftx"); diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 931bf38a289..c06aeeb0906 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -264,10 +264,7 @@ fn map_price_aggregated( )), SortOrder::Descending, )? - .take_while(|item| match item { - Ok((k, _)) => k.0 == token.clone() && k.1 == currency.clone(), - _ => true, - }) + .take_while(|item| matches!(item, Ok((k, _)) if &k.0 == token && &k.1 == currency)) .flatten() .collect::>(); @@ -361,8 +358,8 @@ fn index_set_oracle_data( let key = ( price_aggregated.aggregated.oracles.total, price_aggregated.block.height, - token.clone(), - currency.clone(), + token, + currency, ); ticker_repo.by_key.put(&key, pair)?; ticker_repo.by_id.put( @@ -534,22 +531,19 @@ pub fn index_interval_mapper( SortOrder::Descending, )? .take(1) - .flatten() - .collect::>(); + .next() + .transpose()?; - if previous.is_empty() { + let Some((_, id)) = previous else { return start_new_bucket(services, block, token, currency, aggregated, interval); - } - - for (_, id) in previous { - let aggregated_interval = repo.by_id.get(&id)?; - if let Some(aggregated_interval) = aggregated_interval { - if block.median_time - aggregated.block.median_time > interval.clone() as i64 { - return start_new_bucket(services, block, token, currency, aggregated, interval); - } + }; - forward_aggregate(services, (id, &aggregated_interval), aggregated)?; + if let Some(aggregated_interval) = repo.by_id.get(&id)? { + if block.median_time - aggregated.block.median_time > interval.clone() as i64 { + return start_new_bucket(services, block, token, currency, aggregated, interval); } + + forward_aggregate(services, (id, &aggregated_interval), aggregated)?; } Ok(()) diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index 844e28b3581..19e29af568f 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -1,15 +1,17 @@ use std::{str::FromStr, sync::Arc}; +use ain_cpp_imports::PoolPairCreationHeight; use ain_dftx::{pool::*, COIN}; use bitcoin::Txid; use log::trace; +use parking_lot::RwLock; use rust_decimal::Decimal; use rust_decimal_macros::dec; use snafu::OptionExt; use super::Context; use crate::{ - error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu}, + error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, NotFoundKind}, indexer::{tx_result, Index, Result}, model::{self, PoolSwapResult, TxResult}, storage::{RepositoryOps, SortOrder}, @@ -149,7 +151,7 @@ fn invalidate_swap_aggregated( impl Index for PoolSwap { fn index(self, services: &Arc, ctx: &Context) -> Result<()> { - trace!("[Poolswap] Indexing..."); + trace!("[Poolswap] Indexing {self:?}..."); let txid = ctx.tx.txid; let idx = ctx.tx_idx; let from = self.from_script; @@ -158,17 +160,28 @@ impl Index for PoolSwap { let from_amount = self.from_amount; let to_token_id = self.to_token_id.0; - let Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) = - services.result.get(&txid)? - else { - // TODO: Commenting out for now, fallback should only be introduced for supporting back CLI indexing - return Err("Missing swap result".into()); - // let pair = find_pair(from_token_id, to_token_id); - // if pair.is_none() { - // return Err(format_err!("Pool not found by {from_token_id}-{to_token_id} or {to_token_id}-{from_token_id}").into()); - // } - // let pair = pair.unwrap(); - // (None, pair.id) + let (to_amount, pool_id) = match services.result.get(&txid)? { + Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) => { + (Some(to_amount), pool_id) + } + _ => { + let poolpairs = services.pool_pair_cache.get(); + + let pool_id = poolpairs + .into_iter() + .find(|pp| { + (pp.id_token_a == self.from_token_id.0 as u32 + && pp.id_token_b == self.to_token_id.0 as u32) + || (pp.id_token_a == self.to_token_id.0 as u32 + && pp.id_token_b == self.from_token_id.0 as u32) + }) + .map(|pp| pp.id) + .ok_or(Error::NotFound { + kind: NotFoundKind::PoolPair, + })?; + + (None, pool_id) + } }; let swap: model::PoolSwap = model::PoolSwap { @@ -221,17 +234,31 @@ impl Index for PoolSwap { impl Index for CompositeSwap { fn index(self, services: &Arc, ctx: &Context) -> Result<()> { - trace!("[CompositeSwap] Indexing..."); + trace!("[CompositeSwap] Indexing {self:?}..."); let txid = ctx.tx.txid; let from_token_id = self.pool_swap.from_token_id.0; let from_amount = self.pool_swap.from_amount; let to_token_id = self.pool_swap.to_token_id.0; - let Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) = - services.result.get(&txid)? - else { - trace!("Missing swap result for {}", txid.to_string()); - return Err("Missing swap result".into()); + let (to_amount, pool_id) = match services.result.get(&txid)? { + Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) => { + (Some(to_amount), Some(pool_id)) + } + _ => { + let poolpairs = services.pool_pair_cache.get(); + + let pool_id = poolpairs + .into_iter() + .find(|pp| { + (pp.id_token_a == self.pool_swap.from_token_id.0 as u32 + && pp.id_token_b == self.pool_swap.to_token_id.0 as u32) + || (pp.id_token_a == self.pool_swap.to_token_id.0 as u32 + && pp.id_token_b == self.pool_swap.from_token_id.0 as u32) + }) + .map(|pp| pp.id); + + (None, pool_id) + } }; let from = self.pool_swap.from_script; @@ -240,6 +267,9 @@ impl Index for CompositeSwap { let pool_ids = if pools.is_empty() { // the pool_id from finals wap is the only swap while pools is empty + let pool_id = pool_id.ok_or(Error::NotFound { + kind: NotFoundKind::PoolPair, + })?; Vec::from([pool_id]) } else { pools.iter().map(|pool| pool.id.0 as u32).collect() @@ -286,3 +316,37 @@ impl Index for CompositeSwap { tx_result::invalidate(services, &ctx.tx.txid) } } + +#[derive(Default)] +pub struct PoolPairCache { + cache: RwLock>>, +} + +impl PoolPairCache { + pub fn new() -> Self { + Self { + cache: RwLock::new(None), + } + } + + pub fn get(&self) -> Vec { + { + let guard = self.cache.read(); + if let Some(poolpairs) = guard.as_ref() { + return poolpairs.clone(); + } + } + + let poolpairs = ain_cpp_imports::get_pool_pairs(); + + let mut guard = self.cache.write(); + *guard = Some(poolpairs.clone()); + + poolpairs + } + + pub fn invalidate(&self) { + let mut guard = self.cache.write(); + *guard = None; + } +} diff --git a/lib/ain-ocean/src/lib.rs b/lib/ain-ocean/src/lib.rs index 4ab49ddcccf..986488d1a21 100644 --- a/lib/ain-ocean/src/lib.rs +++ b/lib/ain-ocean/src/lib.rs @@ -8,8 +8,9 @@ use std::{path::PathBuf, sync::Arc}; pub use api::ocean_router; use error::Error; +use indexer::poolswap::PoolPairCache; pub use indexer::{ - index_block, invalidate_block, + get_block_height, index_block, invalidate_block, oracle::invalidate_oracle_interval, transaction::{index_transaction, invalidate_transaction}, tx_result, @@ -143,6 +144,7 @@ pub struct Services { pub script_unspent: ScriptUnspentService, pub token_graph: Arc>>, pub store: Arc, + pub pool_pair_cache: PoolPairCache, } impl Services { @@ -218,6 +220,7 @@ impl Services { }, token_graph: Arc::new(Mutex::new(UnGraphMap::new())), store: Arc::clone(&store), + pool_pair_cache: PoolPairCache::new(), } } } diff --git a/lib/ain-ocean/src/model/poolswap.rs b/lib/ain-ocean/src/model/poolswap.rs index 051deb7da71..6ad5af8dbe5 100644 --- a/lib/ain-ocean/src/model/poolswap.rs +++ b/lib/ain-ocean/src/model/poolswap.rs @@ -13,7 +13,7 @@ pub struct PoolSwap { pub pool_id: u32, pub from_amount: i64, pub from_token_id: u64, - pub to_amount: i64, + pub to_amount: Option, pub to_token_id: u64, pub from: ScriptBuf, pub to: ScriptBuf, diff --git a/lib/ain-rs-exports/src/lib.rs b/lib/ain-rs-exports/src/lib.rs index 484b6dea17c..347f7b9a216 100644 --- a/lib/ain-rs-exports/src/lib.rs +++ b/lib/ain-rs-exports/src/lib.rs @@ -346,6 +346,7 @@ pub mod ffi { fn evm_try_flush_db(result: &mut CrossBoundaryResult); + fn ocean_get_block_height(result: &mut CrossBoundaryResult) -> u32; fn ocean_index_block(result: &mut CrossBoundaryResult, block_str: String); fn ocean_invalidate_block(result: &mut CrossBoundaryResult, block: String); diff --git a/lib/ain-rs-exports/src/ocean.rs b/lib/ain-rs-exports/src/ocean.rs index d86efa870c8..2519e64f3b4 100644 --- a/lib/ain-rs-exports/src/ocean.rs +++ b/lib/ain-rs-exports/src/ocean.rs @@ -7,6 +7,11 @@ use crate::{ prelude::{cross_boundary_error_return, cross_boundary_success_return}, }; +#[ffi_fallible] +pub fn ocean_get_block_height() -> Result { + ain_ocean::get_block_height(&ain_ocean::SERVICES) +} + #[ffi_fallible] pub fn ocean_index_block(block_str: String) -> Result<()> { let block: Block = serde_json::from_str(&block_str)?; diff --git a/src/Makefile.am b/src/Makefile.am index 385585c02dc..04b884b5c59 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -149,6 +149,7 @@ DEFI_CORE_H = \ index/blockfilterindex.h \ index/txindex.h \ indirectmap.h \ + ocean.h \ init.h \ interfaces/chain.h \ interfaces/handler.h \ @@ -410,6 +411,7 @@ libdefi_server_a_SOURCES = \ index/blockfilterindex.cpp \ index/txindex.cpp \ interfaces/chain.cpp \ + ocean.cpp \ init.cpp \ dbwrapper.cpp \ ffi/ffiexports.cpp \ diff --git a/src/init.cpp b/src/init.cpp index 1def0f90e89..1435d4ac3c5 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -67,6 +67,7 @@ #include #include #include +#include #include #include @@ -1749,21 +1750,6 @@ void SetupInterrupts() { fInterrupt = SetupInterruptArg("-interrupt-block", fInterruptBlockHash, fInterruptBlockHeight); } -bool OceanIndex (const UniValue b) { - CrossBoundaryResult result; - ocean_index_block(result, b.write()); - if (!result.ok) { - LogPrintf("Error indexing genesis block: %s\n", result.reason); - ocean_invalidate_block(result, b.write()); - if (!result.ok) { - LogPrintf("Error invalidating genesis block: %s\n", result.reason); - return false; - } - OceanIndex(b); - } - return true; -}; - bool AppInitMain(InitInterfaces& interfaces) { const CChainParams& chainparams = Params(); @@ -2518,7 +2504,7 @@ bool AppInitMain(InitInterfaces& interfaces) } std::string error; - + if (!pwallet->GetNewDestination(OutputType::BECH32, "", dest, error)) { return InitError("Wallet not able to get new destination for mocknet"); } @@ -2579,14 +2565,19 @@ bool AppInitMain(InitInterfaces& interfaces) const UniValue b = blockToJSON(*pcustomcsview, block, tip, pblockindex, true, 2); - if (bool isIndexed = OceanIndex(b); !isIndexed) { + if (bool isIndexed = OceanIndex(b, 0); !isIndexed) { return false; } LogPrintf("WARNING: -expr-oceanarchive flag is turned on. This feature is not yet stable. Please do not use in production unless you're aware of the risks\n"); } - // ********************************************************* Step 16: start minter thread + // ********************************************************* Step 16: start ocean catchup + if (!CatchupOceanIndexer()) { + return false; + } + + // ********************************************************* Step 17: start minter thread if(gArgs.GetBoolArg("-gen", DEFAULT_GENERATE)) { if (!pos::StartStakingThreads(threadGroup)) { return false; diff --git a/src/ocean.cpp b/src/ocean.cpp new file mode 100644 index 00000000000..62a5d236db0 --- /dev/null +++ b/src/ocean.cpp @@ -0,0 +1,139 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +bool OceanIndex(const UniValue b, uint32_t blockHeight) { + CrossBoundaryResult result; + ocean_index_block(result, b.write()); + if (!result.ok) { + LogPrintf("Error indexing ocean block %d: %s\n", result.reason, blockHeight); + ocean_invalidate_block(result, b.write()); + if (!result.ok) { + LogPrintf("Error invalidating ocean %d block: %s\n", result.reason, blockHeight); + } + return false; + } + return true; +}; + +bool CatchupOceanIndexer() { + if (!gArgs.GetBoolArg("-oceanarchive", DEFAULT_OCEAN_INDEXER_ENABLED) && + !gArgs.GetBoolArg("-expr-oceanarchive", DEFAULT_OCEAN_INDEXER_ENABLED)) { + return true; + } + + CrossBoundaryResult result; + + auto oceanBlockHeight = ocean_get_block_height(result); + if (!result.ok) { + LogPrintf("Error getting Ocean block height: %s\n", result.reason); + return false; + } + + CBlockIndex *tip = nullptr; + { + LOCK(cs_main); + tip = ::ChainActive().Tip(); + if (!tip) { + LogPrintf("Error: Cannot get chain tip\n"); + return false; + } + } + const uint32_t tipHeight = tip->nHeight; + + if (tipHeight == oceanBlockHeight) { + return true; + } + + LogPrintf("Starting Ocean index catchup...\n"); + + uint32_t currentHeight = oceanBlockHeight; + + LogPrintf("Ocean catchup: Current height=%u, Target height=%u\n", currentHeight, tipHeight); + + uint32_t remainingBlocks = tipHeight - currentHeight; + const uint32_t startHeight = oceanBlockHeight; + int lastProgress = -1; + const auto startTime = std::chrono::steady_clock::now(); + + CBlockIndex *pindex = nullptr; + while (currentHeight < tipHeight) { + if (ShutdownRequested()) { + LogPrintf("Shutdown requested, exiting ocean catchup...\n"); + return false; + } + + { + LOCK(cs_main); + pindex = ::ChainActive()[currentHeight]; + if (!pindex) { + LogPrintf("Error: Cannot find block at height %u\n", currentHeight); + return false; + } + } + + CBlock block; + if (!ReadBlockFromDisk(block, pindex, Params().GetConsensus())) { + LogPrintf("Error: Failed to read block %s from disk\n", pindex->GetBlockHash().ToString()); + return false; + } + + const UniValue b = blockToJSON(*pcustomcsview, block, tip, pindex, true, 2); + + if (bool isIndexed = OceanIndex(b, currentHeight); !isIndexed) { + return false; + } + + currentHeight++; + + uint32_t blocksProcessed = currentHeight - startHeight; + int currentProgress = static_cast((static_cast(currentHeight * 100) / tipHeight)); + + if (currentProgress > lastProgress || currentHeight % 10000 == 0) { + auto currentTime = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(currentTime - startTime).count(); + + double blocksPerSecond = elapsed > 0 ? static_cast(blocksProcessed) / elapsed : 0; + + remainingBlocks = tipHeight - currentHeight; + int estimatedSecondsLeft = blocksPerSecond > 0 ? static_cast(remainingBlocks / blocksPerSecond) : 0; + + LogPrintf( + "Ocean indexing progress: %d%% (%u/%u blocks) - %.2f blocks/s - " + "ETA: %d:%02d:%02d\n", + currentProgress, + currentHeight, + tipHeight, + blocksPerSecond, + estimatedSecondsLeft / 3600, + (estimatedSecondsLeft % 3600) / 60, + estimatedSecondsLeft % 60); + + lastProgress = currentProgress; + } + } + + auto totalTime = + std::chrono::duration_cast(std::chrono::steady_clock::now() - startTime).count(); + + LogPrintf("Ocean indexes caught up to tip. Total time: %d:%02d:%02d\n", + totalTime / 3600, + (totalTime % 3600) / 60, + totalTime % 60); + + return true; +} diff --git a/src/ocean.h b/src/ocean.h new file mode 100644 index 00000000000..5a92763a79b --- /dev/null +++ b/src/ocean.h @@ -0,0 +1,9 @@ +#ifndef DEFI_OCEAN_H +#define DEFI_OCEAN_H + +#include + +bool CatchupOceanIndexer(); +bool OceanIndex (const UniValue b, uint32_t blockHeight); + +#endif // DEFI_OCEAN_H