Skip to content

Commit

Permalink
Merge pull request #30 from jurraca/improve-merge-perf
Browse files Browse the repository at this point in the history
improve merge performance with dictionary lookup
  • Loading branch information
fjahr authored Nov 16, 2024
2 parents 8df428e + d7b2e7b commit 6e9f0be
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 94 deletions.
23 changes: 0 additions & 23 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,18 @@
utils.lib.eachDefaultSystem (system: let
pkgs = nixpkgs.legacyPackages.${system};

# Custom derivation for pandarallel
pandarallel = pkgs.python3Packages.buildPythonPackage rec {
pname = "pandarallel";
version = "1.6.5";

src = pkgs.python311Packages.fetchPypi {
inherit pname version;
sha256 = "HC35j/ZEHorhP/QozuuqfsQtcx9/lyxBzk/e8dOt9kA=";
};

propagatedBuildInputs = with pkgs.python3Packages; [ pandas dill psutil ];

meta = with pkgs.lib; {
description = "An efficient parallel computing library for pandas";
homepage = "https://github.com/nalepae/pandarallel";
license = licenses.bsd3;
};
};

rpki-client = rpki-cli.defaultPackage.${system};
pythonBuildDeps = pkgs.python311.withPackages (ps: [
ps.beautifulsoup4
ps.numpy
ps.pandas
ps.requests
ps.tqdm
pandarallel
]);
pythonDevDeps = pkgs.python311.withPackages (ps: [
ps.beautifulsoup4
ps.numpy
ps.pandas
ps.requests
ps.tqdm
pandarallel
]);
kartografDeps = [
pythonBuildDeps
Expand Down
166 changes: 112 additions & 54 deletions kartograf/merge.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,72 @@
import ipaddress
import shutil
import numpy as np
from pandarallel import pandarallel
import pandas as pd

from kartograf.timed import timed

class BaseNetworkIndex:
'''
A class whose _dict represents a mapping of the network number and IP networks within that network for a given AS file.
To check inclusion of a given IP network in the base AS file, we can compare (see check_inclusion) the networks under the root network number instead of all the networks in the base file.
'''


def __init__(self):
self._dict = {}
self._keys = self._dict.keys()
for i in range(0, 255):
self._dict[i] = []

def update(self, pfx):
ipn = ipaddress.ip_network(pfx)
netw = int(ipn.network_address)
mask = int(ipn.netmask)
if ipn.version == 4:
root_net = int(str(pfx).split(".", maxsplit=1)[0])
current = self._dict[root_net]
self._dict[root_net] = current + [(netw, mask)]
else:
root_net = str(pfx).split(":", maxsplit=1)[0]
if root_net in self._keys:
current = self._dict[root_net]
self._dict[root_net] = current + [(netw, mask)]
else:
self._dict.update({root_net: [(netw, mask)]})

def check_inclusion(self, row, root_net):
"""
A network is a subnet of another if the bitwise AND of its IP and the base network's netmask
is equal to the base network IP.
"""
for net, mask in self._dict[root_net]:
if row[0] & mask == net:
return 1
return 0

def included(self, row, result_array):
root_net = row.PFXS_LEADING
if root_net in self._keys:
result = self.check_inclusion(row, root_net)
result_array.append(result)
else:
result_array.append(0)
return result_array


@timed
def merge_irr(context):
rpki_file = f'{context.out_dir_rpki}rpki_final.txt'
irr_file = f'{context.out_dir_irr}irr_final.txt'
irr_filtered_file = f'{context.out_dir_irr}irr_filtered.txt'
rpki_file = f"{context.out_dir_rpki}rpki_final.txt"
irr_file = f"{context.out_dir_irr}irr_final.txt"
irr_filtered_file = f"{context.out_dir_irr}irr_filtered.txt"
out_file = f"{context.out_dir}merged_file_rpki_irr.txt"

general_merge(rpki_file, irr_file, irr_filtered_file, out_file, context.args.silent, context.args.workers)
general_merge(
rpki_file,
irr_file,
irr_filtered_file,
out_file
)
shutil.copy2(out_file, context.final_result_file)


Expand All @@ -23,42 +75,42 @@ def merge_pfx2as(context):
# We are always doing RPKI but IRR is optional for now so depending on this
# we are working off of a different base file for the merge.
if context.args.irr:
base_file = f'{context.out_dir}merged_file_rpki_irr.txt'
base_file = f"{context.out_dir}merged_file_rpki_irr.txt"
out_file = f"{context.out_dir}merged_file_rpki_irr_rv.txt"
else:
base_file = f'{context.out_dir_rpki}rpki_final.txt'
base_file = f"{context.out_dir_rpki}rpki_final.txt"
out_file = f"{context.out_dir}merged_file_rpki_rv.txt"

rv_file = f'{context.out_dir_collectors}pfx2asn_clean.txt'
rv_filtered_file = f'{context.out_dir_collectors}pfx2asn_filtered.txt'
rv_file = f"{context.out_dir_collectors}pfx2asn_clean.txt"
rv_filtered_file = f"{context.out_dir_collectors}pfx2asn_filtered.txt"

general_merge(base_file, rv_file, rv_filtered_file, out_file, context.args.silent, context.args.workers)
general_merge(
base_file,
rv_file,
rv_filtered_file,
out_file
)
shutil.copy2(out_file, context.final_result_file)


def general_merge(base_file, extra_file, extra_filtered_file, out_file, silent=False, num_workers=0):
extra_kwargs = {'nb_workers': num_workers} if num_workers > 0 else {}
pandarallel.initialize(progress_bar=not silent, verbose=0, **extra_kwargs)

print("Parse base file to numpy arrays")
base_nets = []
base_masks = []
def general_merge(
base_file, extra_file, extra_filtered_file, out_file
):
"""
Merge lists of IP networks into a base file.
"""
print("Parse base file to dictionary")
base = BaseNetworkIndex()
with open(base_file, "r") as file:
for line in file:
pfx, asn = line.split(" ")
ipn = ipaddress.ip_network(pfx)
netw = int(ipn.network_address)
mask = int(ipn.netmask)
base_masks.append(mask)
base_nets.append(netw)

net_masks = np.array(base_masks)
network_addresses = np.array(base_nets)
base.update(pfx)

print("Parse extra file to Pandas DataFrame")
extra_nets_int = []
extra_asns = []
extra_pfxs = []
extra_pfxs_leading = []
with open(extra_file, "r") as file:
for line in file:
pfx, asn = line.split(" ")
Expand All @@ -67,46 +119,52 @@ def general_merge(base_file, extra_file, extra_filtered_file, out_file, silent=F
extra_nets_int.append(netw_int)
extra_asns.append(asn.strip())
extra_pfxs.append(pfx)
if ipn.version == 4:
root_net = int(pfx.split(".", maxsplit=1)[0])
else:
root_net = str(pfx).split(":", maxsplit=1)[0]
extra_pfxs_leading.append(root_net)

df_extra = pd.DataFrame()
df_extra['INETS'] = extra_nets_int
df_extra['ASNS'] = extra_asns
df_extra['PFXS'] = extra_pfxs

def check_inclusion(extra_net):
inclusion_list = int(extra_net) & net_masks == network_addresses
df_extra["INETS"] = extra_nets_int
df_extra["ASNS"] = extra_asns
df_extra["PFXS"] = extra_pfxs
df_extra["PFXS_LEADING"] = extra_pfxs_leading

if np.any(inclusion_list):
return 1
print("Merging extra prefixes that were not included in the base file:\n")

return 0
included = []
for row in df_extra.itertuples(index=False):
base.included(row, included)

print("Filtering extra prefixes that were already "
"included in the base file:\n")
df_extra['INCLUDED'] = df_extra.INETS.parallel_apply(check_inclusion)
df_extra["INCLUDED"] = included
df_filtered = df_extra[df_extra.INCLUDED == 0]
# We are stuck at the end of the progress bar after above finishes
print("\n")

print("Finished merging extra prefixes.")

if extra_filtered_file:
print(f"Finished filtering! Originally {len(df_extra.index)} "
f"entries filtered down to {len(df_filtered.index)}")
df_filtered.to_csv(extra_filtered_file,
sep=' ',
index=False,
columns=["PFXS", "ASNS"],
header=False)
print(
f"Finished filtering! Originally {len(df_extra.index)} "
f"entries filtered down to {len(df_filtered.index)}"
)
df_filtered.to_csv(
extra_filtered_file,
sep=" ",
index=False,
columns=["PFXS", "ASNS"],
header=False,
)

with open(extra_filtered_file, "r") as extra:
extra_contents = extra.read()
else:
print(f"Finished filtering! Originally {len(df_extra.index)} entries "
f"filtered down to {len(df_filtered.index)}")
extra_contents = df_filtered.to_csv(None,
sep=' ',
index=False,
columns=["PFXS", "ASNS"],
header=False)
print(
f"Finished filtering! Originally {len(df_extra.index)} entries "
f"filtered down to {len(df_filtered.index)}"
)
extra_contents = df_filtered.to_csv(
None, sep=" ", index=False, columns=["PFXS", "ASNS"], header=False
)

print("Merging base file with filtered extra file")
with open(base_file, "r") as base:
Expand Down
9 changes: 0 additions & 9 deletions module.nix
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,8 @@ in
options.services.kartograf = {
enable = mkEnableOption "kartograf";
clean = mkEnableOption "cleaning up of temporary artifacts after processing." // { default = true; };
silent = mkEnableOption "silencing output (suppresses pandarallel's progress_bar)." // { default = true; };
useIRR = mkEnableOption "using Internet Routing Registry (IRR) data" // { default = true; };
useRV = mkEnableOption "using RouteViews (RV) data" // { default = true; };
workers = mkOption {
type = types.int;
default = 0;
example = 4;
description = mdDoc "Number of workers to use for pandarallel (0 = use all available cores).";
};
schedule = mkOption {
type = types.str;
default = "*-*-01 00:00:00 UTC";
Expand Down Expand Up @@ -69,10 +62,8 @@ in
ExecStopPost = "${postScript}/bin/post-script";
ExecStart = ''${kartograf}/bin/kartograf map \
${optionalString cfg.clean "--cleanup" } \
${optionalString cfg.silent "--silent" } \
${optionalString cfg.useIRR "--irr" } \
${optionalString cfg.useRV "--routeviews" } \
${optionalString (cfg.workers != 0) "--workers=${toString cfg.workers}" } \
'';
MemoryDenyWriteExecute = true;
WorkingDirectory = cfg.resultPath;
Expand Down
2 changes: 0 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
beautifulsoup4==4.11.1
numpy==1.26.4
pandas==1.5.3
requests>=2.31.0
tqdm==4.66.3
pandarallel==1.6.5
6 changes: 0 additions & 6 deletions run
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ if __name__ == "__main__":
# services.
# parser_map.add_argument("-f", "--announced_filter", action="store_true", default=False)

# Reduce log output (for now, affects only progress_bar in general_merge)
parser_map.add_argument("-s", "--silent", action="store_true", default=False)

# Number of workers to use for pandarallel
parser_map.add_argument("-p", "--workers", type=int, default=0)

# TODO:
# Include multiple ASNs that validate correctly for the same prefix.
# parser_map.add_argument("-m", "--multi_map", action="store_true", default=False)
Expand Down

0 comments on commit 6e9f0be

Please sign in to comment.