diff --git a/lib/portage/__init__.py b/lib/portage/__init__.py index 21bf993170..b56057a33a 100644 --- a/lib/portage/__init__.py +++ b/lib/portage/__init__.py @@ -226,13 +226,15 @@ def _decode_argv(argv): return [_unicode_decode(x.encode(fs_encoding, "surrogateescape")) for x in argv] -def _unicode_encode(s, encoding=_encodings["content"], errors="backslashreplace"): +def _unicode_encode( + s, encoding=_encodings["content"], errors="backslashreplace" +) -> bytes: if isinstance(s, str): s = s.encode(encoding, errors) return s -def _unicode_decode(s, encoding=_encodings["content"], errors="replace"): +def _unicode_decode(s, encoding=_encodings["content"], errors="replace") -> str: if isinstance(s, bytes): s = str(s, encoding=encoding, errors=errors) return s diff --git a/lib/portage/checksum.py b/lib/portage/checksum.py index b10643476b..773e9a60cc 100644 --- a/lib/portage/checksum.py +++ b/lib/portage/checksum.py @@ -10,7 +10,7 @@ import stat import subprocess import tempfile - +from typing import Any from portage import _encodings, _unicode_decode, _unicode_encode from portage import os from portage.const import HASHING_BLOCKSIZE, PRELINK_BINARY @@ -34,11 +34,11 @@ # Dict of all available hash functions -hashfunc_map = {} +hashfunc_map: dict[str, Any] = {} hashorigin_map = {} -def _open_file(filename): +def _open_file(filename: str | bytes): try: return open( _unicode_encode(filename, encoding=_encodings["fs"], errors="strict"), "rb" @@ -58,7 +58,7 @@ def _open_file(filename): class _generate_hash_function: __slots__ = ("_hashobject",) - def __init__(self, hashtype, hashobject, origin="unknown"): + def __init__(self, hashtype, hashobject, origin: str = "unknown"): self._hashobject = hashobject hashfunc_map[hashtype] = self hashorigin_map[hashtype] = origin @@ -75,7 +75,7 @@ def checksum_str(self, data): checksum.update(data) return checksum.hexdigest() - def checksum_file(self, filename): + def checksum_file(self, filename: str) -> tuple[Any, int]: """ Run a checksum against a file. @@ -177,7 +177,7 @@ def checksum_file(self, filename): # There is only one implementation for size class SizeHash: - def checksum_file(self, filename): + def checksum_file(self, filename: str) -> tuple[int, int]: size = os.stat(filename).st_size return (size, size) @@ -185,7 +185,7 @@ def checksum_file(self, filename): hashfunc_map["size"] = SizeHash() # cache all supported hash methods in a frozenset -hashfunc_keys = frozenset(hashfunc_map) +hashfunc_keys: set[str] = frozenset(hashfunc_map) # end actual hash functions @@ -202,7 +202,7 @@ def checksum_file(self, filename): del cmd, proc, status -def is_prelinkable_elf(filename): +def is_prelinkable_elf(filename: bytes) -> bool: with _open_file(filename) as f: magic = f.read(17) return ( @@ -212,26 +212,26 @@ def is_prelinkable_elf(filename): ) # 2=ET_EXEC, 3=ET_DYN -def perform_md5(x, calc_prelink=0): +def perform_md5(x: str, calc_prelink: int = 0) -> tuple[bytes, int]: return perform_checksum(x, "MD5", calc_prelink)[0] -def _perform_md5_merge(x, **kwargs): +def _perform_md5_merge(x: str, **kwargs) -> bytes: return perform_md5( _unicode_encode(x, encoding=_encodings["merge"], errors="strict"), **kwargs ) -def perform_all(x, calc_prelink=0): +def perform_all(x: str, calc_prelink: int = 0) -> dict[str, bytes]: mydict = {k: perform_checksum(x, k, calc_prelink)[0] for k in hashfunc_keys} return mydict -def get_valid_checksum_keys(): +def get_valid_checksum_keys() -> set[str]: return hashfunc_keys -def get_hash_origin(hashtype): +def get_hash_origin(hashtype: str): if hashtype not in hashfunc_keys: raise KeyError(hashtype) return hashorigin_map.get(hashtype, "unknown") @@ -266,7 +266,7 @@ class _hash_filter: "_tokens", ) - def __init__(self, filter_str): + def __init__(self, filter_str: str): tokens = filter_str.upper().split() if not tokens or tokens[-1] == "*": del tokens[:] @@ -274,7 +274,7 @@ def __init__(self, filter_str): tokens.reverse() self._tokens = tuple(tokens) - def __call__(self, hash_name): + def __call__(self, hash_name: str) -> bool: if self.transparent: return True matches = ("*", hash_name) @@ -286,7 +286,9 @@ def __call__(self, hash_name): return False -def _apply_hash_filter(digests, hash_filter): +def _apply_hash_filter( + digests: dict[str, str], hash_filter: callable[[str], bool] +) -> dict[str, str]: """ Return a new dict containing the filtered digests, or the same dict if no changes are necessary. This will always preserve at @@ -321,7 +323,12 @@ def _apply_hash_filter(digests, hash_filter): return digests -def verify_all(filename, mydict, calc_prelink=0, strict=0): +def verify_all( + filename: str | bytes, + mydict: dict[str, any], + calc_prelink: int = 0, + strict: int = 0, +): """ Verify all checksums against a file. @@ -388,7 +395,9 @@ def verify_all(filename, mydict, calc_prelink=0, strict=0): return file_is_ok, reason -def perform_checksum(filename, hashname="MD5", calc_prelink=0): +def perform_checksum( + filename: str, hashname: str = "MD5", calc_prelink: int = 0 +) -> tuple[bytes, int]: """ Run a specific checksum against a file. The filename can be either unicode or an encoded byte string. If filename @@ -450,7 +459,9 @@ def perform_checksum(filename, hashname="MD5", calc_prelink=0): del e -def perform_multiple_checksums(filename, hashes=["MD5"], calc_prelink=0): +def perform_multiple_checksums( + filename: str, hashes: list[str] = ["MD5"], calc_prelink: int = 0 +) -> dict[str, bytes]: """ Run a group of checksums against a file. @@ -475,7 +486,7 @@ def perform_multiple_checksums(filename, hashes=["MD5"], calc_prelink=0): return rVal -def checksum_str(data, hashname="MD5"): +def checksum_str(data: bytes, hashname: str = "MD5"): """ Run a specific checksum against a byte string. diff --git a/lib/portage/xpak.py b/lib/portage/xpak.py index 94a07a84cf..8c2295fc81 100644 --- a/lib/portage/xpak.py +++ b/lib/portage/xpak.py @@ -35,6 +35,7 @@ import array import errno +from typing import Optional, Any import portage from portage import os @@ -48,7 +49,7 @@ from portage.util.file_copy import copyfile -def addtolist(mylist, curdir): +def addtolist(mylist: list[str], curdir: str | bytes) -> None: """(list, dir) --- Takes an array(list) and appends all files from dir down the directory tree. Returns nothing. list is modified.""" curdir = normalize_path( @@ -73,7 +74,7 @@ def addtolist(mylist, curdir): mylist.append(os.path.join(parent, x)[len(curdir) + 1 :]) -def encodeint(myint): +def encodeint(myint: int) -> bytes: """Takes a 4 byte integer and converts it into a string of 4 characters. Returns the characters in a string.""" a = array.array("B") @@ -88,7 +89,7 @@ def encodeint(myint): return a.tostring() -def decodeint(mystring): +def decodeint(mystring: str | bytes) -> int: """Takes a 4 byte string and converts it into a 4 byte integer. Returns an integer.""" myint = 0 @@ -99,7 +100,7 @@ def decodeint(mystring): return myint -def xpak(rootdir, outfile=None): +def xpak(rootdir, outfile=None) -> bytes: """(rootdir, outfile) -- creates an xpak segment of the directory 'rootdir' and under the name 'outfile' if it is specified. Otherwise it returns the xpak segment.""" @@ -133,7 +134,7 @@ def xpak(rootdir, outfile=None): return xpak_segment -def xpak_mem(mydata): +def xpak_mem(mydata: dict) -> bytes: """Create an xpack segment from a map object.""" mydata_encoded = {} @@ -174,7 +175,7 @@ def xpak_mem(mydata): ) -def xsplit(infile): +def xsplit(infile: str | bytes) -> bool: """(infile) -- Splits the infile into two files. 'infile.index' contains the index segment. 'infile.dat' contains the data segment.""" @@ -204,7 +205,7 @@ def xsplit(infile): return True -def xsplit_mem(mydat): +def xsplit_mem(mydat: bytes) -> Optional[tuple[bytes, bytes]]: if mydat[0:8] != b"XPAKPACK": return None if mydat[-8:] != b"XPAKSTOP": @@ -213,7 +214,7 @@ def xsplit_mem(mydat): return (mydat[16 : indexsize + 16], mydat[indexsize + 16 : -8]) -def getindex(infile): +def getindex(infile: str | bytes) -> None | bytes: """(infile) -- grabs the index segment from the infile and returns it.""" myfile = open( _unicode_encode(infile, encoding=_encodings["fs"], errors="strict"), "rb" @@ -228,7 +229,7 @@ def getindex(infile): return myindex -def getboth(infile): +def getboth(infile: str | bytes): """(infile) -- grabs the index and data segments from the infile. Returns an array [indexSegment, dataSegment]""" myfile = open( @@ -246,13 +247,13 @@ def getboth(infile): return myindex, mydata -def listindex(myindex): +def listindex(myindex) -> None: """Print to the terminal the filenames listed in the indexglob passed in.""" for x in getindex_mem(myindex): print(x) -def getindex_mem(myindex): +def getindex_mem(myindex) -> list[Any]: """Returns the filenames listed in the indexglob passed in.""" myindexlen = len(myindex) startpos = 0 @@ -264,7 +265,7 @@ def getindex_mem(myindex): return myret -def searchindex(myindex, myitem): +def searchindex(myindex, myitem) -> tuple[int, int]: """(index, item) -- Finds the offset and length of the file 'item' in the datasegment via the index 'index' provided.""" myitem = _unicode_encode( @@ -288,7 +289,7 @@ def searchindex(myindex, myitem): startpos = startpos + mytestlen + 12 -def getitem(myid, myitem): +def getitem(myid, myitem) -> list[Any]: myindex = myid[0] mydata = myid[1] myloc = searchindex(myindex, myitem) @@ -297,7 +298,7 @@ def getitem(myid, myitem): return mydata[myloc[0] : myloc[0] + myloc[1]] -def xpand(myid, mydest): +def xpand(myid, mydest) -> None: mydest = normalize_path(mydest) + os.sep myindex = myid[0] mydata = myid[1] @@ -340,7 +341,7 @@ def __init__(self, myfile): self.indexpos = None self.datapos = None - def decompose(self, datadir, cleanup=1): + def decompose(self, datadir, cleanup: int = 1) -> int: """Alias for unpackinfo() --- Complement to recompose() but optionally deletes the destination directory. Extracts the xpak from the tbz2 into the directory provided. Raises IOError if scan() fails. @@ -353,11 +354,13 @@ def decompose(self, datadir, cleanup=1): os.makedirs(datadir) return self.unpackinfo(datadir) - def compose(self, datadir, cleanup=0): + def compose(self, datadir, cleanup: int = 0) -> None: """Alias for recompose().""" return self.recompose(datadir, cleanup) - def recompose(self, datadir, cleanup=0, break_hardlinks=True): + def recompose( + self, datadir, cleanup: int = 0, break_hardlinks: bool = True + ) -> None: """Creates an xpak segment from the datadir provided, truncates the tbz2 to the end of regular data if an xpak segment already exists, and adds the new segment to the file with terminating info.""" @@ -366,7 +369,7 @@ def recompose(self, datadir, cleanup=0, break_hardlinks=True): if cleanup: self.cleanup(datadir) - def recompose_mem(self, xpdata, break_hardlinks=True): + def recompose_mem(self, xpdata, break_hardlinks: bool = True) -> int: """ Update the xpak segment. @param xpdata: A new xpak segment to be written, like that returned @@ -402,7 +405,7 @@ def recompose_mem(self, xpdata, break_hardlinks=True): myfile.close() return 1 - def cleanup(self, datadir): + def cleanup(self, datadir) -> None: datadir_split = os.path.split(datadir) if len(datadir_split) >= 2 and len(datadir_split[1]) > 0: # This is potentially dangerous, @@ -415,7 +418,7 @@ def cleanup(self, datadir): else: raise oe - def scan(self): + def scan(self) -> int: """Scans the tbz2 to locate the xpak segment and setup internal values. This function is called by relevant functions already.""" a = None @@ -480,7 +483,7 @@ def scan(self): if a is not None: a.close() - def filelist(self): + def filelist(self) -> Optional[list[Any]]: """Return an array of each file listed in the index.""" if not self.scan(): return None @@ -501,14 +504,14 @@ def getfile(self, myfile, mydefault=None): a.close() return myreturn - def getelements(self, myfile): + def getelements(self, myfile) -> list: """A split/array representation of tbz2.getfile()""" mydat = self.getfile(myfile) if not mydat: return [] return mydat.split() - def unpackinfo(self, mydest): + def unpackinfo(self, mydest) -> int: """Unpacks all the files from the dataSegment into 'mydest'.""" if not self.scan(): return 0 @@ -551,7 +554,7 @@ def unpackinfo(self, mydest): a.close() return 1 - def get_data(self): + def get_data(self) -> dict[bytes, bytes]: """Returns all the files from the dataSegment as a map object.""" if not self.scan(): return {} @@ -575,7 +578,7 @@ def get_data(self): a.close() return mydata - def getboth(self): + def getboth(self) -> tuple[bytes, bytes]: """Returns an array [indexSegment, dataSegment]""" if not self.scan(): return None