diff --git a/tests/test_cortex_endpoint.py b/tests/test_cortex_endpoint.py new file mode 100644 index 0000000..e3d0028 --- /dev/null +++ b/tests/test_cortex_endpoint.py @@ -0,0 +1,19 @@ +from thehive4py.client import TheHiveApi +from thehive4py.types.case import OutputCase + + +class TestCortexEndpoint: + def test_list_analyzers(self, thehive: TheHiveApi): + analyzers = thehive.cortex.list_analyzers() + assert analyzers == [] + + def test_list_analyzers_by_type(self, thehive: TheHiveApi): + data_type = "mail" + analyzers = thehive.cortex.list_analyzers_by_type(data_type=data_type) + assert analyzers == [] + + def test_list_responders(self, thehive: TheHiveApi, test_case: OutputCase): + responders = thehive.cortex.list_responders( + entity_type="case", entity_id=test_case["_id"] + ) + assert responders == [] diff --git a/thehive4py/endpoints/cortex.py b/thehive4py/endpoints/cortex.py index 8bf8a63..bda9755 100644 --- a/thehive4py/endpoints/cortex.py +++ b/thehive4py/endpoints/cortex.py @@ -1,29 +1,52 @@ from thehive4py.endpoints._base import EndpointBase +from thehive4py.types.cortex import ( + OutputAnalyzer, + OutputAnalyzerJob, + OutputResponder, + OutputResponderAction, + InputResponderAction, + InputAnalyzerJob, +) +from typing import Optional, List class CortexEndpoint(EndpointBase): - def create_analyzer_job( - self, cortex_id: str, analyzer_id: str, observable_id: str - ) -> dict: + def create_analyzer_job(self, job: InputAnalyzerJob) -> OutputAnalyzerJob: return self._session.make_request( - "POST", - path="/api/connector/cortex/job", - json={ - "analyzerId": analyzer_id, - "cortexId": cortex_id, - "artifactId": observable_id, - }, + "POST", path="/api/connector/cortex/job", json=job ) def create_responder_action( - self, object_id: str, object_type: str, responder_id: str - ) -> dict: + self, action: InputResponderAction + ) -> OutputResponderAction: return self._session.make_request( - "POST", - path="/api/connector/cortex/action", - json={ - "objectId": object_id, - "objectType": object_type, - "responderId": responder_id, - }, + "POST", path="/api/connector/cortex/action", json=action + ) + + def list_analyzers(self, range: Optional[str] = None) -> List[OutputAnalyzer]: + params = {"range": range} + return self._session.make_request( + "GET", path="/api/connector/cortex/analyzer", params=params + ) + + def list_analyzers_by_type(self, data_type: str) -> List[OutputAnalyzer]: + return self._session.make_request( + "GET", path=f"/api/connector/cortex/analyzer/type/{data_type}" + ) + + def get_analyzer(self, analyzer_id: str) -> OutputAnalyzer: + return self._session.make_request( + "GET", path=f"/api/connector/cortex/analyzer/{analyzer_id}" + ) + + def get_analyzer_job(self, job_id: str) -> OutputAnalyzerJob: + return self._session.make_request( + "GET", path=f"/api/connector/cortex/job/{job_id}" + ) + + def list_responders( + self, entity_type: str, entity_id: str + ) -> List[OutputResponder]: + return self._session.make_request( + "GET", f"/api/connector/cortex/responder/{entity_type}/{entity_id}" ) diff --git a/thehive4py/types/cortex.py b/thehive4py/types/cortex.py new file mode 100644 index 0000000..7ed7aa1 --- /dev/null +++ b/thehive4py/types/cortex.py @@ -0,0 +1,93 @@ +from typing import Any, TypedDict, List, Dict + + +class OutputAnalyzerRequired(TypedDict): + id: str + name: str + version: str + description: str + + +class OutputAnalyzer(OutputAnalyzerRequired, total=False): + dataTypeList: List[str] + cortexIds: List[str] + + +class OutputResponderRequired(TypedDict): + id: str + name: str + version: str + description: str + + +class OutputResponder(OutputResponderRequired, total=False): + dataTypeList: List[str] + cortexIds: List[str] + + +class OutputAnalyzerJobRequired(TypedDict): + _id: str + _type: str + _createdBy: str + _createdAt: str + analyzerId: str + analyzerName: str + analyzerDefinition: str + status: str + startDate: str + cortexId: str + cortexJobId: str + id: str + operations: str + + +class OutputAnalyzerJob(TypedDict, total=False): + _updatedBy: str + _updatedAt: str + endDate: str + report: Dict[str, Any] + case_artifact: Dict[str, Any] + + +class OutputResponderActionRequired(TypedDict): + _id: str + _type: str + _createdBy: str + _createdAt: str + responderId: str + status: str + startDate: str + cortexId: str + cortexJobId: str + id: str + operations: str + + +class OutputResponderAction(OutputResponderActionRequired, total=False): + _updatedBy: str + _updatedAt: str + endDate: str + report: Dict[str, Any] + responderName: str + responderDefinition: str + + +class InputResponderActionRequired(TypedDict): + objectId: str + objectType: str + responderId: str + + +class InputResponderAction(InputResponderActionRequired, total=False): + parameters: Dict[str, Any] + tlp: int + + +class InputAnalyzerJobRequired(TypedDict): + analyzerId: str + cortexId: str + observableId: str + + +class InputAnalyzerJob(InputAnalyzerJobRequired, total=False): + parameters: Dict[str, Any]