diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 25858be..0699698 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -16,17 +16,12 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: "3.9" + python-version: "3.12" - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install flake8 pytest + python -m pip install pytest python -m pip install -r requirements.txt - - name: Lint with flake8 - run: | - # stop the build if there are Python syntax errors or undefined names - flake8 src --select=E9,F63,F7,F82 --show-source --statistics - flake8 src --exit-zero --max-line-length=128 --statistics --ignore=F403,F405,W503 - name: Test with pytest run: | pytest diff --git a/.gitignore b/.gitignore index 96fe658..9ae7959 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__/ build/ dist/ venv/ +.tox/ *.egg-info/ _sharedmem.cpython* .ipynb_checkpoints/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c4f8893..9d7ae16 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,22 +1,8 @@ default_language_version: python: python3 repos: -- repo: https://github.com/pycqa/flake8 - rev: 7.1.0 +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.5.4 hooks: - - id: flake8 - pass_filenames: false - args: [ - src, - --select, 'E9,F63,F7,F82', - --show-source, - --statistics - ] - - id: flake8 - pass_filenames: false - args: [ - src, - --max-line-length=128, - --statistics, - --ignore, 'F403,F405,W503' - ] + - id: ruff + - id: ruff-format diff --git a/.ruff.toml b/.ruff.toml new file mode 100644 index 0000000..902e795 --- /dev/null +++ b/.ruff.toml @@ -0,0 +1,4 @@ +line-length = 128 + +[lint] +ignore = ["F403", "F405"] diff --git a/iproute4mac/__init__.py b/iproute4mac/__init__.py new file mode 100644 index 0000000..6c5007c --- /dev/null +++ b/iproute4mac/__init__.py @@ -0,0 +1 @@ +VERSION = "0.2.0" diff --git a/iproute4mac/bridge.py b/iproute4mac/bridge.py new file mode 100644 index 0000000..f161a8c --- /dev/null +++ b/iproute4mac/bridge.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 + +import sys + +import iproute4mac +from iproute4mac.utils import * +from iproute4mac.iplink import * +from iproute4mac.ipaddress import * + + +def do_help(argv=[], option={}): + usage() + + +def usage(): + stderr("""\ +Usage: bridge [ OPTIONS ] OBJECT { COMMAND | help } + bridge [ -force ] -batch filename +where OBJECT := { link | fdb | mdb | vlan | vni | monitor } + OPTIONS := { -V[ersion] | -s[tatistics] | -d[etails] | + -o[neline] | -t[imestamp] | -n[etns] name | + -com[pressvlans] -c[olor] -p[retty] -j[son] }""") + exit(-1) + + +""" Implemented objects """ +objs = [ + ("link", do_notimplemented), + ("fdb", do_notimplemented), + ("mdb", do_notimplemented), + ("vlan", do_notimplemented), + ("vni", do_notimplemented), + ("monitor", do_notimplemented), + ("help", do_help), +] + + +def do_obj(argv, option): + obj = argv.pop(0) + for o, f in objs: + if o.startswith(obj): + return f(argv, option) + + stderr(f'Object "{obj}" is unknown, try "bridge help".') + return EXIT_FAILURE + + +def main(): + batch_file = None + option = { + "preferred_family": AF_UNSPEC, + "show_stats": False, + "show_details": False, + "oneline": False, + "timestamp": False, + "compress_vlans": False, + "force": False, + "json": False, + "pretty": False, + "do_all": False, + } + + if sys.platform != "darwin": + stderr("Unupported OS.") + exit(-1) + + argv = sys.argv[1:] + while argv: + if argv[0] == "--": + argv.pop(0) + break + elif argv[0][0] != "-": + break + + opt = argv.pop(0) + if opt[1] == "-": + opt = opt[1:] + + if "-help".startswith(opt): + usage() + elif "-Version".startswith(opt): + print(f"bridge wrapper, iproute4mac-{iproute4mac.VERSION}") + exit(0) + elif "-stats".startswith(opt) or "-statistics".startswith(opt): + option["show_stats"] = True + elif "-details".startswith(opt): + option["show_details"] = True + elif "-oneline".startswith(opt): + option["oneline"] = True + elif "-timestamp".startswith(opt): + option["timestamp"] = True + elif "-family".startswith(opt): + try: + opt = argv.pop(0) + except IndexError: + missarg("family type") + if opt == "help": + usage() + option["preferred_family"] = read_family(opt) + if option["preferred_family"] == AF_UNSPEC: + invarg("invalid protocol family", opt) + elif opt == "-4": + option["preferred_family"] = AF_INET + elif opt == "-6": + option["preferred_family"] = AF_INET6 + elif "-netns".startswith(opt): + do_notimplemented() + elif matches_color(opt): + # Color option is not implemented + pass + elif "-compressvlans".startswith(opt): + option["compress_vlans"] = True + elif "-force".startswith(opt): + option["force"] = True + elif "-json".startswith(opt): + option["json"] = True + elif "-pretty".startswith(opt): + option["pretty"] = True + elif "-batch".startswith(opt): + try: + batch_file = argv.pop(0) + except IndexError: + missarg("batch file") + else: + stderr(f'Option "{opt}" is unknown, try "bridge help".') + exit(-1) + + if batch_file: + do_notimplemented() + + if argv: + return do_obj(argv, option) + + usage() + + +if __name__ == "__main__": + main() diff --git a/iproute4mac/ifconfig.py b/iproute4mac/ifconfig.py new file mode 100644 index 0000000..bfd4737 --- /dev/null +++ b/iproute4mac/ifconfig.py @@ -0,0 +1,285 @@ +import re + +from iproute4mac.utils import * + + +def dumps(links, option): + if option["json"]: + print(json_dumps(links, option["pretty"])) + return + + if not links: + return + + for link in links: + stdout(link["ifindex"], ": ", link["ifname"]) + if "link" in link: + stdout("@", link["link"]) + stdout(": <", ",".join(link["flags"]), "> mtu ", link["mtu"]) + if "master" in link: + stdout(" master ", link["master"]) + stdout(" state ", link["operstate"], end="\n") + + stdout(" link/", link["link_type"]) + if "address" in link: + stdout(" ", link["address"]) + if "broadcast" in link: + stdout(" brd ", link["broadcast"]) + stdout(end="\n") + + if "linkinfo" in link and "info_kind" in link["linkinfo"]: + info = link["linkinfo"] + if info["info_kind"] == "vlan": + data = info["info_data"] + stdout( + " ", + info["info_kind"], + " protocol ", + data["protocol"], + " id ", + data["id"], + end="\n", + ) + elif info["info_kind"] == "bridge": + data = info["info_data"] + stdout( + " bridge ", + " ".join([f"{key} {value}" for key, value in data.items()]), + end="\n", + ) + + for addr in link.get("addr_info", []): + stdout(" ", addr["family"]) + stdout(" ", addr["local"]) + if "address" in addr: + stdout(" peer ", addr["address"]) + stdout("/", addr["prefixlen"]) + if "broadcast" in addr: + stdout(" brd ", addr["broadcast"]) + if "scope" in addr: + stdout(" scope ", addr["scope"]) + stdout(end="\n") + if "valid_life_time" in addr and "preferred_life_time" in addr: + stdout( + " valid_lft ", + "forever" if addr["valid_life_time"] == ND6_INFINITE_LIFETIME else addr["valid_life_time"], + " preferred_lft ", + "forever" if addr["preferred_life_time"] == ND6_INFINITE_LIFETIME else addr["preferred_life_time"], + ) + stdout(end="\n") + + +class ifconfigRegEx: + _header = re.compile(r"(?P\w+):" r" flags=\w+<(?P.*)>" r" mtu (?P\d+)" r" index (?P\d+)") + _eflags = re.compile(r"\s+eflags=\w+<(?P.*)>") + _ether = re.compile(rf"\s+ether (?P{LLADDR})") + _inet = re.compile( + rf"\s+inet (?P{IPV4ADDR})" + rf"(?: --> (?P
{IPV4ADDR}))?" + rf" netmask (?P{IPV4MASK})" + rf"(?: broadcast (?P{IPV4ADDR}))?" + ) + _inet6 = re.compile( + rf"\s+inet6 (?P{IPV6ADDR})(?:%\w+)?" + r" prefixlen (?P\d+)" + r"(?: (?Pautoconf))?" + r"(?: (?Psecured))?" + r"(?: pltime (?P\d+))?" + r"(?: vltime (?P\d+))?" + r"(?: scopeid (?P0x[0-9a-fA-F]+))?" + ) + _state = re.compile(r"\s+status: (?P\w+)") + _vlan = re.compile(r"\s+vlan: (?P\d+) parent interface: (?P?)") + _bond = re.compile(r"\s+bond interfaces: (\w+(?: \w+)*)") + _bridge = re.compile(r"\s+Configuration:") + + def __init__(self, line): + self.header = self._header.match(line) + self.eflags = self._eflags.match(line) + self.ether = self._ether.match(line) + self.inet = self._inet.match(line) + self.inet6 = self._inet6.match(line) + self.state = self._state.match(line) + self.vlan = self._vlan.match(line) + self.bond = self._bond.match(line) + self.bridge = self._bridge.match(line) + + +class bridgeRegEx: + _id = re.compile( + r"\s+id (?P(?:[0-9a-fA-F]{1,2}:?){6})" + r" priority (?P\d+)" + r" hellotime (?P\d+)" + r" fwddelay (?P\d+)" + ) + _age = re.compile( + r"\s+maxage (?P\d+)" + r" holdcnt (?P\d+)" + r" proto (?P\w+)" + r" maxaddr (?P\d+)" + r" timeout (?P\d+)" + ) + _root = re.compile( + r"\s+root id (?P(?:[0-9a-fA-F]{1,2}:?){6})" + r" priority (?P\d+)" + r" ifcost (?P\d+)" + r" port (?P\d+)" + ) + _filter = re.compile(r"\s+ipfilter (?P\w+)" r" flags (?P0x[0-9a-fA-F]+)") + _member = re.compile(r"\s+member: (?P\w+)") + _cache = re.compile(r"\s+media:") + + def __init__(self, line): + self.id = self._id.match(line) + self.age = self._age.match(line) + self.root = self._root.match(line) + self.filter = self._filter.match(line) + self.member = self._member.match(line) + self.cache = self._cache.match(line) + + +def parse_bridge(lines, links, link): + info_data = {} + while line := next(lines): + match = bridgeRegEx(line) + + if match.id: + info_data["forward_delay"] = int(match.id.group("delay")) + info_data["hello_time"] = int(match.id.group("hello")) + elif match.age: + info_data["max_age"] = int(match.age.group("max_age")) + info_data["ageing_time"] = int(match.age.group("ageing")) + elif match.root: + info_data["priority"] = int(match.root.group("priority")) + info_data["root_id"] = match.root.group("id") + info_data["root_port"] = int(match.root.group("port")) + info_data["root_path_cost"] = int(match.root.group("cost")) + elif match.filter: + info_data["ipfilter"] = match.filter.group("filter") != "disabled" + elif match.member: + slave = next(item for item in links if item["ifname"] == match.member.group("member")) + slave["master"] = link["ifname"] + slave["linkinfo"] = {"info_slave_kind": "bridge"} + elif match.cache: + link["linkinfo"].update({"info_data": info_data}) + break + + +def parse(res, option): + links = [] + lines = iter(res.split("\n")) + while line := next(lines): + match = ifconfigRegEx(line) + + if match.header: + header = match.header.groupdict() + link = { + "ifindex": int(header["ifindex"]), + "ifname": header["ifname"], + "flags": header["flags"].split(",") if header["flags"] != "" else [], + "mtu": int(header["mtu"]), + "operstate": "UNKNOWN", + "link_type": "none", + } + + if "LOOPBACK" in link["flags"]: + link["link_type"] = "loopback" + link["address"] = "00:00:00:00:00:00" + link["broadcast"] = "00:00:00:00:00:00" + elif "POINTOPOINT" in link["flags"]: + link["link_pointtopoint"] = True + + if link["ifname"].startswith("bridge") or link["ifname"].startswith("bond") or link["ifname"].startswith("vlan"): + link["linkinfo"] = {"info_kind": re.sub(r"[0-9]+", "", link["ifname"])} + + links.append(link) + inet_count = 0 + inet6_count = 0 + continue + + if match.eflags: + link["eflags"] = match.eflags.group("eflags").split(",") + elif match.ether: + link["link_type"] = "ether" + link["address"] = match.ether.group("ether") + link["broadcast"] = "ff:ff:ff:ff:ff:ff" + elif match.state: + link["operstate"] = oper_states[match.state.group("state")] + elif match.inet and option["preferred_family"] in (AF_INET, AF_UNSPEC): + inet = match.inet.groupdict() + addr = {"family": "inet", "local": inet["local"]} + if inet["address"]: + addr["address"] = inet["address"] + addr["prefixlen"] = netmask_to_length(inet["netmask"]) + if inet["broadcast"]: + addr["broadcast"] = inet["broadcast"] + ip = Prefix(addr["local"]) + if ip.is_link: + addr["scope"] = "link" + elif ip.is_global: + # FIXME: may be Python ipaddress is_global() not compliant with iproute2 + addr["scope"] = "global" + else: + addr["scope"] = "host" + addr.update( + { + "valid_life_time": ND6_INFINITE_LIFETIME, + "preferred_life_time": ND6_INFINITE_LIFETIME, + } + ) + if inet_count + inet6_count > 0: + # Let IPv4 at beginning + link["addr_info"].insert(inet_count, addr) + else: + link["addr_info"] = [addr] + inet_count += 1 + elif match.inet6 and option["preferred_family"] in (AF_INET6, AF_UNSPEC): + inet6 = match.inet6.groupdict() + ip = Prefix(inet6["local"]) + if ip.is_link: + scope = "link" + elif ip.is_global: + # FIXME: may be Python ipaddress is_global() not compliant with iproute2 + scope = "global" + else: + scope = "host" + addr = { + "family": "inet6", + "local": inet6["local"], + "prefixlen": int(inet6["prefixlen"]), + "scope": scope, + "valid_life_time": int(inet6["vltime"]) if inet6["vltime"] else ND6_INFINITE_LIFETIME, + "preferred_life_time": int(inet6["pltime"]) if inet6["pltime"] else ND6_INFINITE_LIFETIME, + } + if inet_count + inet6_count > 0: + # Put IPv6 after IPv4 + link["addr_info"].append(addr) + else: + link["addr_info"] = [addr] + inet6_count += 1 + elif match.vlan: + parent = match.vlan.group("parent") + if parent != "": + link["link"] = parent + link["linkinfo"].update( + { + "info_data": { + "protocol": "802.1Q", + "id": int(match.vlan.group("vlanid")), + "flags": [], + } + } + ) + elif match.bond: + for ifname in match.bond.group(1).split(" "): + slave = next(item for item in links if item["ifname"] == ifname) + slave["master"] = link["ifname"] + slave["address"] = link["address"] + slave["linkinfo"] = { + "info_slave_kind": "bond", + "perm_hwaddr": slave["address"], + } + elif match.bridge: + parse_bridge(lines, links, link) + + return links diff --git a/iproute4mac/ip.py b/iproute4mac/ip.py new file mode 100644 index 0000000..32be48e --- /dev/null +++ b/iproute4mac/ip.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 + +import sys + +import iproute4mac +from iproute4mac.utils import * +from iproute4mac.iplink import do_iplink +from iproute4mac.ipaddress import do_ipaddr +from iproute4mac.iproute import do_iproute + + +def do_help(argv=[], option={}): + usage() + + +def usage(): + stderr("""\ +Usage: ip [ OPTIONS ] OBJECT { COMMAND | help } + ip [ -force ] -batch filename +where OBJECT := { address | addrlabel | fou | help | ila | ioam | l2tp | link | + macsec | maddress | monitor | mptcp | mroute | mrule | + neighbor | neighbour | netconf | netns | nexthop | ntable | + ntbl | route | rule | sr | stats | tap | tcpmetrics | + token | tunnel | tuntap | vrf | xfrm } + OPTIONS := { -V[ersion] | -s[tatistics] | -d[etails] | -r[esolve] | + -h[uman-readable] | -iec | -j[son] | -p[retty] | + -f[amily] { inet | inet6 | mpls | bridge | link } | + -4 | -6 | -M | -B | -0 | + -l[oops] { maximum-addr-flush-attempts } | -echo | -br[ief] | + -o[neline] | -t[imestamp] | -ts[hort] | -b[atch] [filename] | + -rc[vbuf] [size] | -n[etns] name | -N[umeric] | -a[ll] | + -c[olor]}""") + exit(-1) + + +""" Implemented objects """ +objs = [ + ("address", do_ipaddr), + ("addrlabel", do_notimplemented), + ("maddress", do_notimplemented), + ("route", do_iproute), + ("rule", do_notimplemented), + ("neighbor", do_notimplemented), + ("neighbour", do_notimplemented), + ("ntable", do_notimplemented), + ("ntbl", do_notimplemented), + ("link", do_iplink), + ("l2tp", do_notimplemented), + ("fou", do_notimplemented), + ("ila", do_notimplemented), + ("macsec", do_notimplemented), + ("tunnel", do_notimplemented), + ("tunl", do_notimplemented), + ("tuntap", do_notimplemented), + ("tap", do_notimplemented), + ("token", do_notimplemented), + ("tcpmetrics", do_notimplemented), + ("tcp_metrics", do_notimplemented), + ("monitor", do_notimplemented), + ("xfrm", do_notimplemented), + ("mroute", do_notimplemented), + ("mrule", do_notimplemented), + ("netns", do_notimplemented), + ("netconf", do_notimplemented), + ("vrf", do_notimplemented), + ("sr", do_notimplemented), + ("nexthop", do_notimplemented), + ("mptcp", do_notimplemented), + ("ioam", do_notimplemented), + ("help", do_help), + ("stats", do_notimplemented), +] + + +def do_obj(argv, option): + obj = argv.pop(0) + for o, f in objs: + if o.startswith(obj): + return f(argv, option) + + stderr(f'Object "{obj}" is unknown, try "ip help".') + return EXIT_FAILURE + + +def main(): + batch_file = None + option = { + "preferred_family": AF_UNSPEC, + "human_readable": False, + "use_iec": False, + "show_stats": False, + "show_details": False, + "oneline": False, + "brief": False, + "json": False, + "pretty": False, + "timestamp": False, + "timestamp_short": False, + "echo_request": False, + "force": False, + "max_flush_loops": 10, + "batch_mode": False, + "do_all": False, + } + + if sys.platform != "darwin": + stderr("Unupported OS.") + exit(-1) + + argv = sys.argv[1:] + while argv: + if argv[0] == "--": + argv.pop(0) + break + elif argv[0][0] != "-": + break + + opt = argv.pop(0) + if opt[1] == "-": + opt = opt[1:] + + if "-loops".startswith(opt): + try: + option["max_flush_loops"] = int(argv.pop(0)) + except IndexError: + missarg("loop count") + except ValueError: + error("loop count not a number") + elif "-family".startswith(opt): + try: + opt = argv.pop(0) + except IndexError: + missarg("family type") + if opt == "help": + usage() + option["preferred_family"] = read_family(opt) + if option["preferred_family"] == AF_UNSPEC: + invarg("invalid protocol family", opt) + elif opt == "-4": + option["preferred_family"] = AF_INET + elif opt == "-6": + option["preferred_family"] = AF_INET6 + elif opt == "-0": + option["preferred_family"] = AF_PACKET + elif opt == "-M": + option["preferred_family"] = AF_MPLS + elif opt == "-B": + option["preferred_family"] = AF_BRIDGE + elif "-human-readable".startswith(opt): + option["human_readable"] = True + elif "-iec".startswith(opt): + option["use_iec"] = True + elif "-stats".startswith(opt) or "-statistics".startswith(opt): + option["show_stats"] = True + elif "-details".startswith(opt): + option["show_details"] = True + elif "-resolve".startswith(opt): + option["resolve_hosts"] = True + elif "-oneline".startswith(opt): + option["oneline"] = True + elif "-timestamp".startswith(opt): + option["timestamp"] = True + elif "-tshort".startswith(opt): + option["timestamp"] = True + option["timestamp_short"] = True + elif "-Version".startswith(opt): + print(f"ip wrapper, iproute4mac-{iproute4mac.VERSION}") + exit(0) + elif "-force".startswith(opt): + option["force"] = True + elif "-batch".startswith(opt): + try: + batch_file = argv.pop(0) + except IndexError: + missarg("batch file") + elif "-brief".startswith(opt): + option["brief"] = True + elif "-json".startswith(opt): + option["json"] = True + elif "-pretty".startswith(opt): + option["pretty"] = True + elif "-rcvbuf".startswith(opt): + try: + option["rcvbuf"] = int(argv.pop(0)) + except IndexError: + missarg("rcvbuf size") + except ValueError: + error("rcvbuf size not a number") + elif matches_color(opt): + # Color option is not implemented + pass + elif "-help".startswith(opt): + usage() + elif "-netns".startswith(opt): + do_notimplemented() + elif "-Numeric".startswith(opt): + option["numeric"] = True + elif "-all".startswith(opt): + option["do_all"] = True + elif opt == "-echo": + option["echo_request"] = True + else: + stderr(f'Option "{opt}" is unknown, try "ip -help".') + exit(-1) + + if batch_file: + do_notimplemented() + + if argv: + return do_obj(argv, option) + + usage() + + +if __name__ == "__main__": + main() diff --git a/src/iproute4mac/ipaddress.py b/iproute4mac/ipaddress.py similarity index 67% rename from src/iproute4mac/ipaddress.py rename to iproute4mac/ipaddress.py index 64afe33..8f0e5dd 100644 --- a/src/iproute4mac/ipaddress.py +++ b/iproute4mac/ipaddress.py @@ -61,10 +61,12 @@ def usage(): # vcan | veth | vlan | vrf | vti | vxcan | vxlan | wwan | # xfrm } def ipaddr_list(argv, option, usage=usage): - cmd = subprocess.run(['ifconfig', '-v', '-L', '-a',], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8") + cmd = subprocess.run( + ["ifconfig", "-v", "-L", "-a"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) if cmd.returncode != 0: print(cmd.stderr) exit(cmd.returncode) @@ -72,73 +74,68 @@ def ipaddr_list(argv, option, usage=usage): links = ifconfig.parse(cmd.stdout, option=option) while argv: opt = argv.pop(0) - if strcmp(opt, 'to'): + if strcmp(opt, "to"): # to = next_arg(argv) # get_prefix(to, option['preferred_family']) do_notimplemented() - elif strcmp(opt, 'scope'): + elif strcmp(opt, "scope"): scope = next_arg(argv) - if (scope not in ('link', 'host', 'global', 'all') - and not scope.isdigit()): + if scope not in ("link", "host", "global", "all") and not scope.isdigit(): invarg('invalid "scope"', scope) - if scope == 'all': + if scope == "all": continue do_notimplemented() - elif strcmp(opt, 'up'): - links = [link for link in links if ('flags' in link and 'UP' in link['flags'])] + elif strcmp(opt, "up"): + links = [link for link in links if ("flags" in link and "UP" in link["flags"])] # TODO: elif get_filter(opt): - elif strcmp(opt, 'label'): + elif strcmp(opt, "label"): # label = next_opt(argv) do_notimplemented() - elif strcmp(opt, 'group'): + elif strcmp(opt, "group"): group = next_arg(argv) do_notimplemented() invarg('Invalid "group" value', group) - elif strcmp(opt, 'master'): + elif strcmp(opt, "master"): master = next_arg(argv) - if not any(link['ifname'] == master for link in links): - invarg('Device does not exist', master) - links = [link for link in links if ('master' in link and link['master'] == master)] - elif strcmp(opt, 'vrf'): + if not any(link["ifname"] == master for link in links): + invarg("Device does not exist", master) + links = [link for link in links if ("master" in link and link["master"] == master)] + elif strcmp(opt, "vrf"): vrf = next_arg(argv) - if not any(link['ifname'] == vrf for link in links): - invarg('Not a valid VRF name', vrf) + if not any(link["ifname"] == vrf for link in links): + invarg("Not a valid VRF name", vrf) # if not name_is_vrf(vrf): # invarg('Not a valid VRF name', vrf) # links = [link for link in links if ('master' in link and link['master'] == vrf)] # FIXME: https://wiki.netunix.net/freebsd/network/vrf/ do_notimplemented() - elif strcmp(opt, 'nomaster'): - links = [link for link in links if 'master' not in link] - elif strcmp(opt, 'type'): + elif strcmp(opt, "nomaster"): + links = [link for link in links if "master" not in link] + elif strcmp(opt, "type"): kind = next_arg(argv) - if kind.endswith('_slave'): - kind = kind.replace('_slave', '') - links = [link for link in links if recurse_in(link, ['linkinfo', 'info_slave_kind'], kind)] + if kind.endswith("_slave"): + kind = kind.replace("_slave", "") + links = [link for link in links if recurse_in(link, ["linkinfo", "info_slave_kind"], kind)] else: - links = [link for link in links if recurse_in(link, ['linkinfo', 'info_kind'], kind)] + links = [link for link in links if recurse_in(link, ["linkinfo", "info_kind"], kind)] else: - if strcmp(opt, 'dev'): + if strcmp(opt, "dev"): opt = next_arg(argv) - elif matches(opt, 'help'): + elif matches(opt, "help"): usage() - links = [link for link in links if link['ifname'] == opt] + links = [link for link in links if link["ifname"] == opt] if not links: stderr(f'Device "{opt}" does not exist.') exit(-1) - if not option['show_details']: - delete_keys(links, 'linkinfo') + if not option["show_details"]: + delete_keys(links, "linkinfo") - if option['preferred_family'] in (AF_INET, AF_INET6, AF_MPLS, AF_BRIDGE): - family = family_name(option['preferred_family']) - links = [ - link for link in links if 'addr_info' in link and any( - addr['family'] == family for addr in link['addr_info'] - ) - ] - elif option['preferred_family'] == AF_PACKET: - delete_keys(links, 'addr_info') + if option["preferred_family"] in (AF_INET, AF_INET6, AF_MPLS, AF_BRIDGE): + family = family_name(option["preferred_family"]) + links = [link for link in links if "addr_info" in link and any(addr["family"] == family for addr in link["addr_info"])] + elif option["preferred_family"] == AF_PACKET: + delete_keys(links, "addr_info") ifconfig.dumps(links, option) return EXIT_SUCCESS @@ -149,28 +146,25 @@ def do_ipaddr(argv, option): return ipaddr_list(argv, option) cmd = argv.pop(0) - if 'add'.startswith(cmd): + if "add".startswith(cmd): return do_notimplemented() - elif ('change'.startswith(cmd) - or 'chg'.startswith(cmd)): + elif "change".startswith(cmd) or "chg".startswith(cmd): return do_notimplemented() - elif 'replace'.startswith(cmd): + elif "replace".startswith(cmd): return do_notimplemented() - elif 'delete'.startswith(cmd): + elif "delete".startswith(cmd): return do_notimplemented() - elif ('show'.startswith(cmd) - or 'lst'.startswith(cmd) - or 'list'.startswith(cmd)): + elif "show".startswith(cmd) or "lst".startswith(cmd) or "list".startswith(cmd): return ipaddr_list(argv, option) - elif 'flush'.startswith(cmd): + elif "flush".startswith(cmd): return do_notimplemented() - elif 'save'.startswith(cmd): + elif "save".startswith(cmd): return do_notimplemented() - elif 'showdump'.startswith(cmd): + elif "showdump".startswith(cmd): return do_notimplemented() - elif 'restore'.startswith(cmd): + elif "restore".startswith(cmd): return do_notimplemented() - elif 'help'.startswith(cmd): + elif "help".startswith(cmd): return usage() stderr(f'Command "{cmd}" is unknown, try "ip address help".') diff --git a/src/iproute4mac/iplink.py b/iproute4mac/iplink.py similarity index 90% rename from src/iproute4mac/iplink.py rename to iproute4mac/iplink.py index 60f952e..d264343 100644 --- a/src/iproute4mac/iplink.py +++ b/iproute4mac/iplink.py @@ -86,7 +86,7 @@ def usage(): # | netdevsim | rmnet | xfrm ] # ETYPE := [ TYPE | bridge_slave | bond_slave ] def iplink_list(argv, option): - option['preferred_family'] = AF_PACKET + option["preferred_family"] = AF_PACKET return ipaddr_list(argv, option, usage) @@ -95,26 +95,23 @@ def do_iplink(argv, option): return iplink_list(argv, option) cmd = argv.pop(0) - if 'add'.startswith(cmd): + if "add".startswith(cmd): return do_notimplemented() - elif ('change'.startswith(cmd) - or 'set'.startswith(cmd)): + elif "change".startswith(cmd) or "set".startswith(cmd): return do_notimplemented() - elif 'replace'.startswith(cmd): + elif "replace".startswith(cmd): return do_notimplemented() - elif 'delete'.startswith(cmd): + elif "delete".startswith(cmd): return do_notimplemented() - elif ('show'.startswith(cmd) - or 'lst'.startswith(cmd) - or 'list'.startswith(cmd)): + elif "show".startswith(cmd) or "lst".startswith(cmd) or "list".startswith(cmd): return iplink_list(argv, option) - elif 'xstats'.startswith(cmd): + elif "xstats".startswith(cmd): return do_notimplemented() - elif 'afstats'.startswith(cmd): + elif "afstats".startswith(cmd): return do_notimplemented() - elif 'property'.startswith(cmd): + elif "property".startswith(cmd): return do_notimplemented() - elif 'help'.startswith(cmd): + elif "help".startswith(cmd): return usage() stderr(f'Command "{cmd}" is unknown, try "ip link help".') diff --git a/src/iproute4mac/iproute.py b/iproute4mac/iproute.py similarity index 62% rename from src/iproute4mac/iproute.py rename to iproute4mac/iproute.py index 57b697a..48a4ae0 100644 --- a/src/iproute4mac/iproute.py +++ b/iproute4mac/iproute.py @@ -68,6 +68,21 @@ def usage(): exit(-1) +def iproute_modify(argv, option): + while argv: + opt = argv.pop(0) + if strcmp(opt, "src"): + src = next_arg(argv) + do_notimplemented([src]) + elif strcmp(opt, "as"): + addr = next_arg(argv) + if strcmp(addr, "to"): + addr = next_arg(argv) + do_notimplemented([addr]) + + return EXIT_SUCCESS + + # ip route [ list [ SELECTOR ] ] # SELECTOR := [ root PREFIX ] [ match PREFIX ] [ exact PREFIX ] # [ table TABLE_ID ] [ vrf NAME ] [ proto RTPROTO ] @@ -78,117 +93,119 @@ def usage(): # SCOPE := [ host | link | global | NUMBER ] # RTPROTO := [ kernel | boot | static | NUMBER ] def iproute_list(argv, option): - cmd = subprocess.run(['netstat', '-n', '-r'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8") + cmd = subprocess.run( + ["netstat", "-n", "-r"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) if cmd.returncode != 0: stderr(cmd.stderr) exit(cmd.returncode) - if option['preferred_family'] == AF_UNSPEC: - option['preferred_family'] = AF_INET + if option["preferred_family"] == AF_UNSPEC: + option["preferred_family"] = AF_INET routes = netstat.parse(cmd.stdout, option) while argv: opt = argv.pop(0) - if matches(opt, 'table'): + if matches(opt, "table"): table = next_arg(argv) do_notimplemented([table]) - elif matches(opt, 'vrf'): + elif matches(opt, "vrf"): tid = next_arg(argv) do_notimplemented([tid]) - elif matches(opt, 'cached', 'cloned'): + elif matches(opt, "cached", "cloned"): do_notimplemented() - elif (strcmp(opt, 'tod') - or matches(opt, 'dsfield')): + elif strcmp(opt, "tod") or matches(opt, "dsfield"): tos = next_arg(argv) do_notimplemented([tos]) - elif matches(opt, 'protocol'): + elif matches(opt, "protocol"): protocol = next_arg(argv) - if protocol not in ('static', 'redirect', 'kernel', 'all'): + if protocol not in ("static", "redirect", "kernel", "all"): invarg('invalid "protocol"', protocol) - if protocol == 'all': + if protocol == "all": continue - routes = [route for route in routes if 'protocol' in route and route['protocol'] == protocol] - elif matches(opt, 'scope'): + routes = [route for route in routes if "protocol" in route and route["protocol"] == protocol] + elif matches(opt, "scope"): scope = next_arg(argv) - if (scope not in ('link', 'host', 'global', 'all') - and not scope.isdigit()): + if scope not in ("link", "host", "global", "all") and not scope.isdigit(): invarg('invalid "scope"', scope) - if scope == 'all': + if scope == "all": continue # FIXME: numeric scope? - routes = [route for route in routes if 'scope' in route and route['scope'] == scope] - delete_keys(routes, 'scope') - elif matches(opt, 'type'): + routes = [route for route in routes if "scope" in route and route["scope"] == scope] + delete_keys(routes, "scope") + elif matches(opt, "type"): addr_type = next_arg(argv) - if addr_type not in ('blackhole', 'broadcast', 'multicast', 'unicast'): - invarg('node type value is invalid', addr_type) - routes = [route for route in routes if (('type' in route and route['type'] == addr_type) - or ('type' not in route and addr_type == 'unicast'))] - elif strcmp(opt, 'dev', 'oif', 'iif'): + if addr_type not in ("blackhole", "broadcast", "multicast", "unicast"): + invarg("node type value is invalid", addr_type) + routes = [ + route + for route in routes + if (("type" in route and route["type"] == addr_type) or ("type" not in route and addr_type == "unicast")) + ] + elif strcmp(opt, "dev", "oif", "iif"): dev = next_arg(argv) - routes = [route for route in routes if 'dev' in route and route['dev'] == dev] - delete_keys(routes, 'dev') - elif strcmp(opt, 'mark'): + routes = [route for route in routes if "dev" in route and route["dev"] == dev] + delete_keys(routes, "dev") + elif strcmp(opt, "mark"): mark = next_arg(argv) do_notimplemented([mark]) - elif (matches(opt, 'metric', 'priority') - or strcmp(opt, 'preference')): + elif matches(opt, "metric", "priority") or strcmp(opt, "preference"): metric = next_arg(argv) try: metric = int(metric) except ValueError: invarg('"metric" value is invalid', metric) do_notimplemented() - elif strcmp(opt, 'via'): + elif strcmp(opt, "via"): via = next_arg(argv) family = read_family(via) if family == AF_UNSPEC: - family = option['preferred_family'] + family = option["preferred_family"] else: via = next_arg(argv) prefix = get_prefix(via, family) - routes = [route for route in routes if 'gateway' in route and prefix in Prefix(route['gateway'])] - delete_keys(routes, 'gateway') - elif strcmp(opt, 'src'): + routes = [route for route in routes if "gateway" in route and prefix in Prefix(route["gateway"])] + delete_keys(routes, "gateway") + elif strcmp(opt, "src"): src = next_arg(argv) do_notimplemented([src]) - elif matches(opt, 'realms'): + elif matches(opt, "realms"): realm = next_arg(argv) do_notimplemented([realm]) - elif matches(opt, 'from'): + elif matches(opt, "from"): opt = next_arg(argv) - if matches(opt, 'root'): + if matches(opt, "root"): opt = next_arg(argv) - prefix = get_prefix(opt, option['preferred_family']) + prefix = get_prefix(opt, option["preferred_family"]) do_notimplemented() - elif matches(opt, 'match'): + elif matches(opt, "match"): opt = next_arg(argv) - prefix = get_prefix(opt, option['preferred_family']) + prefix = get_prefix(opt, option["preferred_family"]) do_notimplemented() else: - if matches(opt, 'exact'): + if matches(opt, "exact"): opt = next_arg(argv) - prefix = get_prefix(opt, option['preferred_family']) + prefix = get_prefix(opt, option["preferred_family"]) do_notimplemented() else: - if matches(opt, 'to'): + if matches(opt, "to"): opt = next_arg(argv) - if matches(opt, 'root'): + if matches(opt, "root"): opt = next_arg(argv) - prefix = get_prefix(opt, option['preferred_family']) + prefix = get_prefix(opt, option["preferred_family"]) do_notimplemented() - elif matches(opt, 'match'): + elif matches(opt, "match"): opt = next_arg(argv) - prefix = get_prefix(opt, option['preferred_family']) - routes = [route for route in routes if 'dst' in route and prefix in Prefix(route['dst'])] + prefix = get_prefix(opt, option["preferred_family"]) + routes = [route for route in routes if "dst" in route and prefix in Prefix(route["dst"])] else: - if matches(opt, 'exact'): + if matches(opt, "exact"): opt = next_arg(argv) - prefix = get_prefix(opt, option['preferred_family']) - routes = [route for route in routes if 'dst' in route and prefix == Prefix(route['dst'])] + prefix = get_prefix(opt, option["preferred_family"]) + routes = [route for route in routes if "dst" in route and prefix == Prefix(route["dst"])] netstat.dumps(routes, option) return EXIT_SUCCESS @@ -199,35 +216,33 @@ def do_iproute(argv, option): return iproute_list(argv, option) cmd = argv.pop(0) - if 'add'.startswith(cmd): - return do_notimplemented() - elif 'change'.startswith(cmd): + if "add".startswith(cmd): + return iproute_modify(argv, option) + elif "change".startswith(cmd): return do_notimplemented() - elif 'replace'.startswith(cmd): + elif "replace".startswith(cmd): return do_notimplemented() - elif 'prepend'.startswith(cmd): + elif "prepend".startswith(cmd): return do_notimplemented() - elif 'append'.startswith(cmd): + elif "append".startswith(cmd): return do_notimplemented() - elif 'test'.startswith(cmd): + elif "test".startswith(cmd): return do_notimplemented() - elif 'delete'.startswith(cmd): + elif "delete".startswith(cmd): return do_notimplemented() - elif ('show'.startswith(cmd) - or 'lst'.startswith(cmd) - or 'list'.startswith(cmd)): + elif "show".startswith(cmd) or "lst".startswith(cmd) or "list".startswith(cmd): return iproute_list(argv, option) - elif 'get'.startswith(cmd): + elif "get".startswith(cmd): return do_notimplemented() - elif 'flush'.startswith(cmd): + elif "flush".startswith(cmd): return do_notimplemented() - elif 'save'.startswith(cmd): + elif "save".startswith(cmd): return do_notimplemented() - elif 'restore'.startswith(cmd): + elif "restore".startswith(cmd): return do_notimplemented() - elif 'showdump'.startswith(cmd): + elif "showdump".startswith(cmd): return do_notimplemented() - elif 'help'.startswith(cmd): + elif "help".startswith(cmd): return usage() stderr(f'Command "{cmd}" is unknown, try "ip route help".') diff --git a/iproute4mac/netstat.py b/iproute4mac/netstat.py new file mode 100644 index 0000000..d7f8a42 --- /dev/null +++ b/iproute4mac/netstat.py @@ -0,0 +1,147 @@ +import re + +from iproute4mac.utils import * + + +""" +https://docs.freebsd.org/en/books/handbook/advanced-networking/#routeflags +""" +RTF_PROTO1 = "1" # Protocol specific routing flag #1 +RTF_PROTO2 = "2" # Protocol specific routing flag #2 +RTF_PROTO3 = "3" # Protocol specific routing flag #3 +RTF_BLACKHOLE = "B" # Just discard packets (during updates) +RTF_BROADCAST = "b" # The route represents a broadcast address +RTF_CLONING = "C" # Generate new routes on use +RTF_PRCLONING = "c" # Protocol-specified generate new routes on use +RTF_DYNAMIC = "D" # Created dynamically (by redirect) +RTF_GATEWAY = "G" # Destination requires forwarding by intermediary +RTF_HOST = "H" # Host entry (net otherwise) +RTF_IFSCOPE = "I" # Route is associated with an interface scope +RTF_IFREF = "i" # Route is holding a reference to the interface +RTF_LLINFO = "L" # Valid protocol to link address translation +RTF_MODIFIED = "M" # Modified dynamically (by redirect) +RTF_MULTICAST = "m" # The route represents a multicast address +RTF_REJECT = "R" # Host or net unreachable +RTF_ROUTER = "r" # Host is a default router +RTF_STATIC = "S" # Manually added +RTF_UP = "U" # Route usable +RTF_WASCLONED = "W" # Route was generated as a result of cloning +RTF_XRESOLVE = "X" # External daemon translates proto to link address +RTF_PROXY = "Y" # Proxying; cloned routes will not be scoped +RTF_GLOBAL = "g" # Route to a destination of the global internet (policy hint) + + +def dumps(routes, option): + if option["json"]: + print(json_dumps(routes, option["pretty"])) + return + + if not routes: + return + + for route in routes: + if option["show_details"] or "type" in route: + stdout(route["type"] if "type" in route else "unicast", " ") + stdout(route["dst"]) + if "gateway" in route: + stdout(f" via {route['gateway']}") + if "dev" in route: + stdout(f" dev {route['dev']}") + if "protocol" in route: + stdout(f" proto {route['protocol']}") + if "scope" in route: + stdout(f" scope {route['scope']}") + if "prefsrc" in route: + stdout(f" src {route['src']}") + stdout(end="\n") + + +class netstatRegEx: + _ipv4 = re.compile( + rf"(?P(?:default|{IPV4ADDR}))(?:/(?P\d+))?" rf"\s+(?P{IPV4ADDR}|{LLADDR}|link#\d+)" + ) + _ipv6 = re.compile( + rf"(?P(?:default|{IPV6ADDR}))(?:%\w+)?(?:/(?P\d+))?" rf"\s+(?P{IPV6ADDR}|{LLADDR}|link#\d+)" + ) + _route = re.compile( + rf"(?P(?:default|{IPV4ADDR}|{IPV6ADDR}))(?:%\w+)?(?:/(?P\d+))?" + rf"\s+(?P{IPV4ADDR}|{IPV6ADDR}|{LLADDR}|link#\d+)" + r"\s+(?P\w+)" + r"\s+(?P\w+)" + r"\s+(?P\S+)?" + ) + + def __init__(self, line): + self.ipv4 = self._ipv4.match(line) + self.ipv6 = self._ipv6.match(line) + self.route = self._route.match(line) + + +def parse(res, option): + routes = [] + for line in iter(res.split("\n")): + match = netstatRegEx(line) + + if match.route: + dst, prefix, gateway, flags, dev, expire = match.route.groups() + + if any(flag in flags for flag in (RTF_WASCLONED, RTF_PROXY)): + continue + if match.ipv4 and option["preferred_family"] == AF_INET6: + continue + if match.ipv6 and option["preferred_family"] == AF_INET: + continue + + if dst != "default" and match.ipv4: + dots = dst.count(".") + if dots < 3: + dst = dst + ".0" * (3 - dots) + if not prefix: + prefix = 8 * (dots + 1) + if prefix: + dst = f"{dst}/{prefix}" + + # protocol + if RTF_STATIC in flags: + protocol = "static" + elif any(flag in flags for flag in (RTF_DYNAMIC, RTF_MODIFIED)): + protocol = "redirect" + else: + protocol = "kernel" + + # scope + if gateway.startswith("link#") or re.search(LLADDR, gateway): + scope = "link" + gateway = None + elif RTF_HOST in flags: + scope = "host" + elif option["show_details"]: + scope = "global" + else: + scope = None + + # address type + if RTF_BLACKHOLE in flags: + addr_type = "blackhole" + elif RTF_BROADCAST in flags: + addr_type = "broadcast" + elif RTF_MULTICAST in flags: + addr_type = "multicast" + elif option["show_details"]: + addr_type = "unicast" + else: + addr_type = None + + route = { + "type": addr_type, + "dst": dst, + "gateway": gateway, + "dev": dev, + "protocol": protocol, + "scope": scope, + "expire": int(expire) if expire and expire != "!" else None, + "flags": [], + } + routes.append({k: v for k, v in route.items() if v is not None}) + + return routes diff --git a/src/iproute4mac/utils.py b/iproute4mac/utils.py similarity index 71% rename from src/iproute4mac/utils.py rename to iproute4mac/utils.py index 65bdc2f..3c5895e 100644 --- a/src/iproute4mac/utils.py +++ b/iproute4mac/utils.py @@ -6,23 +6,23 @@ from _ctypes import PyObj_FromPtr -''' Costants ''' +""" Costants """ # socket.h AF_UNSPEC = 0 AF_UNIX = 1 AF_INET = 2 AF_BRIDGE = 7 AF_INET6 = 10 -AF_PACKET = 17 # not present in BSD +AF_PACKET = 17 # not present in BSD AF_MPLS = 28 address_families = [ - (AF_UNSPEC, 'none'), - (AF_INET, 'inet'), - (AF_INET6, 'inet6'), - (AF_PACKET, 'link'), - (AF_MPLS, 'mpls'), - (AF_BRIDGE, 'bridge') + (AF_UNSPEC, "none"), + (AF_INET, "inet"), + (AF_INET6, "inet6"), + (AF_PACKET, "link"), + (AF_MPLS, "mpls"), + (AF_BRIDGE, "bridge"), ] # libc @@ -30,51 +30,48 @@ EXIT_SUCCESS = 0 # map operstates -oper_states = { - 'active': 'UP', - 'inactive': 'DOWN' -} +oper_states = {"active": "UP", "inactive": "DOWN"} # MAC address RegEx -LLSEG = '[0-9a-fA-F]{1,2}' -LLADDR = '(?:%s(?::%s){5})' % (LLSEG, LLSEG) +LLSEG = "[0-9a-fA-F]{1,2}" +LLADDR = "(?:%s(?::%s){5})" % (LLSEG, LLSEG) # IPv4 RegEx -IPV4SEG = '(?:25[0-5]|2[0-4][0-9]|1{0,1}[0-9]{1,2})' -IPV4ADDR = r'(?:%s(?:\.%s){0,3})' % (IPV4SEG, IPV4SEG) -IPV4MASK = '(?:0x)?(?:[0-9a-fA-F]){8}' +IPV4SEG = "(?:25[0-5]|2[0-4][0-9]|1{0,1}[0-9]{1,2})" +IPV4ADDR = r"(?:%s(?:\.%s){0,3})" % (IPV4SEG, IPV4SEG) +IPV4MASK = "(?:0x)?(?:[0-9a-fA-F]){8}" # IPv6 RegEx -IPV6SEG = '(?:[0-9a-fA-F]{1,4})' +IPV6SEG = "(?:[0-9a-fA-F]{1,4})" IPV6GROUPS = ( - '::', - '(?:%s:){1,7}:' % (IPV6SEG), - ':(?::%s){1,7}' % (IPV6SEG), - '(?:%s:){1,6}:%s' % (IPV6SEG, IPV6SEG), - '%s:(?::%s){1,6}' % (IPV6SEG, IPV6SEG), - '(?:%s:){1,5}(?::%s){1,2}' % (IPV6SEG, IPV6SEG), - '(?:%s:){1,4}(?::%s){1,3}' % (IPV6SEG, IPV6SEG), - '(?:%s:){1,3}(?::%s){1,4}' % (IPV6SEG, IPV6SEG), - '(?:%s:){1,2}(?::%s){1,5}' % (IPV6SEG, IPV6SEG), - '(?:%s:){7,7}%s' % (IPV6SEG, IPV6SEG), + "::", + "(?:%s:){1,7}:" % (IPV6SEG), + ":(?::%s){1,7}" % (IPV6SEG), + "(?:%s:){1,6}:%s" % (IPV6SEG, IPV6SEG), + "%s:(?::%s){1,6}" % (IPV6SEG, IPV6SEG), + "(?:%s:){1,5}(?::%s){1,2}" % (IPV6SEG, IPV6SEG), + "(?:%s:){1,4}(?::%s){1,3}" % (IPV6SEG, IPV6SEG), + "(?:%s:){1,3}(?::%s){1,4}" % (IPV6SEG, IPV6SEG), + "(?:%s:){1,2}(?::%s){1,5}" % (IPV6SEG, IPV6SEG), + "(?:%s:){7,7}%s" % (IPV6SEG, IPV6SEG), ) -IPV6ADDR = '|'.join([f'(?:{group})' for group in IPV6GROUPS[::-1]]) -IPV6ADDR = f'(?:{IPV6ADDR})' +IPV6ADDR = "|".join([f"(?:{group})" for group in IPV6GROUPS[::-1]]) +IPV6ADDR = f"(?:{IPV6ADDR})" # nu -ND6_INFINITE_LIFETIME = 0xffffffff +ND6_INFINITE_LIFETIME = 0xFFFFFFFF -def stdout(*args, sep='', end=''): +def stdout(*args, sep="", end=""): print(*args, sep=sep, end=end) def stderr(text): - sys.stderr.write(text + '\n') + sys.stderr.write(text + "\n") def error(text): - stderr(f'Error: {text}') + stderr(f"Error: {text}") exit(-1) @@ -109,12 +106,12 @@ def family_name(family): for f, n in address_families: if family == f: return n - return '???' + return "???" def family_name_verbose(family): if family == AF_UNSPEC: - return 'any value' + return "any value" return family_name(family) @@ -134,22 +131,22 @@ def af_byte_len(af): def mask2bits(mask): - return sum([bit_count(int(octet)) for octet in mask.split('.')]) + return sum([bit_count(int(octet)) for octet in mask.split(".")]) def get_addr(name, family): if family == AF_MPLS: - error('MPLS protocol not supported.') - elif strcmp(name, 'default'): + error("MPLS protocol not supported.") + elif strcmp(name, "default"): if family == AF_INET: - return Prefix('0.0.0.0/0') + return Prefix("0.0.0.0/0") if family == AF_INET6: - return Prefix('::/0') - elif strcmp(name, 'any', 'all'): + return Prefix("::/0") + elif strcmp(name, "any", "all"): if family == AF_INET: - return Prefix('0.0.0.0') + return Prefix("0.0.0.0") if family == AF_INET6: - return Prefix('::') + return Prefix("::") else: prefix = Prefix(name) if family == AF_PACKET or prefix.family == family: @@ -209,7 +206,7 @@ def ref(obj_id): def json_dumps(data, pretty=False): if pretty: return json.dumps(data, cls=IpRouteJSON, indent=4) - return json.dumps(data, separators=(',', ':')) + return json.dumps(data, separators=(",", ":")) def json_unindent_list(obj): @@ -219,8 +216,8 @@ def json_unindent_list(obj): elif isinstance(obj, list): if all(isinstance(x, str) for x in obj): return NoIndent(obj) - for i, l in enumerate(obj): - obj[i] = json_unindent_list(l) + for index, entry in enumerate(obj): + obj[index] = json_unindent_list(entry) return obj @@ -231,13 +228,13 @@ def __init__(self, value): def __repr__(self): if self.value: reps = (repr(v) for v in self.value) - return '[ ' + ','.join(reps).replace("'", '"') + ' ]' - return '[ ]' + return "[ " + ",".join(reps).replace("'", '"') + " ]" + return "[ ]" class IpRouteJSON(json.JSONEncoder): - FORMAT_SPEC = '@@{}@@' - regex = re.compile(FORMAT_SPEC.format(r'(\d+)')) + FORMAT_SPEC = "@@{}@@" + regex = re.compile(FORMAT_SPEC.format(r"(\d+)")) def default(self, obj): if isinstance(obj, NoIndent): @@ -251,19 +248,19 @@ def encode(self, obj): for match in self.regex.finditer(json_repr): id = int(match.group(1)) json_repr = json_repr.replace(f'"{format_spec.format(id)}"', repr(ref(id))) - json_repr = re.sub(r'\[\n\s+{', '[ {', json_repr) - json_repr = re.sub(r'},\n\s+{', '},{', json_repr) - json_repr = re.sub(r'}\n\s*\]', '} ]', json_repr) + json_repr = re.sub(r"\[\n\s+{", "[ {", json_repr) + json_repr = re.sub(r"},\n\s+{", "},{", json_repr) + json_repr = re.sub(r"}\n\s*\]", "} ]", json_repr) return json_repr class Prefix: - __slots__ = ('_prefix') + __slots__ = "_prefix" def __init__(self, prefix): - if prefix == 'default': - prefix = '0.0.0.0/0' - if '/' in prefix: + if prefix == "default": + prefix = "0.0.0.0/0" + if "/" in prefix: self._prefix = ipaddress.ip_network(prefix) else: self._prefix = ipaddress.ip_address(prefix) @@ -288,9 +285,11 @@ def __contains__(self, other): return False def __repr__(self): - if (isinstance(self._prefix, ipaddress.IPv4Network | ipaddress.IPv6Network) - and self._prefix.network_address._ip + self._prefix.prefixlen == 0): - return 'default' + if ( + isinstance(self._prefix, ipaddress.IPv4Network | ipaddress.IPv6Network) + and self._prefix.network_address._ip + self._prefix.prefixlen == 0 + ): + return "default" return str(self._prefix) def __str__(self): @@ -352,12 +351,12 @@ def matches(opt, *args): def matches_color(opt): - if '=' in opt: - (opt, arg) = opt.split('=', 1) + if "=" in opt: + (opt, arg) = opt.split("=", 1) else: - arg = 'always' - return '-color'.startswith(opt) and arg in ['always', 'auto', 'never'] + arg = "always" + return "-color".startswith(opt) and arg in ["always", "auto", "never"] def do_notimplemented(argv=[], option={}): - error('function not implemented') + error("function not implemented") diff --git a/setup.cfg b/setup.cfg index d8ee453..9f14fd8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,6 @@ [metadata] license_files = LICENSE name = iproute4mac -version = 0.1.5 author = Ettore Simone author_email = ettore.simone@gmail.com description = iproute for Mac @@ -22,12 +21,8 @@ keywords = iproute2, ip, bridge [options] package_dir = - = src packages = find: -python_requires = >=3.6 - -[options.packages.find] -where = src +python_requires = >=3.10 [options.entry_points] console_scripts = diff --git a/setup.py b/setup.py index 83e4ecd..82d9165 100644 --- a/setup.py +++ b/setup.py @@ -1,39 +1,4 @@ -from setuptools import setup, find_packages +from iproute4mac import VERSION +from setuptools import setup -VERSION = '0.1.5' -DESCRIPTION = 'iproute for Mac' -with open('README.md', 'r') as f: - LONG_DESCRIPTION = f.read() - -setup( - name = 'iproute4mac', - version = VERSION, - author = 'Ettore Simone', - author_email = 'ettore.simone@gmail.com', - description = DESCRIPTION, - long_description = LONG_DESCRIPTION, - long_description_content_type = 'text/markdown', - license_files = ('LICENSE'), - packages = find_packages('src'), - install_requires = [], - keywords = ['iproute2', 'ip', 'bridge'], - url = 'https://github.com/signal-09/iproute4mac', - project_urls = { - 'Source': 'https://github.com/signal-09/iproute4mac', - }, - classifiers = [ - 'Development Status :: 3 - Alpha', - 'Environment :: Console', - 'Intended Audience :: System Administrators', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: MacOS :: MacOS X', - 'Programming Language :: Python :: 3.10', - 'Topic :: System :: Networking', - ], - entry_points = { - 'console_scripts': [ - 'ip = iproute4mac.ip:main', - 'bridge = iproute4mac.bridge:main', - ], - }, -) +setup(version=VERSION) diff --git a/src/iproute4mac/__init__.py b/src/iproute4mac/__init__.py deleted file mode 100644 index d3d645e..0000000 --- a/src/iproute4mac/__init__.py +++ /dev/null @@ -1 +0,0 @@ -VERSION = '0.1.5' diff --git a/src/iproute4mac/bridge.py b/src/iproute4mac/bridge.py deleted file mode 100644 index 266e6a2..0000000 --- a/src/iproute4mac/bridge.py +++ /dev/null @@ -1,138 +0,0 @@ -#!/usr/bin/env python3 - -import sys - -import iproute4mac -from iproute4mac.utils import * -from iproute4mac.iplink import * -from iproute4mac.ipaddress import * - - -def do_help(argv=[], option={}): - usage() - - -def usage(): - stderr("""\ -Usage: bridge [ OPTIONS ] OBJECT { COMMAND | help } - bridge [ -force ] -batch filename -where OBJECT := { link | fdb | mdb | vlan | vni | monitor } - OPTIONS := { -V[ersion] | -s[tatistics] | -d[etails] | - -o[neline] | -t[imestamp] | -n[etns] name | - -com[pressvlans] -c[olor] -p[retty] -j[son] }""") - exit(-1) - - -''' Implemented objects ''' -objs = [ - ('link', do_notimplemented), - ('fdb', do_notimplemented), - ('mdb', do_notimplemented), - ('vlan', do_notimplemented), - ('vni', do_notimplemented), - ('monitor', do_notimplemented), - ('help', do_help) -] - - -def do_obj(argv, option): - obj = argv.pop(0) - for o, f in objs: - if o.startswith(obj): - return f(argv, option) - - stderr(f'Object "{obj}" is unknown, try "bridge help".') - return EXIT_FAILURE - - -def main(): - batch_file = None - option = { - 'preferred_family': AF_UNSPEC, - 'show_stats': False, - 'show_details': False, - 'oneline': False, - 'timestamp': False, - 'compress_vlans': False, - 'force': False, - 'json': False, - 'pretty': False, - 'do_all': False - } - - if sys.platform != 'darwin': - stderr('Unupported OS.') - exit(-1) - - argv = sys.argv[1:] - while argv: - if argv[0] == '--': - argv.pop(0) - break - elif argv[0][0] != '-': - break - - opt = argv.pop(0) - if opt[1] == '-': - opt = opt[1:] - - if '-help'.startswith(opt): - usage() - elif '-Version'.startswith(opt): - print(f'bridge wrapper, iproute4mac-{iproute4mac.VERSION}') - exit(0) - elif '-stats'.startswith(opt) or '-statistics'.startswith(opt): - option['show_stats'] = True - elif '-details'.startswith(opt): - option['show_details'] = True - elif '-oneline'.startswith(opt): - option['oneline'] = True - elif '-timestamp'.startswith(opt): - option['timestamp'] = True - elif '-family'.startswith(opt): - try: - opt = argv.pop(0) - except IndexError: - missarg('family type') - if opt == 'help': - usage() - option['preferred_family'] = read_family(opt) - if option['preferred_family'] == AF_UNSPEC: - invarg('invalid protocol family', opt) - elif opt == '-4': - option['preferred_family'] = AF_INET - elif opt == '-6': - option['preferred_family'] = AF_INET6 - elif '-netns'.startswith(opt): - do_notimplemented() - elif matches_color(opt): - # Color option is not implemented - pass - elif '-compressvlans'.startswith(opt): - option['compress_vlans'] = True - elif '-force'.startswith(opt): - option['force'] = True - elif '-json'.startswith(opt): - option['json'] = True - elif '-pretty'.startswith(opt): - option['pretty'] = True - elif '-batch'.startswith(opt): - try: - batch_file = argv.pop(0) - except IndexError: - missarg('batch file') - else: - stderr(f'Option "{opt}" is unknown, try "bridge help".') - exit(-1) - - if batch_file: - do_notimplemented() - - if argv: - return do_obj(argv, option) - - usage() - - -if __name__ == '__main__': - main() diff --git a/src/iproute4mac/ifconfig.py b/src/iproute4mac/ifconfig.py deleted file mode 100644 index a73ce55..0000000 --- a/src/iproute4mac/ifconfig.py +++ /dev/null @@ -1,266 +0,0 @@ -import re - -from iproute4mac.utils import * - - -def dumps(links, option): - if option['json']: - print(json_dumps(links, option['pretty'])) - return - - if not links: - return - - for link in links: - stdout(link['ifindex'], ': ', link['ifname']) - if 'link' in link: - stdout('@', link['link']) - stdout(': <', ','.join(link['flags']), '> mtu ', link['mtu']) - if 'master' in link: - stdout(' master ', link['master']) - stdout(' state ', link['operstate'], end='\n') - - stdout(' link/', link['link_type']) - if 'address' in link: - stdout(' ', link['address']) - if 'broadcast' in link: - stdout(' brd ', link['broadcast']) - stdout(end='\n') - - if 'linkinfo' in link and 'info_kind' in link['linkinfo']: - info = link['linkinfo'] - if info['info_kind'] == 'vlan': - data = info['info_data'] - stdout(' ', info['info_kind'], ' protocol ', data['protocol'], ' id ', data['id'], end='\n') - elif info['info_kind'] == 'bridge': - data = info['info_data'] - stdout(' bridge ', ' '.join([f'{key} {value}' for key, value in data.items()]), end='\n') - - for addr in link.get('addr_info', []): - stdout(' ', addr['family']) - stdout(' ', addr['local']) - if 'address' in addr: - stdout(' peer ', addr['address']) - stdout('/', addr['prefixlen']) - if 'broadcast' in addr: - stdout(' brd ', addr['broadcast']) - if 'scope' in addr: - stdout(' scope ', addr['scope']) - stdout(end='\n') - if 'valid_life_time' in addr and 'preferred_life_time' in addr: - stdout(' valid_lft ', - 'forever' if addr['valid_life_time'] == ND6_INFINITE_LIFETIME else addr['valid_life_time'], - ' preferred_lft ', - 'forever' if addr['preferred_life_time'] == ND6_INFINITE_LIFETIME else addr['preferred_life_time']) - stdout(end='\n') - - -class ifconfigRegEx: - _header = re.compile(r'(?P\w+):' - r' flags=\w+<(?P.*)>' - r' mtu (?P\d+)' - r' index (?P\d+)') - _eflags = re.compile(r'\s+eflags=\w+<(?P.*)>') - _ether = re.compile(fr'\s+ether (?P{LLADDR})') - _inet = re.compile(fr'\s+inet (?P{IPV4ADDR})' - fr'(?: --> (?P
{IPV4ADDR}))?' - fr' netmask (?P{IPV4MASK})' - fr'(?: broadcast (?P{IPV4ADDR}))?') - _inet6 = re.compile(fr'\s+inet6 (?P{IPV6ADDR})(?:%\w+)?' - r' prefixlen (?P\d+)' - r'(?: (?Pautoconf))?' - r'(?: (?Psecured))?' - r'(?: pltime (?P\d+))?' - r'(?: vltime (?P\d+))?' - r'(?: scopeid (?P0x[0-9a-fA-F]+))?') - _state = re.compile(r'\s+status: (?P\w+)') - _vlan = re.compile(r'\s+vlan: (?P\d+) parent interface: (?P?)') - _bond = re.compile(r'\s+bond interfaces: (\w+(?: \w+)*)') - _bridge = re.compile(r'\s+Configuration:') - - def __init__(self, line): - self.header = self._header.match(line) - self.eflags = self._eflags.match(line) - self.ether = self._ether.match(line) - self.inet = self._inet.match(line) - self.inet6 = self._inet6.match(line) - self.state = self._state.match(line) - self.vlan = self._vlan.match(line) - self.bond = self._bond.match(line) - self.bridge = self._bridge.match(line) - - -class bridgeRegEx: - _id = re.compile(r'\s+id (?P(?:[0-9a-fA-F]{1,2}:?){6})' - r' priority (?P\d+)' - r' hellotime (?P\d+)' - r' fwddelay (?P\d+)') - _age = re.compile(r'\s+maxage (?P\d+)' - r' holdcnt (?P\d+)' - r' proto (?P\w+)' - r' maxaddr (?P\d+)' - r' timeout (?P\d+)') - _root = re.compile(r'\s+root id (?P(?:[0-9a-fA-F]{1,2}:?){6})' - r' priority (?P\d+)' - r' ifcost (?P\d+)' - r' port (?P\d+)') - _filter = re.compile(r'\s+ipfilter (?P\w+)' - r' flags (?P0x[0-9a-fA-F]+)') - _member = re.compile(r'\s+member: (?P\w+)') - _cache = re.compile(r'\s+media:') - - def __init__(self, line): - self.id = self._id.match(line) - self.age = self._age.match(line) - self.root = self._root.match(line) - self.filter = self._filter.match(line) - self.member = self._member.match(line) - self.cache = self._cache.match(line) - - -def parse_bridge(lines, links, link): - info_data = {} - while line := next(lines): - match = bridgeRegEx(line) - - if match.id: - info_data['forward_delay'] = int(match.id.group('delay')) - info_data['hello_time'] = int(match.id.group('hello')) - elif match.age: - info_data['max_age'] = int(match.age.group('max_age')) - info_data['ageing_time'] = int(match.age.group('ageing')) - elif match.root: - info_data['priority'] = int(match.root.group('priority')) - info_data['root_id'] = match.root.group('id') - info_data['root_port'] = int(match.root.group('port')) - info_data['root_path_cost'] = int(match.root.group('cost')) - elif match.filter: - info_data['ipfilter'] = match.filter.group('filter') != 'disabled' - elif match.member: - slave = next(item for item in links if item['ifname'] == match.member.group('member')) - slave['master'] = link['ifname'] - slave['linkinfo'] = {'info_slave_kind': 'bridge'} - elif match.cache: - link['linkinfo'].update({'info_data': info_data}) - break - - -def parse(res, option): - links = [] - lines = iter(res.split('\n')) - while line := next(lines): - match = ifconfigRegEx(line) - - if match.header: - header = match.header.groupdict() - link = { - 'ifindex': int(header['ifindex']), - 'ifname': header['ifname'], - 'flags': header['flags'].split(',') if header['flags'] != '' else [], - 'mtu': int(header['mtu']), - 'operstate': 'UNKNOWN', - 'link_type': 'none' - } - - if 'LOOPBACK' in link['flags']: - link['link_type'] = 'loopback' - link['address'] = '00:00:00:00:00:00' - link['broadcast'] = '00:00:00:00:00:00' - elif 'POINTOPOINT' in link['flags']: - link['link_pointtopoint'] = True - - if (link['ifname'].startswith('bridge') - or link['ifname'].startswith('bond') - or link['ifname'].startswith('vlan')): - link['linkinfo'] = {'info_kind': re.sub(r'[0-9]+', '', link['ifname'])} - - links.append(link) - inet_count = 0 - inet6_count = 0 - continue - - if match.eflags: - link['eflags'] = match.eflags.group('eflags').split(',') - elif match.ether: - link['link_type'] = 'ether' - link['address'] = match.ether.group('ether') - link['broadcast'] = 'ff:ff:ff:ff:ff:ff' - elif match.state: - link['operstate'] = oper_states[match.state.group('state')] - elif match.inet and option['preferred_family'] in (AF_INET, AF_UNSPEC): - inet = match.inet.groupdict() - addr = { - 'family': 'inet', - 'local': inet['local'] - } - if inet['address']: - addr['address'] = inet['address'] - addr['prefixlen'] = netmask_to_length(inet['netmask']) - if inet['broadcast']: - addr['broadcast'] = inet['broadcast'] - ip = Prefix(addr['local']) - if ip.is_link: - addr['scope'] = 'link' - elif ip.is_global: - # FIXME: may be Python ipaddress is_global() not compliant with iproute2 - addr['scope'] = 'global' - else: - addr['scope'] = 'host' - addr.update({ - 'valid_life_time': ND6_INFINITE_LIFETIME, - 'preferred_life_time': ND6_INFINITE_LIFETIME - }) - if inet_count + inet6_count > 0: - # Let IPv4 at beginning - link['addr_info'].insert(inet_count, addr) - else: - link['addr_info'] = [addr] - inet_count += 1 - elif match.inet6 and option['preferred_family'] in (AF_INET6, AF_UNSPEC): - inet6 = match.inet6.groupdict() - ip = Prefix(inet6['local']) - if ip.is_link: - scope = 'link' - elif ip.is_global: - # FIXME: may be Python ipaddress is_global() not compliant with iproute2 - scope = 'global' - else: - scope = 'host' - addr = { - 'family': 'inet6', - 'local': inet6['local'], - 'prefixlen': int(inet6['prefixlen']), - 'scope': scope, - 'valid_life_time': int(inet6['vltime']) if inet6['vltime'] else ND6_INFINITE_LIFETIME, - 'preferred_life_time': int(inet6['pltime']) if inet6['pltime'] else ND6_INFINITE_LIFETIME - } - if inet_count + inet6_count > 0: - # Put IPv6 after IPv4 - link['addr_info'].append(addr) - else: - link['addr_info'] = [addr] - inet6_count += 1 - elif match.vlan: - parent = match.vlan.group('parent') - if parent != '': - link['link'] = parent - link['linkinfo'].update({ - 'info_data': { - 'protocol': '802.1Q', - 'id': int(match.vlan.group('vlanid')), - 'flags': [] - } - }) - elif match.bond: - for ifname in match.bond.group(1).split(' '): - slave = next(item for item in links if item['ifname'] == ifname) - slave['master'] = link['ifname'] - slave['address'] = link['address'] - slave['linkinfo'] = { - 'info_slave_kind': 'bond', - 'perm_hwaddr': slave['address'] - } - elif match.bridge: - parse_bridge(lines, links, link) - - return links diff --git a/src/iproute4mac/ip.py b/src/iproute4mac/ip.py deleted file mode 100644 index 6d8bab0..0000000 --- a/src/iproute4mac/ip.py +++ /dev/null @@ -1,216 +0,0 @@ -#!/usr/bin/env python3 - -import sys - -import iproute4mac -from iproute4mac.utils import * -from iproute4mac.iplink import do_iplink -from iproute4mac.ipaddress import do_ipaddr -from iproute4mac.iproute import do_iproute - - -def do_help(argv=[], option={}): - usage() - - -def usage(): - stderr("""\ -Usage: ip [ OPTIONS ] OBJECT { COMMAND | help } - ip [ -force ] -batch filename -where OBJECT := { address | addrlabel | fou | help | ila | ioam | l2tp | link | - macsec | maddress | monitor | mptcp | mroute | mrule | - neighbor | neighbour | netconf | netns | nexthop | ntable | - ntbl | route | rule | sr | stats | tap | tcpmetrics | - token | tunnel | tuntap | vrf | xfrm } - OPTIONS := { -V[ersion] | -s[tatistics] | -d[etails] | -r[esolve] | - -h[uman-readable] | -iec | -j[son] | -p[retty] | - -f[amily] { inet | inet6 | mpls | bridge | link } | - -4 | -6 | -M | -B | -0 | - -l[oops] { maximum-addr-flush-attempts } | -echo | -br[ief] | - -o[neline] | -t[imestamp] | -ts[hort] | -b[atch] [filename] | - -rc[vbuf] [size] | -n[etns] name | -N[umeric] | -a[ll] | - -c[olor]}""") - exit(-1) - - -''' Implemented objects ''' -objs = [ - ('address', do_ipaddr), - ('addrlabel', do_notimplemented), - ('maddress', do_notimplemented), - ('route', do_iproute), - ('rule', do_notimplemented), - ('neighbor', do_notimplemented), - ('neighbour', do_notimplemented), - ('ntable', do_notimplemented), - ('ntbl', do_notimplemented), - ('link', do_iplink), - ('l2tp', do_notimplemented), - ('fou', do_notimplemented), - ('ila', do_notimplemented), - ('macsec', do_notimplemented), - ('tunnel', do_notimplemented), - ('tunl', do_notimplemented), - ('tuntap', do_notimplemented), - ('tap', do_notimplemented), - ('token', do_notimplemented), - ('tcpmetrics', do_notimplemented), - ('tcp_metrics', do_notimplemented), - ('monitor', do_notimplemented), - ('xfrm', do_notimplemented), - ('mroute', do_notimplemented), - ('mrule', do_notimplemented), - ('netns', do_notimplemented), - ('netconf', do_notimplemented), - ('vrf', do_notimplemented), - ('sr', do_notimplemented), - ('nexthop', do_notimplemented), - ('mptcp', do_notimplemented), - ('ioam', do_notimplemented), - ('help', do_help), - ('stats', do_notimplemented) -] - - -def do_obj(argv, option): - obj = argv.pop(0) - for o, f in objs: - if o.startswith(obj): - return f(argv, option) - - stderr(f'Object "{obj}" is unknown, try "ip help".') - return EXIT_FAILURE - - -def main(): - batch_file = None - option = { - 'preferred_family': AF_UNSPEC, - 'human_readable': False, - 'use_iec': False, - 'show_stats': False, - 'show_details': False, - 'oneline': False, - 'brief': False, - 'json': False, - 'pretty': False, - 'timestamp': False, - 'timestamp_short': False, - 'echo_request': False, - 'force': False, - 'max_flush_loops': 10, - 'batch_mode': False, - 'do_all': False - } - - if sys.platform != 'darwin': - stderr('Unupported OS.') - exit(-1) - - argv = sys.argv[1:] - while argv: - if argv[0] == '--': - argv.pop(0) - break - elif argv[0][0] != '-': - break - - opt = argv.pop(0) - if opt[1] == '-': - opt = opt[1:] - - if '-loops'.startswith(opt): - try: - option['max_flush_loops'] = int(argv.pop(0)) - except IndexError: - missarg('loop count') - except ValueError: - error('loop count not a number') - elif '-family'.startswith(opt): - try: - opt = argv.pop(0) - except IndexError: - missarg('family type') - if opt == 'help': - usage() - option['preferred_family'] = read_family(opt) - if option['preferred_family'] == AF_UNSPEC: - invarg('invalid protocol family', opt) - elif opt == '-4': - option['preferred_family'] = AF_INET - elif opt == '-6': - option['preferred_family'] = AF_INET6 - elif opt == '-0': - option['preferred_family'] = AF_PACKET - elif opt == '-M': - option['preferred_family'] = AF_MPLS - elif opt == '-B': - option['preferred_family'] = AF_BRIDGE - elif '-human-readable'.startswith(opt): - option['human_readable'] = True - elif '-iec'.startswith(opt): - option['use_iec'] = True - elif '-stats'.startswith(opt) or '-statistics'.startswith(opt): - option['show_stats'] = True - elif '-details'.startswith(opt): - option['show_details'] = True - elif '-resolve'.startswith(opt): - option['resolve_hosts'] = True - elif '-oneline'.startswith(opt): - option['oneline'] = True - elif '-timestamp'.startswith(opt): - option['timestamp'] = True - elif '-tshort'.startswith(opt): - option['timestamp'] = True - option['timestamp_short'] = True - elif '-Version'.startswith(opt): - print(f'ip wrapper, iproute4mac-{iproute4mac.VERSION}') - exit(0) - elif '-force'.startswith(opt): - option['force'] = True - elif '-batch'.startswith(opt): - try: - batch_file = argv.pop(0) - except IndexError: - missarg('batch file') - elif '-brief'.startswith(opt): - option['brief'] = True - elif '-json'.startswith(opt): - option['json'] = True - elif '-pretty'.startswith(opt): - option['pretty'] = True - elif '-rcvbuf'.startswith(opt): - try: - option['rcvbuf'] = int(argv.pop(0)) - except IndexError: - missarg('rcvbuf size') - except ValueError: - error('rcvbuf size not a number') - elif matches_color(opt): - # Color option is not implemented - pass - elif '-help'.startswith(opt): - usage() - elif '-netns'.startswith(opt): - do_notimplemented() - elif '-Numeric'.startswith(opt): - option['numeric'] = True - elif '-all'.startswith(opt): - option['do_all'] = True - elif opt == '-echo': - option['echo_request'] = True - else: - stderr(f'Option "{opt}" is unknown, try "ip -help".') - exit(-1) - - if batch_file: - do_notimplemented() - - if argv: - return do_obj(argv, option) - - usage() - - -if __name__ == '__main__': - main() diff --git a/src/iproute4mac/netstat.py b/src/iproute4mac/netstat.py deleted file mode 100644 index c2f93ad..0000000 --- a/src/iproute4mac/netstat.py +++ /dev/null @@ -1,143 +0,0 @@ -import re - -from iproute4mac.utils import * - - -''' -https://docs.freebsd.org/en/books/handbook/advanced-networking/#routeflags -''' -RTF_PROTO1 = '1' # Protocol specific routing flag #1 -RTF_PROTO2 = '2' # Protocol specific routing flag #2 -RTF_PROTO3 = '3' # Protocol specific routing flag #3 -RTF_BLACKHOLE = 'B' # Just discard packets (during updates) -RTF_BROADCAST = 'b' # The route represents a broadcast address -RTF_CLONING = 'C' # Generate new routes on use -RTF_PRCLONING = 'c' # Protocol-specified generate new routes on use -RTF_DYNAMIC = 'D' # Created dynamically (by redirect) -RTF_GATEWAY = 'G' # Destination requires forwarding by intermediary -RTF_HOST = 'H' # Host entry (net otherwise) -RTF_IFSCOPE = 'I' # Route is associated with an interface scope -RTF_IFREF = 'i' # Route is holding a reference to the interface -RTF_LLINFO = 'L' # Valid protocol to link address translation -RTF_MODIFIED = 'M' # Modified dynamically (by redirect) -RTF_MULTICAST = 'm' # The route represents a multicast address -RTF_REJECT = 'R' # Host or net unreachable -RTF_ROUTER = 'r' # Host is a default router -RTF_STATIC = 'S' # Manually added -RTF_UP = 'U' # Route usable -RTF_WASCLONED = 'W' # Route was generated as a result of cloning -RTF_XRESOLVE = 'X' # External daemon translates proto to link address -RTF_PROXY = 'Y' # Proxying; cloned routes will not be scoped -RTF_GLOBAL = 'g' # Route to a destination of the global internet (policy hint) - - -def dumps(routes, option): - if option['json']: - print(json_dumps(routes, option['pretty'])) - return - - if not routes: - return - - for route in routes: - if option['show_details'] or 'type' in route: - stdout(route['type'] if 'type' in route else 'unicast', ' ') - stdout(route['dst']) - if 'gateway' in route: - stdout(f" via {route['gateway']}") - if 'dev' in route: - stdout(f" dev {route['dev']}") - if 'protocol' in route: - stdout(f" proto {route['protocol']}") - if 'scope' in route: - stdout(f" scope {route['scope']}") - if 'prefsrc' in route: - stdout(f" src {route['src']}") - stdout(end='\n') - - -class netstatRegEx: - _ipv4 = re.compile(fr'(?P(?:default|{IPV4ADDR}))(?:/(?P\d+))?' - fr'\s+(?P{IPV4ADDR}|{LLADDR}|link#\d+)') - _ipv6 = re.compile(fr'(?P(?:default|{IPV6ADDR}))(?:%\w+)?(?:/(?P\d+))?' - fr'\s+(?P{IPV6ADDR}|{LLADDR}|link#\d+)') - _route = re.compile(fr'(?P(?:default|{IPV4ADDR}|{IPV6ADDR}))(?:%\w+)?(?:/(?P\d+))?' - fr'\s+(?P{IPV4ADDR}|{IPV6ADDR}|{LLADDR}|link#\d+)' - r'\s+(?P\w+)' - r'\s+(?P\w+)' - r'\s+(?P\S+)?') - - def __init__(self, line): - self.ipv4 = self._ipv4.match(line) - self.ipv6 = self._ipv6.match(line) - self.route = self._route.match(line) - - -def parse(res, option): - routes = [] - for line in iter(res.split('\n')): - match = netstatRegEx(line) - - if match.route: - dst, prefix, gateway, flags, dev, expire = match.route.groups() - - if any(flag in flags for flag in (RTF_WASCLONED, RTF_PROXY)): - continue - if match.ipv4 and option['preferred_family'] == AF_INET6: - continue - if match.ipv6 and option['preferred_family'] == AF_INET: - continue - - if dst != 'default' and match.ipv4: - dots = dst.count('.') - if dots < 3: - dst = dst + '.0' * (3 - dots) - if not prefix: - prefix = 8 * (dots + 1) - if prefix: - dst = f'{dst}/{prefix}' - - # protocol - if RTF_STATIC in flags: - protocol = 'static' - elif any(flag in flags for flag in (RTF_DYNAMIC, RTF_MODIFIED)): - protocol = 'redirect' - else: - protocol = 'kernel' - - # scope - if gateway.startswith('link#') or re.search(LLADDR, gateway): - scope = 'link' - gateway = None - elif RTF_HOST in flags: - scope = 'host' - elif option['show_details']: - scope = 'global' - else: - scope = None - - # address type - if RTF_BLACKHOLE in flags: - addr_type = 'blackhole' - elif RTF_BROADCAST in flags: - addr_type = 'broadcast' - elif RTF_MULTICAST in flags: - addr_type = 'multicast' - elif option['show_details']: - addr_type = 'unicast' - else: - addr_type = None - - route = { - 'type': addr_type, - 'dst': dst, - 'gateway': gateway, - 'dev': dev, - 'protocol': protocol, - 'scope': scope, - 'expire': int(expire) if expire and expire != '!' else None, - 'flags': [] - } - routes.append({k: v for k, v in route.items() if v is not None}) - - return routes diff --git a/tests/test_with_unittest.py b/tests/test_with_unittest.py index e149c5f..210f083 100644 --- a/tests/test_with_unittest.py +++ b/tests/test_with_unittest.py @@ -2,6 +2,7 @@ from unittest import TestCase + class TryTesting(TestCase): def test_always_passes(self): self.assertTrue(True)