diff --git a/README.md b/README.md index e9a525d..51aeb4e 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,12 @@ These are just a few ideas. * Processes inherit trust zones from the upstream entity * Save it as a .drawio file in a convenient location +Some notes about how to make the diagram: +* You don't need to include a Trust Boundary - it won't get parsed and has no bearing on the threats which appear +* You don't need to include the STRIDE labels - we generate the threats for you! +* Bi-directional flows are currently not supported - flows move in one direction. In some cases, data might flow from Entity A to Entity B, while data also flows from Entity B to Entity A; model these as two separate flows and you'll be fine. +* Entities must talk to other entities through processes - the process is mandatory and must be included. + ### Example ![](samples/bookface.png) diff --git a/materialize_threats/db/dbgraph.py b/materialize_threats/db/dbgraph.py index c5a7a59..599015e 100644 --- a/materialize_threats/db/dbgraph.py +++ b/materialize_threats/db/dbgraph.py @@ -9,7 +9,7 @@ def node_is_user_object(node): def get_node_trust_zones_from_graph(graph): zones = dict() - nodes = set(graph.nodes.keys()) + nodes = set(graph.nodes.keys()) edges = set() for edge in graph.edges: edges.add(edge.to) @@ -17,27 +17,29 @@ def get_node_trust_zones_from_graph(graph): orphans = nodes.difference(edges) nodes = nodes.difference(orphans) - + # For now, orphans are assumed to be the smaller inner objects for node in nodes: node = graph.get_node_by_sid(node) - for orphan in orphans: + node_type = UserObject.infer_type_from_node(node) + + for orphan in orphans: zone = None orphan = graph.get_node_by_sid(orphan) - - + + outer_rect = node.rect inner_rect = orphan.rect - + # here, both the element node and zone node could be wrapped user objects if outer_rect.is_overlapping(inner_rect): - + # maybe using our shape library if node_is_user_object(node) and node_is_user_object(orphan): assert(orphan.value.get_object_type() == 'trust zone') zone = orphan.value.get_trust_zone() - + # using the built-in shape library else: zone = UserObject.get_trust_zone_from_node_label(orphan.value) @@ -50,7 +52,7 @@ def get_node_trust_zones_from_graph(graph): def load_graph_into_db(graph, zones): flows = graph.edges.copy() - + remove_flow = lambda flow: flows.remove(flow) # we only care about things that are connected, so we start by traversing all of the edges in the graph @@ -60,13 +62,13 @@ def load_graph_into_db(graph, zones): source = graph.nodes[flow.fr] destination = graph.nodes[flow.to] - + # Processes get special treatment in our scheme. They represent metadata about a given flow process = None - + pair = [] - for node in (source, destination): + for node in (source, destination): print("inspecting {} {}".format(node.label, node.sid)) element_type = None @@ -76,24 +78,28 @@ def load_graph_into_db(graph, zones): element_type = UserObject.infer_type_from_node(node) if element_type == 'process': - process = Node().create( - zone=zones[node], - label=node.label, - identifier=node.sid, - data='test', - type=element_type - ) + try: + process = Node().create( + zone=zones[node], + label=node.label, + identifier=node.sid, + data='test', + type=element_type + ) + except: + print(f"Looks like {node.label} is missing something; does it have a trust zone?") + exit() if node == source: print("fix up source {} {}".format(node.label, node.sid)) inbound_flow, inbound_node = [(flow, flow.fr) for flow in flows if flow.to == node.sid][0] node = graph.nodes[inbound_node] - dangling_flow = inbound_flow + dangling_flow = inbound_flow else: print("fix up destination {} {}".format(node.label, node.sid)) - + outbound_flow, outbound_node = [(flow, flow.to) for flow in flows if flow.fr == node.sid][0] node = graph.nodes[outbound_node] dangling_flow = outbound_flow @@ -101,22 +107,24 @@ def load_graph_into_db(graph, zones): flows.remove(flow) flows.remove(dangling_flow) - pair.append( - Node().create( - zone=zones[node], - label=node.label, - identifier=node.sid, - data='test', - type=element_type + if element_type is not None: + pair.append( + Node().create( + zone=zones[node], + label=node.label, + identifier=node.sid, + data='test', + type=element_type + ) ) + + if element_type is not None: + Edge.create( + source=pair[SOURCE], + destination=pair[DESTINATION], + process=process, + data='test' ) - Edge.create( - source=pair[SOURCE], - destination=pair[DESTINATION], - process=process, - data='test' - ) - return True diff --git a/materialize_threats/materialize.py b/materialize_threats/materialize.py index f9c2470..c807585 100644 --- a/materialize_threats/materialize.py +++ b/materialize_threats/materialize.py @@ -23,7 +23,7 @@ def get_flows_with_threats(cls): DESTINATION_ZONE = 'destinationZone' PROCESS = 'process' - threats = { + threats = { SPOOFING: [], TAMPERING: [], REPUDIATION: [], @@ -38,9 +38,9 @@ def get_flows_with_threats(cls): edgequery = ( Edge.select( - Source.label.alias(SOURCE), + Source.label.alias(SOURCE), Source.zone.alias(SOURCE_ZONE), - Destination.label.alias(DESTINATION), + Destination.label.alias(DESTINATION), Destination.zone.alias(DESTINATION_ZONE), Process.label.alias(PROCESS) ) @@ -49,7 +49,7 @@ def get_flows_with_threats(cls): .join(Process, on=(Process.id == Edge.process)) .switch(Destination) .join(Destination, on=(Destination.id == Edge.destination)) - ) + ) threats[ELEVATION_OF_PRIVILEGE] = list( edgequery.where( @@ -59,7 +59,7 @@ def get_flows_with_threats(cls): threats[SPOOFING] = list( edgequery.where( - (Source.zone == 0) & + (Source.zone == 0) & (Destination.zone == 1) ).dicts() ) @@ -74,7 +74,7 @@ def get_flows_with_threats(cls): threats[DENIAL_OF_SERVICE] = list( edgequery.where( - (Source.zone == 0) & + (Source.zone == 0) & (Destination.zone == 1) ).dicts() ) @@ -98,13 +98,13 @@ def materialize(cls): args = argparse.ArgumentParser(description="Enumerate STRIDE threats from a data flow diagram and create test case stubs") args.add_argument( - "--diagram", + "--diagram", default="samples/sample.drawio", - type=argparse.FileType('r'), + type=argparse.FileType('r'), help="The draw.io data flow diagram filename" ) filename = args.parse_args().diagram.name - + args.add_argument( "--featurefile", default=os.path.basename(filename) + ".feature", @@ -122,6 +122,6 @@ def materialize(cls): feature_file = create_feature_file_for_gherkins(feature=filename, gherkins=gherkin_candidates) args.parse_args().featurefile.write(feature_file) - - cls.output_threats(threats) + + cls.output_threats(threats) diff --git a/materialize_threats/mx/models/UserObject.py b/materialize_threats/mx/models/UserObject.py index 812703b..0714775 100644 --- a/materialize_threats/mx/models/UserObject.py +++ b/materialize_threats/mx/models/UserObject.py @@ -26,20 +26,27 @@ def text_to_mx_value(self): if i != last_text: value += "
" return value - + @classmethod def get_trust_zone_from_node_label(cls, label): + # Sometimes we pass in a UserObject instead of a label + if type(label) != str: + label = label.label if label is not None: - zone = label.lower().split(cls.ZONE_PREFIX)[cls.ZONE_INDEX] - if cls._is_valid_trust_zone(zone): - return zone - + try: + zone = label.lower().split(cls.ZONE_PREFIX)[cls.ZONE_INDEX] + zone = zone.replace('', '') + if cls._is_valid_trust_zone(zone): + return zone + except: + print(f'found unparsable object with label {label}, skipping') + return None return None def get_trust_zone(self): - + zone = self.xml.get(self.LABEL) - + if zone is not None: try: zone = zone.lower().split(self.ZONE_PREFIX)[self.ZONE_INDEX] @@ -63,7 +70,7 @@ def get_object_type(self): if type in self.TYPES: return type - @classmethod + @classmethod def infer_type_from_node(cls, node): TRUST_ZONE = 'text;html=1;strokeColor=#82b366;fillColor=#d5e8d4;align=center;verticalAlign=middle;whiteSpace=wrap;overflow=hidden;' @@ -76,12 +83,12 @@ def infer_type_from_node(cls, node): 'element': ELEMENT, 'process': PROCESS, 'data store': DATA_STORE - + } - + for text in node.texts: for key, value in types.items(): - if text.text == value: + if text.text == value: return key - return None \ No newline at end of file + return None