From b371810be5d64bba0b1623297a36033e8740768d Mon Sep 17 00:00:00 2001 From: Sander Sweers Date: Sat, 21 Dec 2024 15:21:10 +0100 Subject: [PATCH] NetConf: Use pathlib to handle most path and file operations --- blueman/main/NetConf.py | 58 +++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/blueman/main/NetConf.py b/blueman/main/NetConf.py index 0b69e8f73..c5548648a 100644 --- a/blueman/main/NetConf.py +++ b/blueman/main/NetConf.py @@ -21,17 +21,16 @@ class NetworkSetupError(Exception): def _is_running(name: str, pid: int) -> bool: - if not os.path.exists(f"/proc/{pid}"): + path = pathlib.Path(f"/proc/{pid}") + if not path.exists(): return False - with open(f"/proc/{pid}/cmdline") as f: - return name in f.readline().replace("\0", " ") + return name in path.joinpath("cmdline").read_text() -def _read_pid_file(fname: str) -> int | None: +def _read_pid_file(fname: pathlib.Path) -> int | None: try: - with open(fname) as f: - return int(f.read()) + return int(fname.read_text()) except (OSError, ValueError): return None @@ -59,8 +58,8 @@ def _get_arguments(ip4_address: str) -> list[str]: return [] @property - def _pid_path(self) -> str: - return f"/var/run/{self._key}.pan1.pid" + def _pid_path(self) -> pathlib.Path: + return pathlib.Path(f"/var/run/{self._key}.pan1.pid") def apply(self, ip4_address: str, ip4_mask: str) -> None: error = self._start(_get_binary(*self._BINARIES), ip4_address, ip4_mask, @@ -68,8 +67,7 @@ def apply(self, ip4_address: str, ip4_mask: str) -> None: for addr in DNSServerProvider.get_servers()]) if error is None: logging.info(f"{self._key} started correctly") - with open(self._pid_path) as f: - self._pid = int(f.read()) + self._pid = _read_pid_file(self._pid_path) logging.info(f"pid {self._pid}") NetConf.lock("dhcp") else: @@ -188,7 +186,7 @@ def _start(self, binary: str, ip4_address: str, ip4_mask: str, dns_servers: list f.write(dhcp_config) f.write(subnet) - cmd = [binary, "-pf", self._pid_path, "pan1"] + cmd = [binary, "-pf", self._pid_path.as_posix(), "pan1"] p = Popen(cmd, stderr=PIPE) error = p.communicate()[1] @@ -213,6 +211,7 @@ def _clean_up_configuration(self) -> None: class UdhcpdHandler(DHCPHandler): _BINARIES = ["udhcpd"] + _config_file: pathlib.Path | None = None def _generate_config(self, ip4_address: str, ip4_mask: str, dns_servers: list[str]) -> str: ipiface = ipaddress.ip_interface('/'.join((ip4_address, ip4_mask))) @@ -225,12 +224,14 @@ def _generate_config(self, ip4_address: str, ip4_mask: str, dns_servers: list[st "pid_path": self._pid_path} def _start(self, binary: str, ip4_address: str, ip4_mask: str, dns_servers: list[str]) -> bytes | None: - config_file, self._config_path = mkstemp(prefix="udhcpd-") - with open(config_file, "w", encoding="utf8") as f: + _handle, config_path = mkstemp(prefix="udhcpd-") + self._config_file = pathlib.Path(config_path) + + with self._config_file.open("w", encoding="utf8") as f: f.write(self._generate_config(ip4_address, ip4_mask, dns_servers)) - logging.info(f"Running udhcpd with config file {self._config_path}") - cmd = [binary, "-S", self._config_path] + logging.info(f"Running udhcpd with config file {self._config_file}") + cmd = [binary, "-S", self._config_file.as_posix()] p = Popen(cmd, stderr=PIPE) error = p.communicate()[1] @@ -242,25 +243,24 @@ def _start(self, binary: str, ip4_address: str, ip4_mask: str, dns_servers: list return None if p.pid and pid is not None and _is_running("udhcpd", pid) else error def _clean_up_configuration(self) -> None: - if os.path.exists(self._config_path): - os.remove(self._config_path) + if self._config_file is None: + return + self._config_file.unlink(missing_ok=True) class NetConf: _dhcp_handler: DHCPHandler | None = None _ipt_rules: list[tuple[str, str, str]] = [] - _IPV4_SYS_PATH = "/proc/sys/net/ipv4" - _RUN_PATH = "/var/run" + _IPV4_SYS_PATH = pathlib.Path("/proc/sys/net/ipv4") + _RUN_PATH = pathlib.Path("/var/run") @classmethod def _enable_ip4_forwarding(cls) -> None: - with open(f"{cls._IPV4_SYS_PATH}/ip_forward", "w") as f: - f.write("1") + cls._IPV4_SYS_PATH.joinpath("ip_forward").write_text("1") - for d in os.listdir(f"{cls._IPV4_SYS_PATH}/conf"): - with open(f"{cls._IPV4_SYS_PATH}/conf/{d}/forwarding", "w") as f: - f.write("1") + for p in cls._IPV4_SYS_PATH.joinpath("conf").glob("**/forwarding"): + p.write_text("1") @classmethod def _add_ipt_rule(cls, table: str, chain: str, rule: str) -> None: @@ -345,16 +345,12 @@ def clean_up(cls) -> None: @classmethod def lock(cls, key: str) -> None: - with open(f"{cls._RUN_PATH}/blueman-{key}", "w"): - pass + cls._RUN_PATH.joinpath(f"blueman-{key}").touch() @classmethod def unlock(cls, key: str) -> None: - try: - os.unlink(f"{cls._RUN_PATH}/blueman-{key}") - except OSError: - pass + cls._RUN_PATH.joinpath(f"blueman-{key}").unlink(missing_ok=True) @classmethod def locked(cls, key: str) -> bool: - return os.path.exists(f"{cls._RUN_PATH}/blueman-{key}") + return cls._RUN_PATH.joinpath(f"blueman-{key}").exists()