Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:MISP/misp-stix
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisr3d committed Jan 4, 2024
2 parents 07caf0b + 43a5989 commit 941f1ca
Show file tree
Hide file tree
Showing 29 changed files with 2,897 additions and 4,021 deletions.
99 changes: 47 additions & 52 deletions misp_stix_converter/misp2stix/misp_to_stix2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import Generator, Optional, Tuple, Union

_label_fields = ('type', 'category', 'to_ids')
_labelled_object_types = ('malware', 'threat-actor', 'tool')
_misp_time_fields = ('first_seen', 'last_seen')
_object_attributes_additional_fields = ('category', 'comment', 'to_ids', 'uuid')
_object_attributes_fields = ('type', 'object_relation', 'value')
Expand Down Expand Up @@ -2367,10 +2368,6 @@ def _handle_galaxy_matching(self, object_type: str, stix_object: dict):
)
)

def _handle_meta_mapping(self, object_type: str, key: str) -> Union[str, None]:
if hasattr(self._mapping, f'{object_type}_meta_mapping'):
return getattr(self._mapping, f'{object_type}_meta_mapping')(key)

def _handle_object_refs(self, object_refs: list):
for object_ref in object_refs:
if object_ref not in self.__object_refs:
Expand Down Expand Up @@ -2443,10 +2440,6 @@ def _parse_attack_pattern_galaxy(self, galaxy: Union[MISPGalaxy, dict],
attack_pattern_args = self._create_galaxy_args(
cluster, galaxy['name'], attack_pattern_id, timestamp
)
if cluster.get('meta'):
attack_pattern_args.update(
self._parse_meta_fields(cluster['meta'], 'attack_pattern')
)
self._append_SDO_without_refs(
self._create_attack_pattern(attack_pattern_args)
)
Expand Down Expand Up @@ -2479,10 +2472,6 @@ def _parse_course_of_action_galaxy(self, galaxy: Union[MISPGalaxy, dict],
course_of_action_args = self._create_galaxy_args(
cluster, galaxy['name'], course_of_action_id, timestamp
)
if cluster.get('meta'):
course_of_action_args.update(
self._parse_meta_fields(cluster['meta'], 'course_of_action')
)
course_of_action = self._create_course_of_action(course_of_action_args)
self._append_SDO_without_refs(course_of_action)
object_refs.append(course_of_action_id)
Expand All @@ -2499,7 +2488,7 @@ def _parse_external_id(self, external_id: str) -> dict:
'external_id': external_id
}

def _parse_external_reference(
def _parse_external_references(
self, meta_args: dict, values: Union[list, str],
feature: Optional[str] = '_parse_external_id'):
if isinstance(values, list):
Expand Down Expand Up @@ -2536,10 +2525,6 @@ def _parse_intrusion_set_galaxy(self, galaxy: Union[MISPGalaxy, dict],
intrusion_set_args = self._create_galaxy_args(
cluster, galaxy['name'], intrusion_set_id, timestamp
)
if cluster.get('meta'):
intrusion_set_args.update(
self._parse_meta_fields(cluster['meta'], 'intrusion_set')
)
intrusion_set = self._create_intrusion_set(intrusion_set_args)
self._append_SDO_without_refs(intrusion_set)
object_refs.append(intrusion_set_id)
Expand Down Expand Up @@ -2577,11 +2562,6 @@ def _parse_malware_galaxy(self, galaxy: Union[MISPGalaxy, dict],
malware_args = self._create_galaxy_args(
cluster, galaxy['name'], malware_id, timestamp
)
if cluster.get('meta'):
meta_args = self._parse_meta_fields(cluster['meta'], 'malware')
if 'labels' in meta_args:
malware_args['labels'].extend(meta_args.pop('labels'))
malware_args.update(meta_args)
malware = self._create_malware(malware_args)
self._append_SDO_without_refs(malware)
object_refs.append(malware_id)
Expand All @@ -2596,32 +2576,54 @@ def _parse_malware_types(self, meta_args: dict, values: Union[list, str]):
feature = 'malware_types' if self._version == '2.1' else 'labels'
meta_args[feature] = values if isinstance(values, list) else [values]

def _parse_meta_fields(self, cluster_meta: dict, object_type: str) -> dict:
def _parse_meta_custom_fields(self, cluster_meta: dict) -> dict:
meta_args = defaultdict(list)
for key, values in cluster_meta.items():
feature = self._mapping.external_references_fields(key)
if feature is not None:
self._parse_external_reference(meta_args, values, feature)
continue
feature = self._handle_meta_mapping(object_type, key)
self._parse_external_references(meta_args, values, feature)
else:
meta_args[f"x_misp_{self._sanitise_meta_field(key)}"] = values
if any(key.startswith('x_misp_') for key in meta_args.keys()):
meta_args['allow_custom'] = True
return meta_args

def _parse_meta_fields(
self, cluster_meta: dict, object_type: str, value: str) -> dict:
meta_args = defaultdict(list)
field = f"{object_type.replace('-', '_')}_meta_mapping"
for key, values in cluster_meta.items():
feature = self._mapping.external_references_fields(key)
if feature is not None:
getattr(self, feature)(
self._parse_external_references(meta_args, values, feature)
continue
to_call = getattr(self._mapping, field)(key)
if to_call is not None:
args = [
meta_args, values if isinstance(values, list) else [values]
)
]
if 'synonyms' in to_call:
args.append(value)
getattr(self, to_call)(*args)
else:
feature = self._sanitise_meta_field(key)
meta_args[f"x_misp_{feature}"] = values
meta_args[f"x_misp_{self._sanitise_meta_field(key)}"] = values
if any(key.startswith('x_misp_') for key in meta_args.keys()):
meta_args['allow_custom'] = True
return meta_args

def _parse_synonyms_21_meta_field(self, meta_args: dict, values: list):
feature = 'aliases' if self._version == '2.1' else 'x_misp_synonyms'
meta_args[feature] = values
def _parse_synonyms_21_meta_field(
self, meta_args: dict, values: list, cluster_value: str):
aliases = [value for value in values if value != cluster_value]
if aliases:
feature = 'aliases' if self._version == '2.1' else 'x_misp_synonyms'
meta_args[feature] = aliases

@staticmethod
def _parse_synonyms_meta_field(meta_args: dict, values: list):
meta_args['aliases'] = values
def _parse_synonyms_meta_field(
meta_args: dict, values: list, cluster_value: str):
aliases = [value for value in values if value != cluster_value]
if aliases:
meta_args['aliases'] = aliases

def _parse_sector_galaxy(self, galaxy: Union[MISPGalaxy, dict],
timestamp: Union[datetime, None]) -> list:
Expand Down Expand Up @@ -2698,13 +2700,6 @@ def _parse_threat_actor_galaxy(self, galaxy: Union[MISPGalaxy, dict],
threat_actor_args = self._create_galaxy_args(
cluster, galaxy['name'], threat_actor_id, timestamp
)
if cluster.get('meta'):
meta_args = self._parse_meta_fields(
cluster['meta'], 'threat_actor'
)
if 'labels' in meta_args:
threat_actor_args['labels'].extend(meta_args.pop('labels'))
threat_actor_args.update(meta_args)
threat_actor = self._create_threat_actor(threat_actor_args)
self._append_SDO_without_refs(threat_actor)
object_refs.append(threat_actor_id)
Expand Down Expand Up @@ -2739,11 +2734,6 @@ def _parse_tool_galaxy(self, galaxy: Union[MISPGalaxy, dict],
tool_args = self._create_galaxy_args(
cluster, galaxy['name'], tool_id, timestamp
)
if cluster.get('meta'):
meta_args = self._parse_meta_fields(cluster['meta'], 'tool')
if 'labels' in meta_args:
tool_args['labels'].extend(meta_args.pop('labels'))
tool_args.update(meta_args)
tool = self._create_tool(tool_args)
self._append_SDO_without_refs(tool)
object_refs.append(tool_id)
Expand Down Expand Up @@ -2795,10 +2785,6 @@ def _parse_vulnerability_galaxy(self, galaxy: Union[MISPGalaxy, dict],
vulnerability_args = self._create_galaxy_args(
cluster, galaxy['name'], vulnerability_id, timestamp
)
if cluster.get('meta'):
vulnerability_args.update(
self._parse_meta_fields(cluster['meta'], 'vulnerability')
)
vulnerability = self._create_vulnerability(vulnerability_args)
self._append_SDO_without_refs(vulnerability)
object_refs.append(vulnerability_id)
Expand Down Expand Up @@ -2856,7 +2842,6 @@ def _create_galaxy_args(
self, cluster: Union[MISPGalaxyCluster, dict], name: str,
object_id: str, timestamp: Optional[datetime] = None) -> dict:
object_type = object_id.split('--')[0]

value = cluster['value']
if cluster['type'].startswith('mitre-') and ' - ' in value:
value = value.split(' - ')[0].strip()
Expand All @@ -2879,6 +2864,16 @@ def _create_galaxy_args(
'modified': timestamp
}
)
if cluster.get('meta'):
feature = f"{object_type.replace('-', '_')}_meta_mapping"
meta_args = (
self._parse_meta_fields(cluster['meta'], object_type, value)
if hasattr(self._mapping, feature) else
self._parse_meta_custom_fields(cluster['meta'])
)
if object_type in _labelled_object_types and 'labels' in meta_args:
galaxy_args['labels'].extend(meta_args.pop('labels'))
galaxy_args.update(meta_args)
return galaxy_args

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions misp_stix_converter/misp2stix/misp_to_stix21.py
Original file line number Diff line number Diff line change
Expand Up @@ -1485,7 +1485,7 @@ def _parse_country_galaxy(self, galaxy: Union[MISPGalaxy, dict],
}
location_args['country'] = cluster['meta']['ISO']
location_args.update(
self._parse_meta_fields(cluster['meta'], 'location')
self._parse_meta_custom_fields(cluster['meta'])
)
if timestamp is None:
if not cluster.get('timestamp'):
Expand Down Expand Up @@ -1540,7 +1540,7 @@ def _parse_region_galaxy(self, galaxy: Union[MISPGalaxy, dict],
cluster, galaxy['description'], galaxy['name'], timestamp
)
location_args.update(
self._parse_meta_fields(cluster['meta'], 'location')
self._parse_meta_custom_fields(cluster['meta'])
)
location = self._create_location(location_args)
self._append_SDO_without_refs(location)
Expand Down
4 changes: 2 additions & 2 deletions misp_stix_converter/misp2stix/stix2_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class MISPtoSTIX2Mapping(MISPtoSTIXMapping):
)
__vulnerability_meta_mapping = Mapping(
**{
'aliases': '_parse_external_reference'
'aliases': '_parse_external_references'
}
)

Expand Down Expand Up @@ -1304,4 +1304,4 @@ def x509_object_mapping(cls, field: str) -> Union[dict, None]:

@classmethod
def x509_single_fields(cls) -> tuple:
return cls.__x509_single_fields
return cls.__x509_single_fields
2 changes: 2 additions & 0 deletions misp_stix_converter/stix2misp/converters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
ExternalSTIX2MalwareConverter, InternalSTIX2MalwareConverter)
from .stix2_note_converter import STIX2NoteConverter # noqa
from .stix2_observable_objects_converter import STIX2ObservableObjectConverter # noqa
from .stix2_observed_data_converter import ( # noqa
InternalSTIX2ObservedDataConverter)
from .stix2_threat_actor_converter import( # noqa
ExternalSTIX2ThreatActorConverter, InternalSTIX2ThreatActorConverter)
from .stix2_tool_converter import ( # noqa
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

from ... import Mapping
from ..exceptions import UnknownParsingFunctionError
from .stix2converter import (
ExternalSTIX2Converter, InternalSTIX2Converter, STIX2Converter,
_MAIN_PARSER_TYPING)
from .stix2converter import ExternalSTIX2Converter, InternalSTIX2Converter
from .stix2mapping import (
ExternalSTIX2Mapping, InternalSTIX2Mapping, STIX2Mapping)
from abc import ABCMeta
Expand Down Expand Up @@ -38,15 +36,24 @@ def attack_pattern_meta_mapping(cls) -> dict:
return cls.__attack_pattern_meta_mapping


class STIX2AttackPatternConverter(STIX2Converter, metaclass=ABCMeta):
def __init__(self, main: _MAIN_PARSER_TYPING):
class ExternalSTIX2AttackPatternMapping(
STIX2AttackPatternMapping, ExternalSTIX2Mapping):
pass


class ExternalSTIX2AttackPatternConverter(ExternalSTIX2Converter):
def __init__(self, main: 'ExternalSTIX2toMISPParser'):
self._set_main_parser(main)
self._mapping = ExternalSTIX2AttackPatternMapping

def parse(self, attack_pattern_ref: str):
attack_pattern = self.main_parser._get_stix_object(attack_pattern_ref)
self._parse_galaxy(attack_pattern)

def _create_cluster(self, attack_pattern: _ATTACK_PATTERN_TYPING,
description: Optional[str] = None,
galaxy_type: Optional[str] = None) -> MISPGalaxyCluster:
attack_pattern_args = self._create_cluster_args(
attack_pattern, galaxy_type, description=description
attack_pattern, galaxy_type
)
meta = self._handle_meta_fields(attack_pattern)
if hasattr(attack_pattern, 'external_references'):
Expand All @@ -64,22 +71,6 @@ def _create_cluster(self, attack_pattern: _ATTACK_PATTERN_TYPING,
return self._create_misp_galaxy_cluster(attack_pattern_args)


class ExternalSTIX2AttackPatternMapping(
STIX2AttackPatternMapping, ExternalSTIX2Mapping):
pass


class ExternalSTIX2AttackPatternConverter(
STIX2AttackPatternConverter, ExternalSTIX2Converter):
def __init__(self, main: 'ExternalSTIX2toMISPParser'):
super().__init__(main)
self._mapping = ExternalSTIX2AttackPatternMapping

def parse(self, attack_pattern_ref: str):
attack_pattern = self.main_parser._get_stix_object(attack_pattern_ref)
self._parse_galaxy(attack_pattern)


class InternalSTIX2AttackPatternMapping(
STIX2AttackPatternMapping, InternalSTIX2Mapping):
__attack_pattern_id_attribute = Mapping(
Expand All @@ -106,10 +97,9 @@ def attack_pattern_object_mapping(cls) -> dict:
return cls.__attack_pattern_object_mapping


class InternalSTIX2AttackPatternConverter(
STIX2AttackPatternConverter, InternalSTIX2Converter):
class InternalSTIX2AttackPatternConverter(InternalSTIX2Converter):
def __init__(self, main: 'InternalSTIX2toMISPParser'):
super().__init__(main)
self._set_main_parser(main)
self._mapping = InternalSTIX2AttackPatternMapping

def parse(self, attack_pattern_ref: str):
Expand All @@ -126,6 +116,29 @@ def parse(self, attack_pattern_ref: str):
except Exception as exception:
self.main_parser._attack_pattern_error(attack_pattern.id, exception)

def _create_cluster(self, attack_pattern: _ATTACK_PATTERN_TYPING,
description: Optional[str] = None,
galaxy_type: Optional[str] = None) -> MISPGalaxyCluster:
attack_pattern_args = self._create_cluster_args(
attack_pattern, galaxy_type, description=description
)
meta = self._handle_meta_fields(attack_pattern)
if hasattr(attack_pattern, 'external_references'):
meta.update(
self._handle_external_references(
attack_pattern.external_references
)
)
if meta.get('external_id'):
self._handle_cluster_value(attack_pattern_args, meta['external_id'])
if hasattr(attack_pattern, 'kill_chain_phases'):
meta['kill_chain'] = self._handle_kill_chain_phases(
attack_pattern.kill_chain_phases
)
if meta:
attack_pattern_args['meta'] = meta
return self._create_misp_galaxy_cluster(attack_pattern_args)

def _parse_attack_pattern_object(
self, attack_pattern: _ATTACK_PATTERN_TYPING):
misp_object = self._create_misp_object('attack-pattern', attack_pattern)
Expand Down
Loading

0 comments on commit 941f1ca

Please sign in to comment.