Skip to content

Commit

Permalink
start refactoring transmission action handler
Browse files Browse the repository at this point in the history
  • Loading branch information
micielski committed Jul 5, 2024
1 parent eb953e5 commit 89186eb
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 108 deletions.
17 changes: 6 additions & 11 deletions rm-main/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@ use crate::{
};

use anyhow::{Error, Result};
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
Mutex,
};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use transmission_rpc::{types::SessionGet, TransClient};

#[derive(Clone)]
pub struct Ctx {
pub client: Arc<Mutex<TransClient>>,
pub config: Arc<Config>,
pub session_info: Arc<SessionGet>,
action_tx: UnboundedSender<Action>,
Expand All @@ -28,17 +24,16 @@ pub struct Ctx {

impl Ctx {
async fn new(
client: Arc<Mutex<TransClient>>,
client: &mut TransClient,
config: Config,
action_tx: UnboundedSender<Action>,
trans_tx: UnboundedSender<TorrentAction>,
) -> Result<Self> {
let response = client.lock().await.session_get().await;
let response = client.session_get().await;
match response {
Ok(res) => {
let session_info = Arc::new(res.arguments);
return Ok(Self {
client,
config: Arc::new(config),
action_tx,
trans_tx,
Expand Down Expand Up @@ -76,12 +71,12 @@ impl App {
pub async fn new(config: Config) -> Result<Self> {
let (action_tx, action_rx) = mpsc::unbounded_channel();

let client = Arc::new(Mutex::new(transmission::utils::client_from_config(&config)));
let mut client = transmission::utils::client_from_config(&config);

let (trans_tx, trans_rx) = mpsc::unbounded_channel();
let ctx = Ctx::new(client, config, action_tx, trans_tx).await?;
let ctx = Ctx::new(&mut client, config, action_tx.clone(), trans_tx).await?;

tokio::spawn(transmission::action_handler(ctx.clone(), trans_rx));
tokio::spawn(transmission::action_handler(client, trans_rx, action_tx));
Ok(Self {
should_quit: false,
main_window: MainWindow::new(ctx.clone()),
Expand Down
116 changes: 65 additions & 51 deletions rm-main/src/transmission/action.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::sync::{Arc, Mutex};

use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
use transmission_rpc::types::{
Id, SessionGet, Torrent, TorrentAction as RPCAction, TorrentAddArgs, TorrentSetArgs,
FreeSpace, Id, SessionGet, SessionStats, Torrent, TorrentAction as RPCAction, TorrentAddArgs,
TorrentGetField, TorrentSetArgs,
};
use transmission_rpc::TransClient;

use crate::app;
use rm_shared::action::Action;
use rm_shared::action::ErrorMessage;

#[derive(Debug)]
pub enum TorrentAction {
// Magnet/URL, Directory
Add(String, Option<String>),
Expand All @@ -22,10 +23,17 @@ pub enum TorrentAction {
SetArgs(Box<TorrentSetArgs>, Option<Vec<Id>>),
// Torrent ID, Directory to move to
Move(Vec<Id>, String),
GetSessionStats(oneshot::Sender<SessionStats>),
GetFreeSpace(String, oneshot::Sender<FreeSpace>),
GetTorrents(Vec<TorrentGetField>, oneshot::Sender<Vec<Torrent>>),
GetTorrentsById(Vec<Id>, oneshot::Sender<Vec<Torrent>>),
}

// TODO: make all the options use the same type of interface. Probably use a sender everywhere
pub async fn action_handler(ctx: app::Ctx, mut trans_rx: UnboundedReceiver<TorrentAction>) {
pub async fn action_handler(
mut client: TransClient,
mut trans_rx: UnboundedReceiver<TorrentAction>,
action_tx: UnboundedSender<Action>,
) {
while let Some(action) = trans_rx.recv().await {
match action {
TorrentAction::Add(ref url, directory) => {
Expand All @@ -41,9 +49,9 @@ pub async fn action_handler(ctx: app::Ctx, mut trans_rx: UnboundedReceiver<Torre
download_dir: directory,
..Default::default()
};
match ctx.client.lock().await.torrent_add(args).await {
match client.torrent_add(args).await {
Ok(_) => {
ctx.send_action(Action::TaskSuccess);
action_tx.send(Action::TaskSuccess).unwrap();
}
Err(e) => {
let error_title = "Failed to add a torrent";
Expand All @@ -55,49 +63,34 @@ pub async fn action_handler(ctx: app::Ctx, mut trans_rx: UnboundedReceiver<Torre
title: error_title.to_string(),
message: msg,
};
ctx.send_action(Action::Error(Box::new(error_message)));
action_tx
.send(Action::Error(Box::new(error_message)))
.unwrap();
}
}
}
TorrentAction::Stop(ids) => {
ctx.client
.lock()
.await
client
.torrent_action(RPCAction::Stop, ids.clone())
.await
.unwrap();
}
TorrentAction::Start(ids) => {
ctx.client
.lock()
.await
client
.torrent_action(RPCAction::Start, ids.clone())
.await
.unwrap();
}
TorrentAction::DeleteWithFiles(ids) => {
ctx.client
.lock()
.await
.torrent_remove(ids, true)
.await
.unwrap();
ctx.send_action(Action::TaskSuccess)
client.torrent_remove(ids, true).await.unwrap();
action_tx.send(Action::TaskSuccess).unwrap();
}
TorrentAction::DeleteWithoutFiles(ids) => {
ctx.client
.lock()
.await
.torrent_remove(ids, false)
.await
.unwrap();
ctx.send_action(Action::TaskSuccess)
client.torrent_remove(ids, false).await.unwrap();
action_tx.send(Action::TaskSuccess).unwrap();
}
TorrentAction::GetTorrentInfo(id, torrent_info) => {
let new_torrent_info = ctx
.client
.lock()
.await
let new_torrent_info = client
.torrent_get(None, Some(vec![id]))
.await
.unwrap()
Expand All @@ -108,29 +101,14 @@ pub async fn action_handler(ctx: app::Ctx, mut trans_rx: UnboundedReceiver<Torre
*torrent_info.lock().unwrap() = Some(new_torrent_info);
}
TorrentAction::SetArgs(args, ids) => {
ctx.client
.lock()
.await
.torrent_set(*args, ids)
.await
.unwrap();
client.torrent_set(*args, ids).await.unwrap();
}
TorrentAction::GetSessionGet(sender) => {
let session_get = ctx
.client
.lock()
.await
.session_get()
.await
.unwrap()
.arguments;
let session_get = client.session_get().await.unwrap().arguments;
sender.send(session_get).unwrap();
}
TorrentAction::Move(ids, new_directory) => {
if let Err(e) = ctx
.client
.lock()
.await
if let Err(e) = client
.torrent_set_location(ids, new_directory.clone(), Option::from(true))
.await
{
Expand All @@ -143,9 +121,45 @@ pub async fn action_handler(ctx: app::Ctx, mut trans_rx: UnboundedReceiver<Torre
title: error_title.to_string(),
message: msg,
};
ctx.send_action(Action::Error(Box::new(error_message)));
action_tx
.send(Action::Error(Box::new(error_message)))
.unwrap();
}
}
TorrentAction::GetSessionStats(sender) => {
sender
.send(client.session_stats().await.unwrap().arguments)
.unwrap();
}
TorrentAction::GetFreeSpace(path, sender) => {
sender
.send(client.free_space(path).await.unwrap().arguments)
.unwrap();
}
TorrentAction::GetTorrents(fields, sender) => {
sender
.send(
client
.torrent_get(Some(fields), None)
.await
.unwrap()
.arguments
.torrents,
)
.unwrap();
}
TorrentAction::GetTorrentsById(ids, sender) => {
sender
.send(
client
.torrent_get(None, Some(ids))
.await
.unwrap()
.arguments
.torrents,
)
.unwrap();
}
}
}
}
55 changes: 20 additions & 35 deletions rm-main/src/transmission/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
time::Duration,
};

use tokio::sync::oneshot;
use transmission_rpc::types::{FreeSpace, SessionStats, TorrentGetField};

use crate::{
Expand All @@ -11,43 +12,33 @@ use crate::{
};
use rm_shared::action::Action;

use super::TorrentAction;

pub async fn stats(ctx: app::Ctx, stats: Arc<Mutex<Option<SessionStats>>>) {
loop {
let new_stats = ctx
.client
.lock()
.await
.session_stats()
.await
.unwrap()
.arguments;
let (stats_tx, stats_rx) = oneshot::channel();
ctx.send_torrent_action(TorrentAction::GetSessionStats(stats_tx));
let new_stats = stats_rx.await.unwrap();

*stats.lock().unwrap() = Some(new_stats);
ctx.send_action(Action::Render);
tokio::time::sleep(Duration::from_secs(ctx.config.connection.stats_refresh)).await;
}
}

pub async fn free_space(ctx: app::Ctx, free_space: Arc<Mutex<Option<FreeSpace>>>) {
let download_dir = ctx
.client
.lock()
.await
.session_get()
.await
.unwrap()
.arguments
.download_dir
.leak();
let (sess_tx, sess_rx) = oneshot::channel();
ctx.send_torrent_action(TorrentAction::GetSessionGet(sess_tx));
let download_dir = sess_rx.await.unwrap().download_dir.leak();

loop {
let new_free_space = ctx
.client
.lock()
.await
.free_space(download_dir.to_string())
.await
.unwrap()
.arguments;
let (space_tx, space_rx) = oneshot::channel();
ctx.send_torrent_action(TorrentAction::GetFreeSpace(
download_dir.to_string(),
space_tx,
));
let new_free_space = space_rx.await.unwrap();

*free_space.lock().unwrap() = Some(new_free_space);
ctx.send_action(Action::Render);
tokio::time::sleep(Duration::from_secs(
Expand All @@ -73,15 +64,9 @@ pub async fn torrents(ctx: app::Ctx, table_manager: Arc<Mutex<TableManager>>) {
TorrentGetField::Status,
TorrentGetField::DownloadDir,
];
let rpc_response = ctx
.client
.lock()
.await
.torrent_get(Some(fields), None)
.await
.unwrap();

let new_torrents = rpc_response.arguments.torrents;
let (torrents_tx, torrents_rx) = oneshot::channel();
ctx.send_torrent_action(TorrentAction::GetTorrents(fields, torrents_tx));
let new_torrents = torrents_rx.await.unwrap();

{
let mut table_manager_lock = table_manager.lock().unwrap();
Expand Down
19 changes: 8 additions & 11 deletions rm-main/src/ui/tabs/torrents/popups/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use ratatui::{
Block, BorderType, Clear, Paragraph,
},
};
use tokio::sync::oneshot;
use transmission_rpc::types::{Id, Torrent, TorrentSetArgs};
use tui_tree_widget::{Tree, TreeItem, TreeState};

Expand Down Expand Up @@ -39,17 +40,13 @@ async fn fetch_new_files(
ctx: app::Ctx,
) {
loop {
let new_torrent = ctx
.client
.lock()
.await
.torrent_get(None, Some(vec![torrent_id.clone()]))
.await
.unwrap()
.arguments
.torrents
.pop()
.unwrap();
let (torrent_tx, torrent_rx) = oneshot::channel();
ctx.send_torrent_action(TorrentAction::GetTorrentsById(
vec![torrent_id.clone()],
torrent_tx,
));
let new_torrent = torrent_rx.await.unwrap().pop().unwrap();

let new_tree = Node::new_from_torrent(&new_torrent);
*torrent.lock().unwrap() = Some(new_torrent);
*tree.lock().unwrap() = new_tree;
Expand Down

0 comments on commit 89186eb

Please sign in to comment.