Skip to content

Commit

Permalink
Make indexer support single shard tracking. (#12607)
Browse files Browse the repository at this point in the history
Allow using tracked_accounts. In that case, use a ShardTracker to filter
down the chunks we should query.

Is there any tests for the indexer? 🤔
  • Loading branch information
robin-near authored Dec 12, 2024
1 parent 861c842 commit fb444c6
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion chain/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ nearcore.workspace = true
near-client.workspace = true
near-chain-configs.workspace = true
near-config-utils.workspace = true
near-dyn-configs.workspace = true
near-crypto.workspace = true
near-dyn-configs.workspace = true
near-epoch-manager.workspace = true
near-indexer-primitives.workspace = true
near-o11y.workspace = true
near-parameters.workspace = true
Expand All @@ -39,6 +40,7 @@ nightly_protocol = [
"near-chain-configs/nightly_protocol",
"near-client/nightly_protocol",
"near-dyn-configs/nightly_protocol",
"near-epoch-manager/nightly_protocol",
"near-indexer-primitives/nightly_protocol",
"near-o11y/nightly_protocol",
"near-parameters/nightly_protocol",
Expand All @@ -52,6 +54,7 @@ nightly = [
"near-chain-configs/nightly",
"near-client/nightly",
"near-dyn-configs/nightly",
"near-epoch-manager/nightly",
"near-indexer-primitives/nightly",
"near-o11y/nightly",
"near-parameters/nightly",
Expand Down
15 changes: 9 additions & 6 deletions chain/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use near_indexer_primitives::{
StreamerMessage,
};

use near_epoch_manager::shard_tracker::ShardTracker;
pub use streamer::build_streamer_message;

mod streamer;
Expand Down Expand Up @@ -92,6 +93,7 @@ pub struct Indexer {
near_config: nearcore::NearConfig,
view_client: actix::Addr<near_client::ViewClientActor>,
client: actix::Addr<near_client::ClientActor>,
shard_tracker: ShardTracker,
}

impl Indexer {
Expand All @@ -113,16 +115,16 @@ impl Indexer {
.unwrap_or_else(|e| panic!("Error loading config: {:#}", e));

assert!(
!&near_config.client_config.tracked_shards.is_empty(),
"Indexer should track at least one shard. \n\
Tip: You may want to update {} with `\"tracked_shards\": [0]`
",
!&near_config.client_config.tracked_shards.is_empty() || !&near_config.client_config.tracked_accounts.is_empty(),
"Indexer should either track at least one shard or track at least one account. \n\
Tip: You may want to update {} with `\"tracked_shards\": [0]` (which tracks all shards)
or `\"tracked_accounts\": [\"some_account.near\"]` (which tracks whatever shard the account is on)",
indexer_config.home_dir.join("config.json").display()
);
let nearcore::NearNode { client, view_client, .. } =
let nearcore::NearNode { client, view_client, shard_tracker, .. } =
nearcore::start_with_config(&indexer_config.home_dir, near_config.clone())
.with_context(|| "start_with_config")?;
Ok(Self { view_client, client, near_config, indexer_config })
Ok(Self { view_client, client, near_config, indexer_config, shard_tracker })
}

/// Boots up `near_indexer::streamer`, so it monitors the new blocks with chunks, transactions, receipts, and execution outcomes inside. The returned stream handler should be drained and handled on the user side.
Expand All @@ -131,6 +133,7 @@ impl Indexer {
actix::spawn(streamer::start(
self.view_client.clone(),
self.client.clone(),
self.shard_tracker.clone(),
self.indexer_config.clone(),
self.near_config.config.store.clone(),
sender,
Expand Down
7 changes: 6 additions & 1 deletion chain/indexer/src/streamer/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use near_primitives::{types, views};

use super::errors::FailedToFetchData;
use super::INDEXER;
use near_epoch_manager::shard_tracker::ShardTracker;

pub(crate) async fn fetch_status(
client: &Addr<near_client::ClientActor>,
Expand Down Expand Up @@ -161,12 +162,16 @@ async fn fetch_single_chunk(
pub(crate) async fn fetch_block_chunks(
client: &Addr<near_client::ViewClientActor>,
block: &views::BlockView,
shard_tracker: &ShardTracker,
) -> Result<Vec<views::ChunkView>, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching chunks for block #{}", block.header.height);
let mut futures: futures::stream::FuturesUnordered<_> = block
.chunks
.iter()
.filter(|chunk| chunk.height_included == block.header.height)
.filter(|chunk| {
shard_tracker.care_about_shard(None, &block.header.prev_hash, chunk.shard_id, false)
&& chunk.height_included == block.header.height
})
.map(|chunk| fetch_single_chunk(&client, chunk.chunk_hash))
.collect();
let mut chunks = Vec::<views::ChunkView>::with_capacity(futures.len());
Expand Down
23 changes: 17 additions & 6 deletions chain/indexer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use self::utils::convert_transactions_sir_into_local_receipts;
use crate::streamer::fetchers::fetch_protocol_config;
use crate::INDEXER;
use crate::{AwaitForNodeSyncedEnum, IndexerConfig};
use near_epoch_manager::shard_tracker::ShardTracker;

mod errors;
mod fetchers;
Expand Down Expand Up @@ -74,9 +75,10 @@ fn test_problematic_blocks_hash() {
pub async fn build_streamer_message(
client: &Addr<near_client::ViewClientActor>,
block: views::BlockView,
shard_tracker: &ShardTracker,
) -> Result<StreamerMessage, FailedToFetchData> {
let _timer = metrics::BUILD_STREAMER_MESSAGE_TIME.start_timer();
let chunks = fetch_block_chunks(&client, &block).await?;
let chunks = fetch_block_chunks(&client, &block, shard_tracker).await?;

let protocol_config_view = fetch_protocol_config(&client, block.header.hash).await?;
let shard_ids = protocol_config_view.shard_layout.shard_ids();
Expand Down Expand Up @@ -200,6 +202,7 @@ pub async fn build_streamer_message(
&runtime_config,
block.clone(),
execution_outcome.id,
shard_tracker,
)
.await?
}
Expand Down Expand Up @@ -276,6 +279,7 @@ async fn lookup_delayed_local_receipt_in_previous_blocks(
runtime_config: &RuntimeConfig,
block: views::BlockView,
receipt_id: CryptoHash,
shard_tracker: &ShardTracker,
) -> Result<views::ReceiptView, FailedToFetchData> {
let mut prev_block_tried = 0u16;
let mut prev_block_hash = block.header.prev_hash;
Expand All @@ -299,9 +303,14 @@ async fn lookup_delayed_local_receipt_in_previous_blocks(

prev_block_hash = prev_block.header.prev_hash;

if let Some(receipt) =
find_local_receipt_by_id_in_block(&client, &runtime_config, prev_block, receipt_id)
.await?
if let Some(receipt) = find_local_receipt_by_id_in_block(
&client,
&runtime_config,
prev_block,
receipt_id,
shard_tracker,
)
.await?
{
tracing::debug!(
target: INDEXER,
Expand All @@ -324,8 +333,9 @@ async fn find_local_receipt_by_id_in_block(
runtime_config: &RuntimeConfig,
block: views::BlockView,
receipt_id: near_primitives::hash::CryptoHash,
shard_tracker: &ShardTracker,
) -> Result<Option<views::ReceiptView>, FailedToFetchData> {
let chunks = fetch_block_chunks(&client, &block).await?;
let chunks = fetch_block_chunks(&client, &block, shard_tracker).await?;

let protocol_config_view = fetch_protocol_config(&client, block.header.hash).await?;
let mut shards_outcomes = fetch_outcomes(&client, block.header.hash).await?;
Expand Down Expand Up @@ -371,6 +381,7 @@ async fn find_local_receipt_by_id_in_block(
pub(crate) async fn start(
view_client: Addr<near_client::ViewClientActor>,
client: Addr<near_client::ClientActor>,
shard_tracker: ShardTracker,
indexer_config: IndexerConfig,
store_config: near_store::StoreConfig,
blocks_sink: mpsc::Sender<StreamerMessage>,
Expand Down Expand Up @@ -437,7 +448,7 @@ pub(crate) async fn start(
for block_height in start_syncing_block_height..=latest_block_height {
metrics::CURRENT_BLOCK_HEIGHT.set(block_height as i64);
if let Ok(block) = fetch_block_by_height(&view_client, block_height).await {
let response = build_streamer_message(&view_client, block).await;
let response = build_streamer_message(&view_client, block, &shard_tracker).await;

match response {
Ok(streamer_message) => {
Expand Down
5 changes: 4 additions & 1 deletion nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ pub struct NearNode {
pub resharding_handle: ReshardingHandle,
// The threads that state sync runs in.
pub state_sync_runtime: Arc<tokio::runtime::Runtime>,
/// Shard tracker, allows querying of which shards are tracked by this node.
pub shard_tracker: ShardTracker,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand Down Expand Up @@ -428,7 +430,7 @@ pub fn start_with_config_and_synchronization(
client_config: config.client_config.clone(),
chain_genesis,
epoch_manager,
shard_tracker,
shard_tracker: shard_tracker.clone(),
runtime,
validator: config.validator_signer.clone(),
dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(),
Expand Down Expand Up @@ -511,5 +513,6 @@ pub fn start_with_config_and_synchronization(
state_sync_dumper,
resharding_handle,
state_sync_runtime,
shard_tracker,
})
}

0 comments on commit fb444c6

Please sign in to comment.