diff --git a/Cargo.lock b/Cargo.lock index 5de82d54544..ce238a4bda3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4507,6 +4507,7 @@ dependencies = [ "near-config-utils", "near-crypto", "near-dyn-configs", + "near-epoch-manager", "near-indexer-primitives", "near-o11y", "near-parameters", diff --git a/chain/indexer/Cargo.toml b/chain/indexer/Cargo.toml index 1215cfa2081..511a06758bd 100644 --- a/chain/indexer/Cargo.toml +++ b/chain/indexer/Cargo.toml @@ -25,8 +25,9 @@ nearcore.workspace = true near-client.workspace = true near-chain-configs.workspace = true near-config-utils.workspace = true -near-dyn-configs.workspace = true near-crypto.workspace = true +near-dyn-configs.workspace = true +near-epoch-manager.workspace = true near-indexer-primitives.workspace = true near-o11y.workspace = true near-parameters.workspace = true @@ -39,6 +40,7 @@ nightly_protocol = [ "near-chain-configs/nightly_protocol", "near-client/nightly_protocol", "near-dyn-configs/nightly_protocol", + "near-epoch-manager/nightly_protocol", "near-indexer-primitives/nightly_protocol", "near-o11y/nightly_protocol", "near-parameters/nightly_protocol", @@ -52,6 +54,7 @@ nightly = [ "near-chain-configs/nightly", "near-client/nightly", "near-dyn-configs/nightly", + "near-epoch-manager/nightly", "near-indexer-primitives/nightly", "near-o11y/nightly", "near-parameters/nightly", diff --git a/chain/indexer/src/lib.rs b/chain/indexer/src/lib.rs index 72571ef03c8..754f3267f6b 100644 --- a/chain/indexer/src/lib.rs +++ b/chain/indexer/src/lib.rs @@ -15,6 +15,7 @@ pub use near_indexer_primitives::{ StreamerMessage, }; +use near_epoch_manager::shard_tracker::ShardTracker; pub use streamer::build_streamer_message; mod streamer; @@ -92,6 +93,7 @@ pub struct Indexer { near_config: nearcore::NearConfig, view_client: actix::Addr, client: actix::Addr, + shard_tracker: ShardTracker, } impl Indexer { @@ -113,16 +115,16 @@ impl Indexer { .unwrap_or_else(|e| panic!("Error loading config: {:#}", e)); assert!( - !&near_config.client_config.tracked_shards.is_empty(), - "Indexer should track at least one shard. \n\ - Tip: You may want to update {} with `\"tracked_shards\": [0]` - ", + !&near_config.client_config.tracked_shards.is_empty() || !&near_config.client_config.tracked_accounts.is_empty(), + "Indexer should either track at least one shard or track at least one account. \n\ + Tip: You may want to update {} with `\"tracked_shards\": [0]` (which tracks all shards) + or `\"tracked_accounts\": [\"some_account.near\"]` (which tracks whatever shard the account is on)", indexer_config.home_dir.join("config.json").display() ); - let nearcore::NearNode { client, view_client, .. } = + let nearcore::NearNode { client, view_client, shard_tracker, .. } = nearcore::start_with_config(&indexer_config.home_dir, near_config.clone()) .with_context(|| "start_with_config")?; - Ok(Self { view_client, client, near_config, indexer_config }) + Ok(Self { view_client, client, near_config, indexer_config, shard_tracker }) } /// Boots up `near_indexer::streamer`, so it monitors the new blocks with chunks, transactions, receipts, and execution outcomes inside. The returned stream handler should be drained and handled on the user side. @@ -131,6 +133,7 @@ impl Indexer { actix::spawn(streamer::start( self.view_client.clone(), self.client.clone(), + self.shard_tracker.clone(), self.indexer_config.clone(), self.near_config.config.store.clone(), sender, diff --git a/chain/indexer/src/streamer/fetchers.rs b/chain/indexer/src/streamer/fetchers.rs index fda4bbc283e..9e5d93b4665 100644 --- a/chain/indexer/src/streamer/fetchers.rs +++ b/chain/indexer/src/streamer/fetchers.rs @@ -13,6 +13,7 @@ use near_primitives::{types, views}; use super::errors::FailedToFetchData; use super::INDEXER; +use near_epoch_manager::shard_tracker::ShardTracker; pub(crate) async fn fetch_status( client: &Addr, @@ -161,12 +162,16 @@ async fn fetch_single_chunk( pub(crate) async fn fetch_block_chunks( client: &Addr, block: &views::BlockView, + shard_tracker: &ShardTracker, ) -> Result, FailedToFetchData> { tracing::debug!(target: INDEXER, "Fetching chunks for block #{}", block.header.height); let mut futures: futures::stream::FuturesUnordered<_> = block .chunks .iter() - .filter(|chunk| chunk.height_included == block.header.height) + .filter(|chunk| { + shard_tracker.care_about_shard(None, &block.header.prev_hash, chunk.shard_id, false) + && chunk.height_included == block.header.height + }) .map(|chunk| fetch_single_chunk(&client, chunk.chunk_hash)) .collect(); let mut chunks = Vec::::with_capacity(futures.len()); diff --git a/chain/indexer/src/streamer/mod.rs b/chain/indexer/src/streamer/mod.rs index 6267c26c092..43a66b761d0 100644 --- a/chain/indexer/src/streamer/mod.rs +++ b/chain/indexer/src/streamer/mod.rs @@ -26,6 +26,7 @@ use self::utils::convert_transactions_sir_into_local_receipts; use crate::streamer::fetchers::fetch_protocol_config; use crate::INDEXER; use crate::{AwaitForNodeSyncedEnum, IndexerConfig}; +use near_epoch_manager::shard_tracker::ShardTracker; mod errors; mod fetchers; @@ -74,9 +75,10 @@ fn test_problematic_blocks_hash() { pub async fn build_streamer_message( client: &Addr, block: views::BlockView, + shard_tracker: &ShardTracker, ) -> Result { let _timer = metrics::BUILD_STREAMER_MESSAGE_TIME.start_timer(); - let chunks = fetch_block_chunks(&client, &block).await?; + let chunks = fetch_block_chunks(&client, &block, shard_tracker).await?; let protocol_config_view = fetch_protocol_config(&client, block.header.hash).await?; let shard_ids = protocol_config_view.shard_layout.shard_ids(); @@ -200,6 +202,7 @@ pub async fn build_streamer_message( &runtime_config, block.clone(), execution_outcome.id, + shard_tracker, ) .await? } @@ -276,6 +279,7 @@ async fn lookup_delayed_local_receipt_in_previous_blocks( runtime_config: &RuntimeConfig, block: views::BlockView, receipt_id: CryptoHash, + shard_tracker: &ShardTracker, ) -> Result { let mut prev_block_tried = 0u16; let mut prev_block_hash = block.header.prev_hash; @@ -299,9 +303,14 @@ async fn lookup_delayed_local_receipt_in_previous_blocks( prev_block_hash = prev_block.header.prev_hash; - if let Some(receipt) = - find_local_receipt_by_id_in_block(&client, &runtime_config, prev_block, receipt_id) - .await? + if let Some(receipt) = find_local_receipt_by_id_in_block( + &client, + &runtime_config, + prev_block, + receipt_id, + shard_tracker, + ) + .await? { tracing::debug!( target: INDEXER, @@ -324,8 +333,9 @@ async fn find_local_receipt_by_id_in_block( runtime_config: &RuntimeConfig, block: views::BlockView, receipt_id: near_primitives::hash::CryptoHash, + shard_tracker: &ShardTracker, ) -> Result, FailedToFetchData> { - let chunks = fetch_block_chunks(&client, &block).await?; + let chunks = fetch_block_chunks(&client, &block, shard_tracker).await?; let protocol_config_view = fetch_protocol_config(&client, block.header.hash).await?; let mut shards_outcomes = fetch_outcomes(&client, block.header.hash).await?; @@ -371,6 +381,7 @@ async fn find_local_receipt_by_id_in_block( pub(crate) async fn start( view_client: Addr, client: Addr, + shard_tracker: ShardTracker, indexer_config: IndexerConfig, store_config: near_store::StoreConfig, blocks_sink: mpsc::Sender, @@ -437,7 +448,7 @@ pub(crate) async fn start( for block_height in start_syncing_block_height..=latest_block_height { metrics::CURRENT_BLOCK_HEIGHT.set(block_height as i64); if let Ok(block) = fetch_block_by_height(&view_client, block_height).await { - let response = build_streamer_message(&view_client, block).await; + let response = build_streamer_message(&view_client, block, &shard_tracker).await; match response { Ok(streamer_message) => { diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index bc3f88c621e..0b255586fa1 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -225,6 +225,8 @@ pub struct NearNode { pub resharding_handle: ReshardingHandle, // The threads that state sync runs in. pub state_sync_runtime: Arc, + /// Shard tracker, allows querying of which shards are tracked by this node. + pub shard_tracker: ShardTracker, } pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result { @@ -428,7 +430,7 @@ pub fn start_with_config_and_synchronization( client_config: config.client_config.clone(), chain_genesis, epoch_manager, - shard_tracker, + shard_tracker: shard_tracker.clone(), runtime, validator: config.validator_signer.clone(), dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(), @@ -511,5 +513,6 @@ pub fn start_with_config_and_synchronization( state_sync_dumper, resharding_handle, state_sync_runtime, + shard_tracker, }) }