From cc785f81e4da12959cde58b7bdd173d2daa6b24e Mon Sep 17 00:00:00 2001 From: drosetti Date: Thu, 31 Oct 2024 09:47:08 +0100 Subject: [PATCH] added test for elastic cronjob and a collection to mark the last updated time --- .../0063_singleton_and_elastic_report.py | 39 ++ api_app/models.py | 26 + intel_owl/tasks.py | 119 +++-- tests/intel_owl/__init__.py | 0 tests/intel_owl/test_tasks.py | 475 ++++++++++++++++++ 5 files changed, 619 insertions(+), 40 deletions(-) create mode 100644 api_app/migrations/0063_singleton_and_elastic_report.py create mode 100644 tests/intel_owl/__init__.py create mode 100644 tests/intel_owl/test_tasks.py diff --git a/api_app/migrations/0063_singleton_and_elastic_report.py b/api_app/migrations/0063_singleton_and_elastic_report.py new file mode 100644 index 0000000000..3a46c575e7 --- /dev/null +++ b/api_app/migrations/0063_singleton_and_elastic_report.py @@ -0,0 +1,39 @@ +# Generated by Django 4.2.15 on 2024-10-29 10:48 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("api_app", "0062_alter_parameter_python_module"), + ] + + operations = [ + migrations.CreateModel( + name="LastElasticReportUpdate", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("last_update_datetime", models.DateTimeField()), + ], + options={ + "abstract": False, + }, + ), + migrations.AddConstraint( + model_name="lastelasticreportupdate", + constraint=models.CheckConstraint( + check=models.Q(("pk", 1)), + name="singleton", + violation_error_message="This class is a singleton: only one object is allowed", + ), + ), + ] diff --git a/api_app/models.py b/api_app/models.py index 231203dd9b..d637d79681 100644 --- a/api_app/models.py +++ b/api_app/models.py @@ -1787,3 +1787,29 @@ def generate_health_check_periodic_task(self): )[0] self.health_check_task = periodic_task self.save() + + +class SingletonModel(models.Model): + """Singleton base class. + Singleton is a desing pattern that allow only one istance of a class. + """ + + def save(self, *args, **kwargs): + # check required to delete the singleton instance and create a new one + if type(self).objects.count() == 0: + self.pk = 1 + super().save(*args, **kwargs) + + class Meta: + abstract = True + constraints = [ + models.CheckConstraint( + check=Q(pk=1), + name="singleton", + violation_error_message="This class is a singleton: only one object is allowed", + ), + ] + + +class LastElasticReportUpdate(SingletonModel): + last_update_datetime = models.DateTimeField() diff --git a/intel_owl/tasks.py b/intel_owl/tasks.py index 7e002965c2..34bc703155 100644 --- a/intel_owl/tasks.py +++ b/intel_owl/tasks.py @@ -404,51 +404,90 @@ def send_plugin_report_to_elastic(max_timeout: int = 60, max_objects: int = 1000 from api_app.analyzers_manager.models import AnalyzerReport from api_app.connectors_manager.models import ConnectorReport from api_app.ingestors_manager.models import IngestorReport - from api_app.models import AbstractReport + from api_app.models import AbstractReport, LastElasticReportUpdate from api_app.pivots_manager.models import PivotReport from api_app.visualizers_manager.models import VisualizerReport - def _convert_report_to_elastic_document(_class: AbstractReport) -> List[Dict]: + if settings.ELASTIC_HOST: upper_threshold = now().replace(second=0, microsecond=0) - lower_threshold = upper_threshold - datetime.timedelta(minutes=5) - report_list: list(AbstractReport) = _class.objects.filter( - status__in=ReportStatus.final_statuses(), - end_time__lt=upper_threshold, - end_time__gte=lower_threshold, + try: + last_elastic_report_update = LastElasticReportUpdate.objects.get() + except LastElasticReportUpdate.DoesNotExist: + # first time is missing, update some days of reports + first_run_start_date = upper_threshold - datetime.timedelta(days=30) + logger.warning( + f"not stored last update time, create it from: {first_run_start_date}" + ) + last_elastic_report_update = LastElasticReportUpdate( + last_update_datetime=first_run_start_date + ) + last_elastic_report_update.save() + + lower_threshold = last_elastic_report_update.last_update_datetime + logger.info( + f"add to elastic reports from: {lower_threshold} to {upper_threshold}" ) - return [ - { - "_op_type": "index", - "_index": ( - "plugin-report-" - f"{inflection.underscore(_class.__name__).replace('_', '-')}-" - f"{datetime.date.today()}" - ), - "_source": { - "config": {"name": report.config.name}, - "job": {"id": report.job.id}, - "start_time": report.start_time, - "end_time": report.end_time, - "status": report.status, - "report": report.report, - }, - } - for report in report_list - ] - - # add document - document_list = ( - _convert_report_to_elastic_document(AnalyzerReport) - + _convert_report_to_elastic_document(ConnectorReport) - + _convert_report_to_elastic_document(IngestorReport) - + _convert_report_to_elastic_document(PivotReport) - + _convert_report_to_elastic_document(VisualizerReport) - ) - logger.info(f"documents to add to elastic: {len(document_list)}") - try: - bulk(connections.get_connection(), document_list) - except ApiError as error: - logger.critical(error) + + def _convert_report_to_elastic_document( + _class: AbstractReport, + start_time: datetime.datetime, + end_time: datetime.datetime, + ) -> List[Dict]: + report_list: list(AbstractReport) = _class.objects.filter( + status__in=ReportStatus.final_statuses(), + end_time__gte=start_time, + end_time__lt=end_time, + ) + report_document_list = [ + { + "_op_type": "index", + "_index": ( + "plugin-report-" + f"{inflection.underscore(_class.__name__).replace('_', '-')}-" + f"{now().date()}" + ), + "_source": { + "config": {"name": report.config.name}, + "job": {"id": report.job.id}, + "start_time": report.start_time, + "end_time": report.end_time, + "status": report.status, + "report": report.report, + }, + } + for report in report_list + ] + logger.info( + f"{_class.__name__} has {len(report_document_list)} new documents to upload" + ) + return report_document_list + + # add document + all_report_document_list = ( + _convert_report_to_elastic_document( + AnalyzerReport, lower_threshold, upper_threshold + ) + + _convert_report_to_elastic_document( + ConnectorReport, lower_threshold, upper_threshold + ) + + _convert_report_to_elastic_document( + IngestorReport, lower_threshold, upper_threshold + ) + + _convert_report_to_elastic_document( + PivotReport, lower_threshold, upper_threshold + ) + + _convert_report_to_elastic_document( + VisualizerReport, lower_threshold, upper_threshold + ) + ) + logger.info(f"documents to add to elastic: {len(all_report_document_list)}") + try: + bulk(connections.get_connection(), all_report_document_list) + except ApiError as error: + logger.critical(error) + else: + last_elastic_report_update.last_update_datetime = upper_threshold + last_elastic_report_update.save() @shared_task( diff --git a/tests/intel_owl/__init__.py b/tests/intel_owl/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/intel_owl/test_tasks.py b/tests/intel_owl/test_tasks.py new file mode 100644 index 0000000000..fb9fcd6d7a --- /dev/null +++ b/tests/intel_owl/test_tasks.py @@ -0,0 +1,475 @@ +import datetime +from unittest.mock import patch + +from django.test import override_settings +from kombu import uuid + +from api_app.analyzers_manager.models import AnalyzerConfig, AnalyzerReport +from api_app.choices import PythonModuleBasePaths +from api_app.connectors_manager.models import ConnectorConfig, ConnectorReport +from api_app.ingestors_manager.models import IngestorConfig, IngestorReport +from api_app.models import Job, LastElasticReportUpdate, PythonModule +from api_app.pivots_manager.models import PivotConfig, PivotReport +from api_app.visualizers_manager.models import VisualizerConfig, VisualizerReport +from certego_saas.apps.user.models import User +from intel_owl.tasks import send_plugin_report_to_elastic +from tests import CustomTestCase +from tests.mock_utils import MockResponseNoOp + +_now = datetime.datetime(2024, 10, 29, 11, tzinfo=datetime.UTC) + + +@patch("intel_owl.tasks.now", return_value=_now) +@patch("intel_owl.tasks.connections.get_connection") +class SendElasticTestCase(CustomTestCase): + + def setUp(self): + job = Job.objects.create( + observable_name="dns.google.com", tlp="AMBER", user=User.objects.first() + ) + AnalyzerReport.objects.create( # valid for initial, not for last 5 minutes + config=AnalyzerConfig.objects.get( + python_module=PythonModule.objects.get( + base_path=PythonModuleBasePaths.ObservableAnalyzer.value, + module="dns.dns_malicious_detectors.cloudflare_malicious_detector.CloudFlareMaliciousDetector", + ) + ), + job=job, + start_time=datetime.datetime(2024, 10, 29, 4, 49, tzinfo=datetime.UTC), + end_time=datetime.datetime(2024, 10, 29, 4, 59, tzinfo=datetime.UTC), + status=AnalyzerReport.Status.SUCCESS, + report={"observable": "dns.google.com", "malicious": False}, + task_id=uuid(), + parameters={}, + ) + AnalyzerReport.objects.create( # valid + config=AnalyzerConfig.objects.get( + python_module=PythonModule.objects.get( + base_path=PythonModuleBasePaths.ObservableAnalyzer.value, + module="dns.dns_malicious_detectors.dns0_eu_malicious_detector.DNS0EUMaliciousDetector", + ) + ), + job=job, + start_time=datetime.datetime(2024, 10, 29, 10, 49, tzinfo=datetime.UTC), + end_time=datetime.datetime(2024, 10, 29, 10, 59, tzinfo=datetime.UTC), + status=AnalyzerReport.Status.FAILED, + errors=["error"], + task_id=uuid(), + parameters={}, + ) + AnalyzerReport.objects.create( # valid + config=AnalyzerConfig.objects.get( + python_module=PythonModule.objects.get( + base_path=PythonModuleBasePaths.ObservableAnalyzer.value, + module="dns.dns_malicious_detectors.quad9_malicious_detector.Quad9MaliciousDetector", + ) + ), + job=job, + start_time=datetime.datetime(2024, 10, 29, 10, 49, tzinfo=datetime.UTC), + end_time=datetime.datetime(2024, 10, 29, 10, 59, tzinfo=datetime.UTC), + status=AnalyzerReport.Status.KILLED, + task_id=uuid(), + parameters={}, + ) + AnalyzerReport.objects.create( # too old + config=AnalyzerConfig.objects.get( + python_module=PythonModule.objects.get( + base_path=PythonModuleBasePaths.ObservableAnalyzer.value, + module="dns.dns_resolvers.classic_dns_resolver.ClassicDNSResolver", + ) + ), + job=job, + start_time=datetime.datetime( + 2024, 9, 29, 10, 58, 49, tzinfo=datetime.timezone.utc + ), + end_time=datetime.datetime( + 2024, 9, 29, 10, 58, 59, tzinfo=datetime.timezone.utc + ), + status=AnalyzerReport.Status.SUCCESS, + report={"observable": "dns.google.com", "malicious": False}, + task_id=uuid(), + parameters={}, + ) + AnalyzerReport.objects.create( # invalid status + config=AnalyzerConfig.objects.get( + python_module=PythonModule.objects.get( + base_path=PythonModuleBasePaths.ObservableAnalyzer.value, + module="dns.dns_resolvers.cloudflare_dns_resolver.CloudFlareDNSResolver", + ) + ), + job=job, + status=AnalyzerReport.Status.RUNNING, + start_time=datetime.datetime(2024, 10, 29, 10, 49, tzinfo=datetime.UTC), + end_time=datetime.datetime(2024, 10, 29, 10, 59, tzinfo=datetime.UTC), + task_id=uuid(), + parameters={}, + ) + ConnectorReport.objects.create( + config=ConnectorConfig.objects.get( + python_module=PythonModule.objects.get( + base_path=PythonModuleBasePaths.Connector.value, + module="abuse_submitter.AbuseSubmitter", + ) + ), + job=job, + start_time=datetime.datetime(2024, 10, 29, 10, 49, tzinfo=datetime.UTC), + end_time=datetime.datetime(2024, 10, 29, 10, 59, tzinfo=datetime.UTC), + status=ConnectorReport.Status.SUCCESS, + task_id=uuid(), + report={ + "subject": "Subject", + "from": "sender@gmail.com", + "to": "receiver@gmail.com", + "body": "hello world", + }, + parameters={}, + ) + IngestorReport.objects.create( + config=IngestorConfig.objects.get( + python_module=PythonModule.objects.get( + base_path=PythonModuleBasePaths.Ingestor.value, + module="malware_bazaar.MalwareBazaar", + ) + ), + job=job, + start_time=datetime.datetime(2024, 10, 29, 10, 49, tzinfo=datetime.UTC), + end_time=datetime.datetime(2024, 10, 29, 10, 59, tzinfo=datetime.UTC), + status=IngestorReport.Status.SUCCESS, + task_id=uuid(), + report={}, + parameters={}, + ) + PivotReport.objects.create( + config=PivotConfig.objects.filter( + python_module=PythonModule.objects.get( + base_path=PythonModuleBasePaths.Pivot.value, + module="compare.Compare", + ) + ).first(), + job=job, + start_time=datetime.datetime(2024, 10, 29, 10, 49, tzinfo=datetime.UTC), + end_time=datetime.datetime(2024, 10, 29, 10, 59, tzinfo=datetime.UTC), + status=PivotReport.Status.SUCCESS, + task_id=uuid(), + report={"job_id": [1], "created": True, "motivation": None}, + parameters={}, + ) + VisualizerReport.objects.create( + config=VisualizerConfig.objects.get( + python_module=PythonModule.objects.get( + base_path=PythonModuleBasePaths.Visualizer.value, + module="dns.DNS", + ) + ), + job=job, + start_time=datetime.datetime(2024, 10, 29, 10, 49, tzinfo=datetime.UTC), + end_time=datetime.datetime(2024, 10, 29, 10, 59, tzinfo=datetime.UTC), + status=VisualizerReport.Status.SUCCESS, + task_id=uuid(), + report={ + "level_position": 1, + "level_size": "3", + "elements": { + "type": "horizontal_list", + "alignment": "around", + "values": [], + }, + }, + parameters={}, + ) + + def tearDown(self): + AnalyzerReport.objects.all().delete() + ConnectorReport.objects.all().delete() + IngestorReport.objects.all().delete() + PivotReport.objects.all().delete() + VisualizerReport.objects.all().delete() + LastElasticReportUpdate.objects.all().delete() + + @override_settings(ELASTIC_HOST="https://elasticsearch:9200") + def test_initial(self, *args, **kwargs): + with patch( + "intel_owl.tasks.bulk", + return_value=MockResponseNoOp(json_data={}, status_code=200), + ) as mocked_elastic_bulk: + send_plugin_report_to_elastic() + self.assertTrue(mocked_elastic_bulk.assert_called_once) + mocked_bulk_param = mocked_elastic_bulk.call_args.args[1] + self.assertEqual( + mocked_bulk_param, + [ + { + "_op_type": "index", + "_index": "plugin-report-analyzer-report-2024-10-29", + "_source": { + "config": {"name": "CloudFlare_Malicious_Detector"}, + "job": {"id": 1}, + "start_time": datetime.datetime( + 2024, 10, 29, 4, 49, tzinfo=datetime.timezone.utc + ), + "end_time": datetime.datetime( + 2024, 10, 29, 4, 59, tzinfo=datetime.timezone.utc + ), + "status": "SUCCESS", + "report": { + "malicious": False, + "observable": "dns.google.com", + }, + }, + }, + { + "_op_type": "index", + "_index": "plugin-report-analyzer-report-2024-10-29", + "_source": { + "config": {"name": "DNS0_EU_Malicious_Detector"}, + "job": {"id": 1}, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "status": "FAILED", + "report": {}, + }, + }, + { + "_op_type": "index", + "_index": "plugin-report-analyzer-report-2024-10-29", + "_source": { + "config": {"name": "Quad9_Malicious_Detector"}, + "job": {"id": 1}, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "status": "KILLED", + "report": {}, + }, + }, + { + "_op_type": "index", + "_index": "plugin-report-connector-report-2024-10-29", + "_source": { + "config": {"name": "AbuseSubmitter"}, + "job": {"id": 1}, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "status": "SUCCESS", + "report": { + "to": "receiver@gmail.com", + "body": "hello world", + "from": "sender@gmail.com", + "subject": "Subject", + }, + }, + }, + { + "_op_type": "index", + "_index": "plugin-report-ingestor-report-2024-10-29", + "_source": { + "config": {"name": "MalwareBazaar"}, + "job": {"id": 1}, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "status": "SUCCESS", + "report": {}, + }, + }, + { + "_op_type": "index", + "_index": "plugin-report-pivot-report-2024-10-29", + "_source": { + "config": {"name": "AbuseIpToSubmission"}, + "job": {"id": 1}, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "status": "SUCCESS", + "report": { + "job_id": [1], + "created": True, + "motivation": None, + }, + }, + }, + { + "_op_type": "index", + "_index": "plugin-report-visualizer-report-2024-10-29", + "_source": { + "config": {"name": "DNS"}, + "job": {"id": 1}, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "status": "SUCCESS", + "report": { + "elements": { + "type": "horizontal_list", + "values": [], + "alignment": "around", + }, + "level_size": "3", + "level_position": 1, + }, + }, + }, + ], + ) + + self.assertEqual( + LastElasticReportUpdate.objects.get().last_update_datetime, + datetime.datetime(2024, 10, 29, 11, tzinfo=datetime.UTC), + ) + + @override_settings(ELASTIC_HOST="https://elasticsearch:9200") + def test_update(self, *args, **kwargs): + LastElasticReportUpdate.objects.create( + last_update_datetime=_now - datetime.timedelta(minutes=5) + ) + with patch( + "intel_owl.tasks.bulk", + return_value=MockResponseNoOp(json_data={}, status_code=200), + ) as mocked_elastic_bulk: + send_plugin_report_to_elastic() + self.assertTrue(mocked_elastic_bulk.assert_called_once) + mocked_bulk_param = mocked_elastic_bulk.call_args.args[1] + self.assertEqual( + mocked_bulk_param, + [ + { + "_index": "plugin-report-analyzer-report-2024-10-29", + "_op_type": "index", + "_source": { + "config": {"name": "DNS0_EU_Malicious_Detector"}, + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "job": {"id": 2}, + "report": {}, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "status": "FAILED", + }, + }, + { + "_index": "plugin-report-analyzer-report-2024-10-29", + "_op_type": "index", + "_source": { + "config": {"name": "Quad9_Malicious_Detector"}, + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "job": {"id": 2}, + "report": {}, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "status": "KILLED", + }, + }, + { + "_index": "plugin-report-connector-report-2024-10-29", + "_op_type": "index", + "_source": { + "config": {"name": "AbuseSubmitter"}, + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "job": {"id": 2}, + "report": { + "body": "hello world", + "from": "sender@gmail.com", + "subject": "Subject", + "to": "receiver@gmail.com", + }, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "status": "SUCCESS", + }, + }, + { + "_index": "plugin-report-ingestor-report-2024-10-29", + "_op_type": "index", + "_source": { + "config": {"name": "MalwareBazaar"}, + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "job": {"id": 2}, + "report": {}, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "status": "SUCCESS", + }, + }, + { + "_index": "plugin-report-pivot-report-2024-10-29", + "_op_type": "index", + "_source": { + "config": {"name": "AbuseIpToSubmission"}, + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "job": {"id": 2}, + "report": { + "created": True, + "job_id": [1], + "motivation": None, + }, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "status": "SUCCESS", + }, + }, + { + "_index": "plugin-report-visualizer-report-2024-10-29", + "_op_type": "index", + "_source": { + "config": {"name": "DNS"}, + "end_time": datetime.datetime( + 2024, 10, 29, 10, 59, tzinfo=datetime.timezone.utc + ), + "job": {"id": 2}, + "report": { + "elements": { + "alignment": "around", + "type": "horizontal_list", + "values": [], + }, + "level_position": 1, + "level_size": "3", + }, + "start_time": datetime.datetime( + 2024, 10, 29, 10, 49, tzinfo=datetime.timezone.utc + ), + "status": "SUCCESS", + }, + }, + ], + ) + + self.assertEqual( + LastElasticReportUpdate.objects.get().last_update_datetime, + datetime.datetime(2024, 10, 29, 11, tzinfo=datetime.UTC), + )