Skip to content

Commit

Permalink
Merge pull request #13 from pszufe/validate_hif
Browse files Browse the repository at this point in the history
Rewrite the validate_hif.py to use the new schema
  • Loading branch information
colltoaction authored Aug 31, 2024
2 parents a8aa1fa + a95ebd3 commit 27bea43
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 240 deletions.
128 changes: 18 additions & 110 deletions scripts/hif.py
Original file line number Diff line number Diff line change
@@ -1,115 +1,25 @@
import json
from collections import defaultdict
from warnings import warn


def validate_hif(path):
with open(path) as file:
# load JSON file
data = json.loads(file.read())

# dictionary to store statuses
info = {}

# check that keys do not deviate from the standard field names
info["valid-field-names"] = 0
fields = {"network-type", "metadata", "nodes", "edges", "incidences"}
if not set(data).issubset(fields):
fields = ", ".join(fields)
data = ", ".join(set(data))
warn(
f"Acceptable field names are: {fields}\nand the field names are {data}"
)
info["valid-field-names"] = 1

# incidences are required; check that they exist
info["incidences-exist"] = 0

if "incidences" not in data:
warn(f"The file must contain an field for incidences.")
info["incidences-exist"] = 1

# check network type
info["valid-network-type"] = 0

network_types = {"asc", "undirected", "directed"}
if "network-type" in data:
if data["network-type"] not in network_types:
network_types = ", ".join(network_types)
warn(
f"Unsupported network type. Valid types are: {network_types}"
)
info["valid-network-type"] = 1

# check network metadata
info["metadata-dict"] = 0

if "metadata" in data:
if not isinstance(data["metadata"], dict):
warn(f"The metadata must be dict-like.")
info["metadata-dict"] = 1

# check node attributes
info["node-record-length"] = 0
info["node-attr-dict"] = 0

if "nodes" in data:
for i, record in enumerate(data["nodes"]):
if len(record) != 2:
warn(
f"Each node record must have two entries: an ID and the dictionary of corresponding attributes."
)
info["node-record-length"] = 1

if not isinstance(record[1], dict):
warn(f"The node attributes must be dict-like.")
info["node-attr-dict"] = 1

# check edge attributes
info["edge-record-length"] = 0
info["edge-attr-dict"] = 0

if "edges" in data:
for i, record in enumerate(data["edges"]):
if len(record) != 2:
warn(
f"Each edge record must have two entries: an ID and the dictionary of corresponding attributes."
)
info["edge-record-length"] = 1

if not isinstance(record[1], dict):
warn(f"The edge attributes must be dict-like.")
info["edge-attr-dict"] = 1

if "incidences" in data:
info["incidence-record-length"] = 0
info["incidence-attr-dict"] = 0

def validate_network_type(data, verbose):
"""
Custom validations for network types
"""
if (
"network-type" in data
and data["network-type"] == "directed"
and "incidences" in data
):
for i, record in enumerate(data["incidences"]):
if len(record) != 3:
warn(
f"Each incidence record must have three entries: an edge ID, a node ID, and the dictionary of corresponding attributes."
)
info["incidence-record-length"] = 1

if not isinstance(record[2], dict):
warn(f"The incidence attributes must be dict-like.")
info["incidence-attr-dict"] = 1

# in the case of directed hypergraphs, each incidence must
# have the "direction" attribute
if "network-type" in data and data["network-type"] == "directed":
data["direction-exists-for-directed"] = 0
for i, record in enumerate(data["edges"]):
if "direction" not in record[2]:
warn(
f"Each incidence record must have have the 'direction' attribute for directed hypergraphs."
)
data["direction-exists-for-directed"] = 1
status = 1
if verbose:
print(
f"Each incidence record must have have the 'direction' attribute for directed hypergraphs."
)

# in the case of simplicial complexes, make sure that the edges are maximal
if "network-type" in data and data["network-type"] == "asc":
data["maximal-edges-for-asc"] = 0
if "network-type" in data and data["network-type"] == "asc" and "incidences" in data:
edgedict = defaultdict(set)
for record in data["incidences"]:
e = record[0]
Expand All @@ -118,9 +28,7 @@ def validate_hif(path):
for e1, edge1 in edgedict.items():
for e2, edge2 in edgedict.items():
if e1 != e2 and edge1.issubset(edge2):
warn(
f"Only maximal faces should be stored for simplicial complexes."
if verbose:
print(
f"Only maximal faces should be stored for simplicial complexes."
)
data["maximal-edges-for-asc"] = 1

return info
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def bad_network_type():
return json.load(open(f"{json_dir}/bad_network_type.json", "r"))


@pytest.fixture
def bad_node_without_id():
return json.load(open(f"{json_dir}/bad_node_without_id.json", "r"))


@pytest.fixture
def metadata_as_list():
return json.load(open(f"{json_dir}/metadata_as_list.json", "r"))
Expand Down
1 change: 1 addition & 0 deletions tests/test_files/bad_node_without_id.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"incidences": [], "nodes": [ { } ]}
17 changes: 17 additions & 0 deletions tests/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ def test_bad_network_type(validator, bad_network_type):
validator(bad_network_type)


def test_bad_node_without_id(validator, bad_node_without_id):
with pytest.raises(ValueError):
validator(bad_node_without_id)


def test_single_node(validator, single_node):
validator(single_node)


def test_single_edge(validator, single_edge):
validator(single_edge)


def test_single_incidence(validator, single_incidence):
validator(single_incidence)


def test_metadata_as_list(validator, metadata_as_list):
with pytest.raises(ValueError):
validator(metadata_as_list)
Expand Down
134 changes: 4 additions & 130 deletions validate_hif.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import json
import sys
from collections import defaultdict

import fastjsonschema

# 0 - OK, 1 - bad JSON
status = 0
from scripts.hif import validate_network_type


if len(sys.argv) > 2 and sys.argv[2] == "--silent":
Expand All @@ -14,136 +12,12 @@
verbose = True

# network parameters
filename = "lesmis-hif.json"
schema_filename = "hif_schema.json"

filename = sys.argv[1]
schema_filename = "schemas/hif_schema_v0.1.0.json"

with open(filename) as file, open(schema_filename) as schema_file:
# load JSON file
validate_schema = fastjsonschema.compile(json.load(schema_file))
data = json.load(file)
validate_schema(data)

# check that keys do not deviate from the standard field names
# DESCRIPTIONS OF THE FIELDS
# "network-type": a string indicating what type of network the dataset is.
# Valid choices currently include:
# - "undirected" (undirected hypergraph with potential multiedges)
# - "asc" (simplicial complex where only the maximal faces are stored)
# - "directed" (directed hypergraph with potential multiedges)
# "metadata": any dataset-level attributes (e.g., name, author, etc.) which must be dict-like
# "nodes": a list of 2-entries, where the first entry of a record is the node ID
# and the second entry is dict-like and stores the associated attributes.
# "edges": a list of 2-entries, where the first entry of a record is the edge ID
# and the second entry is dict-like and stores the associated attributes.
# "incidences": a list of 3-entries, where the first entry of a record is the edge ID,
# the second entry is the node ID, and the third entry is dict-like and
# stores the associated attributes.
# **Note that this is the only required field.

fields = {"network-type", "metadata", "nodes", "edges", "incidences"}
if not set(data).issubset(fields):
status = 1
if verbose:
field_names = ", ".join(fields)
new_field_names = ", ".join(set(data))
print(
f"Acceptable field names are: {field_names}\nand the field names are {new_field_names}"
)

# incidences are required
if "incidences" not in data:
status = 1
if verbose:
print(f"The file must contain an field for incidences.")

# check network type
network_types = {"asc", "undirected", "directed"}
if "network-type" in data:
if data["network-type"] not in network_types:
status = 1

if verbose:
network_types = ", ".join(network_types)
print(f"Unsupported network type. Valid types are: {network_types}")

# check network metadata
if "metadata" in data:
if not isinstance(data["metadata"], dict):
status = 1
if verbose:
print(f"The metadata must be dict-like.")

# check node attributes
if "nodes" in data:
for i, record in enumerate(data["nodes"]):
if len(record) != 2:
status = 1
if verbose:
print(
f"Each node record must have two entries: an ID and the dictionary of corresponding attributes."
)
if not isinstance(record[1], dict):
status = 1
if verbose:
print(f"The node attributes must be dict-like.")

# check edge attributes
if "edges" in data:
for i, record in enumerate(data["edges"]):
if len(record) != 2:
status = 1
if verbose:
print(
f"Each edge record must have two entries: an ID and the dictionary of corresponding attributes."
)
if not isinstance(record[1], dict):
status = 1
if verbose:
print(f"The edge attributes must be dict-like.")

if "incidences" in data:
for i, record in enumerate(data["incidences"]):
if len(record) != 3:
status = 1
if verbose:
print(
f"Each incidence record must have three entries: an edge ID, a node ID, and the dictionary of corresponding attributes."
)
if not isinstance(record[2], dict):
status = 1
if verbose:
print(f"The incidence attributes must be dict-like.")

# in the case of directed hypergraphs, each incidence must
# have the "direction" attribute
if (
"network-type" in data
and data["network-type"] == "directed"
and "incidences" in data
):
for i, record in enumerate(data["incidences"]):
if "direction" not in record[2]:
status = 1
if verbose:
print(
f"Each incidence record must have have the 'direction' attribute for directed hypergraphs."
)

# in the case of simplicial complexes, make sure that the edges are maximal
if "network-type" in data and data["network-type"] == "asc" and "incidences" in data:
edgedict = defaultdict(set)
for record in data["incidences"]:
e = record[0]
n = record[1]
edgedict[e].add(n)
for e1, edge1 in edgedict.items():
for e2, edge2 in edgedict.items():
if e1 != e2 and edge1.issubset(edge2):
status = 1
if verbose:
print(
f"Only maximal faces should be stored for simplicial complexes."
)

print(f"Exit status {status}.")
validate_network_type(data, verbose)

0 comments on commit 27bea43

Please sign in to comment.