Skip to content

Commit

Permalink
Merge pull request zingolabs#1589 from Oscar-Pepper/move_mempool_to_s…
Browse files Browse the repository at this point in the history
…eparate_task

Move mempool to separate task
  • Loading branch information
AloeareV authored Dec 18, 2024
2 parents 8ac0137 + 01c20a9 commit f9f3e5f
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 130 deletions.
2 changes: 1 addition & 1 deletion libtonode-tests/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn sync_mainnet_test() {
let mut lightclient = LightClient::create_from_wallet_base_async(
WalletBase::from_string(HOSPITAL_MUSEUM_SEED.to_string()),
&config,
2_715_150,
2_650_318,
true,
)
.await
Expand Down
18 changes: 7 additions & 11 deletions zingo-sync/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use zcash_client_backend::{
data_api::chain::ChainState,
proto::{
compact_formats::CompactBlock,
service::{BlockId, GetAddressUtxosReply, RawTransaction, TreeState},
service::{
compact_tx_streamer_client::CompactTxStreamerClient, BlockId, GetAddressUtxosReply,
RawTransaction, TreeState,
},
},
};
use zcash_primitives::{
Expand Down Expand Up @@ -41,8 +44,6 @@ pub enum FetchRequest {
oneshot::Sender<Vec<(BlockHeight, Transaction)>>,
(String, Range<BlockHeight>),
),
/// Get a stream of mempool transactions until a new block is mined.
MempoolStream(oneshot::Sender<tonic::Streaming<RawTransaction>>),
}

/// Gets the height of the blockchain from the server.
Expand Down Expand Up @@ -154,16 +155,11 @@ pub async fn get_transparent_address_transactions(
}

/// Gets stream of mempool transactions until the next block is mined.
///
/// Requires [`crate::client::fetch::fetch`] to be running concurrently, connected via the `fetch_request` channel.
pub async fn get_mempool_transaction_stream(
fetch_request_sender: UnboundedSender<FetchRequest>,
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
) -> Result<tonic::Streaming<RawTransaction>, ()> {
let (reply_sender, reply_receiver) = oneshot::channel();
fetch_request_sender
.send(FetchRequest::MempoolStream(reply_sender))
.unwrap();
let mempool_stream = reply_receiver.await.unwrap();
tracing::info!("Fetching mempool stream");
let mempool_stream = fetch::get_mempool_stream(client).await.unwrap();

Ok(mempool_stream)
}
11 changes: 4 additions & 7 deletions zingo-sync/src/client/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,6 @@ async fn fetch_from_server(
.unwrap();
sender.send(transactions).unwrap();
}

FetchRequest::MempoolStream(sender) => {
tracing::info!("Fetching mempool stream");
let mempool_stream = get_mempool_stream(client).await.unwrap();
sender.send(mempool_stream).unwrap();
}
}

Ok(())
Expand Down Expand Up @@ -295,7 +289,10 @@ async fn get_taddress_txs(
Ok(transactions)
}

async fn get_mempool_stream(
/// Call `GetMempoolStream` client gPRC
///
/// This is not called from the fetch request framework and is intended to be called independently.
pub(super) async fn get_mempool_stream(
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
) -> Result<tonic::Streaming<RawTransaction>, ()> {
let request = tonic::Request::new(Empty {});
Expand Down
5 changes: 4 additions & 1 deletion zingo-sync/src/scan/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ where
/// If verification is still in progress, do not create scan tasks.
/// If there is an idle worker, create a new scan task and add to worker.
/// If there are no more range available to scan, shutdown the idle workers.
pub(crate) async fn update<W>(&mut self, wallet: &mut W)
pub(crate) async fn update<W>(&mut self, wallet: &mut W, shutdown_mempool: Arc<AtomicBool>)
where
W: SyncWallet + SyncBlocks,
{
Expand Down Expand Up @@ -193,6 +193,9 @@ where
}
}
ScannerState::Shutdown => {
// shutdown mempool
shutdown_mempool.store(true, atomic::Ordering::Relaxed);

// shutdown idle workers
while let Some(worker) = self.idle_worker() {
self.shutdown_worker(worker.id)
Expand Down
Loading

0 comments on commit f9f3e5f

Please sign in to comment.