Skip to content

Commit

Permalink
Put entire core processing into a thread
Browse files Browse the repository at this point in the history
  • Loading branch information
TonyGiorgio committed May 17, 2024
1 parent c7137b6 commit 7404357
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 71 deletions.
2 changes: 1 addition & 1 deletion src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub struct CoreHandle {
}

pub fn create_handles() -> (UIHandle, CoreHandle) {
let (ui_to_core_tx, core_from_ui_rx) = mpsc::channel::<UICoreMsg>(1);
let (ui_to_core_tx, core_from_ui_rx) = mpsc::channel::<UICoreMsg>(50);

let ui_handle = UIHandle { ui_to_core_tx };

Expand Down
155 changes: 85 additions & 70 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::time::sleep;

use iced::{
futures::{channel::mpsc::Sender, SinkExt},
Expand Down Expand Up @@ -346,94 +347,108 @@ pub fn run_core() -> Subscription<Message> {
std::any::TypeId::of::<Connect>(),
100,
|mut tx: Sender<Message>| async move {
// Setup UI Handle
let (ui_handle, mut core_handle) = bridge::create_handles();
let arc_ui_handle = Arc::new(ui_handle);
tx.send(Message::UIHandlerLoaded(arc_ui_handle.clone()))
.await
.expect("should send");

let network = Network::Signet;

// Create the datadir if it doesn't exist
let path = PathBuf::from(&conf::data_dir(network));
std::fs::create_dir_all(path.clone()).expect("Could not create datadir");
log::info!("Using datadir: {path:?}");

loop {
let msg = core_handle.recv().await;
tokio::spawn(async move {
log::info!("doing tokio spawn things");
// Setup UI Handle
let (ui_handle, mut core_handle) = bridge::create_handles();
let arc_ui_handle = Arc::new(ui_handle);
tx.send(Message::UIHandlerLoaded(arc_ui_handle.clone()))
.await
.expect("should send");

match msg {
Some(UICoreMsg::Unlock(password)) => {
log::info!("Sending unlock message");
tx.send(Message::CoreMessage(CoreUIMsg::Unlocking))
.await
.expect("should send");
log::info!("Sent UI thread");

// attempting to unlock
let db_path = path.join("harbor.sqlite");
let db =
spawn_blocking(move || setup_db(db_path.to_str().unwrap(), password))
.await
.expect("Could not create join handle");
loop {
let msg = core_handle.recv().await;

if let Err(e) = db {
// probably invalid password
error!("error using password: {e}");
match msg {
Some(UICoreMsg::Unlock(password)) => {
log::info!("Sending unlock message");
tx.send(Message::CoreMessage(CoreUIMsg::Unlocking))
.await
.expect("should send");

tx.send(Message::CoreMessage(CoreUIMsg::UnlockFailed(
"Invalid Password".to_string(),
)))
.await
.expect("should send");
continue;
}
let db = db.expect("no error");

let mnemonic = get_mnemonic(db.clone()).expect("should get seed");

let stop = Arc::new(AtomicBool::new(false));

// check db for fedimints
let mut clients = HashMap::new();
let federation_ids = db
.list_federations()
.expect("should load initial fedimints");
for f in federation_ids {
let client = FedimintClient::new(
db.clone(),
FederationInviteOrId::Id(
FederationId::from_str(&f).expect("should parse federation id"),
),
&mnemonic,
network,
stop.clone(),
)
// attempting to unlock
let db_path = path.join("harbor.sqlite");
let db = spawn_blocking(move || {
setup_db(db_path.to_str().unwrap(), password)
})
.await
.expect("Could not create fedimint client");
.expect("Could not create join handle");

clients.insert(client.fedimint_client.federation_id(), client);
}
if let Err(e) = db {
// probably invalid password
error!("error using password: {e}");

let core = HarborCore {
storage: db.clone(),
tx: tx.clone(),
mnemonic,
network,
clients: Arc::new(RwLock::new(clients)),
stop,
};
tx.send(Message::CoreMessage(CoreUIMsg::UnlockFailed(
"Invalid Password".to_string(),
)))
.await
.expect("should send");
continue;
}
let db = db.expect("no error");

let mnemonic = get_mnemonic(db.clone()).expect("should get seed");

let stop = Arc::new(AtomicBool::new(false));

// check db for fedimints
let mut clients = HashMap::new();
let federation_ids = db
.list_federations()
.expect("should load initial fedimints");
for f in federation_ids {
let client = FedimintClient::new(
db.clone(),
FederationInviteOrId::Id(
FederationId::from_str(&f)
.expect("should parse federation id"),
),
&mnemonic,
network,
stop.clone(),
)
.await
.expect("Could not create fedimint client");

tx.send(Message::CoreMessage(CoreUIMsg::UnlockSuccess))
.await
.expect("should send");
clients.insert(client.fedimint_client.federation_id(), client);
}

process_core(&mut core_handle, &core).await;
}
_ => {
warn!("Ignoring unrelated message to locked core")
let core = HarborCore {
storage: db.clone(),
tx: tx.clone(),
mnemonic,
network,
clients: Arc::new(RwLock::new(clients)),
stop,
};

tx.send(Message::CoreMessage(CoreUIMsg::UnlockSuccess))
.await
.expect("should send");

process_core(&mut core_handle, &core).await;
}
_ => {
warn!("Ignoring unrelated message to locked core")
}
}
}
})
.await
.unwrap();

loop {
sleep(Duration::from_secs(5)).await;
log::trace!("Subscription thread still running");
}
},
)
Expand Down

0 comments on commit 7404357

Please sign in to comment.