Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Put entire core processing into a thread #49

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub struct CoreHandle {
}

pub fn create_handles() -> (UIHandle, CoreHandle) {
let (ui_to_core_tx, core_from_ui_rx) = mpsc::channel::<UICoreMsg>(1);
let (ui_to_core_tx, core_from_ui_rx) = mpsc::channel::<UICoreMsg>(50);

let ui_handle = UIHandle { ui_to_core_tx };

Expand Down
155 changes: 85 additions & 70 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::time::sleep;

use iced::{
futures::{channel::mpsc::Sender, SinkExt},
Expand Down Expand Up @@ -346,94 +347,108 @@ pub fn run_core() -> Subscription<Message> {
std::any::TypeId::of::<Connect>(),
100,
|mut tx: Sender<Message>| async move {
// Setup UI Handle
let (ui_handle, mut core_handle) = bridge::create_handles();
let arc_ui_handle = Arc::new(ui_handle);
tx.send(Message::UIHandlerLoaded(arc_ui_handle.clone()))
.await
.expect("should send");

let network = Network::Signet;

// Create the datadir if it doesn't exist
let path = PathBuf::from(&conf::data_dir(network));
std::fs::create_dir_all(path.clone()).expect("Could not create datadir");
log::info!("Using datadir: {path:?}");

loop {
let msg = core_handle.recv().await;
tokio::spawn(async move {
log::info!("doing tokio spawn things");
// Setup UI Handle
let (ui_handle, mut core_handle) = bridge::create_handles();
let arc_ui_handle = Arc::new(ui_handle);
tx.send(Message::UIHandlerLoaded(arc_ui_handle.clone()))
.await
.expect("should send");

match msg {
Some(UICoreMsg::Unlock(password)) => {
log::info!("Sending unlock message");
tx.send(Message::CoreMessage(CoreUIMsg::Unlocking))
.await
.expect("should send");
log::info!("Sent UI thread");

// attempting to unlock
let db_path = path.join("harbor.sqlite");
let db =
spawn_blocking(move || setup_db(db_path.to_str().unwrap(), password))
.await
.expect("Could not create join handle");
loop {
let msg = core_handle.recv().await;

if let Err(e) = db {
// probably invalid password
error!("error using password: {e}");
match msg {
Some(UICoreMsg::Unlock(password)) => {
log::info!("Sending unlock message");
tx.send(Message::CoreMessage(CoreUIMsg::Unlocking))
.await
.expect("should send");

tx.send(Message::CoreMessage(CoreUIMsg::UnlockFailed(
"Invalid Password".to_string(),
)))
.await
.expect("should send");
continue;
}
let db = db.expect("no error");

let mnemonic = get_mnemonic(db.clone()).expect("should get seed");

let stop = Arc::new(AtomicBool::new(false));

// check db for fedimints
let mut clients = HashMap::new();
let federation_ids = db
.list_federations()
.expect("should load initial fedimints");
for f in federation_ids {
let client = FedimintClient::new(
db.clone(),
FederationInviteOrId::Id(
FederationId::from_str(&f).expect("should parse federation id"),
),
&mnemonic,
network,
stop.clone(),
)
// attempting to unlock
let db_path = path.join("harbor.sqlite");
let db = spawn_blocking(move || {
setup_db(db_path.to_str().unwrap(), password)
})
.await
.expect("Could not create fedimint client");
.expect("Could not create join handle");

clients.insert(client.fedimint_client.federation_id(), client);
}
if let Err(e) = db {
// probably invalid password
error!("error using password: {e}");

let core = HarborCore {
storage: db.clone(),
tx: tx.clone(),
mnemonic,
network,
clients: Arc::new(RwLock::new(clients)),
stop,
};
tx.send(Message::CoreMessage(CoreUIMsg::UnlockFailed(
"Invalid Password".to_string(),
)))
.await
.expect("should send");
continue;
}
let db = db.expect("no error");

let mnemonic = get_mnemonic(db.clone()).expect("should get seed");

let stop = Arc::new(AtomicBool::new(false));

// check db for fedimints
let mut clients = HashMap::new();
let federation_ids = db
.list_federations()
.expect("should load initial fedimints");
for f in federation_ids {
let client = FedimintClient::new(
db.clone(),
FederationInviteOrId::Id(
FederationId::from_str(&f)
.expect("should parse federation id"),
),
&mnemonic,
network,
stop.clone(),
)
.await
.expect("Could not create fedimint client");

tx.send(Message::CoreMessage(CoreUIMsg::UnlockSuccess))
.await
.expect("should send");
clients.insert(client.fedimint_client.federation_id(), client);
}

process_core(&mut core_handle, &core).await;
}
_ => {
warn!("Ignoring unrelated message to locked core")
let core = HarborCore {
storage: db.clone(),
tx: tx.clone(),
mnemonic,
network,
clients: Arc::new(RwLock::new(clients)),
stop,
};

tx.send(Message::CoreMessage(CoreUIMsg::UnlockSuccess))
.await
.expect("should send");

process_core(&mut core_handle, &core).await;
}
_ => {
warn!("Ignoring unrelated message to locked core")
}
}
}
})
.await
.unwrap();

loop {
sleep(Duration::from_secs(5)).await;
log::trace!("Subscription thread still running");
}
},
)
Expand Down