-
Notifications
You must be signed in to change notification settings - Fork 145
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #371 from Anko59/369-add-more-cortex-queries
Add more cortex queries
- Loading branch information
Showing
3 changed files
with
154 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from thehive4py.client import TheHiveApi | ||
from thehive4py.types.case import OutputCase | ||
|
||
|
||
class TestCortexEndpoint: | ||
def test_list_analyzers(self, thehive: TheHiveApi): | ||
analyzers = thehive.cortex.list_analyzers() | ||
assert analyzers == [] | ||
|
||
def test_list_analyzers_by_type(self, thehive: TheHiveApi): | ||
data_type = "mail" | ||
analyzers = thehive.cortex.list_analyzers_by_type(data_type=data_type) | ||
assert analyzers == [] | ||
|
||
def test_list_responders(self, thehive: TheHiveApi, test_case: OutputCase): | ||
responders = thehive.cortex.list_responders( | ||
entity_type="case", entity_id=test_case["_id"] | ||
) | ||
assert responders == [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,29 +1,52 @@ | ||
from thehive4py.endpoints._base import EndpointBase | ||
from thehive4py.types.cortex import ( | ||
OutputAnalyzer, | ||
OutputAnalyzerJob, | ||
OutputResponder, | ||
OutputResponderAction, | ||
InputResponderAction, | ||
InputAnalyzerJob, | ||
) | ||
from typing import Optional, List | ||
|
||
|
||
class CortexEndpoint(EndpointBase): | ||
def create_analyzer_job( | ||
self, cortex_id: str, analyzer_id: str, observable_id: str | ||
) -> dict: | ||
def create_analyzer_job(self, job: InputAnalyzerJob) -> OutputAnalyzerJob: | ||
return self._session.make_request( | ||
"POST", | ||
path="/api/connector/cortex/job", | ||
json={ | ||
"analyzerId": analyzer_id, | ||
"cortexId": cortex_id, | ||
"artifactId": observable_id, | ||
}, | ||
"POST", path="/api/connector/cortex/job", json=job | ||
) | ||
|
||
def create_responder_action( | ||
self, object_id: str, object_type: str, responder_id: str | ||
) -> dict: | ||
self, action: InputResponderAction | ||
) -> OutputResponderAction: | ||
return self._session.make_request( | ||
"POST", | ||
path="/api/connector/cortex/action", | ||
json={ | ||
"objectId": object_id, | ||
"objectType": object_type, | ||
"responderId": responder_id, | ||
}, | ||
"POST", path="/api/connector/cortex/action", json=action | ||
) | ||
|
||
def list_analyzers(self, range: Optional[str] = None) -> List[OutputAnalyzer]: | ||
params = {"range": range} | ||
return self._session.make_request( | ||
"GET", path="/api/connector/cortex/analyzer", params=params | ||
) | ||
|
||
def list_analyzers_by_type(self, data_type: str) -> List[OutputAnalyzer]: | ||
return self._session.make_request( | ||
"GET", path=f"/api/connector/cortex/analyzer/type/{data_type}" | ||
) | ||
|
||
def get_analyzer(self, analyzer_id: str) -> OutputAnalyzer: | ||
return self._session.make_request( | ||
"GET", path=f"/api/connector/cortex/analyzer/{analyzer_id}" | ||
) | ||
|
||
def get_analyzer_job(self, job_id: str) -> OutputAnalyzerJob: | ||
return self._session.make_request( | ||
"GET", path=f"/api/connector/cortex/job/{job_id}" | ||
) | ||
|
||
def list_responders( | ||
self, entity_type: str, entity_id: str | ||
) -> List[OutputResponder]: | ||
return self._session.make_request( | ||
"GET", f"/api/connector/cortex/responder/{entity_type}/{entity_id}" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
from typing import Any, TypedDict, List, Dict | ||
|
||
|
||
class OutputAnalyzerRequired(TypedDict): | ||
id: str | ||
name: str | ||
version: str | ||
description: str | ||
|
||
|
||
class OutputAnalyzer(OutputAnalyzerRequired, total=False): | ||
dataTypeList: List[str] | ||
cortexIds: List[str] | ||
|
||
|
||
class OutputResponderRequired(TypedDict): | ||
id: str | ||
name: str | ||
version: str | ||
description: str | ||
|
||
|
||
class OutputResponder(OutputResponderRequired, total=False): | ||
dataTypeList: List[str] | ||
cortexIds: List[str] | ||
|
||
|
||
class OutputAnalyzerJobRequired(TypedDict): | ||
_id: str | ||
_type: str | ||
_createdBy: str | ||
_createdAt: str | ||
analyzerId: str | ||
analyzerName: str | ||
analyzerDefinition: str | ||
status: str | ||
startDate: str | ||
cortexId: str | ||
cortexJobId: str | ||
id: str | ||
operations: str | ||
|
||
|
||
class OutputAnalyzerJob(TypedDict, total=False): | ||
_updatedBy: str | ||
_updatedAt: str | ||
endDate: str | ||
report: Dict[str, Any] | ||
case_artifact: Dict[str, Any] | ||
|
||
|
||
class OutputResponderActionRequired(TypedDict): | ||
_id: str | ||
_type: str | ||
_createdBy: str | ||
_createdAt: str | ||
responderId: str | ||
status: str | ||
startDate: str | ||
cortexId: str | ||
cortexJobId: str | ||
id: str | ||
operations: str | ||
|
||
|
||
class OutputResponderAction(OutputResponderActionRequired, total=False): | ||
_updatedBy: str | ||
_updatedAt: str | ||
endDate: str | ||
report: Dict[str, Any] | ||
responderName: str | ||
responderDefinition: str | ||
|
||
|
||
class InputResponderActionRequired(TypedDict): | ||
objectId: str | ||
objectType: str | ||
responderId: str | ||
|
||
|
||
class InputResponderAction(InputResponderActionRequired, total=False): | ||
parameters: Dict[str, Any] | ||
tlp: int | ||
|
||
|
||
class InputAnalyzerJobRequired(TypedDict): | ||
analyzerId: str | ||
cortexId: str | ||
observableId: str | ||
|
||
|
||
class InputAnalyzerJob(InputAnalyzerJobRequired, total=False): | ||
parameters: Dict[str, Any] |