From 346f9c00f509f3ea181b4d7a58b82f9ae06915c3 Mon Sep 17 00:00:00 2001 From: gfrebello Date: Thu, 2 Mar 2023 08:54:36 -0300 Subject: [PATCH 1/7] timemachine: Add support for reconstruction/filtering of missing edges. --- lntopo/timemachine.py | 103 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 98 insertions(+), 5 deletions(-) diff --git a/lntopo/timemachine.py b/lntopo/timemachine.py index 120587f..7df538e 100644 --- a/lntopo/timemachine.py +++ b/lntopo/timemachine.py @@ -8,7 +8,7 @@ from datetime import datetime import json from networkx.readwrite import json_graph - +import requests @click.group() def timemachine(): @@ -19,7 +19,8 @@ def timemachine(): @click.argument("dataset", type=DatasetFile()) @click.argument("timestamp", type=int, required=False) @click.option('--fmt', type=click.Choice(['dot', 'gml', 'graphml', 'json'], case_sensitive=False)) -def restore(dataset, timestamp=None, fmt='dot'): +@click.option('--fix_missing', type=click.Choice(['recover','filter'], case_sensitive=False)) +def restore(dataset, timestamp=None, fmt='dot', fix_missing=None): """Restore reconstructs the network topology at a specific time in the past. Restore replays gossip messages from a dataset and reconstructs @@ -123,15 +124,107 @@ def restore(dataset, timestamp=None, fmt='dot'): for scid in todelete: del channels[scid] - nodes = [n for n in nodes.values() if n["in_degree"] > 0 or n['out_degree'] > 0] - if len(channels) == 0: print( "ERROR: no channels are left after pruning, make sure to select a" "timestamp that is covered by the dataset." ) sys.exit(1) - + + if fix_missing is not None: + # If fix_missing is set, find channels that don't have edge data for both directions + unmatched = [] + removed = [] + for scid, chan in tqdm(channels.items(), desc="Finding unmatched channels"): + + if scid[-2:] == "/0": + opposite_scid = scid[:-2] + "/1" + elif scid[-2:] == "/1": + opposite_scid = scid[:-2] + "/0" + else: + raise Exception("ERROR: unknown scid format.") + + if opposite_scid not in channels: + unmatched.append(scid) + + if fix_missing == "recover": + # Attempt to recover the missing edges using a Lightning explorer + for scid in tqdm(unmatched, desc="Retrieving information for missing edges from 1ML.com"): + scid_elements = [ int(i) for i in scid[:-2].split("x") ] + converted_scid = scid_elements[0] << 40 | scid_elements[1] << 16 | scid_elements[2] + url = "https://1ml.com/channel/" + str(converted_scid) + "/json" + resp = requests.get(url) + + if resp.status_code == 200: + chan_info = resp.json() + else: + raise Exception("ERROR: unable to retrieve channel.") + + direction = int(not bool(int(scid[-1:]))) + + if direction == 0: + recovered_data = chan_info["node1_policy"] + else: + recovered_data = chan_info["node2_policy"] + + chan = channels.get(scid, None) + + if not all(recovered_data.values()): + # If no useful data could be found, remove the channel + node = nodes.get(chan["source"], None) + if node is None: + continue + node["out_degree"] -= 1 + node = nodes.get(chan["destination"], None) + if node is None: + continue + node["in_degree"] -= 1 + removed.append(channels[scid]) + del channels[scid] + + else: + # Add recovered edge to the graph + channels[scid[:-1] + str(direction)] = { + "source": chan["destination"], + "destination": chan["source"], + "timestamp": chan["timestamp"], + "features": chan["features"], + "fee_base_msat": recovered_data["fee_base_msat"], + "fee_proportional_millionths": recovered_data["fee_rate_milli_msat"], + "htlc_minimum_msat": recovered_data["min_htlc"], + "cltv_expiry_delta": recovered_data["time_lock_delta"] } + + node = nodes.get(chan["destination"], None) + if node is None: + continue + node["out_degree"] += 1 + node = nodes.get(chan["source"], None) + if node is None: + continue + node["in_degree"] += 1 + + if fix_missing == "filter": + # Remove channels that don"t have edge data for both directions + for scid in tqdm(unmatched, desc="Removing unmatched edges from the graph"): + chan = channels.get(scid, None) + node = nodes.get(chan["source"], None) + if node is None: + continue + node["out_degree"] -= 1 + node = nodes.get(chan["destination"], None) + if node is None: + continue + node["in_degree"] -= 1 + removed.append(channels[scid]) + del channels[scid] + + + print('WARNING:', len(removed), "channels were removed from the graph due to missing edges") + + + nodes = [n for n in nodes.values() if n["in_degree"] > 0 or n["out_degree"] > 0] + + # Export graph g = nx.DiGraph() for n in nodes: g.add_node(n["id"], **n) From 3aecbf4d809fbfb3bbf5d27db644760dfe1a9c5e Mon Sep 17 00:00:00 2001 From: Gabriel Rebello Date: Thu, 2 Mar 2023 12:29:02 -0300 Subject: [PATCH 2/7] timemachine: Corrected a small indentation issue. --- lntopo/timemachine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lntopo/timemachine.py b/lntopo/timemachine.py index 7df538e..1a08398 100644 --- a/lntopo/timemachine.py +++ b/lntopo/timemachine.py @@ -144,7 +144,7 @@ def restore(dataset, timestamp=None, fmt='dot', fix_missing=None): else: raise Exception("ERROR: unknown scid format.") - if opposite_scid not in channels: + if opposite_scid not in channels: unmatched.append(scid) if fix_missing == "recover": From 1b3bbce0b71c2164cf42d32f1bce487cd82abad2 Mon Sep 17 00:00:00 2001 From: gfrebello Date: Fri, 3 Mar 2023 00:04:39 -0300 Subject: [PATCH 3/7] timemachine: Add cache support for retrieved edges. --- lntopo/timemachine.py | 51 ++++++++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/lntopo/timemachine.py b/lntopo/timemachine.py index 7df538e..c5fd540 100644 --- a/lntopo/timemachine.py +++ b/lntopo/timemachine.py @@ -9,6 +9,8 @@ import json from networkx.readwrite import json_graph import requests +import os +import csv @click.group() def timemachine(): @@ -35,6 +37,7 @@ def restore(dataset, timestamp=None, fmt='dot', fix_missing=None): cutoff = timestamp - 2 * 7 * 24 * 3600 channels = {} nodes = {} + cache_file = "./data/channels_cache.csv" # Some target formats do not suport UTF-8 aliases. codec = 'UTF-8' if fmt in ['dot'] else 'ASCII' @@ -130,7 +133,7 @@ def restore(dataset, timestamp=None, fmt='dot', fix_missing=None): "timestamp that is covered by the dataset." ) sys.exit(1) - + if fix_missing is not None: # If fix_missing is set, find channels that don't have edge data for both directions unmatched = [] @@ -144,28 +147,46 @@ def restore(dataset, timestamp=None, fmt='dot', fix_missing=None): else: raise Exception("ERROR: unknown scid format.") - if opposite_scid not in channels: + if opposite_scid not in channels: unmatched.append(scid) if fix_missing == "recover": - # Attempt to recover the missing edges using a Lightning explorer - for scid in tqdm(unmatched, desc="Retrieving information for missing edges from 1ML.com"): - scid_elements = [ int(i) for i in scid[:-2].split("x") ] - converted_scid = scid_elements[0] << 40 | scid_elements[1] << 16 | scid_elements[2] - url = "https://1ml.com/channel/" + str(converted_scid) + "/json" - resp = requests.get(url) - - if resp.status_code == 200: - chan_info = resp.json() + # Attempt to recover missing edges + if os.path.exists(cache_file) and os.stat(cache_file).st_size > 0: + with open(cache_file, 'r') as f: + reader = csv.reader(f) + channels_cache = {rows[0]:json.loads(rows[1]) for rows in reader} + else: + channels_cache = dict() + + for scid in tqdm(unmatched, desc="Attempting to recover missing edges"): + undirected_scid = scid[:-2] + if undirected_scid in channels_cache: + # If possible, retrieve edge data from the cache file + recovered_chan = channels_cache[undirected_scid] else: - raise Exception("ERROR: unable to retrieve channel.") - + # Else, request edge data from a LN explorer and save it in the cache file + scid_elements = [ int(i) for i in undirected_scid.split("x") ] + converted_scid = scid_elements[0] << 40 | scid_elements[1] << 16 | scid_elements[2] + url = "https://1ml.com/channel/" + str(converted_scid) + "/json" + resp = requests.get(url) + + if resp.status_code == 200: + recovered_chan = resp.json() + else: + raise Exception("ERROR: unable to retrieve channel.") + + os.makedirs(os.path.dirname(cache_file), exist_ok=True) + with open(cache_file, 'w+') as f: + writer = csv.writer(f) + writer.writerow([undirected_scid, json.dumps(recovered_chan)]) + direction = int(not bool(int(scid[-1:]))) if direction == 0: - recovered_data = chan_info["node1_policy"] + recovered_data = recovered_chan["node1_policy"] else: - recovered_data = chan_info["node2_policy"] + recovered_data = recovered_chan["node2_policy"] chan = channels.get(scid, None) From cf7ae4126aa33cdfae2807caff86a417957772c4 Mon Sep 17 00:00:00 2001 From: gfrebello Date: Fri, 3 Mar 2023 09:52:18 -0300 Subject: [PATCH 4/7] timemachine: Fix an issue that would overwrite the cache file. --- lntopo/timemachine.py | 120 +++++++++++++++++++++--------------------- 1 file changed, 60 insertions(+), 60 deletions(-) diff --git a/lntopo/timemachine.py b/lntopo/timemachine.py index c5fd540..7952105 100644 --- a/lntopo/timemachine.py +++ b/lntopo/timemachine.py @@ -154,75 +154,75 @@ def restore(dataset, timestamp=None, fmt='dot', fix_missing=None): # Attempt to recover missing edges if os.path.exists(cache_file) and os.stat(cache_file).st_size > 0: with open(cache_file, 'r') as f: - reader = csv.reader(f) + reader = csv.reader(f, quoting=csv.QUOTE_NONE) channels_cache = {rows[0]:json.loads(rows[1]) for rows in reader} else: channels_cache = dict() - for scid in tqdm(unmatched, desc="Attempting to recover missing edges"): - undirected_scid = scid[:-2] - if undirected_scid in channels_cache: - # If possible, retrieve edge data from the cache file - recovered_chan = channels_cache[undirected_scid] - else: - # Else, request edge data from a LN explorer and save it in the cache file - scid_elements = [ int(i) for i in undirected_scid.split("x") ] - converted_scid = scid_elements[0] << 40 | scid_elements[1] << 16 | scid_elements[2] - url = "https://1ml.com/channel/" + str(converted_scid) + "/json" - resp = requests.get(url) - - if resp.status_code == 200: - recovered_chan = resp.json() + os.makedirs(os.path.dirname(cache_file), exist_ok=True) + with open(cache_file, 'w') as f: + for scid in tqdm(unmatched, desc="Attempting to recover missing edges"): + undirected_scid = scid[:-2] + if undirected_scid in channels_cache: + # If possible, retrieve edge data from the cache file + recovered_chan = channels_cache[undirected_scid] else: - raise Exception("ERROR: unable to retrieve channel.") - - os.makedirs(os.path.dirname(cache_file), exist_ok=True) - with open(cache_file, 'w+') as f: + # Else, request edge data from a LN explorer and save it in the cache file + scid_elements = [ int(i) for i in undirected_scid.split("x") ] + converted_scid = scid_elements[0] << 40 | scid_elements[1] << 16 | scid_elements[2] + url = "https://1ml.com/channel/" + str(converted_scid) + "/json" + resp = requests.get(url) + + if resp.status_code == 200: + recovered_chan = resp.json() + else: + raise Exception("ERROR: unable to retrieve channel.") + writer = csv.writer(f) writer.writerow([undirected_scid, json.dumps(recovered_chan)]) - direction = int(not bool(int(scid[-1:]))) - - if direction == 0: - recovered_data = recovered_chan["node1_policy"] - else: - recovered_data = recovered_chan["node2_policy"] - - chan = channels.get(scid, None) + direction = int(not bool(int(scid[-1:]))) - if not all(recovered_data.values()): - # If no useful data could be found, remove the channel - node = nodes.get(chan["source"], None) - if node is None: - continue - node["out_degree"] -= 1 - node = nodes.get(chan["destination"], None) - if node is None: - continue - node["in_degree"] -= 1 - removed.append(channels[scid]) - del channels[scid] - - else: - # Add recovered edge to the graph - channels[scid[:-1] + str(direction)] = { - "source": chan["destination"], - "destination": chan["source"], - "timestamp": chan["timestamp"], - "features": chan["features"], - "fee_base_msat": recovered_data["fee_base_msat"], - "fee_proportional_millionths": recovered_data["fee_rate_milli_msat"], - "htlc_minimum_msat": recovered_data["min_htlc"], - "cltv_expiry_delta": recovered_data["time_lock_delta"] } - - node = nodes.get(chan["destination"], None) - if node is None: - continue - node["out_degree"] += 1 - node = nodes.get(chan["source"], None) - if node is None: - continue - node["in_degree"] += 1 + if direction == 0: + recovered_data = recovered_chan["node1_policy"] + else: + recovered_data = recovered_chan["node2_policy"] + + chan = channels.get(scid, None) + + if not all(recovered_data.values()): + # If no useful data could be found, remove the channel + node = nodes.get(chan["source"], None) + if node is None: + continue + node["out_degree"] -= 1 + node = nodes.get(chan["destination"], None) + if node is None: + continue + node["in_degree"] -= 1 + removed.append(channels[scid]) + del channels[scid] + + else: + # Add recovered edge to the graph + channels[scid[:-1] + str(direction)] = { + "source": chan["destination"], + "destination": chan["source"], + "timestamp": chan["timestamp"], + "features": chan["features"], + "fee_base_msat": recovered_data["fee_base_msat"], + "fee_proportional_millionths": recovered_data["fee_rate_milli_msat"], + "htlc_minimum_msat": recovered_data["min_htlc"], + "cltv_expiry_delta": recovered_data["time_lock_delta"] } + + node = nodes.get(chan["destination"], None) + if node is None: + continue + node["out_degree"] += 1 + node = nodes.get(chan["source"], None) + if node is None: + continue + node["in_degree"] += 1 if fix_missing == "filter": # Remove channels that don"t have edge data for both directions From a99caa292d7cff220f2dfcab091eb4b74ff58e30 Mon Sep 17 00:00:00 2001 From: gfrebello Date: Fri, 3 Mar 2023 19:17:32 -0300 Subject: [PATCH 5/7] timemachine: Fix quoting issue when writing into the cache file. --- lntopo/timemachine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lntopo/timemachine.py b/lntopo/timemachine.py index 7952105..b9083d2 100644 --- a/lntopo/timemachine.py +++ b/lntopo/timemachine.py @@ -154,7 +154,7 @@ def restore(dataset, timestamp=None, fmt='dot', fix_missing=None): # Attempt to recover missing edges if os.path.exists(cache_file) and os.stat(cache_file).st_size > 0: with open(cache_file, 'r') as f: - reader = csv.reader(f, quoting=csv.QUOTE_NONE) + reader = csv.reader(f) channels_cache = {rows[0]:json.loads(rows[1]) for rows in reader} else: channels_cache = dict() From 8cae2bf723f5ba06c98762522981746ff218e51f Mon Sep 17 00:00:00 2001 From: gfrebello Date: Mon, 6 Mar 2023 16:45:27 -0300 Subject: [PATCH 6/7] timemachine: Fix an issue that would overwrite the cache file. --- lntopo/timemachine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lntopo/timemachine.py b/lntopo/timemachine.py index b9083d2..2667043 100644 --- a/lntopo/timemachine.py +++ b/lntopo/timemachine.py @@ -160,7 +160,7 @@ def restore(dataset, timestamp=None, fmt='dot', fix_missing=None): channels_cache = dict() os.makedirs(os.path.dirname(cache_file), exist_ok=True) - with open(cache_file, 'w') as f: + with open(cache_file, 'a') as f: for scid in tqdm(unmatched, desc="Attempting to recover missing edges"): undirected_scid = scid[:-2] if undirected_scid in channels_cache: From 9df071522ae813b18453b34762780b0f296c6832 Mon Sep 17 00:00:00 2001 From: gfrebello Date: Wed, 8 Mar 2023 11:25:29 -0300 Subject: [PATCH 7/7] timemachine: Fix a bug that would interpret all recovered data as strings. --- lntopo/timemachine.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lntopo/timemachine.py b/lntopo/timemachine.py index 2667043..1479bcf 100644 --- a/lntopo/timemachine.py +++ b/lntopo/timemachine.py @@ -210,10 +210,11 @@ def restore(dataset, timestamp=None, fmt='dot', fix_missing=None): "destination": chan["source"], "timestamp": chan["timestamp"], "features": chan["features"], - "fee_base_msat": recovered_data["fee_base_msat"], - "fee_proportional_millionths": recovered_data["fee_rate_milli_msat"], - "htlc_minimum_msat": recovered_data["min_htlc"], - "cltv_expiry_delta": recovered_data["time_lock_delta"] } + "fee_base_msat": int(recovered_data["fee_base_msat"]), + "fee_proportional_millionths": int(recovered_data["fee_rate_milli_msat"]), + "htlc_minimum_msat": int(recovered_data["min_htlc"]), + "cltv_expiry_delta": int(recovered_data["time_lock_delta"]) + } node = nodes.get(chan["destination"], None) if node is None: