Skip to content

Commit

Permalink
re-format some code.
Browse files Browse the repository at this point in the history
  • Loading branch information
Abeautifulsnow committed Mar 9, 2024
1 parent 29b704d commit 5f635ce
Showing 1 changed file with 61 additions and 50 deletions.
111 changes: 61 additions & 50 deletions pkgu.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import sys
import time
import traceback
import warnings
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from collections.abc import Generator
from functools import lru_cache
from sqlite3 import Connection, Cursor, OperationalError
from typing import Any, AnyStr, Callable, NewType, Optional, Sequence, Union
Expand All @@ -29,18 +31,6 @@
from colorama import Fore, Style, init
from halo import Halo
from loguru import logger

try:
from pkg_resources import Distribution
from pkg_resources import DistributionNotFound as _DistributionNotFound
from pkg_resources import VersionConflict as _VersionConflict
from pkg_resources import get_distribution, working_set
except (DeprecationWarning, ModuleNotFoundError, ImportError):
# TODO: Need to develop a backward-compatible version and use other way instead.
# https://setuptools.pypa.io/en/latest/pkg_resources.html
pass


from prettytable import PrettyTable
from pydantic import VERSION, BaseModel

Expand Down Expand Up @@ -74,7 +64,19 @@
from typing import List, Tuple


# 变量赋值
warnings.filterwarnings("ignore", category=DeprecationWarning)
try:
from pkg_resources import Distribution
from pkg_resources import DistributionNotFound as _DistributionNotFound
from pkg_resources import ResolutionError
from pkg_resources import VersionConflict as _VersionConflict
from pkg_resources import get_distribution, working_set
except (DeprecationWarning, ModuleNotFoundError, ImportError):
# TODO: Need to develop a backward-compatible version and use other way instead.
# https://setuptools.pypa.io/en/latest/pkg_resources.html
pass

# Common variable
ENV = os.environ.copy()
ENV["PYTHONUNBUFFERED"] = "1"
__version__ = importlib_metadata.version("pkgu")
Expand Down Expand Up @@ -109,13 +111,6 @@ def clear_lines(num_lines: int):
##############################################
# Error Declare #
##############################################
class ResolutionError(Exception):
"""Abstract base for dependency resolution errors"""

def __repr__(self):
return self.__class__.__name__ + repr(self.args)


class VersionConflict(ResolutionError):
"""
An already-installed version conflicts with the requested version.
Expand Down Expand Up @@ -263,6 +258,11 @@ def run_subprocess_cmd(commands: Union[str, list]) -> Tuple[str, bool]:
sys.exit(1)


def format_color_text(fore_color: str, text: str, reset: str) -> str:
"""Output the text with color."""
return fore_color + text + reset


class PackageInfoBase(BaseModel):
"""The basic package infomation."""

Expand Down Expand Up @@ -386,7 +386,7 @@ def pretty_table(self):
if len(self.model.packages) != 0:
print(pretty_output)
else:
awesome = Fore.GREEN + "✔ Awesome!" + Style.RESET_ALL
awesome = format_color_text(Fore.GREEN, "✔ Awesome!", Style.RESET_ALL)
print(f"{awesome} All of your dependencies are up-to-date.")

def _upgrade_packages(
Expand Down Expand Up @@ -680,7 +680,7 @@ def create_cache_folder(self):
folder_path = os.path.dirname(self.db_file)
os.makedirs(folder_path, exist_ok=True)

def init_db(self) -> Connection:
def init_db(self) -> Generator[Connection]:
try:
yield sqlite3.connect(self.db_file)
except (OperationalError, Exception) as e:
Expand Down Expand Up @@ -757,9 +757,9 @@ def get_result_with_cache(self, cache_key: str):
def get_result(
self,
key: str,
nocache_fn: Callable[[Union[str, list]], tuple[str, bool]],
nocache_fn: Callable[[Union[str, list]], Tuple[str, bool]],
param: Union[str, list],
) -> tuple[str]:
) -> Tuple[str]:
cache_key = self.get_cache_key(key)

if self.no_cache:
Expand All @@ -781,23 +781,27 @@ def autoremove(names, yes=False):
remove_dists(dead)


def list_dead(names: list[str]):
def list_dead(names: List[str]):
start: set[Distribution] = set()
for name in names:
try:
start.add(get_distribution(name))
except _DistributionNotFound:
print(
Fore.RED
+ "🐙 「%s is not an installed pip module, skipping 」" % name
+ Style.RESET_ALL,
format_color_text(
Fore.RED,
"🐙 「%s is not an installed pip module, skipping 」" % name,
Style.RESET_ALL,
),
file=sys.stderr,
)
except _VersionConflict:
print(
Fore.RED
+ "🐙 「%s is not the currently installed version, skipping 」" % name
+ Style.RESET_ALL,
format_color_text(
Fore.RED,
"🐙 「%s is not the currently installed version, skipping 」" % name,
Style.RESET_ALL,
),
file=sys.stderr,
)

Expand All @@ -812,7 +816,7 @@ def list_dead(names: list[str]):
return dead


def exclude_whitelist(dists: list[Distribution]):
def exclude_whitelist(dists: List[Distribution]):
return {dist for dist in dists if dist.project_name not in WHITELIST}


Expand All @@ -823,7 +827,7 @@ def show_tree(dist: Distribution, dead, padding="", visited=None, last=False):
return
visited.add(dist)

if last: # 最后一个条目
if last: # Tha last item
prefix = "└─"
child_padding = padding + " "
else:
Expand Down Expand Up @@ -871,7 +875,7 @@ def show_dist(dist: Distribution):
"{} {} {}".format(
Fore.RED + dist.project_name,
Fore.MAGENTA + dist.version,
Fore.GREEN + "(" + dist.location + ")" + Style.RESET_ALL,
format_color_text(Fore.GREEN, "(" + dist.location + ")", Style.RESET_ALL),
),
file=sys.stderr,
)
Expand All @@ -881,8 +885,8 @@ def show_freeze(dist: Distribution):
print("🐳 ", dist.as_requirement())


def remove_dists(dists: list[Distribution]):
"""删除指定的包"""
def remove_dists(dists: List[Distribution]):
"""Delete the specified package."""
if py_exe := get_python():
pip_cmd = [py_exe, "-m", "pip"]
else:
Expand All @@ -905,7 +909,6 @@ def requires(dist: Distribution, output=True):
required = []
for pkg in dist.requires():
try:
# print(f"{pkg=}....")
required.append(get_distribution(pkg))
except _VersionConflict as e:
if output:
Expand Down Expand Up @@ -941,9 +944,11 @@ def remove_package_and_dependencies(args: "argparse.Namespace"):
if dead:
print(
"🔴",
Fore.BLUE
+ " | ".join([d.project_name for d in dead])
+ Style.RESET_ALL,
format_color_text(
Fore.BLUE,
" | ".join([d.project_name for d in dead]),
Style.RESET_ALL,
),
)
else:
autoremove(args.pkg_name, yes=args.yes)
Expand Down Expand Up @@ -974,7 +979,11 @@ def print_total_time_elapsed(start_time: float, time_end: Optional[float] = None
elapsed = time.time() - start_time

print(
Fore.MAGENTA + f"Total time elapsed: {Fore.CYAN}{elapsed} s." + Style.RESET_ALL
format_color_text(
Fore.MAGENTA,
f"Total time elapsed: {Fore.CYAN}{elapsed} s.",
Style.RESET_ALL,
)
)


Expand All @@ -989,6 +998,15 @@ def parse_args():
required=True,
)

# version
parse.add_argument(
"-v",
"--version",
help="Display %(prog)s version and information",
action="version",
version=f"%(prog)s {__version__}",
)

# Update the package.
parser_update = sub_parser.add_parser("update", help="Update python package.")
parser_update.add_argument(
Expand Down Expand Up @@ -1017,13 +1035,6 @@ def parse_args():
help="Whether to use db cache. Default: %(default)s",
action="store_true",
)
parser_update.add_argument(
"-v",
"--version",
help="Display %(prog)s version and information",
action="version",
version=f"%(prog)s {__version__}",
)

# Remove the package.
parser_remove = sub_parser.add_parser(
Expand Down Expand Up @@ -1095,7 +1106,7 @@ def update_packages(args: "argparse.Namespace"):
print_py_env_with_table(python_env)

if len(wdt.model.packages) == 0:
# 打印耗时总时间
# Print the total cost time.
print_total_time_elapsed(time_s)
return
else:
Expand Down Expand Up @@ -1135,7 +1146,7 @@ def update_packages(args: "argparse.Namespace"):
wdt.statistic_result()
else:
pass
# 打印耗时总时间
# Print the total cost time.
print_total_time_elapsed(time_s, time_e)


Expand Down

0 comments on commit 5f635ce

Please sign in to comment.