diff --git a/djangomain/settings/settings.py b/djangomain/settings/settings.py index fcc75774..abd34119 100644 --- a/djangomain/settings/settings.py +++ b/djangomain/settings/settings.py @@ -287,3 +287,6 @@ crossorigin="anonymous", ) ) + + +DATA_UPLOAD_MAX_NUMBER_FIELDS = None diff --git a/importing/functions.py b/importing/functions.py index 10806d0a..bf143439 100755 --- a/importing/functions.py +++ b/importing/functions.py @@ -23,7 +23,8 @@ from django.db.models import FileField from formatting.models import Classification, Format -from measurement.models import Measurement +from importing.models import DataImport +from measurement.models import Measurement, Report from station.models import Station unix_epoch = np.datetime64(0, "s") @@ -262,7 +263,7 @@ def standardise_datetime(date_time: Any, datetime_format: str) -> datetime: def save_temp_data_to_permanent( - station: Station, file_format: Format, file: FileField + data_import: DataImport, ) -> tuple[datetime, datetime, int]: """Function to pass the temporary import to the final table. @@ -278,14 +279,21 @@ def save_temp_data_to_permanent( Args: data_import_temp: DataImportTemp object. """ + station = data_import.station + file_format = data_import.format + file = data_import.rawfile + + # Delete exiting measurements and reports for the same data_import_id + Measurement.objects.filter(data_import_id=data_import.data_import_id).delete() + Report.objects.filter(data_import_id=data_import.data_import_id).delete() - all_data = construct_matrix(file, file_format, station) + all_data = construct_matrix(file, file_format, station, data_import) if not all_data: msg = "No data to import. Is the chosen format correct?" getLogger().error(msg) raise ValidationError(msg) - must_cols = ["station_id", "variable_id", "date", "value"] + must_cols = ["data_import_id", "station_id", "variable_id", "date", "value"] start_date = all_data[0]["date"].iloc[0] end_date = all_data[0]["date"].iloc[-1] num_records = len(all_data[0]) @@ -301,12 +309,18 @@ def save_temp_data_to_permanent( records = table.to_dict("records") variable_id = table["variable_id"].iloc[0] - # Delete existing data between the date ranges + # Delete existing data between the date ranges. Needed for data not linked + # to a data_import_id. Both measurements and reports are deleted. Measurement.timescale.filter( time__range=[start_date, end_date], station_id=station.station_id, variable_id=variable_id, ).delete() + Report.objects.filter( + time__range=[start_date, end_date], + station_id=station.station_id, + variable_id=variable_id, + ).delete() # Bulk add new data model_instances = [Measurement(**record) for record in records] @@ -321,7 +335,12 @@ def save_temp_data_to_permanent( return start_date, end_date, num_records -def construct_matrix(matrix_source, file_format, station) -> list[pd.DataFrame]: +def construct_matrix( + matrix_source: FileField, + file_format: Format, + station: Station, + data_import: DataImport, +) -> list[pd.DataFrame]: """Construct the "matrix" or results table. Does various cleaning / simple transformations depending on the date format, type of data (accumulated, incremental...) and deals with NANs. @@ -443,6 +462,7 @@ def construct_matrix(matrix_source, file_format, station) -> list[pd.DataFrame]: data["value"] = data["value"] * float(classification.resolution) data["station_id"] = station.station_id data["variable_id"] = classification.variable.variable_id + data["data_import_id"] = data_import.data_import_id # Add the data to the main list to_ingest.append(data) diff --git a/importing/tasks.py b/importing/tasks.py index 809d523f..bfcbb237 100644 --- a/importing/tasks.py +++ b/importing/tasks.py @@ -24,9 +24,7 @@ def ingest_data(data_import_pk: int) -> None: try: getLogger("huey").info("Ingesting data for %s", data_import) data_import.start_date, data_import.end_date, data_import.records = ( - save_temp_data_to_permanent( - data_import.station, data_import.format, data_import.rawfile - ) + save_temp_data_to_permanent(data_import) ) data_import.status = "C" data_import.log = "Data ingestion completed successfully" diff --git a/measurement/migrations/0015_measurement_data_import_report_data_import.py b/measurement/migrations/0015_measurement_data_import_report_data_import.py new file mode 100644 index 00000000..51741a13 --- /dev/null +++ b/measurement/migrations/0015_measurement_data_import_report_data_import.py @@ -0,0 +1,25 @@ +# Generated by Django 5.1.1 on 2024-10-08 12:51 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('importing', '0006_alter_dataimport_data_import_id_and_more'), + ('measurement', '0014_alter_report_completeness'), + ] + + operations = [ + migrations.AddField( + model_name='measurement', + name='data_import', + field=models.ForeignKey(blank=True, help_text='Data import this measurement belongs to.', null=True, on_delete=django.db.models.deletion.CASCADE, to='importing.dataimport'), + ), + migrations.AddField( + model_name='report', + name='data_import', + field=models.ForeignKey(blank=True, help_text='Data import this measurement belongs to.', null=True, on_delete=django.db.models.deletion.CASCADE, to='importing.dataimport'), + ), + ] diff --git a/measurement/models.py b/measurement/models.py index 0853d43d..7f499182 100755 --- a/measurement/models.py +++ b/measurement/models.py @@ -17,6 +17,7 @@ from django.db import models from timescale.db.models.models import TimescaleDateTimeField, TimescaleModel +from importing.models import DataImport from management.models import apply_add_permissions_to_standard_group from station.models import Station from variable.models import Variable @@ -46,6 +47,13 @@ class MeasurementBase(TimescaleModel): time: TimescaleDateTimeField + data_import = models.ForeignKey( + DataImport, + on_delete=models.CASCADE, + null=True, + blank=True, + help_text="Data import this measurement belongs to.", + ) station = models.ForeignKey( Station, on_delete=models.PROTECT, diff --git a/measurement/reporting.py b/measurement/reporting.py index d2bb5a69..9cfd0b42 100644 --- a/measurement/reporting.py +++ b/measurement/reporting.py @@ -3,6 +3,7 @@ import pandas as pd +from importing.models import DataImport from measurement.models import Measurement, Report from station.models import Station from variable.models import Variable @@ -34,6 +35,17 @@ def calculate_reports( daily = hourly.resample("D").agg(operation) monthly = daily.resample("MS").agg(operation) + # Get the right data_import for each period. We use the mode to get the most common + # data_import value in the period. + def mode(x: pd.Series) -> str | None: + modes = x.mode() + return modes[0] if not modes.empty else None + + cols2 = ["time", "data_import_id"] + hourly["data_import_id"] = data[cols2].resample("H", on="time").agg(mode) + daily["data_import_id"] = data[cols2].resample("D", on="time").agg(mode) + monthly["data_import_id"] = data[cols2].resample("MS", on="time").agg(mode) + # Put everything together hourly["report_type"] = "hourly" daily["report_type"] = "daily" @@ -145,9 +157,13 @@ def save_report_data(data: pd.DataFrame) -> None: data: The dataframe with the report data. """ data_ = data.dropna(axis=1, how="all").dropna(axis=0, subset=["value"]) + data_import_avail = "data_import_id" in data_.columns Report.objects.bulk_create( [ Report( + data_import=DataImport.objects.get(pk=row["data_import_id"]) + if data_import_avail and not pd.isna(row["data_import_id"]) + else None, station=Station.objects.get(station_code=row["station"]), variable=Variable.objects.get(variable_code=row["variable"]), time=time, diff --git a/tests/importing/test_functions.py b/tests/importing/test_functions.py index ee1ed226..9abcaef9 100755 --- a/tests/importing/test_functions.py +++ b/tests/importing/test_functions.py @@ -29,6 +29,7 @@ class TestMatrixFunctions(TestCase): def setUp(self): from formatting.models import Format + from importing.models import DataImport from station.models import TIMEZONES, Station self.file_format = Format.objects.get(format_id=45) @@ -37,6 +38,12 @@ def setUp(self): ) self.station = Station.objects.get(station_id=8) self.station.timezone = TIMEZONES[0][0] + self.data_import = DataImport.objects.create( + owner=self.station.owner, + station=self.station, + format=self.file_format, + rawfile=self.data_file, + ) def test_preformat_matrix(self): from importing.functions import read_data_to_import @@ -51,7 +58,7 @@ def test_construct_matrix(self): from variable.models import Variable variables_data = construct_matrix( - self.data_file, self.file_format, self.station + self.data_file, self.file_format, self.station, self.data_import ) self.assertEqual(len(variables_data), 2) vars = list( diff --git a/tests/measurement/test_reporting.py b/tests/measurement/test_reporting.py index 98945b9a..eed50bec 100644 --- a/tests/measurement/test_reporting.py +++ b/tests/measurement/test_reporting.py @@ -26,6 +26,7 @@ def test_calculate_reports(self): data = pd.DataFrame( { "time": time, + "data_import_id": None, "value": np.linspace(1, 5, len(time)), "maximum": np.linspace(1, 5, len(time)) + 1, "minimum": np.linspace(1, 5, len(time)) - 1, @@ -53,6 +54,7 @@ def test_calculate_reports(self): "value", "maximum", "minimum", + "data_import_id", "report_type", "station", "variable", @@ -181,6 +183,7 @@ def test_save_report_data(self): { "station": [self.station.station_code], "variable": [self.variable.variable_code], + "data_import_id": [None], "time": time, "value": [1.0], "report_type": ["hourly"], @@ -259,6 +262,7 @@ def test_get_report_data(self): [ "id", "time", + "data_import_id", "station", "variable", "value",