Skip to content

Commit

Permalink
improve merge performance with dictionary lookup
Browse files Browse the repository at this point in the history
add BaseNetworksIndex class to handle base indexing and lookup
removes unused deps (numpy, pandarallel) from requirements and dev env
  • Loading branch information
jurraca committed Nov 13, 2024
1 parent 8df428e commit d7b2e7b
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 94 deletions.
23 changes: 0 additions & 23 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,18 @@
utils.lib.eachDefaultSystem (system: let
pkgs = nixpkgs.legacyPackages.${system};

# Custom derivation for pandarallel
pandarallel = pkgs.python3Packages.buildPythonPackage rec {
pname = "pandarallel";
version = "1.6.5";

src = pkgs.python311Packages.fetchPypi {
inherit pname version;
sha256 = "HC35j/ZEHorhP/QozuuqfsQtcx9/lyxBzk/e8dOt9kA=";
};

propagatedBuildInputs = with pkgs.python3Packages; [ pandas dill psutil ];

meta = with pkgs.lib; {
description = "An efficient parallel computing library for pandas";
homepage = "https://github.com/nalepae/pandarallel";
license = licenses.bsd3;
};
};

rpki-client = rpki-cli.defaultPackage.${system};
pythonBuildDeps = pkgs.python311.withPackages (ps: [
ps.beautifulsoup4
ps.numpy
ps.pandas
ps.requests
ps.tqdm
pandarallel
]);
pythonDevDeps = pkgs.python311.withPackages (ps: [
ps.beautifulsoup4
ps.numpy
ps.pandas
ps.requests
ps.tqdm
pandarallel
]);
kartografDeps = [
pythonBuildDeps
Expand Down
166 changes: 112 additions & 54 deletions kartograf/merge.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,72 @@
import ipaddress
import shutil
import numpy as np
from pandarallel import pandarallel
import pandas as pd

from kartograf.timed import timed

class BaseNetworkIndex:
'''
A class whose _dict represents a mapping of the network number and IP networks within that network for a given AS file.
To check inclusion of a given IP network in the base AS file, we can compare (see check_inclusion) the networks under the root network number instead of all the networks in the base file.
'''


def __init__(self):
self._dict = {}
self._keys = self._dict.keys()
for i in range(0, 255):
self._dict[i] = []

def update(self, pfx):
ipn = ipaddress.ip_network(pfx)
netw = int(ipn.network_address)
mask = int(ipn.netmask)
if ipn.version == 4:
root_net = int(str(pfx).split(".", maxsplit=1)[0])
current = self._dict[root_net]
self._dict[root_net] = current + [(netw, mask)]
else:
root_net = str(pfx).split(":", maxsplit=1)[0]
if root_net in self._keys:
current = self._dict[root_net]
self._dict[root_net] = current + [(netw, mask)]
else:
self._dict.update({root_net: [(netw, mask)]})

def check_inclusion(self, row, root_net):
"""
A network is a subnet of another if the bitwise AND of its IP and the base network's netmask
is equal to the base network IP.
"""
for net, mask in self._dict[root_net]:
if row[0] & mask == net:
return 1
return 0

def included(self, row, result_array):
root_net = row.PFXS_LEADING
if root_net in self._keys:
result = self.check_inclusion(row, root_net)
result_array.append(result)
else:
result_array.append(0)
return result_array


@timed
def merge_irr(context):
rpki_file = f'{context.out_dir_rpki}rpki_final.txt'
irr_file = f'{context.out_dir_irr}irr_final.txt'
irr_filtered_file = f'{context.out_dir_irr}irr_filtered.txt'
rpki_file = f"{context.out_dir_rpki}rpki_final.txt"
irr_file = f"{context.out_dir_irr}irr_final.txt"
irr_filtered_file = f"{context.out_dir_irr}irr_filtered.txt"
out_file = f"{context.out_dir}merged_file_rpki_irr.txt"

general_merge(rpki_file, irr_file, irr_filtered_file, out_file, context.args.silent, context.args.workers)
general_merge(
rpki_file,
irr_file,
irr_filtered_file,
out_file
)
shutil.copy2(out_file, context.final_result_file)


Expand All @@ -23,42 +75,42 @@ def merge_pfx2as(context):
# We are always doing RPKI but IRR is optional for now so depending on this
# we are working off of a different base file for the merge.
if context.args.irr:
base_file = f'{context.out_dir}merged_file_rpki_irr.txt'
base_file = f"{context.out_dir}merged_file_rpki_irr.txt"
out_file = f"{context.out_dir}merged_file_rpki_irr_rv.txt"
else:
base_file = f'{context.out_dir_rpki}rpki_final.txt'
base_file = f"{context.out_dir_rpki}rpki_final.txt"
out_file = f"{context.out_dir}merged_file_rpki_rv.txt"

rv_file = f'{context.out_dir_collectors}pfx2asn_clean.txt'
rv_filtered_file = f'{context.out_dir_collectors}pfx2asn_filtered.txt'
rv_file = f"{context.out_dir_collectors}pfx2asn_clean.txt"
rv_filtered_file = f"{context.out_dir_collectors}pfx2asn_filtered.txt"

general_merge(base_file, rv_file, rv_filtered_file, out_file, context.args.silent, context.args.workers)
general_merge(
base_file,
rv_file,
rv_filtered_file,
out_file
)
shutil.copy2(out_file, context.final_result_file)


def general_merge(base_file, extra_file, extra_filtered_file, out_file, silent=False, num_workers=0):
extra_kwargs = {'nb_workers': num_workers} if num_workers > 0 else {}
pandarallel.initialize(progress_bar=not silent, verbose=0, **extra_kwargs)

print("Parse base file to numpy arrays")
base_nets = []
base_masks = []
def general_merge(
base_file, extra_file, extra_filtered_file, out_file
):
"""
Merge lists of IP networks into a base file.
"""
print("Parse base file to dictionary")
base = BaseNetworkIndex()
with open(base_file, "r") as file:
for line in file:
pfx, asn = line.split(" ")
ipn = ipaddress.ip_network(pfx)
netw = int(ipn.network_address)
mask = int(ipn.netmask)
base_masks.append(mask)
base_nets.append(netw)

net_masks = np.array(base_masks)
network_addresses = np.array(base_nets)
base.update(pfx)

print("Parse extra file to Pandas DataFrame")
extra_nets_int = []
extra_asns = []
extra_pfxs = []
extra_pfxs_leading = []
with open(extra_file, "r") as file:
for line in file:
pfx, asn = line.split(" ")
Expand All @@ -67,46 +119,52 @@ def general_merge(base_file, extra_file, extra_filtered_file, out_file, silent=F
extra_nets_int.append(netw_int)
extra_asns.append(asn.strip())
extra_pfxs.append(pfx)
if ipn.version == 4:
root_net = int(pfx.split(".", maxsplit=1)[0])
else:
root_net = str(pfx).split(":", maxsplit=1)[0]
extra_pfxs_leading.append(root_net)

df_extra = pd.DataFrame()
df_extra['INETS'] = extra_nets_int
df_extra['ASNS'] = extra_asns
df_extra['PFXS'] = extra_pfxs

def check_inclusion(extra_net):
inclusion_list = int(extra_net) & net_masks == network_addresses
df_extra["INETS"] = extra_nets_int
df_extra["ASNS"] = extra_asns
df_extra["PFXS"] = extra_pfxs
df_extra["PFXS_LEADING"] = extra_pfxs_leading

if np.any(inclusion_list):
return 1
print("Merging extra prefixes that were not included in the base file:\n")

return 0
included = []
for row in df_extra.itertuples(index=False):
base.included(row, included)

print("Filtering extra prefixes that were already "
"included in the base file:\n")
df_extra['INCLUDED'] = df_extra.INETS.parallel_apply(check_inclusion)
df_extra["INCLUDED"] = included
df_filtered = df_extra[df_extra.INCLUDED == 0]
# We are stuck at the end of the progress bar after above finishes
print("\n")

print("Finished merging extra prefixes.")

if extra_filtered_file:
print(f"Finished filtering! Originally {len(df_extra.index)} "
f"entries filtered down to {len(df_filtered.index)}")
df_filtered.to_csv(extra_filtered_file,
sep=' ',
index=False,
columns=["PFXS", "ASNS"],
header=False)
print(
f"Finished filtering! Originally {len(df_extra.index)} "
f"entries filtered down to {len(df_filtered.index)}"
)
df_filtered.to_csv(
extra_filtered_file,
sep=" ",
index=False,
columns=["PFXS", "ASNS"],
header=False,
)

with open(extra_filtered_file, "r") as extra:
extra_contents = extra.read()
else:
print(f"Finished filtering! Originally {len(df_extra.index)} entries "
f"filtered down to {len(df_filtered.index)}")
extra_contents = df_filtered.to_csv(None,
sep=' ',
index=False,
columns=["PFXS", "ASNS"],
header=False)
print(
f"Finished filtering! Originally {len(df_extra.index)} entries "
f"filtered down to {len(df_filtered.index)}"
)
extra_contents = df_filtered.to_csv(
None, sep=" ", index=False, columns=["PFXS", "ASNS"], header=False
)

print("Merging base file with filtered extra file")
with open(base_file, "r") as base:
Expand Down
9 changes: 0 additions & 9 deletions module.nix
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,8 @@ in
options.services.kartograf = {
enable = mkEnableOption "kartograf";
clean = mkEnableOption "cleaning up of temporary artifacts after processing." // { default = true; };
silent = mkEnableOption "silencing output (suppresses pandarallel's progress_bar)." // { default = true; };
useIRR = mkEnableOption "using Internet Routing Registry (IRR) data" // { default = true; };
useRV = mkEnableOption "using RouteViews (RV) data" // { default = true; };
workers = mkOption {
type = types.int;
default = 0;
example = 4;
description = mdDoc "Number of workers to use for pandarallel (0 = use all available cores).";
};
schedule = mkOption {
type = types.str;
default = "*-*-01 00:00:00 UTC";
Expand Down Expand Up @@ -69,10 +62,8 @@ in
ExecStopPost = "${postScript}/bin/post-script";
ExecStart = ''${kartograf}/bin/kartograf map \
${optionalString cfg.clean "--cleanup" } \
${optionalString cfg.silent "--silent" } \
${optionalString cfg.useIRR "--irr" } \
${optionalString cfg.useRV "--routeviews" } \
${optionalString (cfg.workers != 0) "--workers=${toString cfg.workers}" } \
'';
MemoryDenyWriteExecute = true;
WorkingDirectory = cfg.resultPath;
Expand Down
2 changes: 0 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
beautifulsoup4==4.11.1
numpy==1.26.4
pandas==1.5.3
requests>=2.31.0
tqdm==4.66.3
pandarallel==1.6.5
6 changes: 0 additions & 6 deletions run
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ if __name__ == "__main__":
# services.
# parser_map.add_argument("-f", "--announced_filter", action="store_true", default=False)

# Reduce log output (for now, affects only progress_bar in general_merge)
parser_map.add_argument("-s", "--silent", action="store_true", default=False)

# Number of workers to use for pandarallel
parser_map.add_argument("-p", "--workers", type=int, default=0)

# TODO:
# Include multiple ASNs that validate correctly for the same prefix.
# parser_map.add_argument("-m", "--multi_map", action="store_true", default=False)
Expand Down

0 comments on commit d7b2e7b

Please sign in to comment.