diff --git a/src/pkgcore/repository/util.py b/src/pkgcore/repository/util.py index 7a0157d9f..2f9be4b4f 100644 --- a/src/pkgcore/repository/util.py +++ b/src/pkgcore/repository/util.py @@ -14,6 +14,67 @@ from ..ebuild.cpv import VersionedCPV +class SimpleTreeFromPkgs(prototype.tree): + """Repository adapter that can work from a sequence of instantiated packages + + This is useful for generating a repository on the fly (package.provided) or for + test usage. + """ + + def __init__( + self, + packages: typing.Iterable, + livefs=False, + frozen=True, + repo_id: typing.Union[str, None] = None, + ): + """ + Args: + packages: sequence of instantiated packages to use. cpv.VersionedCPV for example, + or FakePkg. + """ + super().__init__(frozen=frozen) + # just return the pkg we already have. + self.package_class = lambda x: x + self.repo_id = repo_id + self.livefs = livefs + self._packages = {} # type: dict[str, dict[str, list[typing.Any]]] + for pkg in packages: + self.notify_add_package(pkg) + + def _get_categories(self, *arg: tuple[str, ...]): + if arg: + return () + return tuple(self._packages) + + def _get_packages(self, category: str) -> typing.Iterable[str]: + return tuple(self._packages[category]) + + def _get_versions(self, cp_key: tuple[str, str]) -> typing.Iterable[str]: + return tuple(pkg.fullver for pkg in self._packages[cp_key[0]][cp_key[1]]) + + def notify_add_package(self, pkg): + self._packages.setdefault(pkg.category, {}).setdefault(pkg.package, []).append( + pkg + ) + super().notify_add_package(pkg) + + def notify_remove_package(self, pkg): + try: + pkgs = [ + p + for p in self._packages[pkg.category][pkg.package] + if p.fullver != pkg.fullver + ] + if not pkgs: + del self._packages[pkg.category][pkg.package] + if not self._packages[pkg.category]: + del self._packages[pkg.category] + except KeyError: + pass + super().notify_remove_package(pkg) + + class SimpleTree(prototype.tree): """in-memory repository used for testing or simple shims."""