Skip to content

Commit

Permalink
Add general_merge tests and a test data generator module
Browse files Browse the repository at this point in the history
the generate_data module has functions to create IP networks and addrs and
create AS files as we would get from RPKI or RIR.
  • Loading branch information
jurraca committed Nov 18, 2024
1 parent 6e9f0be commit f043983
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 0 deletions.
2 changes: 2 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
ps.pandas
ps.requests
ps.tqdm
ps.pytest
ps.pylint
]);
kartografDeps = [
pythonBuildDeps
Expand Down
105 changes: 105 additions & 0 deletions tests/generate_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
A module to generate IP networks and ASNs for testing purposes.
"""

import ipaddress
from random import randint

MAX_ASN = 33521664

def generate_ip(ip_type="v4", subnet_size="16"):
"""
Return a random IP network address for a given IP addr type and subnet size.
"""
if ip_type == "v4":
end_ip = int(ipaddress.IPv4Address("255.255.255.255"))
subnet_mask = int(ipaddress.IPv4Network(f"0.0.0.0/{subnet_size}", strict=False).netmask)
elif ip_type == "v6":
end_ip = int(ipaddress.IPv6Address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"))
subnet_mask = int(ipaddress.IPv6Network(f"::::::::/{subnet_size}", strict=False).netmask)
else:
raise TypeError(f"invalid IP address type provided: {ip_type}")

random_ip_int = randint(0, end_ip)

if ip_type == "v4":
network_addr = ipaddress.IPv4Address(random_ip_int & subnet_mask)
return ipaddress.ip_network(str(network_addr) + f"/{subnet_size}")
if ip_type == "v6":
network_addr = ipaddress.IPv6Address(random_ip_int & subnet_mask)
return ipaddress.ip_network(str(network_addr) + f"/{subnet_size}")
return None

def generate_ip_networks(
count, ip_type="v4", subnet_range_start=8, subnet_range_end=24
):
ips = set()
while count > 0:
random_subnet = randint(subnet_range_start, subnet_range_end)
ip = generate_ip(ip_type=ip_type, subnet_size=random_subnet)
if ip not in ips:
ips.add(ip)
count -= 1
return ips


def generate_asns(count):
asns = set()
while count > 0:
asn = randint(1, MAX_ASN)
if asn not in asns:
asns.add(asn)
count -= 1
return asns


def generate_subnets_from_base(base_networks, count):
subnets = []
for network in base_networks[:count]:
subnet = list(ipaddress.ip_network(network).subnets())[0]
subnets.append(str(subnet.network_address))
return subnets


def build_file_lines(ips, asns):
lines = []
for ip, asn in zip(ips, asns):
lines.append(str(ip) + " " + "AS" + str(asn) + "\n")
return lines


def generate_file_items(count, ip_type="v4"):
"""
Generate the lines for an AS file, such as would be received from RPKI or RIR
"""
ips = generate_ip_networks(count, ip_type)
asns = generate_asns(count)
lines = build_file_lines(ips, asns)
return lines


def generate_ip_file(file_name, lines):
"""
Write the items to a local file.
"""
with open(file_name, "w") as f:
for line in lines:
f.write(line)
return f"Generated {file_name}"


def make_disjoint(base_items, extra_items):
"""
Takes two IP network lists and returns the extra networks without subnets of the base list, i.e. non overlapping networks from the extra list.
"""
extra_new = []
for extra in extra_items:
included = False
for network in base_items:
if ipaddress.ip_network(extra).overlaps(ipaddress.ip_network(network)):
included = True
break
if included is False:
extra_new.append(extra)

return extra_new
77 changes: 77 additions & 0 deletions tests/merge_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from generate_data import (
build_file_lines,
generate_ip_file,
generate_file_items,
generate_asns,
generate_subnets_from_base,
make_disjoint
)

from kartograf.merge import general_merge

def __tmp_paths(tmp_path):
return [tmp_path / p for p in ["rpki_final.txt", "irr_final.txt", "out.txt"]]

def test_merge(tmp_path):
"""
Assert that merging two identical files is a no-op.
"""
rpki_data = generate_file_items(100)
rpki_path, _, out_path = __tmp_paths(tmp_path)
generate_ip_file(rpki_path, rpki_data)

general_merge(rpki_path, rpki_path, None, out_path)

with open(out_path, "r") as f:
lines = f.readlines()
final_ips = [item.split()[0] for item in lines]
assert lines == rpki_data
assert len(final_ips) == len(rpki_data)


def test_merge_disjoint(tmp_path):
"""
Test merging non-overlapping sets of IP networks.
"""
main_data = generate_file_items(100)
main_ips = [item.split()[0] for item in main_data]
rpki_ips = main_ips[:50]
irr_ips = main_ips[50:]
irr_ips = make_disjoint(rpki_ips, irr_ips)
rpki_data = build_file_lines(rpki_ips, generate_asns(len(rpki_ips)))
irr_data = build_file_lines(irr_ips, generate_asns(len(irr_ips)))

rpki_path, irr_path, out_path = __tmp_paths(tmp_path)
generate_ip_file(rpki_path, rpki_data)
generate_ip_file(irr_path, irr_data)
general_merge(rpki_path, irr_path, None, out_path)

with open(out_path, "r") as f:
lines = f.readlines()
final_ips = [item.split()[0] for item in lines]

assert set(final_ips) == (set(irr_ips) | set(rpki_ips))


def test_merge_joint(tmp_path):
"""
Test merging overlapping sets of IP networks.
"""
overlap = 10
rpki_data = generate_file_items(100)
rpki_ips = [item.split()[0] for item in rpki_data]
# generate subnets of the rpki networks that should get merged into the base file
irr_ips = generate_subnets_from_base(rpki_ips, overlap)
irr_data = build_file_lines(irr_ips, generate_asns(len(irr_ips)))

rpki_path, irr_path, out_path = __tmp_paths(tmp_path)
generate_ip_file(rpki_path, rpki_data)
generate_ip_file(irr_path, irr_data)
general_merge(rpki_path, irr_path, None, out_path)

with open(out_path, "r") as f:
lines = f.readlines()
final_ips = [item.split()[0] for item in lines]

# no subnets from irr_ips are included in the final merged network list
assert set(final_ips).isdisjoint(set(irr_ips))

0 comments on commit f043983

Please sign in to comment.