diff --git a/debian/control b/debian/control index 0c0336e..05a584e 100644 --- a/debian/control +++ b/debian/control @@ -4,11 +4,12 @@ Priority: optional Maintainer: Jeremy Davis Build-Depends: debhelper (>= 10), - python3-all (>= 3.5~), + python3-all (>= 3.11~), python3-debian, + python3-click, dh-python Standards-Version: 4.0.0 -X-Python-Version: >= 3.5 +X-Python-Version: >= 3.11 Package: pool Architecture: any @@ -19,6 +20,7 @@ Depends: ${python3:Depends}, turnkey-gitwrapper, python3-debian, + python3-click, verseek, Recommends: chanko, diff --git a/debian/pool.links b/debian/pool.links index 3714cc4..07d95ba 100644 --- a/debian/pool.links +++ b/debian/pool.links @@ -1,10 +1,9 @@ -/usr/bin/pool_bin /usr/bin/pool -/usr/bin/pool_bin /usr/bin/pool-exists -/usr/bin/pool_bin /usr/bin/pool-gc -/usr/bin/pool_bin /usr/bin/pool-get -/usr/bin/pool_bin /usr/bin/pool-info -/usr/bin/pool_bin /usr/bin/pool-info-build -/usr/bin/pool_bin /usr/bin/pool-init -/usr/bin/pool_bin /usr/bin/pool-list -/usr/bin/pool_bin /usr/bin/pool-register -/usr/bin/pool_bin /usr/bin/pool-unregister +/usr/bin/pool /usr/bin/pool-exists +/usr/bin/pool /usr/bin/pool-gc +/usr/bin/pool /usr/bin/pool-get +/usr/bin/pool /usr/bin/pool-info +/usr/bin/pool /usr/bin/pool-info-build +/usr/bin/pool /usr/bin/pool-init +/usr/bin/pool /usr/bin/pool-list +/usr/bin/pool /usr/bin/pool-register +/usr/bin/pool /usr/bin/pool-unregister diff --git a/pool b/pool new file mode 100755 index 0000000..dc7e866 --- /dev/null +++ b/pool @@ -0,0 +1,305 @@ +#!/usr/bin/python3 +# Copyright (c) 2019-2024 TurnKey GNU/Linux - https://www.turnkeylinux.org +# +# This file is part of Pool +# +# Pool is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published by the +# Free Software Foundation; either version 3 of the License, or (at your +# option) any later version. + +import sys +import click +from os.path import abspath, split +from typing import Optional, Callable + +from pool_lib import Pool, PoolKernel, PoolError, utils, pool_info + +exitcode = 0 +PROG = "pool" +DEBUG = False + + +@click.group() +@click.version_option() +def cli(): + """Maintain a pool of packages from source and binary stocks.""" + # process 'pool-COMMAND' symlinks to be 'pool COMMAND + + command = split(sys.argv[0])[-1] + sys.argv[0] = PROG + if command.startswith(PROG + "-"): + subcommand = command[len(PROG + "-"):] + sys.argv.insert(1, subcommand) + + +# pool exists +@cli.command() +@click.argument('package') +def exists(package: str) -> Optional[str]: + """Check if package exists in pool (Prints true/false; exit code 0/1 + respectively). + + PACKAGE Package to check for""" + istrue = False + try: + istrue = PoolKernel().exists(package) + except PoolError as e: + if not DEBUG: + utils.fatal(e) + else: + raise + if istrue: + print("true") + else: + print("false") + sys.exit(1) + + +# pool gc +@cli.command() +@click.option('-R', '--disable-recursion', + is_flag=True, + help="Disable recursive garbage collection of subpools") +def gc(disable_recursion: bool = False) -> None: + """Garbage collect stale data from the pool's caches.""" + try: + Pool().gc(not disable_recursion) + except PoolError as e: + if not DEBUG: + utils.fatal(e) + else: + raise + + +# pool get +@cli.command() +@click.option('-i', '--input', 'inputfile', + is_flag=True, + help="Read packages from file(s).") +@click.option('-s', '--strict', + is_flag=True, + help="fatal error on missing packages") +@click.option('-q', '--quiet', + is_flag=True, + help="suppress warnings about missing packages") +@click.option('-t', '--tree', + is_flag=True, + help="output dir is in a package tree format (like a" + " repository)") +@click.option('-d', '--debug', + is_flag=True, + help="leave build chroot intact after build") +@click.option('-o', '--source', + is_flag=True, + help="build source packages in addition to binary packages") +@click.argument('outputdir') +@click.argument('packages', nargs=-1) +def get(outputdir: str, + packages: Optional[list[str] | Pool.PackageList] = None, + inputfile: Optional[str] = None, + strict: bool = False, + quiet: bool = False, + tree: bool = False, + debug: bool = False, + source: bool = False + ) -> Pool.PackageList: + """Get packages from pool. + + OUTPUTDIR /path/to/output/dir + + PACKAGES packagename(s) to build + """ + this_exitcode = exitcode + pool = Pool(debug=debug) + package_list = [] + + # XXX Needs work? (below seems wrong): + # XXX the case of inputfile: str & packages: None is not handled?! + # TODO Check code that that's legit... + if not packages and not inputfile: + # if no packages specified, get all the newest versions + packages = pool.list() + elif packages and inputfile: + # treat all "packages" as plan files + for plan_file in packages: + package_list.extend(utils.read_packages(plan_file)) + else: + # assume that it's a list of package names + package_list = packages + + try: + assert package_list is not None + packages = pool.get( + outputdir, package_list, tree_fmt=tree, + strict=strict, source=source + ) + except PoolError as e: + if not DEBUG: + utils.fatal(e) + else: + raise e + if strict and packages.missing: + this_exitcode = 1 + + if not quiet: + for package in packages.missing: + utils.warn(f"no such package ({package})") + + sys.exit(this_exitcode) + + +# pool info-build +@cli.command() +@click.argument('package') +def info_build(package): + """Prints source build log for package. + + PACKAGE Package to show build into for""" + # Implementation of info_get + raise NotImplementedError('Missing function...') + + +# pool info +@cli.command() +@click.option('--registered', 'function', + default=True, flag_value='print_registered', type=str, + help="Prints list of registered stocks and subpools (default)") +@click.option('--stocks', 'function', + flag_value='print_stocks', type=str, + help="Prints list of registered stocks") +@click.option('--subpools', 'function', type=str, + flag_value='print_subpools', + help="Prints list of registered subpools") +@click.option('--build-root', 'function', + flag_value='print_build_root', type=str, + help="Prints build-root") +@click.option('--build-logs', 'function', + flag_value='print_build_logs', type=str, + help="Prints a list of build logs for source packages") +@click.option('--pkgcache', 'function', + flag_value='print_pkgcache', type=str, + help="Prints list of cached packages") +@click.option('--stock-sources', 'function', + flag_value='print_stock_sources', type=str, + help="Prints list of package sources in registered stocks") +@click.option('--stock-binaries', 'function', + flag_value='print_stock_binaries', type=str, + help="Prints list of package binaries in registered stocks") +@click.option('-r', '--recursive', + is_flag=True, + help="Lookup pool info recursively in subpools") +def info(function: Callable, + recursive: bool = False, + pool: Optional[PoolKernel] = None + ) -> None: + """Prints pool info.""" + + try: + if pool is None: + pool = PoolKernel() + pool.drop_privileges() + except PoolError as e: + if not DEBUG: + utils.fatal(e) + else: + raise e + if recursive: + print("### POOL_DIR=" + pool.path) + + if function: + assert isinstance(function, str) + func = getattr(pool_info, function) + func(pool) + if recursive: + for subpool in pool.subpools: + print() + info(func, recursive, subpool) + + +# pool init +@cli.command() +@click.argument('buildroot') +def init(buildroot: str) -> None: + """Initialize a new pool. + + BUILDROOT /path/to/build-chroot""" + try: + Pool.init_create(abspath(buildroot)) + except PoolError as e: + if not DEBUG: + utils.fatal(e) + else: + raise + + +# pool list +@cli.command('list') +@click.option('-a', '--all-versions', + is_flag=True, + help="print all available versions of a package in" + " the pool (default: print the newest versions only)") +@click.option('-v', '--verbose', + is_flag=True, + help="show warnings for skipped package versions") +@click.option('-n', '--name-only', + is_flag=True, + help="print only the names of packages in the pool") +@click.argument('globs', nargs=-1) +def list_(globs: Optional[list] = None, + all_versions: bool = False, + name_only: bool = False, + verbose: bool = False + ) -> None: + """List packages in pool. + + GLOBS all packagenames matching glob""" + if not globs: + globs = [] + + packages = Pool().list(all_versions, *globs, verbose=verbose) + for glob in packages.missing: + utils.warn(f"{glob}: no matching packages") + + for package in packages: + if name_only: + print(Pool.parse_package_id(package)[0]) + else: + print(package) + + +# pool register +@cli.command() +@click.argument('stock') +def register(stock: str) -> None: + """Register a package stock into the pool. + + STOCK /path/to/stock[#branch]""" + print(repr(stock)) + try: + Pool().register(stock) + except PoolError as e: + if not DEBUG: + utils.fatal(e) + else: + raise + + +# pool unregister +@cli.command() +@click.argument('stock') +def unregister(stock: str) -> None: + """Unregister a package stock from the pool. + + STOCK /path/to/stock[#branch]""" + try: + Pool().unregister(stock) + except PoolError as e: + if not DEBUG: + utils.fatal(e) + else: + raise + + +if __name__ == '__main__': + cli() diff --git a/pool_bin b/pool_bin deleted file mode 100755 index 208d92f..0000000 --- a/pool_bin +++ /dev/null @@ -1,608 +0,0 @@ -#!/usr/bin/python3 -# Copyright (c) TurnKey GNU/Linux - http://www.turnkeylinux.org -# -# This file is part of Pool -# -# Pool is free software; you can redistribute it and/or modify it -# under the terms of the GNU Affero General Public License as published by the -# Free Software Foundation; either version 3 of the License, or (at your -# option) any later version. - -import os -import sys -import argparse -import re -import subprocess -from os.path import basename, dirname, isdir -from os.path import exists as path_exists - -from debian import debfile, debian_support - -import pool_lib as pool -from pool_lib import Pool, PoolKernel, PoolError - -exitcode = 0 -PROG = "pool" -DEBUG = False - - -def warn(s): - global exitcode - exitcode = 1 - print("warning: " + str(s), file=sys.stderr) - - -def fatal(msg, help=None): - print("error: " + str(msg), file=sys.stderr) - if help: - help() - sys.exit(1) - - -def exists(package): - try: - istrue = pool.Pool().exists(package) - except PoolError as e: - if not DEBUG: - fatal(e) - else: - raise - - if istrue: - print("true") - else: - print("false") - sys.exit(1) - - -def gc(disable_recursion=False): - try: - pool.Pool().gc(not disable_recursion) - except PoolError as e: - if not DEBUG: - fatal(e) - else: - raise e - - -def read_packages(in_file): - packages = [] - try: - with open(in_file, "r") as fob: - for line in fob.readlines(): - line = line.split("#")[0].strip() - if not line: - continue - packages.append(line) - return packages - except FileNotFoundError as e: - if not DEBUG: - fatal(e) - else: - raise e - - -def get( - outputdir, - packages=None, - inputfile=None, - strict=False, - quiet=False, - tree=False, - debug=False, - source=False): - - this_exitcode = exitcode - pool = Pool(debug=debug) - package_list = [] - - if not packages and not inputfile: - # if no packages specified, get all the newest versions - packages = pool.list() - elif inputfile: - # treat all "packages" as plan files - for plan_file in packages: - package_list.extend(read_packages(plan_file)) - else: - # assume that it's a list of package names - package_list = packages - - try: - packages = pool.get( - outputdir, package_list, tree_fmt=tree, - strict=strict, source=source - ) - except PoolError as e: - if not DEBUG: - fatal(e) - else: - raise e - if strict and packages.missing: - this_exitcode = 1 - - if not quiet: - for package in packages.missing: - warn("no such package (%s)" % package) - - sys.exit(this_exitcode) - - -def init(pool_path): - try: - pool.Pool.init_create(os.path.abspath(buildroot)) - except PoolError as e: - if not DEBUG: - fatal(e) - else: - raise e - - -# # info_build -def extract_source_name(path): - deb = debfile.DebFile(path) - if "Source" in deb.debcontrol(): - return deb.debcontrol()["Source"] - - return None - - -def pkgcache_list_versions(pool, name): - versions = [ - pkgcache_version - for pkgcache_name, pkgcache_version in pool.pkgcache.list() - if pkgcache_name == name - ] - - for subpool in pool.subpools: - versions += pkgcache_list_versions(subpool, name) - - return versions - - -def pkgcache_getpath_newest(pool, name): - versions = pkgcache_list_versions(pool, name) - if not versions: - return None - - versions.sort(debian_support.version_compare) - version_newest = versions[-1] - - package = pool.fmt_package_id(name, version_newest) - return pool.getpath_deb(package, build=False) - - -def binary2source(pool, package): - """translate package from binary to source""" - name, version = pool.parse_package_id(package) - if version: - path = pool.getpath_deb(package, build=False) - if not path: - return None - - source_name = extract_source_name(path) - if not source_name: - return package - - return pool.fmt_package_id(source_name, version) - - # no version, extract source from the most recent binary - path = pkgcache_getpath_newest(pool, name) - if not path: - return None - - source_name = extract_source_name(path) - if not source_name: - return name - - return source_name - - -def getpath_build_log(package): - try: - pool = Pool() - except Pool.Error as e: - if not DEBUG: - fatal(e) - else: - raise e - - path = pool.getpath_build_log(package) - if path: - return path - - # maybe package is a binary name? - # try mapping it to a source name and trying again - - source_package = binary2source(pool, package) - if source_package: - path = pool.getpath_build_log(source_package) - - if not path: - package_desc = repr(package) - if source_package: - package_desc += " (%s)" % source_package - fatal("no build log for " + package_desc) - - return path - - -# # info -def print_registered(pool): - if pool.stocks: - print("# stocks") - print_stocks(pool) - - if pool.subpools: - if pool.stocks: - print() - print("# subpools") - print_subpools(pool) - - -def print_stocks(pool): - for stock in pool.stocks: - addr = stock.link - if stock.branch: - addr += "#" + stock.branch.replace('%2F', '/') - print(addr) - - -def print_subpools(pool): - for subpool in pool.subpools: - print(subpool.path) - - -def print_build_root(pool): - print(pool.buildroot) - - -def print_pkgcache(pool): - pool.sync() - for name, version in pool.pkgcache.list(): - print(name + "=" + version) - - -def print_stock_inventory(stock_inventory): - package_width = max([len(vals[0]) for vals in stock_inventory]) - stock_name_width = max([len(vals[1]) for vals in stock_inventory]) - - for package, stock_name, relative_path in stock_inventory: - print( - "%s %s %s" - % ( - package.ljust(package_width), - stock_name.ljust(stock_name_width), - relative_path, - ) - ) - - -def print_stock_sources(pool): - pool.sync() - - stock_inventory = [] - for stock in pool.stocks: - for path, versions in stock.sources: - for version in versions: - package = basename(path) + "=" + version - relative_path = dirname(path) - stock_inventory.append((package, stock.name, relative_path)) - - if stock_inventory: - print_stock_inventory(stock_inventory) - - -def print_stock_binaries(pool): - pool.sync() - - stock_inventory = [] - for stock in pool.stocks: - for path in stock.binaries: - package = basename(path) - relative_path = dirname(path) - stock_inventory.append((package, stock.name, relative_path)) - - if stock_inventory: - print_stock_inventory(stock_inventory) - - -def print_build_logs(pool): - for log_name, log_version in pool.build_logs: - print(log_name + "=" + log_version) - - -def info(function, recursive=False, pool=None): - try: - if pool is None: - pool = PoolKernel() - pool.drop_privileges() - except PoolError as e: - if not DEBUG: - fatal(e) - else: - raise e - - if recursive: - print("### POOL_DIR=" + pool.path) - - if function: - function(pool) - if recursive: - for subpool in pool.subpools: - print() - info(function, recursive, subpool) - - -# # init -def init(buildroot): - try: - pool.Pool.init_create(os.path.abspath(buildroot)) - except PoolError as e: - if not DEBUG: - fatal(e) - else: - raise e - - -# # list -def p_list(globs=[], all_versions=False, name_only=False, verbose=False): - packages = Pool().list(all_versions, *globs, verbose=verbose) - for glob in packages.missing: - print("warning: %s: no matching packages" % glob, file=sys.stderr) - - for package in packages: - if name_only: - print(Pool.parse_package_id(package)[0]) - else: - print(package) - - -# # register -def register(stock): - print(repr(stock)) - try: - pool.Pool().register(stock) - except PoolError as e: - if not DEBUG: - fatal(e) - else: - raise e - - -# # unregister -def unregister(stock): - try: - pool.Pool().unregister(stock) - except PoolError as e: - if not DEBUG: - fatal(e) - else: - raise e - - -def main(): - # process 'pool-COMMAND' symlinks to be 'pool COMMAND - - command = os.path.split(sys.argv[0])[-1] - sys.argv[0] = PROG - if command.startswith(PROG + "-"): - subcommand = command[len(PROG + "-"):] - sys.argv.insert(1, subcommand) - - parser = argparse.ArgumentParser( - prog=PROG, - description=("Maintain a pool of packages from source and binary" - " stocks") - ) - subparsers = parser.add_subparsers(dest='cmd') - - # pool-exists - parser_exists = subparsers.add_parser( - "exists", - help=("Check if package exists in pool (Prints true/false; exit" - " code 0/1 respectively)") - ) - parser_exists.add_argument("package", help="Package to check for") - parser_exists.set_defaults(func=exists) - - # pool-gc - parser_gc = subparsers.add_parser( - "gc", - help="Garbage collect stale data from the pool's caches" - ) - parser_gc.add_argument( - "-R", - "--disable-recursion", - action="store_true", - help="Disable recursive garbage collection of subpools", - ) - parser_gc.set_defaults(func=gc) - - # pool-get - parser_get = subparsers.add_parser("get", help="Get packages from pool") - parser_get.add_argument( - "-i", - "--input", - dest="inputfile", - action="store_true", - help="Read packages from file(s)." - ) - parser_get.add_argument( - "-s", "--strict", - action="store_true", - help="fatal error on missing packages" - ) - parser_get.add_argument( - "-q", - "--quiet", - action="store_true", - help="suppress warnings about missing packages", - ) - parser_get.add_argument( - "-t", - "--tree", - action="store_true", - help="output dir is in a package tree format (like a repository)", - ) - parser_get.add_argument( - "-d", - "--debug", - action="store_true", - help="leave build chroot intact after build", - ) - parser_get.add_argument( - "-o", - "--source", - action="store_true", - help="build source packages in addition to binary packages", - ) - parser_get.add_argument("outputdir", help="Output directory") - parser_get.add_argument( - "packages", nargs="+", - default=[], - help=("Package(s) or file containing packages (optionally" - " with versions)") - ) - parser_get.set_defaults(func=get) - - # pool-info-build - parser_info_build = subparsers.add_parser( - "info-get", help="Prints source build log" " for package" - ) - parser_info_build.add_argument("package", help="Package name") - parser_info_build.set_defaults(func=get) - - # pool-info - parser_info = subparsers.add_parser("info", help="Prints pool info") - # pool-info conflicting args - parser_info_conflicts = parser_info.add_mutually_exclusive_group() - parser_info_conflicts.add_argument( - "--registered", - dest="function", - action="store_const", - const=print_registered, - default=print_registered, - help="Prints list of registered stocks and subpools (default)", - ) - parser_info_conflicts.add_argument( - "--stocks", - dest="function", - action="store_const", - const=print_stocks, - help="Prints list of registered stocks", - ) - parser_info_conflicts.add_argument( - "--subpools", - dest="function", - action="store_const", - const=print_subpools, - help="Prints list of registered subpools", - ) - parser_info_conflicts.add_argument( - "--build-root", - dest="function", - action="store_const", - const=print_build_root, - help="Prints build-root", - ) - parser_info_conflicts.add_argument( - "--build-logs", - dest="function", - action="store_const", - const=print_build_logs, - help="Prints a list of build logs for source packages", - ) - parser_info_conflicts.add_argument( - "--pkgcache", - dest="function", - action="store_const", - const=print_pkgcache, - help="Prints list of cached packages", - ) - parser_info_conflicts.add_argument( - "--stock-sources", - dest="function", - action="store_const", - const=print_stock_sources, - help="Prints list of package sources in registered stocks", - ) - parser_info_conflicts.add_argument( - "--stock-binaries", - dest="function", - action="store_const", - const=print_stock_binaries, - help="Prints list of package binaries in registered stocks", - ) - # pool-info non-conflicting arg - parser_info.add_argument( - "-r", - "--recursive", - action="store_true", - help="Lookup pool info recursively in subpools", - ) - parser_info.set_defaults(func=info) - - # pool-init - parser_init = subparsers.add_parser("init", help="Initialize a new pool") - parser_init.add_argument("buildroot", help="/path/to/build-chroot") - parser_init.set_defaults(func=init) - - # pool-list - parser_list = subparsers.add_parser("list", help="List packages in pool") - parser_list.add_argument( - "-a", - "--all-versions", - action="store_true", - help="print all available versions of a package in the pool" - " (default: print the newest versions only)", - ) - parser_list.add_argument( - "-v", - "--verbose", - action="store_true", - help="show warnings for skipped package versions", - ) - parser_list.add_argument( - "-n", - "--name-only", - action="store_true", - help="print only the names of packages in the pool", - ) - parser_list.add_argument("globs", nargs="*", help="package-glob(s)") - parser_list.set_defaults(func=p_list) - - # pool-register - parser_register = subparsers.add_parser( - "register", help="Register a package stock into the pool" - ) - parser_register.add_argument("stock", help="/path/to/stock[#branch]") - parser_register.set_defaults(func=register) - - # pool-unregister - parser_unregister = subparsers.add_parser( - "unregister", help="Unregister a package stock from the pool" - ) - parser_unregister.add_argument("stock", help="/path/to/stock[#branch]") - parser_unregister.set_defaults(func=unregister) - - args = parser.parse_args() - if args.cmd: - func = args.func - args = vars(args) - # only pass debug for pool-get - if func.__name__ != 'get' and "debug" in args: - del args["debug"] - del args["cmd"] - del args["func"] - if 'outputdir' in args.keys(): - if not path_exists(args['outputdir']): - fatal(f"{args['outputdir']} does not exist") - elif not isdir(args['outputdir']): - fatal(f"{args['outputdir']} exists, but is not a directory") - func(**args) - else: - fatal("Subcommand required.", parser.print_help) - - -if __name__ == "__main__": - main() diff --git a/pool_lib/__init__.py b/pool_lib/__init__.py index f662672..ed0c311 100644 --- a/pool_lib/__init__.py +++ b/pool_lib/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) TurnKey GNU/Linux - http://www.turnkeylinux.org +# Copyright (c) TurnKey GNU/Linux - https//www.turnkeylinux.org # # This file is part of Pool # @@ -8,7 +8,9 @@ # option) any later version. import os -from os.path import * +from os.path import (join, splitext, isfile, basename, abspath, + isdir, relpath, dirname, islink, realpath, + exists as path_exists) import re import shlex @@ -19,18 +21,18 @@ import importlib from contextlib import contextmanager from typing import ( - Optional, Dict, List, Union, Tuple, Set, Generator, Type, Iterator, - TypeVar, Callable, Iterable, no_type_check, cast -) + # Note: List reqd to avoid list type clash with list() method + Optional, Union, Generator, List, Type, Iterator, + TypeVar, Iterable, no_type_check, cast + ) import logging from debian import debfile, debian_support -from functools import cmp_to_key import errno import verseek_lib as verseek -from gitwrapper import Git +from gitwrapper import Git, GitError from .forked import forked_constructor from fnmatch import fnmatch @@ -73,7 +75,7 @@ class CircularDependency(PoolError): pass -def deb_get_packages(srcpath: AnyPath) -> List[str]: +def deb_get_packages(srcpath: AnyPath) -> list[str]: path = str_path(srcpath) controlfile = join(path, "debian/control") @@ -84,11 +86,11 @@ def deb_get_packages(srcpath: AnyPath) -> List[str]: return lines -def parse_package_filename(filename: str) -> Tuple[str, str]: +def parse_package_filename(filename: str) -> tuple[str, str]: """Parses package filename -> (name, version)""" if not splitext(filename)[1] in (".deb", ".udeb"): - raise PoolError("not a package `%s'" % filename) + raise PoolError(f"not a package `{filename}'") name, version = filename.split("_")[:2] @@ -98,7 +100,7 @@ def parse_package_filename(filename: str) -> Tuple[str, str]: def hardlink_or_copy(src: AnyPath, dst: AnyPath) -> None: src = os.fspath(src) dst = os.fspath(dst) - if exists(dst): + if path_exists(dst): os.remove(dst) try: @@ -156,8 +158,8 @@ def _unregister(self, name: str, version: str) -> None: def __init__(self, path: AnyPath): self.path = str_path(path) - self.filenames: Dict[Tuple[str, str], str] = {} - self.namerefs: Dict[str, int] = {} + self.filenames: dict[tuple[str, str], str] = {} + self.namerefs: dict[str, int] = {} for filename in self._list_binaries(): self._register(filename) @@ -182,7 +184,7 @@ def exists(self, name: str, version: Optional[str] = None) -> bool: if name in self.namerefs: return True - return exists(join(self.path, basename(name))) + return path_exists(join(self.path, basename(name))) def add(self, path: AnyPath) -> None: """Add binary to cache. Hardlink if possible, copy otherwise.""" @@ -213,7 +215,7 @@ def remove(self, name: str, version: str) -> None: os.remove(path) self._unregister(name, version) - def list(self) -> List[Tuple[str, str]]: + def list(self) -> list[tuple[str, str]]: """List packages in package cache -> list of (package, version)""" return list(self.filenames.keys()) @@ -240,11 +242,11 @@ def create(cls, path: str, link: str) -> None: logger.debug(f'ln -s {link} {path}/link') @property - def sources(self) -> List[Tuple[str, List[str]]]: + def sources(self) -> list[tuple[str, list[str]]]: ... @property - def binaries(self) -> List[str]: + def binaries(self) -> list[str]: ... def sync(self) -> None: @@ -261,17 +263,19 @@ def sync(self) -> None: path_pool: str path_root: str + branch: Optional[str] + def _get_workdir(self) -> Optional[str]: ... - def __init__(self, path: AnyPath): + def __init__(self, path: AnyPath) -> None: logger.debug(f'StockBase(path={path!r})') path_ = str_path(path) self.path_root = path_ self.link_path = join(path_, 'link') self.name = basename(path_) - if not exists(self.link_path): + if not path_exists(self.link_path): raise StockBase.StockBaseError( f"stock link {self.link_path!r} doesn't exist") @@ -286,7 +290,7 @@ class StockPool(StockBase): def __init__(self, path: AnyPath, - recursed_paths: Optional[List[str]] = None): + recursed_paths: Optional[list[str]] = None): logger.debug( f'StockPool(path={path!r}, recursed_paths={recursed_paths!r})') super().__init__(path) @@ -328,7 +332,7 @@ def __get__(self, obj: 'StockBase', type: Type['StockBase']) -> Optional[str]: path = obj.path_sync_head - if exists(path): + if path_exists(path): with open(path) as fob: fob.read().rstrip() @@ -337,7 +341,7 @@ def __get__(self, def __set__(self, obj: 'StockBase', val: Optional[str]) -> None: path = obj.path_sync_head if val is None: - if exists(path): + if path_exists(path): os.remove(path) else: with open(path, 'w') as fob: @@ -364,7 +368,7 @@ def _get_workdir(self) -> str: orig = Git(self.link) checkout_path = self.path_checkout - if not exists(checkout_path): + if not path_exists(checkout_path): mkdir(checkout_path) checkout = Git.init_create(checkout_path) checkout.set_alternates(orig) @@ -381,7 +385,7 @@ def dup_branch(branch: str) -> None: dup_branch(self.branch) checkout.checkout("-q", "-f", self.branch) - if exists(join(checkout_path, "arena.internals")): + if path_exists(join(checkout_path, "arena.internals")): dup_branch(self.branch + "-thin") command = "cd {shlex.quote(checkout_path)} && sumo-open" @@ -407,7 +411,7 @@ def dup_branch(branch: str) -> None: _workdir: Optional[str] workdir = _Workdir() - def _init_read_versions(self) -> Dict[str, List[str]]: + def _init_read_versions(self) -> dict[str, list[str]]: source_versions = {} for dpath, dnames, fnames in os.walk(self.path_index_sources): relative_path = relpath(dpath, self.path_index_sources) @@ -486,10 +490,10 @@ def _sync(self, directory: Optional[str] = None) -> None: self._sync(fpath) @property - def binaries(self) -> List[str]: + def binaries(self) -> list[str]: """List package binaries for this stock -> [ relative/path/foo.deb, ... ]""" - relative_paths: List[str] = [] + relative_paths: list[str] = [] for dpath, dnames, fnames in os.walk(self.path_index_binaries): for fname in fnames: fpath = join(dpath, fname) @@ -498,7 +502,7 @@ def binaries(self) -> List[str]: return relative_paths @property - def sources(self) -> List[Tuple[str, List[str]]]: + def sources(self) -> list[tuple[str, list[str]]]: """List package sources for this stock -> [ (relative/path/foo, versions), ... ]""" return list(self.source_versions.items()) @@ -514,7 +518,7 @@ def sync(self) -> None: # delete old cached versions for path in (self.path_index_sources, self.path_index_binaries): - if exists(path): + if path_exists(path): shutil.rmtree(path) mkdir(path) @@ -532,6 +536,8 @@ class Stocks: Iterating an instance of this class produces all non-subpool type stocks. """ + subpools: dict[str, 'PoolKernel'] = {} + def _load_stock(self, path_stock: AnyPath) -> None: logger.debug(f'loading stock from {path_stock}') stock: Optional[StockBase] = None @@ -540,9 +546,8 @@ def _load_stock(self, path_stock: AnyPath) -> None: self.subpools[stock.name] = stock.pool except CircularDependency: raise - except PoolError as e: - pass - except StockError as e: + except PoolError: + # StockError is a subclass of PoolError pass if not stock: @@ -559,8 +564,7 @@ def _load_stock(self, path_stock: AnyPath) -> None: def _load_stocks(self) -> None: logger.debug('loading stocks') - self.stocks: Dict[str, StockBase] = {} - self.subpools: Dict[str, PoolKernel] = {} + self.stocks: dict[str, StockBase] = {} for stock_name in os.listdir(self.path): path_stock = join(self.path, stock_name) @@ -574,7 +578,8 @@ def _load_stocks(self) -> None: def __init__(self, path: AnyPath, pkgcache: PackageCache, - recursed_paths: Optional[List[str]] = None): + recursed_paths: Optional[list[str]] = None + ) -> None: if recursed_paths is None: recursed_paths = [] self.path = path @@ -598,7 +603,7 @@ def __len__(self) -> int: return len(self.stocks) - len(self.subpools) @staticmethod - def _parse_stock(stock: str) -> Tuple[str, Optional[str]]: + def _parse_stock(stock: str) -> tuple[str, Optional[str]]: branch: Optional[str] try: dir, branch = stock.split("#", 1) @@ -613,14 +618,14 @@ def _parse_stock(stock: str) -> Tuple[str, Optional[str]]: def register(self, stock_ref: str) -> None: logger.debug('Stocks.register') dir, branch = self._parse_stock(stock_ref) - logger.debug(f'parsed "{stock_ref}" -> dir={dir}, branch={branch}') + logger.debug(f'parsed "{stock_ref}" -> {dir=}, {branch=}') if not isdir(dir): - raise PoolError("not a directory `%s'" % dir) + raise PoolError(f"not a directory `{dir}'") git: Optional[Git] try: git = Git(dir) - except Git.GitError: + except GitError: git = None logger.debug(f'git = {git}') @@ -628,7 +633,7 @@ def register(self, stock_ref: str) -> None: if ((not git and branch) or (git and branch and not git.show_ref(branch.replace('%2F', '/')))): - raise PoolError("no such branch `%s' at `%s'" % (branch, dir)) + raise PoolError(f"no such branch `{branch}' at `{dir}'") if git and not branch: ref_path = git.symbolic_ref("HEAD") @@ -653,7 +658,7 @@ def unregister(self, stock_ref: str) -> None: if branch: stock_name += "#" + branch - matches: List[Stock] = [] + matches: list[Stock] = [] for stock in self.stocks.values(): if realpath(stock.link) == realpath(dir): if not isinstance(stock, Stock): @@ -675,14 +680,14 @@ def unregister(self, stock_ref: str) -> None: else: # close sumo arena if it exists checkout_path = stock.path_checkout - if exists(join(checkout_path, "arena.internals")): + if path_exists(join(checkout_path, "arena.internals")): command = f"cd {shlex.quote(checkout_path)} && sumo-close" error = os.system(command) if error: raise PoolError("failed command: " + command) # remove cached binaries compiled from this stock - blacklist: Set[Tuple[str, str]] = set() + blacklist: set[tuple[str, str]] = set() for path, versions in stock.sources: name = basename(path) blacklist |= set([(name, version) for version in versions]) @@ -717,7 +722,8 @@ def get_source_path(self, name: str, version: str) -> Optional[str]: def exists_source_version( self, name: str, - version: Optional[str] = None) -> bool: + version: Optional[str] = None + ) -> bool: """Returns true if the package source exists in any of the stocks. If version is None (default), any version will match""" @@ -732,7 +738,7 @@ def exists_source_version( return False - def get_subpools(self) -> List['PoolKernel']: + def get_subpools(self) -> list['PoolKernel']: return list(self.subpools.values()) @@ -754,16 +760,16 @@ class PoolKernel: PoolError = PoolError class Subpools: - def __get__( - self, obj: 'PoolKernel', - type: Type['PoolKernel'] - ) -> List['PoolKernel']: + def __get__(self, + obj: 'PoolKernel', + type: Type['PoolKernel'] + ) -> list['PoolKernel']: return obj.stocks.get_subpools() subpools = Subpools() @staticmethod - def parse_package_id(package: str) -> Tuple[str, Optional[str]]: + def parse_package_id(package: str) -> tuple[str, Optional[str]]: """Parse package_id string := package-name[=package-version] @@ -789,12 +795,12 @@ def fmt_package_id(name: str, version: Optional[str]) -> str: return name + "=" + version - def __init__( - self, - path: Optional[AnyPath] = None, - recursed_paths: Optional[List[str]] = None, - autosync: bool = True, - debug: bool = False): + def __init__(self, + path: Optional[AnyPath] = None, + recursed_paths: Optional[list[str]] = None, + autosync: bool = True, + debug: bool = False + ) -> None: """Initialize pool instance. if is False, the user is expected to control syncing @@ -826,10 +832,13 @@ def __init__( self.full_path = spath self.path = dirname(spath) - if not exists(spath): - raise PoolError("no pool found (POOL_DIR=%s)" % self.path) - - self.buildroot = os.readlink(self.path_build_root) + if not path_exists(spath): + raise PoolError(f"no pool found (POOL_DIR={self.path})") + try: + self.buildroot = os.readlink(self.path_build_root) + except FileNotFoundError | OSError: + raise PoolError("buildroot symlink missing/incorrect:" + f" {self.path_build_root}") self.pkgcache = PackageCache(self.path_pkgcache) self.stocks = Stocks( self.path_stocks, self.pkgcache, recursed_paths + [self.path] @@ -862,9 +871,10 @@ def exists(self, package: str) -> bool: @sync def _list(self, all_versions: bool, - verbose: bool = False) -> List[Tuple[str, str]]: + verbose: bool = False + ) -> list[tuple[str, str]]: """List packages in pool -> list of (name, version) tuples.""" - packages: Set[Tuple[str, str]] = set() + packages: set[tuple[str, str]] = set() for subpool in self.subpools: packages |= set(subpool._list(all_versions)) @@ -877,7 +887,7 @@ def _list(self, all_versions: bool, if all_versions: return list(packages) - newest: Dict[str, str] = {} + newest: dict[str, str] = {} for name, version in packages: try: if name not in newest or \ @@ -894,7 +904,8 @@ def _list(self, all_versions: bool, return list(newest.items()) def list(self, all_versions: bool = False, - verbose: bool = False) -> List[str]: + verbose: bool = False + ) -> list[str]: """List packages in pool -> list of packages. If all_versions is True, returns all versions of packages, @@ -906,11 +917,11 @@ def list(self, all_versions: bool = False, return [ self.fmt_package_id(name, version) for name, version in self._list(all_versions, verbose=verbose) - ] + ] RS = TypeVar('RS', str, List[str]) - def resolve(self, unresolved: RS) -> RS: + def resolve(self, unresolved: RS) -> List[str] | RS: """Resolve a list of unresolved packages. If unresolved is a single unresolved package, return a single resolved package. @@ -918,11 +929,11 @@ def resolve(self, unresolved: RS) -> RS: If unresolved is a list of unresolved packages, return a list of resolved packages""" - args: List[str] + args: list[str] if isinstance(unresolved, str): args = [unresolved] else: - args = unresolved + args = list(unresolved) packages = dict(self._list(all_versions=False)) resolved = [] @@ -950,7 +961,8 @@ def _build_package_source( source_path: str, name: str, version: str, - source: bool = False) -> None: + source: bool = False + ) -> None: build_outputdir = tempfile.mkdtemp( dir=self.path_tmp, prefix=f"{name}-{version}." @@ -1003,11 +1015,11 @@ def _build_package_source( shutil.rmtree(build_outputdir) @sync - def getpath_deb( - self, - package: str, - build: bool = True, - source: bool = False) -> Optional[str]: + def getpath_deb(self, + package: str, + build: bool = True, + source: bool = False + ) -> Optional[str]: """Get path to package in pool if it exists or None if it doesn't. By default if package exists only in source, build and cache it first. @@ -1043,10 +1055,11 @@ def getpath_deb( return path - class BuildLogs(object): - def __get__( - self, obj: 'PoolKernel', - type: Type['PoolKernel']) -> List[Tuple[str, str]]: + class BuildLogs: + def __get__(self, + obj: 'PoolKernel', + type: Type['PoolKernel'] + ) -> list[tuple[str, str]]: arr = [] for fname in os.listdir(obj.path_build_logs): fpath = join(obj.path_build_logs, fname) @@ -1095,10 +1108,11 @@ def get_log_path(log_name: str, log_version: str) -> str: @sync def gc(self, recurse: bool = True, - verbose: bool = True) -> None: + verbose: bool = True + ) -> None: """Garbage collect stale data from the pool's caches""" - whitelist: Set[Tuple[str, str]] = set() + whitelist: set[tuple[str, str]] = set() for stock in self.stocks: for path, versions in stock.sources: name = basename(path) @@ -1154,14 +1168,15 @@ def get_treedir(pkgname: str) -> str: return join(pkgname[:1], pkgname) -class Pool(object): +class Pool: PoolError = PoolError class PackageList: def __init__(self, - sequence: Optional[Iterable[str]] = None): + sequence: Optional[Iterable[str]] = None + ) -> None: self.inner = [] if sequence is None else list(sequence) - self.missing: List[str] = [] + self.missing: list[str] = [] def __iter__(self): return iter(self.inner) @@ -1180,10 +1195,10 @@ def sort(self, key, reverse: bool = False): fmt_package_id = staticmethod(PoolKernel.fmt_package_id) @classmethod - def init_create( - cls: Type['Pool'], - buildroot: AnyPath, - path: Optional[AnyPath] = None) -> 'Pool': + def init_create(cls: Type['Pool'], + buildroot: AnyPath, + path: Optional[AnyPath] = None + ) -> 'Pool': if path is None: cwd = os.getcwd() @@ -1195,7 +1210,6 @@ def init_create( spath = join(realpath(str_path(path)), '.pool') path_pkgcache = join(spath, 'pkgcache') path_stocks = join(spath, 'stocks') - path_tmp = join(spath, 'tmp') path_build = join(spath, 'build') path_build_root = join(spath, 'build/root') path_build_logs = join(spath, 'build/logs') @@ -1206,15 +1220,17 @@ def init_create( raise PoolError("pool already initialized") if not isdir(buildroot): - raise PoolError("buildroot `%s' is not a directory" % buildroot) + raise PoolError(f"buildroot `{buildroot}' is not a directory") mkdir(path_stocks) - Git.set_gitignore(path_stocks, [ - 'index-sources', - 'index-binaries', - 'SYNC_HEAD', - 'checkout' - ]) + Git.set_gitignore( + path_stocks, + ['index-sources', + 'index-binaries', + 'SYNC_HEAD', + 'checkout' + ] + ) mkdir(path_pkgcache) Git.anchor(path_pkgcache) @@ -1243,7 +1259,8 @@ def init_create( def __init__(self, path: Optional[AnyPath] = None, - debug: bool = False): + debug: bool = False + ) -> None: kernel = PoolKernel(path, debug=debug) if kernel.drop_privileges(pretend=True): def f() -> PoolKernel: @@ -1257,8 +1274,11 @@ def f() -> PoolKernel: forked_constructor(f, print_traceback=True)()) self.kernel = kernel - def list(self, all_versions: bool = False, - *globs: str, verbose: bool = False) -> 'Pool.PackageList': + def list(self, + all_versions: bool = False, + *globs: str, + verbose: bool = False + ) -> 'Pool.PackageList': """List packages in pool (sorted) -> Pool.PackageList (list + .missing attr) @@ -1267,8 +1287,8 @@ def list(self, all_versions: bool = False, """ assert isinstance(all_versions, bool) - def filter_packages(packages: List[str], - globs: List[str]) -> 'Pool.PackageList': + def filter_packages(packages: list[str], + globs: list[str]) -> 'Pool.PackageList': filtered = Pool.PackageList() for glob in globs: matches = [] @@ -1289,11 +1309,6 @@ def filter_packages(packages: List[str], if globs: packages = filter_packages(list(packages), list(globs)) - def _cmp(a: str, b: str) -> int: - a = Pool.parse_package_id(a) - b = Pool.parse_package_id(b) - return debian_support.version_compare(a[1], b[1]) - packages.sort( key=(lambda p: debian_support.Version(Pool.parse_package_id(p)[1])), @@ -1306,12 +1321,12 @@ def register(self, stock: str) -> None: def unregister(self, stock: str) -> None: self.kernel.unregister(stock) - def get( - self, output_dir: str, - packages: List[str], - tree_fmt: bool = False, - strict: bool = False, - source: bool = False) -> 'Pool.PackageList': + def get(self, output_dir: str, + packages: List[str] | PackageList, + tree_fmt: bool = False, + strict: bool = False, + source: bool = False + ) -> PackageList: """get packages to output_dir -> resolved Pool.PackageList of packages If strict missing packages raise an exception, @@ -1343,7 +1358,8 @@ def get( try: for package in resolved: - path_from = self.kernel.getpath_deb(package, source=source) + path_from = str(self.kernel.getpath_deb(package, + source=source)) fname = basename(path_from) if tree_fmt: @@ -1354,7 +1370,7 @@ def get( else: path_to = join(output_dir, basename(path_from)) - if not exists(path_to): + if not path_exists(path_to): hardlink_or_copy(path_from, path_to) finally: self.kernel.autosync = True diff --git a/pool_lib/extras.py b/pool_lib/extras.py new file mode 100644 index 0000000..ce110d3 --- /dev/null +++ b/pool_lib/extras.py @@ -0,0 +1,112 @@ +from typing import Optional, NoReturn + +from debian import debfile, debian_support + +from . import Pool, PoolKernel, PoolError, utils + +PRE_RELEASE = ('alpha', 'beta', 'rc') + + +# binary2source dep +def extract_source_name(path: str) -> Optional[str]: + deb = debfile.DebFile(path) + if "Source" in deb.debcontrol(): + return deb.debcontrol()["Source"] + return None + + +# pkgcache_getpath_newest dep +def pkgcache_list_versions(pool: PoolKernel, name: str) -> list[str]: + versions = [ + pkgcache_version + for pkgcache_name, pkgcache_version in pool.pkgcache.list() + if pkgcache_name == name + ] + + for subpool in pool.subpools: + versions += pkgcache_list_versions(subpool, name) + + return versions + + +# pkgcache_getpath_newest dep +def has_pre_release(versions: list[str]) -> bool: + for version in versions: + for pre_release in PRE_RELEASE: + if pre_release in version.lower(): + return True + return False + + +# binary2source dep +def pkgcache_getpath_newest(pool: PoolKernel, name: str) -> Optional[str]: + versions = pkgcache_list_versions(pool, name) + if not versions: + return None + # Note this won't sort package pre-release & release versions properly + # But it is the sort algorithm used by Debian... + if has_pre_release(versions): + utils.warn(f"pre-release version(s) found for {name} - version sorting" + " may be incorrect") + versions.sort(key=debian_support.Version) + version_newest = versions[-1] + + package = pool.fmt_package_id(name, version_newest) + return pool.getpath_deb(package, build=False) + + +# getpath_build_log +def binary2source(pool: PoolKernel, package: str) -> Optional[str]: + """translate package from binary to source""" + name, version = pool.parse_package_id(package) + if version: + path = pool.getpath_deb(package, build=False) + if not path: + return None + + source_name = extract_source_name(path) + if not source_name: + return package + + return pool.fmt_package_id(source_name, version) + + # no version, extract source from the most recent binary + path = pkgcache_getpath_newest(pool, name) + if not path: + return None + + source_name = extract_source_name(path) + if not source_name: + return name + + return source_name + + +def getpath_build_log(package: str, debug: bool = False) -> str | NoReturn: + try: + pool = Pool() + except PoolError as e: + #if not DEBUG: + if not debug: + utils.fatal(e) + else: + raise + + path = pool.kernel.getpath_build_log(package) + if path: + return path + + # maybe package is a binary name? + # try mapping it to a source name and trying again + + source_package = binary2source(pool.kernel, package) + if source_package: + path = pool.kernel.getpath_build_log(source_package) + + if not path: + package_desc = repr(package) + if source_package: + package_desc += f" ({source_package})" + utils.fatal("no build log for " + package_desc) + + return path diff --git a/pool_lib/forked.py b/pool_lib/forked.py index 6e187a5..afd17ab 100644 --- a/pool_lib/forked.py +++ b/pool_lib/forked.py @@ -92,35 +92,36 @@ class Error(Exception): def forked_func(func: Callable[..., Any], - print_traceback: bool = False) -> Callable[..., Any]: + print_traceback: bool = False + ) -> Callable[..., Any]: def wrapper(*args: Any, **kws: Any) -> Any: r_fd, w_fd = os.pipe() - r_fh = os.fdopen(r_fd, "r", 0) - w_fh = os.fdopen(w_fd, "w", 0) + rb_fh = os.fdopen(r_fd, "rb", 0) + wb_fh = os.fdopen(w_fd, "wb", 0) pid = os.fork() if pid == 0: # child - r_fh.close() + rb_fh.close() try: ret = func(*args, **kws) except Exception as e: if print_traceback: traceback.print_exc(file=sys.stderr) - pickle.dump(e, w_fh) + pickle.dump(e, wb_fh) os._exit(1) - pickle.dump(ret, w_fh) + pickle.dump(ret, wb_fh) os._exit(0) # parent - w_fh.close() + wb_fh.close() pid, status = os.waitpid(pid, 0) if not os.WIFEXITED(status): raise Error("child terminated unexpectedly") - val = pickle.load(r_fh) + val = pickle.load(rb_fh) error = os.WEXITSTATUS(status) if error: raise val @@ -132,8 +133,8 @@ def wrapper(*args: Any, **kws: Any) -> Any: class Pipe: def __init__(self) -> None: r, w = os.pipe() - self.r: BinaryIO = os.fdopen(r, "r", 0) - self.w: BinaryIO = os.fdopen(w, "w", 0) + self.r: BinaryIO = os.fdopen(r, "rb", 0) + self.w: BinaryIO = os.fdopen(w, "wb", 0) class ObjProxyBase: @@ -166,7 +167,7 @@ def run(self) -> None: elif op == self.OP_SET: op_handler = self._handle_op_set else: - raise Error("illegal ObjProxy operation (%s)" % op) + raise Error(f"illegal ObjProxy operation ({op})") op_handler(params) @@ -207,7 +208,18 @@ def _handle_op_set(self, params: Tuple[str, list]) -> Any: setattr(self.obj, attrname, val) -class ObjProxyClient(ObjProxyBase, object): +def _read_result(op_method: Callable[..., None]) -> Callable[..., Any]: + def wrapper(self, *args, **kws): + op_method(self, *args, **kws) + error, val = pickle.load(self._r) + if error: + raise val + return val + + return wrapper + + +class ObjProxyClient(ObjProxyBase): """Object proxy client. Transparently handles: @@ -223,17 +235,6 @@ def __init__(self, r: BinaryIO, w: BinaryIO): self._r = r self._w = w - def _read_result(op_method: Callable[..., None]) -> Callable[..., Any]: - @no_type_check - def wrapper(self, *args, **kws): - op_method(self, *args, **kws) - error, val = pickle.load(self._r) - if error: - raise val - return val - - return wrapper - @_read_result def _op_call(self, attrname: str, args: list, kws: dict) -> None: pickle.dump((self.OP_CALL, (attrname, args, kws)), self._w) diff --git a/pool_lib/pool_info.py b/pool_lib/pool_info.py new file mode 100644 index 0000000..ba090eb --- /dev/null +++ b/pool_lib/pool_info.py @@ -0,0 +1,95 @@ +from os.path import basename, dirname + +from debian import debfile + +from . import PoolKernel + +PRE_RELEASE = ('alpha', 'beta', 'rc') + + +# pool info --registered +def print_registered(pool: PoolKernel) -> None: + if pool.stocks: + print("# stocks") + print_stocks(pool) + + if pool.subpools: + if pool.stocks: + print() + print("# subpools") + print_subpools(pool) + + +# pool info --stocks +def print_stocks(pool: PoolKernel) -> None: + for stock in pool.stocks: + addr = stock.link + if stock.branch: + addr += "#" + stock.branch.replace('%2F', '/') + print(addr) + + +# pool info --subpools +def print_subpools(pool: PoolKernel) -> None: + for subpool in pool.subpools: + print(subpool.path) + + +# pool info --build-root +def print_build_root(pool: PoolKernel) -> None: + print(pool.buildroot) + + +# pool info --build-logs +def print_build_logs(pool: PoolKernel) -> None: + for log_name, log_version in pool.build_logs: + print(log_name + "=" + log_version) + + +# pool info --pkgcache +def print_pkgcache(pool: PoolKernel) -> None: + pool.sync() + for name, version in pool.pkgcache.list(): + print(name + "=" + version) + + +# support for pool info --stock-sources/--stock-binaries +def print_stock_inventory(stock_inventory: list[str]) -> None: + package_width = max([len(vals[0]) for vals in stock_inventory]) + stock_name_width = max([len(vals[1]) for vals in stock_inventory]) + + for package, stock_name, relative_path in stock_inventory: + print(f"{package.ljust(package_width)}" + f" {stock_name.ljust(stock_name_width)}" + f" {relative_path}") + + +# pool info --stock-sources +def print_stock_sources(pool: PoolKernel) -> None: + pool.sync() + + stock_inventory = [] + for stock in pool.stocks: + for path, versions in stock.sources: + for version in versions: + package = basename(path) + "=" + version + relative_path = dirname(path) + stock_inventory.append((package, stock.name, relative_path)) + + if stock_inventory: + print_stock_inventory(stock_inventory) + + +# pool info --stock-binaries +def print_stock_binaries(pool: PoolKernel) -> None: + pool.sync() + + stock_inventory = [] + for stock in pool.stocks: + for path in stock.binaries: + package = basename(path) + relative_path = dirname(path) + stock_inventory.append((package, stock.name, relative_path)) + + if stock_inventory: + print_stock_inventory(stock_inventory) diff --git a/pool_lib/utils.py b/pool_lib/utils.py new file mode 100644 index 0000000..3d5bba0 --- /dev/null +++ b/pool_lib/utils.py @@ -0,0 +1,47 @@ +import sys +from typing import Optional, Callable, NoReturn + +from . import PoolKernel, PoolError + +err_msg = str | PoolError | FileNotFoundError + + +def warn(s: err_msg) -> None: + print("warning: " + str(s), file=sys.stderr) + + +def fatal(msg: err_msg, help: Optional[Callable] = None) -> NoReturn: + print("error: " + str(msg), file=sys.stderr) + if help: + help() + sys.exit(1) + + +def read_packages(in_file: str, debug: bool = False) -> list[str]: + packages = [] + try: + with open(in_file, "r") as fob: + for line in fob.readlines(): + line = line.split("#")[0].strip() + if not line: + continue + packages.append(line) + return packages + except FileNotFoundError as e: + if not debug: + fatal(e) + else: + raise + + +def pkgcache_list_versions(pool: PoolKernel, name: str) -> list[str]: + versions = [ + pkgcache_version + for pkgcache_name, pkgcache_version in pool.pkgcache.list() + if pkgcache_name == name + ] + + for subpool in pool.subpools: + versions += pkgcache_list_versions(subpool, name) + + return versions diff --git a/setup.py b/setup.py index fa9d8ec..e189dc6 100644 --- a/setup.py +++ b/setup.py @@ -4,11 +4,10 @@ setup( name="pool", - version="2.0rc1", + version="1.1.x", author="Jeremy Davis", author_email="jeremy@turnkeylinux.org", url="https://github.com/turnkeylinux/pool", packages=["pool_lib"], - scripts=["pool_bin"] + scripts=["pool"] ) -