Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Add support for multi get operation for database queries #2396

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c77045f
feat: Multi-get with boxed iterators
netrome Oct 24, 2024
e66db04
feat: Multi get on bĺueprint
netrome Oct 24, 2024
9eace87
feat: get_multi on RocksDB
netrome Oct 24, 2024
8ba0c9d
feat: Use local fuel-vm
netrome Oct 24, 2024
76b919b
feat: Implementation for structured storage
netrome Oct 24, 2024
2333dfb
feat: Use multi-get
netrome Oct 25, 2024
49994b9
feat: Messages impl
netrome Oct 25, 2024
2fb9e88
feat: Don't rely on modified StorageInspect
netrome Oct 25, 2024
b2c2632
wip: Introduce specific StorageBatchInspect trait
netrome Oct 25, 2024
df63d48
wip: Use specific trait
netrome Oct 25, 2024
4d5ca7d
feat: Use boxed iterator
netrome Oct 25, 2024
36a50bb
feat: Use multi-get when getting full block
netrome Oct 25, 2024
6275b36
feat: Use multi-get when getting coins
netrome Oct 25, 2024
e1e50fa
feat: Use multi-get when getting transactions
netrome Oct 26, 2024
f76d210
refactor: Rename multi_get -> get_batch
netrome Oct 29, 2024
4feadfd
feat: Implement multi-get for more backends
netrome Oct 29, 2024
e141f56
chore: Clean up comments and add docstrings
netrome Oct 29, 2024
8e3fd0f
chore: Add changelog entry
netrome Oct 29, 2024
39664bb
fix: Take slice instead of Vec as input to function
netrome Oct 29, 2024
52eca64
test: Assert the returned tx is in the expected place in get_full_blo…
netrome Oct 29, 2024
5b0e975
fix: Typo
netrome Oct 29, 2024
bc955fd
fix: Typo
netrome Oct 29, 2024
491c452
feat: Add metrics for RocksDB get_batch implementation
netrome Oct 29, 2024
676a5d4
fix: Clippy
netrome Oct 29, 2024
8b35460
fix: Remove stale TODO comment
netrome Oct 31, 2024
f697707
Revert "feat: Add metrics for RocksDB get_batch implementation"
netrome Oct 31, 2024
3d0a71f
refactor: Return iterator over results in old multi_get function and …
netrome Oct 31, 2024
5de80b7
perf: Only fall back to fetch off chain transactions if any result is…
netrome Oct 31, 2024
d7a30c8
fix: Whitespace
netrome Oct 31, 2024
f27bdbb
feat: Take Cow as keys in `KeyValueInspect::get_batch`
netrome Oct 31, 2024
9ea8894
Merge branch 'master' into 2344-add-support-for-multi-get-operation-i…
netrome Oct 31, 2024
2c2d222
Proposals to multi get PR (#2419)
xgreenx Oct 31, 2024
df33ef2
feat: Simplify `DatabaseCoins` port
netrome Oct 31, 2024
79e42da
Another proposals to multi get PR (#2420)
xgreenx Oct 31, 2024
cbb6efc
feat: Don't require `Send` in BoxedIter
netrome Nov 1, 2024
07f9b94
Merge branch 'master' into 2344-add-support-for-multi-get-operation-i…
netrome Nov 1, 2024
75fedc7
fix: Cargo fmt
netrome Nov 1, 2024
7ae04bc
Update crates/fuel-core/src/graphql_api/database.rs
netrome Nov 15, 2024
c3b7264
fix: Remove redundant 'static bound
netrome Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future.
- [2386](https://github.com/FuelLabs/fuel-core/pull/2386): Add a flag to define the maximum number of file descriptors that RocksDB can use. By default it's half of the OS limit.
- [2376](https://github.com/FuelLabs/fuel-core/pull/2376): Add a way to fetch transactions in P2P without specifying a peer.
- [2396](https://github.com/FuelLabs/fuel-core/pull/2396): Add support for multi get operation for database queries.

### Fixed
- [2366](https://github.com/FuelLabs/fuel-core/pull/2366): The `importer_gas_price_for_block` metric is properly collected.
Expand All @@ -29,6 +30,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Changed
- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.

#### Breaking
- [2396](https://github.com/FuelLabs/fuel-core/pull/2396): Return `StorageResult<Transaction>` in `OffChainDatabase::old_transaction`.

## [Version 0.40.0]

### Added
Expand Down
4 changes: 3 additions & 1 deletion benches/src/db_lookup_times_utils/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ fn get_block_multi_get_method(
.ok_or(anyhow!("empty raw block"))?;
let block: CompressedBlock = postcard::from_bytes(raw_block.as_slice())?;
let tx_ids = block.transactions().iter();
let raw_txs = database.multi_get(BenchDbColumn::Transactions.id(), tx_ids)?;
let raw_txs: Vec<_> = database
.multi_get(BenchDbColumn::Transactions.id(), tx_ids)
.try_collect()?;
let txs: Vec<Transaction> = raw_txs
.iter()
.flatten()
Expand Down
4 changes: 2 additions & 2 deletions crates/client/src/client/schema/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl LowerHex for TxPointer {
}
}

#[derive(cynic::Scalar, Debug, Clone)]
#[derive(cynic::Scalar, Debug, Clone, PartialEq, Eq)]
netrome marked this conversation as resolved.
Show resolved Hide resolved
pub struct HexString(pub Bytes);

impl From<HexString> for Vec<u8> {
Expand All @@ -194,7 +194,7 @@ impl Deref for HexString {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
rafal-ch marked this conversation as resolved.
Show resolved Hide resolved
pub struct Bytes(pub Vec<u8>);

impl FromStr for Bytes {
Expand Down
25 changes: 11 additions & 14 deletions crates/fuel-core/src/database/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
};
use fuel_core_storage::{
iter::{
IntoBoxedIterSend,
IterDirection,
IteratorOverTable,
},
Expand All @@ -23,6 +24,7 @@ use fuel_core_storage::{
Error as StorageError,
Result as StorageResult,
StorageAsRef,
StorageBatchInspect,
};
use fuel_core_types::{
blockchain::{
Expand All @@ -37,7 +39,6 @@ use fuel_core_types::{
fuel_types::BlockHeight,
};
use itertools::Itertools;
use std::borrow::Cow;

impl OffChainIterableKeyValueView {
pub fn get_block_height(&self, id: &BlockId) -> StorageResult<Option<BlockHeight>> {
Expand Down Expand Up @@ -65,20 +66,16 @@ impl OnChainIterableKeyValueView {
/// Retrieve the full block and all associated transactions
pub fn get_full_block(&self, height: &BlockHeight) -> StorageResult<Option<Block>> {
let db_block = self.storage::<FuelBlocks>().get(height)?;

if let Some(block) = db_block {
// fetch all the transactions
// TODO: Use multiget when it's implemented.
// https://github.com/FuelLabs/fuel-core/issues/2344
let txs = block
.transactions()
.iter()
.map(|tx_id| {
self.storage::<Transactions>()
.get(tx_id)
.and_then(|tx| tx.ok_or(not_found!(Transactions)))
.map(Cow::into_owned)
})
.try_collect()?;
let transaction_ids = block.transactions().iter().into_boxed_send();
let txs = <Self as StorageBatchInspect<Transactions>>::get_batch(
self,
transaction_ids,
)
.map(|res| res.and_then(|opt| opt.ok_or(not_found!(Transactions))))
.try_collect()?;

Ok(Some(block.into_owned().uncompress(txs)))
} else {
Ok(None)
Expand Down
72 changes: 50 additions & 22 deletions crates/fuel-core/src/graphql_api/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ use crate::fuel_core_graphql_api::{
use fuel_core_services::yield_stream::StreamYieldExt;
use fuel_core_storage::{
iter::{
BoxedIter,
BoxedIterSend,
IntoBoxedIter,
IntoBoxedIterSend,
IterDirection,
},
not_found,
tables::Transactions,
transactional::AtomicView,
Error as StorageError,
IsNotFound,
Expand Down Expand Up @@ -65,6 +64,7 @@ use fuel_core_types::{
use futures::Stream;
use std::{
borrow::Cow,
collections::BTreeMap,
sync::Arc,
};

Expand Down Expand Up @@ -141,29 +141,57 @@ impl ReadView {
pub fn transaction(&self, tx_id: &TxId) -> StorageResult<Transaction> {
let result = self.on_chain.transaction(tx_id);
if result.is_not_found() {
if let Some(tx) = self.off_chain.old_transaction(tx_id)? {
Ok(tx)
} else {
Err(not_found!(Transactions))
}
self.off_chain.old_transaction(tx_id)
} else {
result
}
}

pub async fn transactions(
pub async fn transactions(&self, tx_ids: &[TxId]) -> Vec<StorageResult<Transaction>> {
let transactions: Vec<_> = self
.on_chain
.transactions(tx_ids.iter().into_boxed())
.collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect you can avoid collecting here since later you iterate on transactions again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unforutnately I need to both iterate over the transactions and zip them with the on_chain_results - so we need to iterate twice over the tersults. We could potentially do something fancy with Itertools::tee, but I think this solution is simpler and easier on the eyes.


// Give a chance for other tasks to run.
tokio::task::yield_now().await;

if transactions.iter().any(|result| result.is_not_found()) {
let on_chain_results = tx_ids.iter().enumerate().zip(transactions).collect();

self.extend_with_off_chain_results(on_chain_results).await
} else {
transactions
}
}

async fn extend_with_off_chain_results(
&self,
tx_ids: Vec<TxId>,
on_chain_results: BTreeMap<(usize, &TxId), StorageResult<Transaction>>,
) -> Vec<StorageResult<Transaction>> {
// TODO: Use multiget when it's implemented.
// https://github.com/FuelLabs/fuel-core/issues/2344
let result = tx_ids
let off_chain_indexed_txids: Vec<_> = on_chain_results
.iter()
.map(|tx_id| self.transaction(tx_id))
.collect::<Vec<_>>();
// Give a chance to other tasks to run.
.filter_map(|(indexed_tx_id, result)| {
result.is_not_found().then_some(*indexed_tx_id)
})
.collect();

let off_chain_results = off_chain_indexed_txids.iter().copied().zip(
self.off_chain.old_transactions(
off_chain_indexed_txids
.iter()
.map(|(_, tx_id)| *tx_id)
.into_boxed(),
),
);

let mut results = on_chain_results;
results.extend(off_chain_results);

// Give a chance for other tasks to run.
tokio::task::yield_now().await;
result

results.into_values().collect()
}

pub fn block(&self, height: &BlockHeight) -> StorageResult<CompressedBlock> {
Expand All @@ -178,7 +206,7 @@ impl ReadView {
&self,
height: Option<BlockHeight>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<CompressedBlock>> {
) -> BoxedIterSend<'_, StorageResult<CompressedBlock>> {
// Chain together blocks from the off-chain db and the on-chain db
// The blocks in off-chain db, if any, are from time before regenesis

Expand All @@ -191,12 +219,12 @@ impl ReadView {
.on_chain
.blocks(Some(height), direction)
.chain(self.off_chain.old_blocks(None, direction))
.into_boxed(),
.into_boxed_send(),
(false, IterDirection::Forward) => self
.off_chain
.old_blocks(Some(height), direction)
.chain(self.on_chain.blocks(None, direction))
.into_boxed(),
.into_boxed_send(),
(false, IterDirection::Reverse) => {
self.off_chain.old_blocks(Some(height), direction)
}
Expand All @@ -207,12 +235,12 @@ impl ReadView {
.off_chain
.old_blocks(None, direction)
.chain(self.on_chain.blocks(None, direction))
.into_boxed(),
.into_boxed_send(),
IterDirection::Reverse => self
.on_chain
.blocks(None, direction)
.chain(self.off_chain.old_blocks(None, direction))
.into_boxed(),
.into_boxed_send(),
}
}
}
Expand Down
55 changes: 41 additions & 14 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
iter::{
BoxedIter,
BoxedIterSend,
IterDirection,
},
tables::{
Expand Down Expand Up @@ -30,12 +31,15 @@ use fuel_core_types::{
DaBlockHeight,
},
},
entities::relayer::{
message::{
MerkleProof,
Message,
entities::{
coins::coin::Coin,
relayer::{
message::{
MerkleProof,
Message,
},
transaction::RelayedTransactionStatus,
},
transaction::RelayedTransactionStatus,
},
fuel_tx::{
Bytes32,
Expand Down Expand Up @@ -76,21 +80,21 @@ pub trait OffChainDatabase: Send + Sync {
owner: &Address,
start_coin: Option<UtxoId>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<UtxoId>>;
) -> BoxedIterSend<'_, StorageResult<UtxoId>>;

fn owned_message_ids(
&self,
owner: &Address,
start_message_id: Option<Nonce>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<Nonce>>;
) -> BoxedIterSend<'_, StorageResult<Nonce>>;

fn owned_transactions_ids(
&self,
owner: Address,
start: Option<TxPointer>,
direction: IterDirection,
) -> BoxedIter<StorageResult<(TxPointer, TxId)>>;
) -> BoxedIterSend<StorageResult<(TxPointer, TxId)>>;

fn contract_salt(&self, contract_id: &ContractId) -> StorageResult<Salt>;

Expand All @@ -100,11 +104,16 @@ pub trait OffChainDatabase: Send + Sync {
&self,
height: Option<BlockHeight>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<CompressedBlock>>;
) -> BoxedIterSend<'_, StorageResult<CompressedBlock>>;

fn old_block_consensus(&self, height: &BlockHeight) -> StorageResult<Consensus>;

fn old_transaction(&self, id: &TxId) -> StorageResult<Option<Transaction>>;
fn old_transaction(&self, id: &TxId) -> StorageResult<Transaction>;

fn old_transactions<'a>(
&'a self,
ids: BoxedIter<'a, &'a TxId>,
) -> BoxedIter<'a, StorageResult<Transaction>>;

fn relayed_tx_status(
&self,
Expand All @@ -120,7 +129,7 @@ pub trait OnChainDatabase:
+ Sync
+ DatabaseBlocks
+ DatabaseMessages
+ StorageInspect<Coins, Error = StorageError>
+ DatabaseCoins
+ StorageRead<BlobData, Error = StorageError>
+ StorageInspect<StateTransitionBytecodeVersions, Error = StorageError>
+ StorageInspect<UploadedBytecodes, Error = StorageError>
Expand All @@ -135,14 +144,20 @@ pub trait DatabaseBlocks {
/// Get a transaction by its id.
fn transaction(&self, tx_id: &TxId) -> StorageResult<Transaction>;

/// Get a batch of transactions by their ids.
fn transactions<'a>(
&'a self,
tx_ids: BoxedIter<'a, &'a TxId>,
) -> BoxedIter<'a, StorageResult<Transaction>>;

/// Get a block by its height.
fn block(&self, height: &BlockHeight) -> StorageResult<CompressedBlock>;

fn blocks(
&self,
height: Option<BlockHeight>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<CompressedBlock>>;
) -> BoxedIterSend<'_, StorageResult<CompressedBlock>>;

fn latest_height(&self) -> StorageResult<BlockHeight>;

Expand All @@ -164,7 +179,12 @@ pub trait DatabaseMessages: StorageInspect<Messages, Error = StorageError> {
&self,
start_message_id: Option<Nonce>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<Message>>;
) -> BoxedIterSend<'_, StorageResult<Message>>;

fn message_batch<'a>(
&'a self,
ids: BoxedIter<'a, &'a Nonce>,
) -> BoxedIter<'a, StorageResult<Message>>;

fn message_exists(&self, nonce: &Nonce) -> StorageResult<bool>;
}
Expand All @@ -176,6 +196,13 @@ pub trait DatabaseRelayedTransactions {
) -> StorageResult<Option<RelayedTransactionStatus>>;
}

/// Trait that specifies all the getters required for coins
pub trait DatabaseCoins: StorageInspect<Coins, Error = StorageError> {
fn coin(&self, utxo_id: UtxoId) -> StorageResult<Coin>;

fn coins<'a>(&'a self, utxo_ids: &'a [UtxoId]) -> BoxedIter<'a, StorageResult<Coin>>;
}

/// Trait that specifies all the getters required for contract.
pub trait DatabaseContracts:
StorageInspect<ContractsRawCode, Error = StorageError>
Expand All @@ -186,7 +213,7 @@ pub trait DatabaseContracts:
contract: ContractId,
start_asset: Option<AssetId>,
direction: IterDirection,
) -> BoxedIter<StorageResult<ContractBalance>>;
) -> BoxedIterSend<StorageResult<ContractBalance>>;
}

/// Trait that specifies all the getters required for chain metadata.
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/query/balance/asset_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<'a> AssetsQuery<'a> {
})
.try_filter_map(move |chunk| async move {
let chunk = database.messages(chunk).await;
Ok::<_, StorageError>(Some(futures::stream::iter(chunk)))
Ok::<_, StorageError>(Some(chunk))
})
.try_flatten()
.filter(|result| {
Expand Down
Loading