Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
fix: trigger kill switch when end block is met (#1353)
Browse files Browse the repository at this point in the history
  • Loading branch information
deekerno authored Sep 13, 2023
1 parent 985d862 commit 322a2e7
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions packages/fuel-indexer/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,17 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
(block_info, next_cursor, has_next_page)
}
Err(e) => {
error!("Indexer({indexer_uid}) failed to fetch blocks: {e:?}",);
sleep(Duration::from_secs(DELAY_FOR_SERVICE_ERROR)).await;
continue;
if let IndexerError::EndBlockMet = e {
info!("Indexer({indexer_uid}) has met its end block; beginning indexer shutdown process.");
executor.kill_switch().store(true, Ordering::SeqCst);
continue;
} else {
error!(
"Indexer({indexer_uid}) failed to fetch blocks: {e:?}",
);
sleep(Duration::from_secs(DELAY_FOR_SERVICE_ERROR)).await;
continue;
}
}
};

Expand Down Expand Up @@ -247,6 +255,21 @@ pub async fn retrieve_blocks_from_node(
end_block: Option<u32>,
indexer_uid: &str,
) -> IndexerResult<(Vec<BlockData>, Option<String>, bool)> {
// Let's check if we need less blocks than block_page_size.
let page_size = if let (Some(start), Some(end)) = (cursor, end_block) {
if let Ok(start) = start.parse::<u32>() {
if start >= end {
return Err(IndexerError::EndBlockMet);
}

std::cmp::min((end - start) as usize, block_page_size)
} else {
block_page_size
}
} else {
block_page_size
};

debug!("Fetching paginated results from {cursor:?}");

let PaginatedResult {
Expand All @@ -257,7 +280,7 @@ pub async fn retrieve_blocks_from_node(
} = client
.full_blocks(PaginationRequest {
cursor: cursor.clone(),
results: block_page_size,
results: page_size,
direction: PageDirection::Forward,
})
.await
Expand All @@ -276,12 +299,6 @@ pub async fn retrieve_blocks_from_node(

let mut block_info = Vec::new();
for block in results.into_iter() {
if let Some(end_block) = end_block {
if block.header.height.0 > end_block {
return Err(IndexerError::EndBlockMet);
}
}

let producer: Option<Bytes32> = block.block_producer().map(|pk| pk.hash());

let mut transactions = Vec::new();
Expand Down

0 comments on commit 322a2e7

Please sign in to comment.