Skip to content

Commit

Permalink
Merge pull request eqlabs#2393 from eqlabs/sistemd/aggregate-bloom-lo…
Browse files Browse the repository at this point in the history
…ad-limit

feat(bloom): add aggregate bloom filter load limit
  • Loading branch information
sistemd authored Nov 22, 2024
2 parents 05bee6f + 42ef432 commit c45b23d
Show file tree
Hide file tree
Showing 14 changed files with 445 additions and 128 deletions.
15 changes: 15 additions & 0 deletions crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,17 @@ This should only be enabled for debugging purposes as it adds substantial proces
)]
get_events_max_uncached_bloom_filters_to_load: std::num::NonZeroUsize,

#[cfg(feature = "aggregate_bloom")]
#[arg(
long = "rpc.get-events-max-bloom-filters-to-load",
long_help = format!("The number of Bloom filters to load for events when querying for events. \
Each filter covers a {} block range. \
This limit is used to prevent queries from taking too long.", pathfinder_storage::BLOCK_RANGE_LEN),
env = "PATHFINDER_RPC_GET_EVENTS_MAX_BLOOM_FILTERS_TO_LOAD",
default_value = "3"
)]
get_events_max_bloom_filters_to_load: std::num::NonZeroUsize,

#[arg(
long = "storage.state-tries",
long_help = "When set to `archive` all historical Merkle trie state is preserved. When set to an integer N, only the last N+1 states of the Merkle tries are kept in the database. \
Expand Down Expand Up @@ -714,6 +725,8 @@ pub struct Config {
pub event_bloom_filter_cache_size: NonZeroUsize,
pub get_events_max_blocks_to_scan: NonZeroUsize,
pub get_events_max_uncached_bloom_filters_to_load: NonZeroUsize,
#[cfg(feature = "aggregate_bloom")]
pub get_events_max_bloom_filters_to_load: NonZeroUsize,
pub state_tries: Option<StateTries>,
pub custom_versioned_constants: Option<VersionedConstants>,
pub feeder_gateway_fetch_concurrency: NonZeroUsize,
Expand Down Expand Up @@ -1005,6 +1018,8 @@ impl Config {
get_events_max_blocks_to_scan: cli.get_events_max_blocks_to_scan,
get_events_max_uncached_bloom_filters_to_load: cli
.get_events_max_uncached_bloom_filters_to_load,
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: cli.get_events_max_bloom_filters_to_load,
gateway_timeout: Duration::from_secs(cli.gateway_timeout.get()),
feeder_gateway_fetch_concurrency: cli.feeder_gateway_fetch_concurrency,
state_tries: cli.state_tries,
Expand Down
2 changes: 2 additions & 0 deletions crates/pathfinder/src/bin/pathfinder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst
get_events_max_blocks_to_scan: config.get_events_max_blocks_to_scan,
get_events_max_uncached_bloom_filters_to_load: config
.get_events_max_uncached_bloom_filters_to_load,
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: config.get_events_max_bloom_filters_to_load,
custom_versioned_constants: config.custom_versioned_constants.take(),
};

Expand Down
4 changes: 4 additions & 0 deletions crates/rpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub struct RpcConfig {
pub batch_concurrency_limit: NonZeroUsize,
pub get_events_max_blocks_to_scan: NonZeroUsize,
pub get_events_max_uncached_bloom_filters_to_load: NonZeroUsize,
#[cfg(feature = "aggregate_bloom")]
pub get_events_max_bloom_filters_to_load: NonZeroUsize,
pub custom_versioned_constants: Option<VersionedConstants>,
}

Expand Down Expand Up @@ -121,6 +123,8 @@ impl RpcContext {
batch_concurrency_limit: NonZeroUsize::new(8).unwrap(),
get_events_max_blocks_to_scan: NonZeroUsize::new(1000).unwrap(),
get_events_max_uncached_bloom_filters_to_load: NonZeroUsize::new(1000).unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: NonZeroUsize::new(1000).unwrap(),
custom_versioned_constants: None,
};

Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/src/jsonrpc/router/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,8 @@ mod tests {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
6 changes: 5 additions & 1 deletion crates/rpc/src/method/get_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,11 @@ pub async fn get_events(

let start = std::time::Instant::now();
let page_from_aggregate = transaction
.events_from_aggregate(&filter, context.config.get_events_max_blocks_to_scan)
.events_from_aggregate(
&filter,
context.config.get_events_max_blocks_to_scan,
context.config.get_events_max_bloom_filters_to_load,
)
.map_err(|e| match e {
EventFilterError::Internal(e) => GetEventsError::Internal(e),
EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()),
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/src/method/subscribe_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,8 @@ mod tests {
batch_concurrency_limit: 64.try_into().unwrap(),
get_events_max_blocks_to_scan: 1024.try_into().unwrap(),
get_events_max_uncached_bloom_filters_to_load: 1024.try_into().unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/src/method/subscribe_new_heads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ mod tests {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/src/method/subscribe_pending_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,8 @@ mod tests {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/src/method/subscribe_transaction_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,8 @@ mod tests {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
6 changes: 2 additions & 4 deletions crates/storage/src/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ pub const BLOCK_RANGE_LEN: u64 = AggregateBloom::BLOCK_RANGE_LEN;
/// Before being added to `AggregateBloom`, each [`BloomFilter`] is
/// rotated by 90 degrees (transposed).
#[derive(Debug, Clone)]
// TODO:
#[allow(dead_code)]
pub struct AggregateBloom {
/// A [Self::BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix stored in
/// a single array.
Expand All @@ -95,7 +97,6 @@ pub struct AggregateBloom {
}

// TODO:
// Delete after cfg flag is removed
#[allow(dead_code)]
impl AggregateBloom {
/// Maximum number of blocks to aggregate in a single `AggregateBloom`.
Expand Down Expand Up @@ -364,9 +365,6 @@ impl BloomFilter {
// Workaround to get the indices of the keys in the filter.
// Needed because the `bloomfilter` crate doesn't provide a
// way to get this information.
// TODO:
// Delete after cfg flag is removed
#[allow(dead_code)]
fn indices_for_key(key: &Felt) -> Vec<usize> {
// Use key on an empty Bloom filter
let mut bloom = Self::new();
Expand Down
Loading

0 comments on commit c45b23d

Please sign in to comment.