Skip to content

Commit

Permalink
Fixes + download_folder(s)()
Browse files Browse the repository at this point in the history
  • Loading branch information
bleudev committed Jun 28, 2024
1 parent 24776f2 commit 12f6fa1
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 96 deletions.
3 changes: 1 addition & 2 deletions ufpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@
# GitHub package
__github_version__ = '0.1'
from ufpy import github
from ufpy.github.download import UGithubDownloader
from ufpy.github import download_file, download_folder, download_repo, download
from ufpy.github import UGithubDownloader, download_file, download_folder, download_repo
3 changes: 3 additions & 0 deletions ufpy/github/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Download
import ufpy.github.download
from ufpy.github.download import UGithubDownloader
from ufpy.github.download import file as download_file
from ufpy.github.download import folder as download_folder
from ufpy.github.download import repo as download_repo
119 changes: 25 additions & 94 deletions ufpy/github/download.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import io
import os
import warnings
from shutil import copy, copytree, rmtree
from tempfile import gettempdir
from typing import Iterable, TypeAlias
Expand All @@ -17,102 +16,23 @@
'UGithubDownloader',
)

def file(repo: str, file_path: str, download_path: str, branch_name: str = 'main'):
file_path = file_path.replace('\\', '/')

url = f'https://raw.githubusercontent.com/{repo}/{branch_name}/{file_path}'
r = get(url)

if not r.ok:
raise Exception("Error with getting file from GitHub. Check that repo is public and that file path is correct.")

download_path = download_path.replace('\\', '/')
path = f'{download_path}/{file_path}'

directory = '/'.join(path.split('/')[:-1])

if not os.path.exists(directory):
os.makedirs(directory)

with open(path, 'w+') as f:
f.write(r.text)
def file(repo: str, file_path: str | list[str], download_path: str, branch_name: str = 'main'):
with UGithubDownloader(repo, download_path, branch_name) as gd:
if isinstance(file_path, str):
gd.download_file(file_path)
else:
gd.download_files(file_path)

def folder(repo: str, folder_path: str | list[str], download_path: str, branch_name: str = 'main'):
if isinstance(folder_path, str):
folder_path = [folder_path]
filename = f'{download_path}/repo_temp.zip'
url = f'https://github.com/{repo}/archive/{branch_name}.zip'

with open(filename, 'wb') as f:
f.write(get(url).content)

repo_name = repo.split('/')[-1]
main_directory_name = f'{repo_name}-{branch_name}'

with ZipFile(filename) as archive:
for file in archive.namelist():
if file.startswith(main_directory_name):
archive.extract(file, download_path)

if os.path.exists(filename):
os.remove(filename)

for fpath in folder_path:
folder_dir = f'{download_path}/{main_directory_name}/{fpath}'
new_folder_dir = f'{download_path}/{fpath}'

if os.path.exists(new_folder_dir):
warnings.warn(
f"Warning ({new_folder_dir}): Currently we don't support editing recursive folders if it's exists"
"when repo directory was downloaded. Sorry, you can just delete all folders with same name as in"
"repo before you use this function instead."
)
with UGithubDownloader(repo, download_path, branch_name) as gd:
if isinstance(folder_path, str):
gd.download_folder(folder_path)
else:
copytree(folder_dir, new_folder_dir)

if os.path.exists(f'{download_path}/{main_directory_name}'):
rmtree(f'{download_path}/{main_directory_name}')
gd.download_folders(folder_path)

def repo(repo: str, download_path: str, branch_name: str = 'main'):
filename = f'{download_path}/repo_temp.zip'
url = f'https://github.com/{repo}/archive/{branch_name}.zip'

with open(filename, 'wb') as f:
f.write(get(url).content)

repo_name = repo.split('/')[-1]
main_directory_name = f'{repo_name}-{branch_name}'

with ZipFile(filename) as archive:
for file in archive.namelist():
if file.startswith(main_directory_name):
archive.extract(file, download_path)

if os.path.exists(filename):
os.remove(filename)

repo_dir = f'{download_path}/{main_directory_name}'
for file in os.listdir(repo_dir):
file_path = f'{repo_dir}/{file}'
new_file_path = f'{download_path}/{file}'
if os.path.isdir(file_path):
if os.path.exists(new_file_path):
print(
f"Warning ({new_file_path}): Currently we don't support editing recursive folders if it's exists"
"when repo directory was downloaded. Sorry, you can just delete all folders with same name as in"
"repo before you use this function instead."
)
continue
copytree(file_path, new_file_path)
elif os.path.exists(new_file_path):
with open(new_file_path, 'w') as nf:
with open(file_path, 'r') as f:
nf.write(f.read())
else:
copy(file_path, new_file_path)

if os.path.exists(f'{download_path}/{main_directory_name}'):
rmtree(f'{download_path}/{main_directory_name}')
with UGithubDownloader(repo, download_path, branch_name) as gd:
gd.download_repo()


def format_paths(*paths: str | list[str]) -> list[str] | list[list[str]] | list[str | list[str]] | str:
Expand Down Expand Up @@ -183,8 +103,19 @@ def download_files(self, file_paths: Iterable[str], download_path: str = ''):
for file_path in file_paths:
self.download_file(file_path, download_path)

def download_folder(self, folder_path: str | list[str], download_path: str):
...
def download_folder(self, folder_path: str, download_path: str = ''):
download_path = format_paths(download_path)
download_path = f'{self.__base_download_path}/{download_path}'

src, dst = f'{self.__repo_path}/{folder_path}', f'{download_path}/{folder_path}'
if os.path.exists(dst):
rmtree(dst)
copytree(src, dst)

def download_folders(self, folder_paths: Iterable[str], download_path: str = ''):
folder_paths, download_path = format_paths(list(folder_paths), download_path)
for folder_path in folder_paths:
self.download_folder(folder_path, download_path)

def download_repo(self, download_path: str = ''):
download_path = format_paths(download_path)
Expand Down

0 comments on commit 12f6fa1

Please sign in to comment.