Skip to content

Commit

Permalink
NetConf: Use pathlib to handle most path and file operations
Browse files Browse the repository at this point in the history
  • Loading branch information
infirit committed Dec 23, 2024
1 parent 645ba99 commit b371810
Showing 1 changed file with 27 additions and 31 deletions.
58 changes: 27 additions & 31 deletions blueman/main/NetConf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ class NetworkSetupError(Exception):


def _is_running(name: str, pid: int) -> bool:
if not os.path.exists(f"/proc/{pid}"):
path = pathlib.Path(f"/proc/{pid}")
if not path.exists():
return False

with open(f"/proc/{pid}/cmdline") as f:
return name in f.readline().replace("\0", " ")
return name in path.joinpath("cmdline").read_text()


def _read_pid_file(fname: str) -> int | None:
def _read_pid_file(fname: pathlib.Path) -> int | None:
try:
with open(fname) as f:
return int(f.read())
return int(fname.read_text())
except (OSError, ValueError):
return None

Expand Down Expand Up @@ -59,17 +58,16 @@ def _get_arguments(ip4_address: str) -> list[str]:
return []

@property
def _pid_path(self) -> str:
return f"/var/run/{self._key}.pan1.pid"
def _pid_path(self) -> pathlib.Path:
return pathlib.Path(f"/var/run/{self._key}.pan1.pid")

def apply(self, ip4_address: str, ip4_mask: str) -> None:
error = self._start(_get_binary(*self._BINARIES), ip4_address, ip4_mask,
[ip4_address if addr.is_loopback else str(addr)
for addr in DNSServerProvider.get_servers()])
if error is None:
logging.info(f"{self._key} started correctly")
with open(self._pid_path) as f:
self._pid = int(f.read())
self._pid = _read_pid_file(self._pid_path)
logging.info(f"pid {self._pid}")
NetConf.lock("dhcp")
else:
Expand Down Expand Up @@ -188,7 +186,7 @@ def _start(self, binary: str, ip4_address: str, ip4_mask: str, dns_servers: list
f.write(dhcp_config)
f.write(subnet)

cmd = [binary, "-pf", self._pid_path, "pan1"]
cmd = [binary, "-pf", self._pid_path.as_posix(), "pan1"]
p = Popen(cmd, stderr=PIPE)

error = p.communicate()[1]
Expand All @@ -213,6 +211,7 @@ def _clean_up_configuration(self) -> None:

class UdhcpdHandler(DHCPHandler):
_BINARIES = ["udhcpd"]
_config_file: pathlib.Path | None = None

def _generate_config(self, ip4_address: str, ip4_mask: str, dns_servers: list[str]) -> str:
ipiface = ipaddress.ip_interface('/'.join((ip4_address, ip4_mask)))
Expand All @@ -225,12 +224,14 @@ def _generate_config(self, ip4_address: str, ip4_mask: str, dns_servers: list[st
"pid_path": self._pid_path}

def _start(self, binary: str, ip4_address: str, ip4_mask: str, dns_servers: list[str]) -> bytes | None:
config_file, self._config_path = mkstemp(prefix="udhcpd-")
with open(config_file, "w", encoding="utf8") as f:
_handle, config_path = mkstemp(prefix="udhcpd-")
self._config_file = pathlib.Path(config_path)

with self._config_file.open("w", encoding="utf8") as f:
f.write(self._generate_config(ip4_address, ip4_mask, dns_servers))

logging.info(f"Running udhcpd with config file {self._config_path}")
cmd = [binary, "-S", self._config_path]
logging.info(f"Running udhcpd with config file {self._config_file}")
cmd = [binary, "-S", self._config_file.as_posix()]
p = Popen(cmd, stderr=PIPE)
error = p.communicate()[1]

Expand All @@ -242,25 +243,24 @@ def _start(self, binary: str, ip4_address: str, ip4_mask: str, dns_servers: list
return None if p.pid and pid is not None and _is_running("udhcpd", pid) else error

def _clean_up_configuration(self) -> None:
if os.path.exists(self._config_path):
os.remove(self._config_path)
if self._config_file is None:
return
self._config_file.unlink(missing_ok=True)


class NetConf:
_dhcp_handler: DHCPHandler | None = None
_ipt_rules: list[tuple[str, str, str]] = []

_IPV4_SYS_PATH = "/proc/sys/net/ipv4"
_RUN_PATH = "/var/run"
_IPV4_SYS_PATH = pathlib.Path("/proc/sys/net/ipv4")
_RUN_PATH = pathlib.Path("/var/run")

@classmethod
def _enable_ip4_forwarding(cls) -> None:
with open(f"{cls._IPV4_SYS_PATH}/ip_forward", "w") as f:
f.write("1")
cls._IPV4_SYS_PATH.joinpath("ip_forward").write_text("1")

for d in os.listdir(f"{cls._IPV4_SYS_PATH}/conf"):
with open(f"{cls._IPV4_SYS_PATH}/conf/{d}/forwarding", "w") as f:
f.write("1")
for p in cls._IPV4_SYS_PATH.joinpath("conf").glob("**/forwarding"):
p.write_text("1")

@classmethod
def _add_ipt_rule(cls, table: str, chain: str, rule: str) -> None:
Expand Down Expand Up @@ -345,16 +345,12 @@ def clean_up(cls) -> None:

@classmethod
def lock(cls, key: str) -> None:
with open(f"{cls._RUN_PATH}/blueman-{key}", "w"):
pass
cls._RUN_PATH.joinpath(f"blueman-{key}").touch()

@classmethod
def unlock(cls, key: str) -> None:
try:
os.unlink(f"{cls._RUN_PATH}/blueman-{key}")
except OSError:
pass
cls._RUN_PATH.joinpath(f"blueman-{key}").unlink(missing_ok=True)

@classmethod
def locked(cls, key: str) -> bool:
return os.path.exists(f"{cls._RUN_PATH}/blueman-{key}")
return cls._RUN_PATH.joinpath(f"blueman-{key}").exists()

0 comments on commit b371810

Please sign in to comment.