diff --git a/ufpy/__init__.py b/ufpy/__init__.py index bec85a4..57ce48c 100644 --- a/ufpy/__init__.py +++ b/ufpy/__init__.py @@ -19,5 +19,4 @@ # GitHub package __github_version__ = '0.1' from ufpy import github -from ufpy.github.download import UGithubDownloader -from ufpy.github import download_file, download_folder, download_repo, download +from ufpy.github import UGithubDownloader, download_file, download_folder, download_repo diff --git a/ufpy/github/__init__.py b/ufpy/github/__init__.py index 862cf1f..b0d148f 100644 --- a/ufpy/github/__init__.py +++ b/ufpy/github/__init__.py @@ -1,3 +1,6 @@ +# Download +import ufpy.github.download +from ufpy.github.download import UGithubDownloader from ufpy.github.download import file as download_file from ufpy.github.download import folder as download_folder from ufpy.github.download import repo as download_repo diff --git a/ufpy/github/download.py b/ufpy/github/download.py index ddd19c5..acd9543 100644 --- a/ufpy/github/download.py +++ b/ufpy/github/download.py @@ -1,6 +1,5 @@ import io import os -import warnings from shutil import copy, copytree, rmtree from tempfile import gettempdir from typing import Iterable, TypeAlias @@ -17,102 +16,23 @@ 'UGithubDownloader', ) -def file(repo: str, file_path: str, download_path: str, branch_name: str = 'main'): - file_path = file_path.replace('\\', '/') - - url = f'https://raw.githubusercontent.com/{repo}/{branch_name}/{file_path}' - r = get(url) - - if not r.ok: - raise Exception("Error with getting file from GitHub. Check that repo is public and that file path is correct.") - - download_path = download_path.replace('\\', '/') - path = f'{download_path}/{file_path}' - - directory = '/'.join(path.split('/')[:-1]) - - if not os.path.exists(directory): - os.makedirs(directory) - - with open(path, 'w+') as f: - f.write(r.text) +def file(repo: str, file_path: str | list[str], download_path: str, branch_name: str = 'main'): + with UGithubDownloader(repo, download_path, branch_name) as gd: + if isinstance(file_path, str): + gd.download_file(file_path) + else: + gd.download_files(file_path) def folder(repo: str, folder_path: str | list[str], download_path: str, branch_name: str = 'main'): - if isinstance(folder_path, str): - folder_path = [folder_path] - filename = f'{download_path}/repo_temp.zip' - url = f'https://github.com/{repo}/archive/{branch_name}.zip' - - with open(filename, 'wb') as f: - f.write(get(url).content) - - repo_name = repo.split('/')[-1] - main_directory_name = f'{repo_name}-{branch_name}' - - with ZipFile(filename) as archive: - for file in archive.namelist(): - if file.startswith(main_directory_name): - archive.extract(file, download_path) - - if os.path.exists(filename): - os.remove(filename) - - for fpath in folder_path: - folder_dir = f'{download_path}/{main_directory_name}/{fpath}' - new_folder_dir = f'{download_path}/{fpath}' - - if os.path.exists(new_folder_dir): - warnings.warn( - f"Warning ({new_folder_dir}): Currently we don't support editing recursive folders if it's exists" - "when repo directory was downloaded. Sorry, you can just delete all folders with same name as in" - "repo before you use this function instead." - ) + with UGithubDownloader(repo, download_path, branch_name) as gd: + if isinstance(folder_path, str): + gd.download_folder(folder_path) else: - copytree(folder_dir, new_folder_dir) - - if os.path.exists(f'{download_path}/{main_directory_name}'): - rmtree(f'{download_path}/{main_directory_name}') + gd.download_folders(folder_path) def repo(repo: str, download_path: str, branch_name: str = 'main'): - filename = f'{download_path}/repo_temp.zip' - url = f'https://github.com/{repo}/archive/{branch_name}.zip' - - with open(filename, 'wb') as f: - f.write(get(url).content) - - repo_name = repo.split('/')[-1] - main_directory_name = f'{repo_name}-{branch_name}' - - with ZipFile(filename) as archive: - for file in archive.namelist(): - if file.startswith(main_directory_name): - archive.extract(file, download_path) - - if os.path.exists(filename): - os.remove(filename) - - repo_dir = f'{download_path}/{main_directory_name}' - for file in os.listdir(repo_dir): - file_path = f'{repo_dir}/{file}' - new_file_path = f'{download_path}/{file}' - if os.path.isdir(file_path): - if os.path.exists(new_file_path): - print( - f"Warning ({new_file_path}): Currently we don't support editing recursive folders if it's exists" - "when repo directory was downloaded. Sorry, you can just delete all folders with same name as in" - "repo before you use this function instead." - ) - continue - copytree(file_path, new_file_path) - elif os.path.exists(new_file_path): - with open(new_file_path, 'w') as nf: - with open(file_path, 'r') as f: - nf.write(f.read()) - else: - copy(file_path, new_file_path) - - if os.path.exists(f'{download_path}/{main_directory_name}'): - rmtree(f'{download_path}/{main_directory_name}') + with UGithubDownloader(repo, download_path, branch_name) as gd: + gd.download_repo() def format_paths(*paths: str | list[str]) -> list[str] | list[list[str]] | list[str | list[str]] | str: @@ -183,8 +103,19 @@ def download_files(self, file_paths: Iterable[str], download_path: str = ''): for file_path in file_paths: self.download_file(file_path, download_path) - def download_folder(self, folder_path: str | list[str], download_path: str): - ... + def download_folder(self, folder_path: str, download_path: str = ''): + download_path = format_paths(download_path) + download_path = f'{self.__base_download_path}/{download_path}' + + src, dst = f'{self.__repo_path}/{folder_path}', f'{download_path}/{folder_path}' + if os.path.exists(dst): + rmtree(dst) + copytree(src, dst) + + def download_folders(self, folder_paths: Iterable[str], download_path: str = ''): + folder_paths, download_path = format_paths(list(folder_paths), download_path) + for folder_path in folder_paths: + self.download_folder(folder_path, download_path) def download_repo(self, download_path: str = ''): download_path = format_paths(download_path)