Skip to content

Commit

Permalink
Use rclone --stats command for all transfer operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Johannes11833 committed Nov 22, 2024
1 parent 1525ab9 commit f9d87e9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 84 deletions.
3 changes: 2 additions & 1 deletion rclone_python/rclone.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,13 +650,14 @@ def _rclone_transfer_operation(
# add global rclone flags
if ignore_existing:
command += " --ignore-existing"
command += " --progress"

# in path
command += f' "{in_path}"'
# out path
command += f' "{out_path}"'

command += " --stats 0.1s --stats-unit bytes --use-json-log -v"

# optional named arguments/flags
command += utils.args2string(args)

Expand Down
134 changes: 51 additions & 83 deletions rclone_python/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import re
import json
import subprocess
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from rich.progress import Progress, TaskID, Task
Expand Down Expand Up @@ -56,31 +56,6 @@ def shorten_filepath(in_path: str, max_length: int) -> str:
return in_path


def convert2bits(value: float, unit: str) -> float:
"""Returns the corresponding bit value to a value with a certain binary prefix (based on powers of 2) like KiB or MiB.
Args:
value (float): Bit value using a certain binary prefix like KiB or MiB.
unit (str): The binary prefix.
Returns:
float: The corresponding bit value.
"""
exp = {
"B": 0,
"KiB": 1,
"MiB": 2,
"GiB": 3,
"TiB": 4,
"PiB": 5,
"EiB": 6,
"ZiB": 7,
"YiB": 8,
}

return value * 1024 ** exp[unit]


# ---------------------------------------------------------------------------- #
# Progressbar related functions #
# ---------------------------------------------------------------------------- #
Expand Down Expand Up @@ -108,10 +83,12 @@ def rclone_progress(
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=stderr, shell=True
)
for line in iter(process.stdout.readline, b""):
var = line.decode()

valid, update_dict = extract_rclone_progress(buffer)
# rclone prints stats to stderr. each line is one update
for line in iter(process.stderr.readline, b""):
line = line.decode()

valid, update_dict = extract_rclone_progress(line)

if valid:
if show_progress:
Expand All @@ -124,12 +101,6 @@ def rclone_progress(
if debug:
pbar.log(buffer)

# reset the buffer
buffer = ""
else:
# buffer until we
buffer += var

if show_progress:
complete_task(total_progress_id, pbar)
for _, task_id in subprocesses.items():
Expand All @@ -140,48 +111,40 @@ def rclone_progress(
return process


def extract_rclone_progress(buffer: str) -> Tuple[bool, Union[Dict[str, Any], None]]:
# matcher that checks if the progress update block is completely buffered yet (defines start and stop)
# it gets the sent bits, total bits, progress, transfer-speed and eta
reg_transferred = re.findall(
r"Transferred:\s+(\d+.\d+ \w+) \/ (\d+.\d+ \w+), (\d{1,3})%, (\d+.\d+ \w+\/\w+), ETA (\S+)",
buffer,
)
def extract_rclone_progress(line: str) -> Tuple[bool, Union[Dict[str, Any], None]]:
try:
stats: Dict = json.loads(line).get("stats", None)
except ValueError:
stats = None

if reg_transferred: # transferred block is completely buffered
# wait until we know the total file size --> bytes > 0
if stats is not None and stats.get("bytes", 0) > 0:
# get the progress of the individual files
# matcher gets the currently transferring files and their individual progress
# returns list of tuples: (name, progress, file_size, unit)
prog_transferring = []
prog_regex = re.findall(
r"\* +(.+):[ ]+(\d{1,3})% \/(\d+.\d+)([a-zA-Z]+),", buffer
)
for item in prog_regex:
prog_transferring.append(
(
item[0],
int(item[1]),
float(item[2]),
# the suffix B of the unit is missing for subprocesses
item[3] + "B",
)
tasks = []
for t in stats.get("transferring", []):
tasks.append(
{
"name": t["name"],
"total": t["size"],
"sent": t["bytes"],
"progress": t["percentage"],
}
)

out = {"prog_transferring": prog_transferring}
sent_bits, total_bits, progress, transfer_speed_str, eta = reg_transferred[0]
out["progress"] = float(progress.strip())
out["total_bits"] = float(re.findall(r"\d+.\d+", total_bits)[0])
out["sent_bits"] = float(re.findall(r"\d+.\d+", sent_bits)[0])
out["unit_sent"] = re.findall(r"[a-zA-Z]+", sent_bits)[0]
out["unit_total"] = re.findall(r"[a-zA-Z]+", total_bits)[0]
out["transfer_speed"] = float(re.findall(r"\d+.\d+", transfer_speed_str)[0])
out["transfer_speed_unit"] = re.findall(
r"[a-zA-Z]+/[a-zA-Z]+", transfer_speed_str
)[0]
out["eta"] = eta
out = {
"tasks": tasks,
"total": stats["totalBytes"],
"sent": stats["bytes"],
"progress": (
stats["bytes"] / stats["totalBytes"]
if stats["bytes"] is not None
else 0
),
"transfer_speed": stats["speed"],
"rclone_output": stats,
}

return True, out

else:
return False, None

Expand Down Expand Up @@ -251,33 +214,38 @@ def update_tasks(

pbar.update(
total_progress,
completed=convert2bits(update_dict["sent_bits"], update_dict["unit_sent"]),
total=convert2bits(update_dict["total_bits"], update_dict["unit_total"]),
completed=update_dict["sent"],
total=update_dict["total"],
)

sp_names = set()
for sp_file_name, sp_progress, sp_size, sp_unit in update_dict["prog_transferring"]:
task_names = set()
for task in update_dict["tasks"]:
task_id = None
sp_names.add(sp_file_name)

if sp_file_name not in subprocesses:
task_name = task["name"]
task_size = task["total"]
task_progress = task["progress"]

task_names.add(task_name)

if task_name not in subprocesses:
task_id = pbar.add_task(" ", visible=False)
subprocesses[sp_file_name] = task_id
subprocesses[task_name] = task_id
else:
task_id = subprocesses[sp_file_name]
task_id = subprocesses[task_name]

pbar.update(
task_id,
# set the description every time to reset the '├'
description=f" ├─{sp_file_name}",
completed=convert2bits(sp_size, sp_unit) * sp_progress / 100.0,
total=convert2bits(sp_size, sp_unit),
description=f" ├─{task_name}",
completed=task_size * task_progress / 100.0,
total=task_size,
# hide subprocesses if we only upload a single file
visible=len(subprocesses) > 1,
)

# make all processes invisible that are no longer provided by rclone (bc. their upload completed)
missing = list(sorted(subprocesses.keys() - sp_names))
missing = list(sorted(subprocesses.keys() - task_names))
for missing_sp_id in missing:
pbar.update(subprocesses[missing_sp_id], visible=False)

Expand Down

0 comments on commit f9d87e9

Please sign in to comment.