diff --git a/flake.nix b/flake.nix index e628079..7867a3c 100644 --- a/flake.nix +++ b/flake.nix @@ -20,41 +20,18 @@ utils.lib.eachDefaultSystem (system: let pkgs = nixpkgs.legacyPackages.${system}; - # Custom derivation for pandarallel - pandarallel = pkgs.python3Packages.buildPythonPackage rec { - pname = "pandarallel"; - version = "1.6.5"; - - src = pkgs.python311Packages.fetchPypi { - inherit pname version; - sha256 = "HC35j/ZEHorhP/QozuuqfsQtcx9/lyxBzk/e8dOt9kA="; - }; - - propagatedBuildInputs = with pkgs.python3Packages; [ pandas dill psutil ]; - - meta = with pkgs.lib; { - description = "An efficient parallel computing library for pandas"; - homepage = "https://github.com/nalepae/pandarallel"; - license = licenses.bsd3; - }; - }; - rpki-client = rpki-cli.defaultPackage.${system}; pythonBuildDeps = pkgs.python311.withPackages (ps: [ ps.beautifulsoup4 - ps.numpy ps.pandas ps.requests ps.tqdm - pandarallel ]); pythonDevDeps = pkgs.python311.withPackages (ps: [ ps.beautifulsoup4 - ps.numpy ps.pandas ps.requests ps.tqdm - pandarallel ]); kartografDeps = [ pythonBuildDeps diff --git a/kartograf/merge.py b/kartograf/merge.py index f5aaabe..a71f1f4 100644 --- a/kartograf/merge.py +++ b/kartograf/merge.py @@ -1,20 +1,72 @@ import ipaddress import shutil -import numpy as np -from pandarallel import pandarallel import pandas as pd from kartograf.timed import timed +class BaseNetworkIndex: + ''' + A class whose _dict represents a mapping of the network number and IP networks within that network for a given AS file. + + To check inclusion of a given IP network in the base AS file, we can compare (see check_inclusion) the networks under the root network number instead of all the networks in the base file. + ''' + + + def __init__(self): + self._dict = {} + self._keys = self._dict.keys() + for i in range(0, 255): + self._dict[i] = [] + + def update(self, pfx): + ipn = ipaddress.ip_network(pfx) + netw = int(ipn.network_address) + mask = int(ipn.netmask) + if ipn.version == 4: + root_net = int(str(pfx).split(".", maxsplit=1)[0]) + current = self._dict[root_net] + self._dict[root_net] = current + [(netw, mask)] + else: + root_net = str(pfx).split(":", maxsplit=1)[0] + if root_net in self._keys: + current = self._dict[root_net] + self._dict[root_net] = current + [(netw, mask)] + else: + self._dict.update({root_net: [(netw, mask)]}) + + def check_inclusion(self, row, root_net): + """ + A network is a subnet of another if the bitwise AND of its IP and the base network's netmask + is equal to the base network IP. + """ + for net, mask in self._dict[root_net]: + if row[0] & mask == net: + return 1 + return 0 + + def included(self, row, result_array): + root_net = row.PFXS_LEADING + if root_net in self._keys: + result = self.check_inclusion(row, root_net) + result_array.append(result) + else: + result_array.append(0) + return result_array + @timed def merge_irr(context): - rpki_file = f'{context.out_dir_rpki}rpki_final.txt' - irr_file = f'{context.out_dir_irr}irr_final.txt' - irr_filtered_file = f'{context.out_dir_irr}irr_filtered.txt' + rpki_file = f"{context.out_dir_rpki}rpki_final.txt" + irr_file = f"{context.out_dir_irr}irr_final.txt" + irr_filtered_file = f"{context.out_dir_irr}irr_filtered.txt" out_file = f"{context.out_dir}merged_file_rpki_irr.txt" - general_merge(rpki_file, irr_file, irr_filtered_file, out_file, context.args.silent, context.args.workers) + general_merge( + rpki_file, + irr_file, + irr_filtered_file, + out_file + ) shutil.copy2(out_file, context.final_result_file) @@ -23,42 +75,42 @@ def merge_pfx2as(context): # We are always doing RPKI but IRR is optional for now so depending on this # we are working off of a different base file for the merge. if context.args.irr: - base_file = f'{context.out_dir}merged_file_rpki_irr.txt' + base_file = f"{context.out_dir}merged_file_rpki_irr.txt" out_file = f"{context.out_dir}merged_file_rpki_irr_rv.txt" else: - base_file = f'{context.out_dir_rpki}rpki_final.txt' + base_file = f"{context.out_dir_rpki}rpki_final.txt" out_file = f"{context.out_dir}merged_file_rpki_rv.txt" - rv_file = f'{context.out_dir_collectors}pfx2asn_clean.txt' - rv_filtered_file = f'{context.out_dir_collectors}pfx2asn_filtered.txt' + rv_file = f"{context.out_dir_collectors}pfx2asn_clean.txt" + rv_filtered_file = f"{context.out_dir_collectors}pfx2asn_filtered.txt" - general_merge(base_file, rv_file, rv_filtered_file, out_file, context.args.silent, context.args.workers) + general_merge( + base_file, + rv_file, + rv_filtered_file, + out_file + ) shutil.copy2(out_file, context.final_result_file) -def general_merge(base_file, extra_file, extra_filtered_file, out_file, silent=False, num_workers=0): - extra_kwargs = {'nb_workers': num_workers} if num_workers > 0 else {} - pandarallel.initialize(progress_bar=not silent, verbose=0, **extra_kwargs) - - print("Parse base file to numpy arrays") - base_nets = [] - base_masks = [] +def general_merge( + base_file, extra_file, extra_filtered_file, out_file +): + """ + Merge lists of IP networks into a base file. + """ + print("Parse base file to dictionary") + base = BaseNetworkIndex() with open(base_file, "r") as file: for line in file: pfx, asn = line.split(" ") - ipn = ipaddress.ip_network(pfx) - netw = int(ipn.network_address) - mask = int(ipn.netmask) - base_masks.append(mask) - base_nets.append(netw) - - net_masks = np.array(base_masks) - network_addresses = np.array(base_nets) + base.update(pfx) print("Parse extra file to Pandas DataFrame") extra_nets_int = [] extra_asns = [] extra_pfxs = [] + extra_pfxs_leading = [] with open(extra_file, "r") as file: for line in file: pfx, asn = line.split(" ") @@ -67,46 +119,52 @@ def general_merge(base_file, extra_file, extra_filtered_file, out_file, silent=F extra_nets_int.append(netw_int) extra_asns.append(asn.strip()) extra_pfxs.append(pfx) + if ipn.version == 4: + root_net = int(pfx.split(".", maxsplit=1)[0]) + else: + root_net = str(pfx).split(":", maxsplit=1)[0] + extra_pfxs_leading.append(root_net) df_extra = pd.DataFrame() - df_extra['INETS'] = extra_nets_int - df_extra['ASNS'] = extra_asns - df_extra['PFXS'] = extra_pfxs - - def check_inclusion(extra_net): - inclusion_list = int(extra_net) & net_masks == network_addresses + df_extra["INETS"] = extra_nets_int + df_extra["ASNS"] = extra_asns + df_extra["PFXS"] = extra_pfxs + df_extra["PFXS_LEADING"] = extra_pfxs_leading - if np.any(inclusion_list): - return 1 + print("Merging extra prefixes that were not included in the base file:\n") - return 0 + included = [] + for row in df_extra.itertuples(index=False): + base.included(row, included) - print("Filtering extra prefixes that were already " - "included in the base file:\n") - df_extra['INCLUDED'] = df_extra.INETS.parallel_apply(check_inclusion) + df_extra["INCLUDED"] = included df_filtered = df_extra[df_extra.INCLUDED == 0] - # We are stuck at the end of the progress bar after above finishes - print("\n") + + print("Finished merging extra prefixes.") if extra_filtered_file: - print(f"Finished filtering! Originally {len(df_extra.index)} " - f"entries filtered down to {len(df_filtered.index)}") - df_filtered.to_csv(extra_filtered_file, - sep=' ', - index=False, - columns=["PFXS", "ASNS"], - header=False) + print( + f"Finished filtering! Originally {len(df_extra.index)} " + f"entries filtered down to {len(df_filtered.index)}" + ) + df_filtered.to_csv( + extra_filtered_file, + sep=" ", + index=False, + columns=["PFXS", "ASNS"], + header=False, + ) with open(extra_filtered_file, "r") as extra: extra_contents = extra.read() else: - print(f"Finished filtering! Originally {len(df_extra.index)} entries " - f"filtered down to {len(df_filtered.index)}") - extra_contents = df_filtered.to_csv(None, - sep=' ', - index=False, - columns=["PFXS", "ASNS"], - header=False) + print( + f"Finished filtering! Originally {len(df_extra.index)} entries " + f"filtered down to {len(df_filtered.index)}" + ) + extra_contents = df_filtered.to_csv( + None, sep=" ", index=False, columns=["PFXS", "ASNS"], header=False + ) print("Merging base file with filtered extra file") with open(base_file, "r") as base: diff --git a/module.nix b/module.nix index 413bf58..8ab2b03 100644 --- a/module.nix +++ b/module.nix @@ -18,15 +18,8 @@ in options.services.kartograf = { enable = mkEnableOption "kartograf"; clean = mkEnableOption "cleaning up of temporary artifacts after processing." // { default = true; }; - silent = mkEnableOption "silencing output (suppresses pandarallel's progress_bar)." // { default = true; }; useIRR = mkEnableOption "using Internet Routing Registry (IRR) data" // { default = true; }; useRV = mkEnableOption "using RouteViews (RV) data" // { default = true; }; - workers = mkOption { - type = types.int; - default = 0; - example = 4; - description = mdDoc "Number of workers to use for pandarallel (0 = use all available cores)."; - }; schedule = mkOption { type = types.str; default = "*-*-01 00:00:00 UTC"; @@ -69,10 +62,8 @@ in ExecStopPost = "${postScript}/bin/post-script"; ExecStart = ''${kartograf}/bin/kartograf map \ ${optionalString cfg.clean "--cleanup" } \ - ${optionalString cfg.silent "--silent" } \ ${optionalString cfg.useIRR "--irr" } \ ${optionalString cfg.useRV "--routeviews" } \ - ${optionalString (cfg.workers != 0) "--workers=${toString cfg.workers}" } \ ''; MemoryDenyWriteExecute = true; WorkingDirectory = cfg.resultPath; diff --git a/requirements.txt b/requirements.txt index 39f98d1..fc34f56 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,4 @@ beautifulsoup4==4.11.1 -numpy==1.26.4 pandas==1.5.3 requests>=2.31.0 tqdm==4.66.3 -pandarallel==1.6.5 diff --git a/run b/run index cdf8100..f7c3d1d 100755 --- a/run +++ b/run @@ -57,12 +57,6 @@ if __name__ == "__main__": # services. # parser_map.add_argument("-f", "--announced_filter", action="store_true", default=False) - # Reduce log output (for now, affects only progress_bar in general_merge) - parser_map.add_argument("-s", "--silent", action="store_true", default=False) - - # Number of workers to use for pandarallel - parser_map.add_argument("-p", "--workers", type=int, default=0) - # TODO: # Include multiple ASNs that validate correctly for the same prefix. # parser_map.add_argument("-m", "--multi_map", action="store_true", default=False)