diff --git a/crates/pathfinder/src/bin/pathfinder/config.rs b/crates/pathfinder/src/bin/pathfinder/config.rs index a0fab2ded1..c8eb2f00b8 100644 --- a/crates/pathfinder/src/bin/pathfinder/config.rs +++ b/crates/pathfinder/src/bin/pathfinder/config.rs @@ -283,6 +283,17 @@ This should only be enabled for debugging purposes as it adds substantial proces )] get_events_max_uncached_bloom_filters_to_load: std::num::NonZeroUsize, + #[cfg(feature = "aggregate_bloom")] + #[arg( + long = "rpc.get-events-max-bloom-filters-to-load", + long_help = format!("The number of Bloom filters to load for events when querying for events. \ + Each filter covers a {} block range. \ + This limit is used to prevent queries from taking too long.", pathfinder_storage::BLOCK_RANGE_LEN), + env = "PATHFINDER_RPC_GET_EVENTS_MAX_BLOOM_FILTERS_TO_LOAD", + default_value = "3" + )] + get_events_max_bloom_filters_to_load: std::num::NonZeroUsize, + #[arg( long = "storage.state-tries", long_help = "When set to `archive` all historical Merkle trie state is preserved. When set to an integer N, only the last N+1 states of the Merkle tries are kept in the database. \ @@ -714,6 +725,8 @@ pub struct Config { pub event_bloom_filter_cache_size: NonZeroUsize, pub get_events_max_blocks_to_scan: NonZeroUsize, pub get_events_max_uncached_bloom_filters_to_load: NonZeroUsize, + #[cfg(feature = "aggregate_bloom")] + pub get_events_max_bloom_filters_to_load: NonZeroUsize, pub state_tries: Option, pub custom_versioned_constants: Option, pub feeder_gateway_fetch_concurrency: NonZeroUsize, @@ -1005,6 +1018,8 @@ impl Config { get_events_max_blocks_to_scan: cli.get_events_max_blocks_to_scan, get_events_max_uncached_bloom_filters_to_load: cli .get_events_max_uncached_bloom_filters_to_load, + #[cfg(feature = "aggregate_bloom")] + get_events_max_bloom_filters_to_load: cli.get_events_max_bloom_filters_to_load, gateway_timeout: Duration::from_secs(cli.gateway_timeout.get()), feeder_gateway_fetch_concurrency: cli.feeder_gateway_fetch_concurrency, state_tries: cli.state_tries, diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index f6b24d7b40..8e69014761 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -219,6 +219,8 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst get_events_max_blocks_to_scan: config.get_events_max_blocks_to_scan, get_events_max_uncached_bloom_filters_to_load: config .get_events_max_uncached_bloom_filters_to_load, + #[cfg(feature = "aggregate_bloom")] + get_events_max_bloom_filters_to_load: config.get_events_max_bloom_filters_to_load, custom_versioned_constants: config.custom_versioned_constants.take(), }; diff --git a/crates/rpc/src/context.rs b/crates/rpc/src/context.rs index 1c9ede45e7..405f5a9e69 100644 --- a/crates/rpc/src/context.rs +++ b/crates/rpc/src/context.rs @@ -20,6 +20,8 @@ pub struct RpcConfig { pub batch_concurrency_limit: NonZeroUsize, pub get_events_max_blocks_to_scan: NonZeroUsize, pub get_events_max_uncached_bloom_filters_to_load: NonZeroUsize, + #[cfg(feature = "aggregate_bloom")] + pub get_events_max_bloom_filters_to_load: NonZeroUsize, pub custom_versioned_constants: Option, } @@ -121,6 +123,8 @@ impl RpcContext { batch_concurrency_limit: NonZeroUsize::new(8).unwrap(), get_events_max_blocks_to_scan: NonZeroUsize::new(1000).unwrap(), get_events_max_uncached_bloom_filters_to_load: NonZeroUsize::new(1000).unwrap(), + #[cfg(feature = "aggregate_bloom")] + get_events_max_bloom_filters_to_load: NonZeroUsize::new(1000).unwrap(), custom_versioned_constants: None, }; diff --git a/crates/rpc/src/jsonrpc/router/subscription.rs b/crates/rpc/src/jsonrpc/router/subscription.rs index 4334778a45..b293351d82 100644 --- a/crates/rpc/src/jsonrpc/router/subscription.rs +++ b/crates/rpc/src/jsonrpc/router/subscription.rs @@ -1027,6 +1027,8 @@ mod tests { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(), + #[cfg(feature = "aggregate_bloom")] + get_events_max_bloom_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/get_events.rs b/crates/rpc/src/method/get_events.rs index c52c0c8130..2fc15ca540 100644 --- a/crates/rpc/src/method/get_events.rs +++ b/crates/rpc/src/method/get_events.rs @@ -250,7 +250,11 @@ pub async fn get_events( let start = std::time::Instant::now(); let page_from_aggregate = transaction - .events_from_aggregate(&filter, context.config.get_events_max_blocks_to_scan) + .events_from_aggregate( + &filter, + context.config.get_events_max_blocks_to_scan, + context.config.get_events_max_bloom_filters_to_load, + ) .map_err(|e| match e { EventFilterError::Internal(e) => GetEventsError::Internal(e), EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()), diff --git a/crates/rpc/src/method/subscribe_events.rs b/crates/rpc/src/method/subscribe_events.rs index ca31de9cc5..bfb5796bd4 100644 --- a/crates/rpc/src/method/subscribe_events.rs +++ b/crates/rpc/src/method/subscribe_events.rs @@ -736,6 +736,8 @@ mod tests { batch_concurrency_limit: 64.try_into().unwrap(), get_events_max_blocks_to_scan: 1024.try_into().unwrap(), get_events_max_uncached_bloom_filters_to_load: 1024.try_into().unwrap(), + #[cfg(feature = "aggregate_bloom")] + get_events_max_bloom_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/subscribe_new_heads.rs b/crates/rpc/src/method/subscribe_new_heads.rs index f949b2f0d5..13536378f2 100644 --- a/crates/rpc/src/method/subscribe_new_heads.rs +++ b/crates/rpc/src/method/subscribe_new_heads.rs @@ -548,6 +548,8 @@ mod tests { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(), + #[cfg(feature = "aggregate_bloom")] + get_events_max_bloom_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/subscribe_pending_transactions.rs b/crates/rpc/src/method/subscribe_pending_transactions.rs index c10784e19d..3cc0bc5e96 100644 --- a/crates/rpc/src/method/subscribe_pending_transactions.rs +++ b/crates/rpc/src/method/subscribe_pending_transactions.rs @@ -493,6 +493,8 @@ mod tests { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(), + #[cfg(feature = "aggregate_bloom")] + get_events_max_bloom_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/subscribe_transaction_status.rs b/crates/rpc/src/method/subscribe_transaction_status.rs index 372396ca08..b8aca06d25 100644 --- a/crates/rpc/src/method/subscribe_transaction_status.rs +++ b/crates/rpc/src/method/subscribe_transaction_status.rs @@ -1169,6 +1169,8 @@ mod tests { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(), + #[cfg(feature = "aggregate_bloom")] + get_events_max_bloom_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/storage/src/bloom.rs b/crates/storage/src/bloom.rs index d272d64a41..907a10894a 100644 --- a/crates/storage/src/bloom.rs +++ b/crates/storage/src/bloom.rs @@ -82,6 +82,8 @@ pub const BLOCK_RANGE_LEN: u64 = AggregateBloom::BLOCK_RANGE_LEN; /// Before being added to `AggregateBloom`, each [`BloomFilter`] is /// rotated by 90 degrees (transposed). #[derive(Debug, Clone)] +// TODO: +#[allow(dead_code)] pub struct AggregateBloom { /// A [Self::BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix stored in /// a single array. @@ -95,7 +97,6 @@ pub struct AggregateBloom { } // TODO: -// Delete after cfg flag is removed #[allow(dead_code)] impl AggregateBloom { /// Maximum number of blocks to aggregate in a single `AggregateBloom`. @@ -364,9 +365,6 @@ impl BloomFilter { // Workaround to get the indices of the keys in the filter. // Needed because the `bloomfilter` crate doesn't provide a // way to get this information. - // TODO: - // Delete after cfg flag is removed - #[allow(dead_code)] fn indices_for_key(key: &Felt) -> Vec { // Use key on an empty Bloom filter let mut bloom = Self::new(); diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index 6bb7b394e4..aa80f6ef02 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -355,90 +355,116 @@ impl Transaction<'_> { } } - // TODO: - // Add a limit to how many aggregate_bloom ranges can be loaded #[cfg(feature = "aggregate_bloom")] pub fn events_from_aggregate( &self, filter: &EventFilter, max_blocks_to_scan: NonZeroUsize, + max_bloom_filters_to_load: NonZeroUsize, ) -> Result { - use std::collections::BTreeSet; - if filter.page_size < 1 { return Err(EventFilterError::PageSizeTooSmall); } let from_block = filter.from_block.unwrap_or(BlockNumber::GENESIS); let to_block = filter.to_block.unwrap_or(BlockNumber::MAX); - let key_filter_is_empty = filter.keys.iter().flatten().count() == 0; - - let mut emitted_events = Vec::new(); - let mut blocks_scanned: usize = 0; - let mut offset = filter.offset; + let (aggregates, load_limit_reached) = self.load_limited_aggregate_bloom_range( + from_block, + to_block, + max_bloom_filters_to_load, + )?; - enum ScanResult { - Done, - PageFull, - ContinueFrom(BlockNumber), - } + let blocks_to_scan = aggregates + .iter() + .flat_map(|aggregate| aggregate.blocks_for_filter(filter)) + .filter(|&block| (from_block..=to_block).contains(&block)); - let aggregates = self.load_aggregate_bloom_range(from_block, to_block)?; - let mut filtered_blocks = aggregates + let keys: Vec> = filter + .keys .iter() - .fold(BTreeSet::new(), |mut acc, aggregate| { - acc.extend(aggregate.blocks_for_filter(filter)); - acc - }); + .map(|keys| keys.iter().collect()) + .collect(); - filtered_blocks.retain(|&block| block >= from_block && block <= to_block); + let key_filter_is_empty = filter.keys.iter().flatten().count() == 0; + let mut offset = filter.offset; - let mut blocks_iter = filtered_blocks.iter(); - let result = loop { - let Some(&block) = blocks_iter.next() else { - break ScanResult::Done; - }; + let mut emitted_events = vec![]; - // Stop if we're past the last block. - if block > to_block { - break ScanResult::Done; + for (blocks_scanned, block) in blocks_to_scan.enumerate() { + if blocks_scanned >= max_blocks_to_scan.get() { + tracing::trace!("Reached block scan limit"); + return Ok(PageOfEvents { + events: emitted_events, + continuation_token: Some(ContinuationToken { + block_number: block, + offset: 0, + }), + }); } - // Check if we've reached our block scan limit - blocks_scanned += 1; - if blocks_scanned > max_blocks_to_scan.get() { - tracing::trace!("Block scan limit reached"); - break ScanResult::ContinueFrom(block); - } + let events_required = filter.page_size + 1 - emitted_events.len(); + tracing::trace!(%block, %events_required, "Processing block"); - match self.scan_block_into( - block, - filter, - key_filter_is_empty, - offset, - &mut emitted_events, - )? { - BlockScanResult::NoSuchBlock => break ScanResult::Done, - BlockScanResult::Done { new_offset } => { - offset = new_offset; + let Some(block_header) = self.block_header(crate::BlockId::Number(block))? else { + break; + }; + + let events = match self.events_for_block(block.into())? { + Some(events) => events, + // Reached the end of P2P synced events. + None => { + return Ok(PageOfEvents { + events: emitted_events, + continuation_token: None, + }) } - } + }; + + let events = events + .into_iter() + .flat_map(|(transaction_hash, events)| { + events.into_iter().zip(std::iter::repeat(transaction_hash)) + }) + .filter(|(event, _)| match filter.contract_address { + Some(address) => event.from_address == address, + None => true, + }) + .filter(|(event, _)| { + if key_filter_is_empty { + return true; + } + + if event.keys.len() < keys.len() { + return false; + } + + event + .keys + .iter() + .zip(keys.iter()) + .all(|(key, filter)| filter.is_empty() || filter.contains(key)) + }) + .skip_while(|_| { + let should_skip = offset > 0; + offset = offset.saturating_sub(1); + should_skip + }) + .take(events_required) + .map(|(event, tx_hash)| EmittedEvent { + data: event.data.clone(), + keys: event.keys.clone(), + from_address: event.from_address, + block_hash: block_header.hash, + block_number: block_header.number, + transaction_hash: tx_hash, + }); + + emitted_events.extend(events); // Stop if we have a page of events plus an extra one to decide if we're on // the last page. if emitted_events.len() > filter.page_size { - break ScanResult::PageFull; - } - }; - - match result { - ScanResult::Done => Ok(PageOfEvents { - events: emitted_events, - continuation_token: None, - }), - ScanResult::PageFull => { - assert!(emitted_events.len() > filter.page_size); let continuation_token = continuation_token( &emitted_events, ContinuationToken { @@ -447,29 +473,40 @@ impl Transaction<'_> { }, ) .unwrap(); + emitted_events.truncate(filter.page_size); - Ok(PageOfEvents { + return Ok(PageOfEvents { events: emitted_events, continuation_token: Some(ContinuationToken { block_number: continuation_token.block_number, - // account for the extra event + // Account for the extra event. offset: continuation_token.offset - 1, }), - }) - } - ScanResult::ContinueFrom(block_number) => { - // We've reached a search limit without filling the page. - // We'll need to continue from the next block. - Ok(PageOfEvents { - events: emitted_events, - continuation_token: Some(ContinuationToken { - block_number, - offset: 0, - }), - }) + }); } } + + if load_limit_reached { + let last_loaded_block = aggregates + .last() + .expect("At least one filter is present") + .to_block; + + Ok(PageOfEvents { + events: emitted_events, + continuation_token: Some(ContinuationToken { + // Bloom filter range is inclusive so + 1. + block_number: last_loaded_block + 1, + offset: 0, + }), + }) + } else { + Ok(PageOfEvents { + events: emitted_events, + continuation_token: None, + }) + } } fn scan_block_into( @@ -574,7 +611,10 @@ impl Transaction<'_> { }) } + // TODO: + // Use this for `SubscribeEvents` #[cfg(feature = "aggregate_bloom")] + #[allow(dead_code)] fn load_aggregate_bloom_range( &self, start_block: BlockNumber, @@ -593,7 +633,7 @@ impl Transaction<'_> { .query_map( named_params![ ":end_block": &end_block, - ":start_block": &start_block + ":start_block": &start_block, ], |row| { let from_block = row.get_block_number(0)?; @@ -627,6 +667,79 @@ impl Transaction<'_> { Ok(aggregates) } + + #[cfg(feature = "aggregate_bloom")] + fn load_limited_aggregate_bloom_range( + &self, + start_block: BlockNumber, + end_block: BlockNumber, + max_bloom_filters_to_load: NonZeroUsize, + ) -> anyhow::Result<(Vec, bool)> { + let mut select_filters_stmt = self.inner().prepare_cached( + r" + SELECT from_block, to_block, bitmap + FROM starknet_events_filters_aggregate + WHERE from_block <= :end_block AND to_block >= :start_block + ORDER BY from_block + LIMIT :max_bloom_filters_to_load + ", + )?; + let mut total_filters_stmt = self.inner().prepare_cached( + r" + SELECT COUNT(*) + FROM starknet_events_filters_aggregate + WHERE from_block <= :end_block AND to_block >= :start_block + ", + )?; + + let mut aggregates = select_filters_stmt + .query_map( + named_params![ + ":end_block": &end_block, + ":start_block": &start_block, + ":max_bloom_filters_to_load": &max_bloom_filters_to_load.get(), + ], + |row| { + let from_block = row.get_block_number(0)?; + let to_block = row.get_block_number(1)?; + let compressed_bitmap: Vec = row.get(2)?; + + Ok(AggregateBloom::from_existing_compressed( + from_block, + to_block, + compressed_bitmap, + )) + }, + ) + .context("Querying bloom filter range")? + .collect::, _>>()?; + + // There are no aggregates in the database yet or the loaded aggregates + // don't cover the requested range. + let should_include_running = aggregates.last().map_or(true, |a| end_block > a.to_block); + + let total_aggregate_filters = total_filters_stmt.query_row( + named_params![ + ":end_block": &end_block, + ":start_block": &start_block, + ], + |row| row.get::<_, u64>(0), + )?; + let load_limit_reached = total_aggregate_filters > max_bloom_filters_to_load.get() as u64; + + if should_include_running && !load_limit_reached { + let running_aggregate = match self.running_aggregate.lock() { + Ok(guard) => guard, + Err(poisoned) => { + tracing::error!("Poisoned lock in load_aggregate_bloom_range"); + poisoned.into_inner() + } + }; + aggregates.push(running_aggregate.clone()); + } + + Ok((aggregates, load_limit_reached)) + } } /// Reconstruct the [aggregate](crate::bloom::AggregateBloom) for the range of @@ -781,6 +894,9 @@ mod tests { LazyLock::new(|| NonZeroUsize::new(100).unwrap()); static MAX_BLOOM_FILTERS_TO_LOAD: LazyLock = LazyLock::new(|| NonZeroUsize::new(100).unwrap()); + #[cfg(feature = "aggregate_bloom")] + static MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD: LazyLock = + LazyLock::new(|| NonZeroUsize::new(3).unwrap()); #[test_log::test(test)] fn get_events_with_fully_specified_filter() { @@ -814,7 +930,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -962,7 +1082,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1001,7 +1125,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1041,7 +1169,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1072,7 +1204,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1110,7 +1246,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1148,7 +1288,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1185,7 +1329,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1210,7 +1358,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1246,7 +1398,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1285,7 +1441,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1316,7 +1476,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1344,7 +1508,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1381,7 +1549,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1426,7 +1598,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1458,7 +1634,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1490,7 +1670,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1519,7 +1703,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1548,7 +1736,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1587,7 +1779,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, 1.try_into().unwrap()) + .events_from_aggregate( + &filter, + 1.try_into().unwrap(), + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1618,7 +1814,11 @@ mod tests { #[cfg(feature = "aggregate_bloom")] { let events_from_aggregate = tx - .events_from_aggregate(&filter, 1.try_into().unwrap()) + .events_from_aggregate( + &filter, + 1.try_into().unwrap(), + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!(events_from_aggregate, events); } @@ -1670,6 +1870,98 @@ mod tests { ); } + #[test] + #[cfg(feature = "aggregate_bloom")] + fn aggregate_bloom_filter_load_limit() { + let blocks: Vec = [ + // First aggregate filter start. + BlockNumber::GENESIS, + BlockNumber::GENESIS + 1, + BlockNumber::GENESIS + 2, + BlockNumber::GENESIS + 3, + // End. + BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN - 1, + // Second aggregate filter start. + BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN, + BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 1, + BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 2, + BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 3, + // End. + BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN - 1, + // Third aggregate filter start. + BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN, + BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN + 1, + ] + .iter() + .map(|&n| n.get() as usize) + .collect(); + + let (storage, test_data) = test_utils::setup_custom_test_storage(&blocks, 2); + let emitted_events = test_data.events; + let mut connection = storage.connection().unwrap(); + let tx = connection.transaction().unwrap(); + + let filter = EventFilter { + from_block: None, + to_block: None, + contract_address: None, + // We're using a key which is present in _all_ events as the 2nd key... + keys: vec![vec![], vec![event_key!("0xdeadbeef")]], + page_size: emitted_events.len(), + offset: 0, + }; + + let events = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap()) + .unwrap(); + + let first_aggregate_filter_range = + BlockNumber::GENESIS.get()..AggregateBloom::BLOCK_RANGE_LEN; + for event in events.events { + // ...but only events from the first bloom filter range are returned. + assert!( + first_aggregate_filter_range.contains(&event.block_number.get()), + "Event block number: {} should have been in the range: {:?}", + event.block_number.get(), + first_aggregate_filter_range + ); + } + let continue_from_block = events.continuation_token.unwrap().block_number; + assert_eq!(continue_from_block, first_aggregate_filter_range.end); + + let filter_with_offset = EventFilter { + from_block: Some(events.continuation_token.unwrap().block_number), + to_block: None, + contract_address: None, + // We're using a key which is present in _all_ events as the 2nd key... + keys: vec![vec![], vec![event_key!("0xdeadbeef")]], + page_size: emitted_events.len(), + offset: 0, + }; + + let events = tx + .events_from_aggregate( + &filter_with_offset, + *MAX_BLOCKS_TO_SCAN, + 1.try_into().unwrap(), + ) + .unwrap(); + assert!(events.continuation_token.is_none()); + + let second_aggregate_filter_range = + AggregateBloom::BLOCK_RANGE_LEN..(2 * AggregateBloom::BLOCK_RANGE_LEN); + let third_aggregate_filter_range = + 2 * AggregateBloom::BLOCK_RANGE_LEN..(3 * AggregateBloom::BLOCK_RANGE_LEN); + for event in events.events { + // ...but only events from the second (loaded) and third (running) bloom filter + // range are returned. + assert!( + (second_aggregate_filter_range.start..third_aggregate_filter_range.end) + .contains(&event.block_number.get()) + ); + } + } + #[test] fn bloom_filter_load_limit() { let (storage, test_data) = test_utils::setup_test_storage(); @@ -1700,17 +1992,6 @@ mod tests { } ); - // TODO: - // This does not match at the moment because aggregate bloom implementation - // does not have a limit on the number of bloom filters to load. - #[cfg(all(feature = "aggregate_bloom", any()))] - { - let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } - let filter = EventFilter { from_block: Some(BlockNumber::new_or_panic(1)), to_block: None, @@ -1733,16 +2014,5 @@ mod tests { }), } ); - - // TODO: - // This does not match at the moment because aggregate bloom implementation - // does not have a limit on the number of bloom filters to load. - #[cfg(all(feature = "aggregate_bloom", any()))] - { - let events_from_aggregate = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index c663df6803..ec2209d50d 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -661,12 +661,15 @@ mod tests { LazyLock::new(|| NonZeroUsize::new(10).unwrap()); static MAX_BLOOM_FILTERS_TO_LOAD: LazyLock = LazyLock::new(|| NonZeroUsize::new(1000).unwrap()); + #[cfg(feature = "aggregate_bloom")] + static MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD: LazyLock = + LazyLock::new(|| NonZeroUsize::new(3).unwrap()); let blocks = [0, 1, 2, 3, 4, 5]; let transactions_per_block = 2; let headers = create_blocks(&blocks); let transactions_and_receipts = - create_transactions_and_receipts(blocks.len() * transactions_per_block); + create_transactions_and_receipts(blocks.len(), transactions_per_block); let emitted_events = extract_events(&headers, &transactions_and_receipts); let insert_block_data = |tx: &Transaction<'_>, idx: usize| { let header = &headers[idx]; @@ -718,7 +721,11 @@ mod tests { }; let events_from_aggregate_before = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap() .events; let events_before = tx @@ -750,7 +757,11 @@ mod tests { } let events_from_aggregate_after = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .events_from_aggregate( + &filter, + *MAX_BLOCKS_TO_SCAN, + *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + ) .unwrap() .events; let events_after = tx diff --git a/crates/storage/src/params.rs b/crates/storage/src/params.rs index 6d7d0aa596..5210234238 100644 --- a/crates/storage/src/params.rs +++ b/crates/storage/src/params.rs @@ -133,6 +133,7 @@ to_sql_builtin!( Vec, &[u8], isize, + usize, i64, i32, i16, diff --git a/crates/storage/src/test_utils.rs b/crates/storage/src/test_utils.rs index 4cd3e1860e..8c36f41e86 100644 --- a/crates/storage/src/test_utils.rs +++ b/crates/storage/src/test_utils.rs @@ -50,11 +50,13 @@ pub(crate) fn create_blocks(block_numbers: &[usize]) -> Vec { .collect::>() } -/// Creates a custom test set of N transactions and receipts. +/// Creates a custom test set of transactions and receipts. pub(crate) fn create_transactions_and_receipts( - n: usize, + block_count: usize, + transactions_per_block: usize, ) -> Vec<(Transaction, Receipt, Vec)> { - let transactions = (0..n).map(|i| match i % TRANSACTIONS_PER_BLOCK { + let n = block_count * transactions_per_block; + let transactions = (0..n).map(|i| match i % transactions_per_block { x if x < INVOKE_TRANSACTIONS_PER_BLOCK => Transaction { hash: TransactionHash(Felt::from_hex_str(&"4".repeat(i + 3)).unwrap()), variant: TransactionVariant::InvokeV0(InvokeTransactionV0 { @@ -131,7 +133,7 @@ pub(crate) fn create_transactions_and_receipts( transaction_index: TransactionIndex::new_or_panic(i as u64 + 2311), ..Default::default() }; - let events = if i % TRANSACTIONS_PER_BLOCK < EVENTS_PER_BLOCK { + let events = if i % transactions_per_block < EVENTS_PER_BLOCK { vec![pathfinder_common::event::Event { from_address: ContractAddress::new_or_panic( Felt::from_hex_str(&"2".repeat(i + 3)).unwrap(), @@ -208,7 +210,7 @@ pub fn setup_custom_test_storage( let headers = create_blocks(block_numbers); let transactions_and_receipts = - create_transactions_and_receipts(block_numbers.len() * transactions_per_block); + create_transactions_and_receipts(block_numbers.len(), transactions_per_block); for (i, header) in headers.iter().enumerate() { tx.insert_block_header(header).unwrap();