Skip to content

Commit

Permalink
fix: refactor RPC implementations to use ApiTipsetKey(Option<TipsetKe…
Browse files Browse the repository at this point in the history
…y>) to not assume TipsetKey to be empty (#3934)
  • Loading branch information
hanabi1224 authored Feb 18, 2024
1 parent 6b3808b commit 65cf790
Show file tree
Hide file tree
Showing 16 changed files with 229 additions and 126 deletions.
37 changes: 17 additions & 20 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,14 @@ where

/// Returns the currently tracked heaviest tipset.
pub fn heaviest_tipset(&self) -> Arc<Tipset> {
self.load_required_tipset(
&self
.settings
.require_obj::<TipsetKey>(HEAD_KEY)
.expect("failed to load heaviest tipset"),
)
.expect("failed to load heaviest tipset")
self.chain_index
.load_required_tipset(
&self
.settings
.require_obj::<TipsetKey>(HEAD_KEY)
.expect("failed to load heaviest tipset"),
)
.expect("failed to load heaviest tipset")
}

/// Returns a reference to the publisher of head changes.
Expand All @@ -192,22 +193,18 @@ where
}

/// Returns Tipset from key-value store from provided CIDs
#[tracing::instrument(skip_all)]
pub fn load_tipset(&self, tsk: &TipsetKey) -> Result<Option<Arc<Tipset>>, Error> {
if tsk.cids.is_empty() {
return Ok(Some(self.heaviest_tipset()));
}
self.chain_index.load_tipset(tsk)
}

/// Returns Tipset from key-value store from provided CIDs.
/// or falls back to the heaviest tipset when no CIDs are provided.
/// This calls fails if the tipset is missing or invalid.
#[tracing::instrument(skip_all)]
pub fn load_required_tipset(&self, tsk: &TipsetKey) -> Result<Arc<Tipset>, Error> {
if tsk.cids.is_empty() {
return Ok(self.heaviest_tipset());
pub fn load_required_tipset_with_fallback(
&self,
tsk_opt: &Option<TipsetKey>,
) -> Result<Arc<Tipset>, Error> {
if let Some(tsk) = tsk_opt {
self.chain_index.load_required_tipset(tsk)
} else {
Ok(self.heaviest_tipset())
}
self.chain_index.load_required_tipset(tsk)
}

/// Determines if provided tipset is heavier than existing known heaviest
Expand Down
2 changes: 1 addition & 1 deletion src/chain_sync/chain_muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ where
) -> Result<FullTipset, ChainMuxerError> {
let mut blocks = Vec::new();
// Retrieve tipset from store based on passed in TipsetKey
let ts = chain_store.load_required_tipset(&tipset_keys)?;
let ts = chain_store.chain_index.load_required_tipset(&tipset_keys)?;
for header in ts.block_headers() {
// Retrieve bls and secp messages from specified BlockHeader
let (bls_msgs, secp_msgs) =
Expand Down
16 changes: 11 additions & 5 deletions src/chain_sync/tipset_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,10 @@ async fn sync_headers_in_reverse<DB: Blockstore + Sync + Send + 'static>(
break;
}
// Attempt to load the parent tipset from local store
if let Ok(tipset) = chain_store.load_required_tipset(oldest_parent.parents()) {
if let Ok(tipset) = chain_store
.chain_index
.load_required_tipset(oldest_parent.parents())
{
parent_blocks.extend(tipset.cids());
parent_tipsets.push(tipset);
continue;
Expand Down Expand Up @@ -875,8 +878,9 @@ async fn sync_headers_in_reverse<DB: Blockstore + Sync + Send + 'static>(
.chain_exchange_headers(None, oldest_tipset.parents(), FORK_LENGTH_THRESHOLD)
.await
.map_err(TipsetRangeSyncerError::NetworkTipsetQueryFailed)?;
let mut potential_common_ancestor =
chain_store.load_required_tipset(current_head.parents())?;
let mut potential_common_ancestor = chain_store
.chain_index
.load_required_tipset(current_head.parents())?;
let mut i = 0;
let mut fork_length = 1;
while i < fork_tipsets.len() {
Expand Down Expand Up @@ -911,8 +915,9 @@ async fn sync_headers_in_reverse<DB: Blockstore + Sync + Send + 'static>(
if i == (fork_tipsets.len() - 1) {
return Err(TipsetRangeSyncerError::ChainForkLengthExceedsFinalityThreshold);
}
potential_common_ancestor =
chain_store.load_required_tipset(potential_common_ancestor.parents())?;
potential_common_ancestor = chain_store
.chain_index
.load_required_tipset(potential_common_ancestor.parents())?;
}
}
}
Expand Down Expand Up @@ -1187,6 +1192,7 @@ async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;

let base_tipset = chain_store
.chain_index
.load_required_tipset(&header.parents)
// The parent tipset will always be there when calling validate_block
// as part of the sync_tipset_range flow because all of the headers in the range
Expand Down
3 changes: 2 additions & 1 deletion src/cli/subcommands/snapshot_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::blocks::TipsetKey;
use crate::chain_sync::SyncConfig;
use crate::cli_shared::snapshot::{self, TrustedVendor};
use crate::rpc_api::chain_api::ChainExportParams;
use crate::rpc_api::data_types::ApiTipsetKey;
use crate::rpc_client::ApiInfo;
use anyhow::Context as _;
use chrono::NaiveDateTime;
Expand Down Expand Up @@ -81,7 +82,7 @@ impl SnapshotCommands {
epoch,
recent_roots: depth.unwrap_or(SyncConfig::default().recent_state_roots),
output_path: temp_path.to_path_buf(),
tipset_keys: chain_head.key().clone(),
tipset_keys: ApiTipsetKey(Some(chain_head.key().clone())),
skip_checksum,
dry_run,
};
Expand Down
1 change: 1 addition & 0 deletions src/fil_cns/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub(in crate::fil_cns) async fn validate_block<DB: Blockstore + Sync + Send + 's
block_sanity_checks(header).map_err(to_errs)?;

let base_tipset = chain_store
.chain_index
.load_required_tipset(&header.parents)
.map_err(to_errs)?;

Expand Down
5 changes: 4 additions & 1 deletion src/libp2p/chain_exchange/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ where
}

let inner = move || {
let root = match cs.load_tipset(&TipsetKey::from_iter(request.start.clone()))? {
let root = match cs
.chain_index
.load_tipset(&TipsetKey::from_iter(request.start.clone()))?
{
Some(tipset) => tipset,
None => {
return Ok(ChainExchangeResponse {
Expand Down
6 changes: 5 additions & 1 deletion src/message_pool/msgpool/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ where
}

fn load_tipset(&self, tsk: &TipsetKey) -> Result<Arc<Tipset>, Error> {
Ok(self.sm.chain_store().load_required_tipset(tsk)?)
Ok(self
.sm
.chain_store()
.chain_index
.load_required_tipset(tsk)?)
}

fn chain_compute_base_fee(&self, ts: &Tipset) -> Result<TokenAmount, Error> {
Expand Down
24 changes: 11 additions & 13 deletions src/rpc/chain_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ use crate::chain::index::ResolveNullTipset;
use crate::cid_collections::CidHashSet;
use crate::lotus_json::LotusJson;
use crate::message::ChainMessage;
use crate::rpc_api::data_types::{ApiMessage, ApiReceipt};
use crate::rpc_api::{
chain_api::*,
data_types::{BlockMessages, RPCState},
};
use crate::rpc_api::{chain_api::*, data_types::*};
use crate::shim::clock::ChainEpoch;
use crate::shim::message::Message;
use crate::utils::io::VoidAsyncWriter;
Expand Down Expand Up @@ -134,7 +130,7 @@ pub async fn chain_export<DB>(
epoch,
recent_roots,
output_path,
tipset_keys: tsk,
tipset_keys: ApiTipsetKey(tsk),
skip_checksum,
dry_run,
}): Params<ChainExportParams>,
Expand All @@ -159,7 +155,7 @@ where
))?;
}

let head = data.chain_store.load_required_tipset(&tsk)?;
let head = data.chain_store.load_required_tipset_with_fallback(&tsk)?;
let start_ts =
data.chain_store
.chain_index
Expand Down Expand Up @@ -243,12 +239,12 @@ pub async fn chain_get_block_messages<DB: Blockstore>(

pub async fn chain_get_tipset_by_height<DB: Blockstore>(
data: Data<RPCState<DB>>,
Params(LotusJson((height, tsk))): Params<LotusJson<(ChainEpoch, TipsetKey)>>,
Params(LotusJson((height, ApiTipsetKey(tsk)))): Params<LotusJson<(ChainEpoch, ApiTipsetKey)>>,
) -> Result<LotusJson<Tipset>, JsonRpcError> {
let ts = data
.state_manager
.chain_store()
.load_required_tipset(&tsk)?;
.load_required_tipset_with_fallback(&tsk)?;
let tss = data
.state_manager
.chain_store()
Expand Down Expand Up @@ -285,25 +281,25 @@ pub async fn chain_get_block<DB: Blockstore>(

pub async fn chain_get_tipset<DB: Blockstore>(
data: Data<RPCState<DB>>,
Params(LotusJson((tsk,))): Params<LotusJson<(TipsetKey,)>>,
Params(LotusJson((ApiTipsetKey(tsk),))): Params<LotusJson<(ApiTipsetKey,)>>,
) -> Result<LotusJson<Tipset>, JsonRpcError> {
let ts = data
.state_manager
.chain_store()
.load_required_tipset(&tsk)?;
.load_required_tipset_with_fallback(&tsk)?;
Ok((*ts).clone().into())
}

// This is basically a port of the reference implementation at
// https://github.com/filecoin-project/lotus/blob/v1.23.0/node/impl/full/chain.go#L321
pub async fn chain_set_head<DB: Blockstore>(
data: Data<RPCState<DB>>,
Params(LotusJson((tsk,))): Params<LotusJson<(TipsetKey,)>>,
Params(LotusJson((ApiTipsetKey(tsk),))): Params<LotusJson<(ApiTipsetKey,)>>,
) -> Result<(), JsonRpcError> {
let new_head = data
.state_manager
.chain_store()
.load_required_tipset(&tsk)?;
.load_required_tipset_with_fallback(&tsk)?;
let mut current = data.state_manager.chain_store().heaviest_tipset();
while current.epoch() >= new_head.epoch() {
for cid in current.key().cids.clone() {
Expand All @@ -315,6 +311,7 @@ pub async fn chain_set_head<DB: Blockstore>(
current = data
.state_manager
.chain_store()
.chain_index
.load_required_tipset(parents)?;
}
data.state_manager
Expand All @@ -335,6 +332,7 @@ pub(crate) async fn chain_get_min_base_fee<DB: Blockstore>(
current = data
.state_manager
.chain_store()
.chain_index
.load_required_tipset(parents)?;

min_base_fee = min_base_fee.min(current.block_headers().first().parent_base_fee.to_owned());
Expand Down
15 changes: 7 additions & 8 deletions src/rpc/eth_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,15 @@ pub async fn eth_block_number<DB: Blockstore>(
}
// First non-null parent.
let effective_parent = heaviest.parents();
let parent = data
if let Ok(Some(parent)) = data
.state_manager
.chain_store()
.load_tipset(effective_parent);
match parent {
Ok(parent) => match parent {
Some(parent) => Ok(format!("0x{:x}", parent.epoch())),
None => Ok("0x0".to_string()),
},
Err(_) => Ok("0x0".to_string()),
.chain_index
.load_tipset(effective_parent)
{
Ok(format!("0x{:x}", parent.epoch()))
} else {
Ok("0x0".to_string())
}
}

Expand Down
1 change: 1 addition & 0 deletions src/rpc/gas_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub async fn estimate_gas_premium<DB: Blockstore>(
let pts = data
.state_manager
.chain_store()
.chain_index
.load_required_tipset(ts.parents())?;
blocks += pts.block_headers().len();
let msgs = crate::chain::messages_for_tipset(data.state_manager.blockstore_owned(), &pts)?;
Expand Down
9 changes: 4 additions & 5 deletions src/rpc/mpool_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

use std::convert::TryFrom;

use crate::blocks::TipsetKey;
use crate::lotus_json::LotusJson;
use crate::message::SignedMessage;
use crate::rpc_api::data_types::{MessageSendSpec, RPCState};
use crate::rpc_api::data_types::*;
use crate::shim::{
address::{Address, Protocol},
message::Message,
Expand All @@ -34,16 +33,15 @@ where
/// Return `Vec` of pending messages in `mpool`
pub async fn mpool_pending<DB>(
data: Data<RPCState<DB>>,
Params(LotusJson((cid_vec,))): Params<LotusJson<(Vec<Cid>,)>>,
Params(LotusJson((ApiTipsetKey(tsk),))): Params<LotusJson<(ApiTipsetKey,)>>,
) -> Result<LotusJson<Vec<SignedMessage>>, JsonRpcError>
where
DB: Blockstore + Send + Sync + 'static,
{
let tsk = TipsetKey::from_iter(cid_vec);
let mut ts = data
.state_manager
.chain_store()
.load_required_tipset(&tsk)?;
.load_required_tipset_with_fallback(&tsk)?;

let (mut pending, mpts) = data.mpool.pending()?;

Expand Down Expand Up @@ -94,6 +92,7 @@ where
ts = data
.state_manager
.chain_store()
.chain_index
.load_required_tipset(ts.parents())?;
}
Ok(pending.into_iter().collect::<Vec<_>>().into())
Expand Down
4 changes: 2 additions & 2 deletions src/rpc/node_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ pub async fn node_status<DB: Blockstore>(
for _ in 0..100 {
block_count += ts.block_headers().len();
let tsk = ts.parents();
ts = data.chain_store.load_required_tipset(tsk)?;
ts = data.chain_store.chain_index.load_required_tipset(tsk)?;
}

node_status.chain_status.blocks_per_tipset_last_100 = block_count as f64 / 100.;

for _ in 100..chain_finality {
block_count += ts.block_headers().len();
let tsk = ts.parents();
ts = data.chain_store.load_required_tipset(tsk)?;
ts = data.chain_store.chain_index.load_required_tipset(tsk)?;
}

node_status.chain_status.blocks_per_tipset_last_finality =
Expand Down
Loading

0 comments on commit 65cf790

Please sign in to comment.