Skip to content

Commit

Permalink
Merge pull request #11 from secmerc/hackday_debugging
Browse files Browse the repository at this point in the history
slightly more robust parsing and error handling
  • Loading branch information
sfc-gh-gbutzi authored Nov 3, 2020
2 parents 663e30f + 98457fa commit f52287d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 58 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ These are just a few ideas.
* Processes inherit trust zones from the upstream entity
* Save it as a .drawio file in a convenient location

Some notes about how to make the diagram:
* You don't need to include a Trust Boundary - it won't get parsed and has no bearing on the threats which appear
* You don't need to include the STRIDE labels - we generate the threats for you!
* Bi-directional flows are currently not supported - flows move in one direction. In some cases, data might flow from Entity A to Entity B, while data also flows from Entity B to Entity A; model these as two separate flows and you'll be fine.
* Entities must talk to other entities through processes - the process is mandatory and must be included.

### Example
![](samples/bookface.png)

Expand Down
78 changes: 43 additions & 35 deletions materialize_threats/db/dbgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,37 @@ def node_is_user_object(node):

def get_node_trust_zones_from_graph(graph):
zones = dict()
nodes = set(graph.nodes.keys())
nodes = set(graph.nodes.keys())
edges = set()
for edge in graph.edges:
edges.add(edge.to)
edges.add(edge.fr)

orphans = nodes.difference(edges)
nodes = nodes.difference(orphans)

# For now, orphans are assumed to be the smaller inner objects
for node in nodes:
node = graph.get_node_by_sid(node)

for orphan in orphans:
node_type = UserObject.infer_type_from_node(node)

for orphan in orphans:
zone = None
orphan = graph.get_node_by_sid(orphan)


outer_rect = node.rect
inner_rect = orphan.rect

# here, both the element node and zone node could be wrapped user objects
if outer_rect.is_overlapping(inner_rect):

# maybe using our shape library
if node_is_user_object(node) and node_is_user_object(orphan):
assert(orphan.value.get_object_type() == 'trust zone')
zone = orphan.value.get_trust_zone()

# using the built-in shape library
else:
zone = UserObject.get_trust_zone_from_node_label(orphan.value)
Expand All @@ -50,7 +52,7 @@ def get_node_trust_zones_from_graph(graph):

def load_graph_into_db(graph, zones):
flows = graph.edges.copy()

remove_flow = lambda flow: flows.remove(flow)

# we only care about things that are connected, so we start by traversing all of the edges in the graph
Expand All @@ -60,13 +62,13 @@ def load_graph_into_db(graph, zones):

source = graph.nodes[flow.fr]
destination = graph.nodes[flow.to]

# Processes get special treatment in our scheme. They represent metadata about a given flow
process = None

pair = []

for node in (source, destination):
for node in (source, destination):
print("inspecting {} {}".format(node.label, node.sid))
element_type = None

Expand All @@ -76,47 +78,53 @@ def load_graph_into_db(graph, zones):
element_type = UserObject.infer_type_from_node(node)

if element_type == 'process':
process = Node().create(
zone=zones[node],
label=node.label,
identifier=node.sid,
data='test',
type=element_type
)
try:
process = Node().create(
zone=zones[node],
label=node.label,
identifier=node.sid,
data='test',
type=element_type
)
except:
print(f"Looks like {node.label} is missing something; does it have a trust zone?")
exit()

if node == source:
print("fix up source {} {}".format(node.label, node.sid))

inbound_flow, inbound_node = [(flow, flow.fr) for flow in flows if flow.to == node.sid][0]
node = graph.nodes[inbound_node]
dangling_flow = inbound_flow
dangling_flow = inbound_flow

else:
print("fix up destination {} {}".format(node.label, node.sid))

outbound_flow, outbound_node = [(flow, flow.to) for flow in flows if flow.fr == node.sid][0]
node = graph.nodes[outbound_node]
dangling_flow = outbound_flow

flows.remove(flow)
flows.remove(dangling_flow)

pair.append(
Node().create(
zone=zones[node],
label=node.label,
identifier=node.sid,
data='test',
type=element_type
if element_type is not None:
pair.append(
Node().create(
zone=zones[node],
label=node.label,
identifier=node.sid,
data='test',
type=element_type
)
)

if element_type is not None:
Edge.create(
source=pair[SOURCE],
destination=pair[DESTINATION],
process=process,
data='test'
)

Edge.create(
source=pair[SOURCE],
destination=pair[DESTINATION],
process=process,
data='test'
)

return True

22 changes: 11 additions & 11 deletions materialize_threats/materialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def get_flows_with_threats(cls):
DESTINATION_ZONE = 'destinationZone'
PROCESS = 'process'

threats = {
threats = {
SPOOFING: [],
TAMPERING: [],
REPUDIATION: [],
Expand All @@ -38,9 +38,9 @@ def get_flows_with_threats(cls):

edgequery = (
Edge.select(
Source.label.alias(SOURCE),
Source.label.alias(SOURCE),
Source.zone.alias(SOURCE_ZONE),
Destination.label.alias(DESTINATION),
Destination.label.alias(DESTINATION),
Destination.zone.alias(DESTINATION_ZONE),
Process.label.alias(PROCESS)
)
Expand All @@ -49,7 +49,7 @@ def get_flows_with_threats(cls):
.join(Process, on=(Process.id == Edge.process))
.switch(Destination)
.join(Destination, on=(Destination.id == Edge.destination))
)
)

threats[ELEVATION_OF_PRIVILEGE] = list(
edgequery.where(
Expand All @@ -59,7 +59,7 @@ def get_flows_with_threats(cls):

threats[SPOOFING] = list(
edgequery.where(
(Source.zone == 0) &
(Source.zone == 0) &
(Destination.zone == 1)
).dicts()
)
Expand All @@ -74,7 +74,7 @@ def get_flows_with_threats(cls):

threats[DENIAL_OF_SERVICE] = list(
edgequery.where(
(Source.zone == 0) &
(Source.zone == 0) &
(Destination.zone == 1)
).dicts()
)
Expand All @@ -98,13 +98,13 @@ def materialize(cls):

args = argparse.ArgumentParser(description="Enumerate STRIDE threats from a data flow diagram and create test case stubs")
args.add_argument(
"--diagram",
"--diagram",
default="samples/sample.drawio",
type=argparse.FileType('r'),
type=argparse.FileType('r'),
help="The draw.io data flow diagram filename"
)
filename = args.parse_args().diagram.name

args.add_argument(
"--featurefile",
default=os.path.basename(filename) + ".feature",
Expand All @@ -122,6 +122,6 @@ def materialize(cls):
feature_file = create_feature_file_for_gherkins(feature=filename, gherkins=gherkin_candidates)

args.parse_args().featurefile.write(feature_file)
cls.output_threats(threats)

cls.output_threats(threats)

31 changes: 19 additions & 12 deletions materialize_threats/mx/models/UserObject.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,27 @@ def text_to_mx_value(self):
if i != last_text:
value += "<hr size='1'/>"
return value

@classmethod
def get_trust_zone_from_node_label(cls, label):
# Sometimes we pass in a UserObject instead of a label
if type(label) != str:
label = label.label
if label is not None:
zone = label.lower().split(cls.ZONE_PREFIX)[cls.ZONE_INDEX]
if cls._is_valid_trust_zone(zone):
return zone

try:
zone = label.lower().split(cls.ZONE_PREFIX)[cls.ZONE_INDEX]
zone = zone.replace('</b>', '')
if cls._is_valid_trust_zone(zone):
return zone
except:
print(f'found unparsable object with label {label}, skipping')
return None
return None

def get_trust_zone(self):

zone = self.xml.get(self.LABEL)

if zone is not None:
try:
zone = zone.lower().split(self.ZONE_PREFIX)[self.ZONE_INDEX]
Expand All @@ -63,7 +70,7 @@ def get_object_type(self):
if type in self.TYPES:
return type

@classmethod
@classmethod
def infer_type_from_node(cls, node):

TRUST_ZONE = 'text;html=1;strokeColor=#82b366;fillColor=#d5e8d4;align=center;verticalAlign=middle;whiteSpace=wrap;overflow=hidden;'
Expand All @@ -76,12 +83,12 @@ def infer_type_from_node(cls, node):
'element': ELEMENT,
'process': PROCESS,
'data store': DATA_STORE

}

for text in node.texts:
for key, value in types.items():
if text.text == value:
if text.text == value:
return key

return None
return None

0 comments on commit f52287d

Please sign in to comment.