Skip to content

Commit

Permalink
added test for elastic cronjob and a collection to mark the last upda…
Browse files Browse the repository at this point in the history
…ted time
  • Loading branch information
drosetti committed Oct 31, 2024
1 parent 3dd86cb commit cc785f8
Show file tree
Hide file tree
Showing 5 changed files with 619 additions and 40 deletions.
39 changes: 39 additions & 0 deletions api_app/migrations/0063_singleton_and_elastic_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Generated by Django 4.2.15 on 2024-10-29 10:48

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("api_app", "0062_alter_parameter_python_module"),
]

operations = [
migrations.CreateModel(
name="LastElasticReportUpdate",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("last_update_datetime", models.DateTimeField()),
],
options={
"abstract": False,
},
),
migrations.AddConstraint(
model_name="lastelasticreportupdate",
constraint=models.CheckConstraint(
check=models.Q(("pk", 1)),
name="singleton",
violation_error_message="This class is a singleton: only one object is allowed",
),
),
]
26 changes: 26 additions & 0 deletions api_app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1787,3 +1787,29 @@ def generate_health_check_periodic_task(self):
)[0]
self.health_check_task = periodic_task
self.save()


class SingletonModel(models.Model):
"""Singleton base class.
Singleton is a desing pattern that allow only one istance of a class.
"""

def save(self, *args, **kwargs):
# check required to delete the singleton instance and create a new one
if type(self).objects.count() == 0:
self.pk = 1
super().save(*args, **kwargs)

class Meta:
abstract = True
constraints = [
models.CheckConstraint(
check=Q(pk=1),
name="singleton",
violation_error_message="This class is a singleton: only one object is allowed",
),
]


class LastElasticReportUpdate(SingletonModel):
last_update_datetime = models.DateTimeField()
119 changes: 79 additions & 40 deletions intel_owl/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,51 +404,90 @@ def send_plugin_report_to_elastic(max_timeout: int = 60, max_objects: int = 1000
from api_app.analyzers_manager.models import AnalyzerReport
from api_app.connectors_manager.models import ConnectorReport
from api_app.ingestors_manager.models import IngestorReport
from api_app.models import AbstractReport
from api_app.models import AbstractReport, LastElasticReportUpdate
from api_app.pivots_manager.models import PivotReport
from api_app.visualizers_manager.models import VisualizerReport

def _convert_report_to_elastic_document(_class: AbstractReport) -> List[Dict]:
if settings.ELASTIC_HOST:
upper_threshold = now().replace(second=0, microsecond=0)
lower_threshold = upper_threshold - datetime.timedelta(minutes=5)
report_list: list(AbstractReport) = _class.objects.filter(
status__in=ReportStatus.final_statuses(),
end_time__lt=upper_threshold,
end_time__gte=lower_threshold,
try:
last_elastic_report_update = LastElasticReportUpdate.objects.get()
except LastElasticReportUpdate.DoesNotExist:
# first time is missing, update some days of reports
first_run_start_date = upper_threshold - datetime.timedelta(days=30)
logger.warning(
f"not stored last update time, create it from: {first_run_start_date}"
)
last_elastic_report_update = LastElasticReportUpdate(
last_update_datetime=first_run_start_date
)
last_elastic_report_update.save()

lower_threshold = last_elastic_report_update.last_update_datetime
logger.info(
f"add to elastic reports from: {lower_threshold} to {upper_threshold}"
)
return [
{
"_op_type": "index",
"_index": (
"plugin-report-"
f"{inflection.underscore(_class.__name__).replace('_', '-')}-"
f"{datetime.date.today()}"
),
"_source": {
"config": {"name": report.config.name},
"job": {"id": report.job.id},
"start_time": report.start_time,
"end_time": report.end_time,
"status": report.status,
"report": report.report,
},
}
for report in report_list
]

# add document
document_list = (
_convert_report_to_elastic_document(AnalyzerReport)
+ _convert_report_to_elastic_document(ConnectorReport)
+ _convert_report_to_elastic_document(IngestorReport)
+ _convert_report_to_elastic_document(PivotReport)
+ _convert_report_to_elastic_document(VisualizerReport)
)
logger.info(f"documents to add to elastic: {len(document_list)}")
try:
bulk(connections.get_connection(), document_list)
except ApiError as error:
logger.critical(error)

def _convert_report_to_elastic_document(
_class: AbstractReport,
start_time: datetime.datetime,
end_time: datetime.datetime,
) -> List[Dict]:
report_list: list(AbstractReport) = _class.objects.filter(
status__in=ReportStatus.final_statuses(),
end_time__gte=start_time,
end_time__lt=end_time,
)
report_document_list = [
{
"_op_type": "index",
"_index": (
"plugin-report-"
f"{inflection.underscore(_class.__name__).replace('_', '-')}-"
f"{now().date()}"
),
"_source": {
"config": {"name": report.config.name},
"job": {"id": report.job.id},
"start_time": report.start_time,
"end_time": report.end_time,
"status": report.status,
"report": report.report,
},
}
for report in report_list
]
logger.info(
f"{_class.__name__} has {len(report_document_list)} new documents to upload"
)
return report_document_list

# add document
all_report_document_list = (
_convert_report_to_elastic_document(
AnalyzerReport, lower_threshold, upper_threshold
)
+ _convert_report_to_elastic_document(
ConnectorReport, lower_threshold, upper_threshold
)
+ _convert_report_to_elastic_document(
IngestorReport, lower_threshold, upper_threshold
)
+ _convert_report_to_elastic_document(
PivotReport, lower_threshold, upper_threshold
)
+ _convert_report_to_elastic_document(
VisualizerReport, lower_threshold, upper_threshold
)
)
logger.info(f"documents to add to elastic: {len(all_report_document_list)}")
try:
bulk(connections.get_connection(), all_report_document_list)
except ApiError as error:
logger.critical(error)
else:
last_elastic_report_update.last_update_datetime = upper_threshold
last_elastic_report_update.save()


@shared_task(
Expand Down
Empty file added tests/intel_owl/__init__.py
Empty file.
Loading

0 comments on commit cc785f8

Please sign in to comment.