From 732f4e7d708597b4f178fb9338492b6c1b61cf83 Mon Sep 17 00:00:00 2001 From: Diego Alonso Alvarez Date: Tue, 8 Oct 2024 14:46:28 +0100 Subject: [PATCH 1/6] Add migrations for station, formatting and measurements --- formatting/migrations/0007_alter_time_code.py | 18 +++++++++++++ ...surement_data_import_report_data_import.py | 25 +++++++++++++++++++ .../migrations/0017_alter_station_region.py | 19 ++++++++++++++ 3 files changed, 62 insertions(+) create mode 100644 formatting/migrations/0007_alter_time_code.py create mode 100644 measurement/migrations/0015_measurement_data_import_report_data_import.py create mode 100644 station/migrations/0017_alter_station_region.py diff --git a/formatting/migrations/0007_alter_time_code.py b/formatting/migrations/0007_alter_time_code.py new file mode 100644 index 00000000..244d7abc --- /dev/null +++ b/formatting/migrations/0007_alter_time_code.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.1 on 2024-10-08 12:50 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('formatting', '0006_alter_classification_accumulate_and_more'), + ] + + operations = [ + migrations.AlterField( + model_name='time', + name='code', + field=models.CharField(help_text='The code used to parse the date column, eg. `%H:%M:%S`', max_length=20, verbose_name='Code'), + ), + ] diff --git a/measurement/migrations/0015_measurement_data_import_report_data_import.py b/measurement/migrations/0015_measurement_data_import_report_data_import.py new file mode 100644 index 00000000..51741a13 --- /dev/null +++ b/measurement/migrations/0015_measurement_data_import_report_data_import.py @@ -0,0 +1,25 @@ +# Generated by Django 5.1.1 on 2024-10-08 12:51 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('importing', '0006_alter_dataimport_data_import_id_and_more'), + ('measurement', '0014_alter_report_completeness'), + ] + + operations = [ + migrations.AddField( + model_name='measurement', + name='data_import', + field=models.ForeignKey(blank=True, help_text='Data import this measurement belongs to.', null=True, on_delete=django.db.models.deletion.CASCADE, to='importing.dataimport'), + ), + migrations.AddField( + model_name='report', + name='data_import', + field=models.ForeignKey(blank=True, help_text='Data import this measurement belongs to.', null=True, on_delete=django.db.models.deletion.CASCADE, to='importing.dataimport'), + ), + ] diff --git a/station/migrations/0017_alter_station_region.py b/station/migrations/0017_alter_station_region.py new file mode 100644 index 00000000..abffeb15 --- /dev/null +++ b/station/migrations/0017_alter_station_region.py @@ -0,0 +1,19 @@ +# Generated by Django 5.1.1 on 2024-10-08 12:51 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('station', '0016_remove_station_delta_t_delete_deltat'), + ] + + operations = [ + migrations.AlterField( + model_name='station', + name='region', + field=models.ForeignKey(blank=True, help_text='Region within the Country where the station is located.', null=True, on_delete=django.db.models.deletion.PROTECT, to='station.region', verbose_name='Region'), + ), + ] From c2cac79abc7a0cf5898f1eb411b616813e0a7037 Mon Sep 17 00:00:00 2001 From: Diego Alonso Alvarez Date: Tue, 8 Oct 2024 14:48:58 +0100 Subject: [PATCH 2/6] :sparkles: Link data_import with measurements and reports. --- djangomain/settings/settings.py | 3 +++ importing/functions.py | 32 ++++++++++++++++++++++++++------ importing/tasks.py | 4 +--- measurement/models.py | 8 ++++++++ measurement/reporting.py | 12 ++++++++++++ 5 files changed, 50 insertions(+), 9 deletions(-) diff --git a/djangomain/settings/settings.py b/djangomain/settings/settings.py index fcc75774..abd34119 100644 --- a/djangomain/settings/settings.py +++ b/djangomain/settings/settings.py @@ -287,3 +287,6 @@ crossorigin="anonymous", ) ) + + +DATA_UPLOAD_MAX_NUMBER_FIELDS = None diff --git a/importing/functions.py b/importing/functions.py index 10806d0a..bf143439 100755 --- a/importing/functions.py +++ b/importing/functions.py @@ -23,7 +23,8 @@ from django.db.models import FileField from formatting.models import Classification, Format -from measurement.models import Measurement +from importing.models import DataImport +from measurement.models import Measurement, Report from station.models import Station unix_epoch = np.datetime64(0, "s") @@ -262,7 +263,7 @@ def standardise_datetime(date_time: Any, datetime_format: str) -> datetime: def save_temp_data_to_permanent( - station: Station, file_format: Format, file: FileField + data_import: DataImport, ) -> tuple[datetime, datetime, int]: """Function to pass the temporary import to the final table. @@ -278,14 +279,21 @@ def save_temp_data_to_permanent( Args: data_import_temp: DataImportTemp object. """ + station = data_import.station + file_format = data_import.format + file = data_import.rawfile + + # Delete exiting measurements and reports for the same data_import_id + Measurement.objects.filter(data_import_id=data_import.data_import_id).delete() + Report.objects.filter(data_import_id=data_import.data_import_id).delete() - all_data = construct_matrix(file, file_format, station) + all_data = construct_matrix(file, file_format, station, data_import) if not all_data: msg = "No data to import. Is the chosen format correct?" getLogger().error(msg) raise ValidationError(msg) - must_cols = ["station_id", "variable_id", "date", "value"] + must_cols = ["data_import_id", "station_id", "variable_id", "date", "value"] start_date = all_data[0]["date"].iloc[0] end_date = all_data[0]["date"].iloc[-1] num_records = len(all_data[0]) @@ -301,12 +309,18 @@ def save_temp_data_to_permanent( records = table.to_dict("records") variable_id = table["variable_id"].iloc[0] - # Delete existing data between the date ranges + # Delete existing data between the date ranges. Needed for data not linked + # to a data_import_id. Both measurements and reports are deleted. Measurement.timescale.filter( time__range=[start_date, end_date], station_id=station.station_id, variable_id=variable_id, ).delete() + Report.objects.filter( + time__range=[start_date, end_date], + station_id=station.station_id, + variable_id=variable_id, + ).delete() # Bulk add new data model_instances = [Measurement(**record) for record in records] @@ -321,7 +335,12 @@ def save_temp_data_to_permanent( return start_date, end_date, num_records -def construct_matrix(matrix_source, file_format, station) -> list[pd.DataFrame]: +def construct_matrix( + matrix_source: FileField, + file_format: Format, + station: Station, + data_import: DataImport, +) -> list[pd.DataFrame]: """Construct the "matrix" or results table. Does various cleaning / simple transformations depending on the date format, type of data (accumulated, incremental...) and deals with NANs. @@ -443,6 +462,7 @@ def construct_matrix(matrix_source, file_format, station) -> list[pd.DataFrame]: data["value"] = data["value"] * float(classification.resolution) data["station_id"] = station.station_id data["variable_id"] = classification.variable.variable_id + data["data_import_id"] = data_import.data_import_id # Add the data to the main list to_ingest.append(data) diff --git a/importing/tasks.py b/importing/tasks.py index 809d523f..bfcbb237 100644 --- a/importing/tasks.py +++ b/importing/tasks.py @@ -24,9 +24,7 @@ def ingest_data(data_import_pk: int) -> None: try: getLogger("huey").info("Ingesting data for %s", data_import) data_import.start_date, data_import.end_date, data_import.records = ( - save_temp_data_to_permanent( - data_import.station, data_import.format, data_import.rawfile - ) + save_temp_data_to_permanent(data_import) ) data_import.status = "C" data_import.log = "Data ingestion completed successfully" diff --git a/measurement/models.py b/measurement/models.py index 0853d43d..7f499182 100755 --- a/measurement/models.py +++ b/measurement/models.py @@ -17,6 +17,7 @@ from django.db import models from timescale.db.models.models import TimescaleDateTimeField, TimescaleModel +from importing.models import DataImport from management.models import apply_add_permissions_to_standard_group from station.models import Station from variable.models import Variable @@ -46,6 +47,13 @@ class MeasurementBase(TimescaleModel): time: TimescaleDateTimeField + data_import = models.ForeignKey( + DataImport, + on_delete=models.CASCADE, + null=True, + blank=True, + help_text="Data import this measurement belongs to.", + ) station = models.ForeignKey( Station, on_delete=models.PROTECT, diff --git a/measurement/reporting.py b/measurement/reporting.py index d2bb5a69..f1a72a8a 100644 --- a/measurement/reporting.py +++ b/measurement/reporting.py @@ -3,6 +3,7 @@ import pandas as pd +from importing.models import DataImport from measurement.models import Measurement, Report from station.models import Station from variable.models import Variable @@ -34,6 +35,16 @@ def calculate_reports( daily = hourly.resample("D").agg(operation) monthly = daily.resample("MS").agg(operation) + # Get the right data_import for each period. We use the mode to get the most common + # data_import value in the period. + def mode(x: pd.Series) -> str: + return x.mode().iloc[0] + + cols2 = ["time", "data_import_id"] + hourly["data_import_id"] = data[cols2].resample("H", on="time").agg(mode) + daily["data_import_id"] = data[cols2].resample("D", on="time").agg(mode) + monthly["data_import_id"] = data[cols2].resample("MS", on="time").agg(mode) + # Put everything together hourly["report_type"] = "hourly" daily["report_type"] = "daily" @@ -148,6 +159,7 @@ def save_report_data(data: pd.DataFrame) -> None: Report.objects.bulk_create( [ Report( + data_import=DataImport.objects.get(pk=row["data_import_id"]), station=Station.objects.get(station_code=row["station"]), variable=Variable.objects.get(variable_code=row["variable"]), time=time, From 01c833b97a1cff4057840895b1f645b7e42ddbc9 Mon Sep 17 00:00:00 2001 From: Diego Alonso Alvarez Date: Tue, 8 Oct 2024 15:19:25 +0100 Subject: [PATCH 3/6] :white_check_mark: Fix tests. --- measurement/reporting.py | 13 ++++++++++--- tests/importing/test_functions.py | 6 +++++- tests/measurement/test_reporting.py | 4 ++++ 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/measurement/reporting.py b/measurement/reporting.py index f1a72a8a..7f1358c6 100644 --- a/measurement/reporting.py +++ b/measurement/reporting.py @@ -37,8 +37,9 @@ def calculate_reports( # Get the right data_import for each period. We use the mode to get the most common # data_import value in the period. - def mode(x: pd.Series) -> str: - return x.mode().iloc[0] + def mode(x: pd.Series) -> str | None: + modes = x.mode() + return modes[0] if not modes.empty else None cols2 = ["time", "data_import_id"] hourly["data_import_id"] = data[cols2].resample("H", on="time").agg(mode) @@ -155,11 +156,17 @@ def save_report_data(data: pd.DataFrame) -> None: Args: data: The dataframe with the report data. """ + from logging import getLogger + + getLogger().warning(data.columns) data_ = data.dropna(axis=1, how="all").dropna(axis=0, subset=["value"]) + data_import_avail = "data_import_id" in data_.columns Report.objects.bulk_create( [ Report( - data_import=DataImport.objects.get(pk=row["data_import_id"]), + data_import=DataImport.objects.get(pk=row["data_import_id"]) + if data_import_avail and not pd.isna(row["data_import_id"]) + else None, station=Station.objects.get(station_code=row["station"]), variable=Variable.objects.get(variable_code=row["variable"]), time=time, diff --git a/tests/importing/test_functions.py b/tests/importing/test_functions.py index 6c6319d6..0163cf67 100755 --- a/tests/importing/test_functions.py +++ b/tests/importing/test_functions.py @@ -28,6 +28,7 @@ class TestMatrixFunctions(TestCase): def setUp(self): from formatting.models import Format + from importing.models import DataImport from station.models import TIMEZONES, Station self.file_format = Format.objects.get(format_id=45) @@ -36,6 +37,9 @@ def setUp(self): ) self.station = Station.objects.get(station_id=8) self.station.timezone = TIMEZONES[0][0] + self.data_import = DataImport.objects.create( + station=self.station, format=self.file_format, rawfile=self.data_file + ) def test_preformat_matrix(self): from importing.functions import read_data_to_import @@ -50,7 +54,7 @@ def test_construct_matrix(self): from variable.models import Variable variables_data = construct_matrix( - self.data_file, self.file_format, self.station + self.data_file, self.file_format, self.station, self.data_import ) self.assertEqual(len(variables_data), 2) vars = list( diff --git a/tests/measurement/test_reporting.py b/tests/measurement/test_reporting.py index 98945b9a..eed50bec 100644 --- a/tests/measurement/test_reporting.py +++ b/tests/measurement/test_reporting.py @@ -26,6 +26,7 @@ def test_calculate_reports(self): data = pd.DataFrame( { "time": time, + "data_import_id": None, "value": np.linspace(1, 5, len(time)), "maximum": np.linspace(1, 5, len(time)) + 1, "minimum": np.linspace(1, 5, len(time)) - 1, @@ -53,6 +54,7 @@ def test_calculate_reports(self): "value", "maximum", "minimum", + "data_import_id", "report_type", "station", "variable", @@ -181,6 +183,7 @@ def test_save_report_data(self): { "station": [self.station.station_code], "variable": [self.variable.variable_code], + "data_import_id": [None], "time": time, "value": [1.0], "report_type": ["hourly"], @@ -259,6 +262,7 @@ def test_get_report_data(self): [ "id", "time", + "data_import_id", "station", "variable", "value", From 36f9281c4880de8bdf5e8c1d87692ebca7e48198 Mon Sep 17 00:00:00 2001 From: Diego Alonso Alvarez Date: Tue, 8 Oct 2024 15:32:27 +0100 Subject: [PATCH 4/6] Remove debug code --- measurement/reporting.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/measurement/reporting.py b/measurement/reporting.py index 7f1358c6..9cfd0b42 100644 --- a/measurement/reporting.py +++ b/measurement/reporting.py @@ -156,9 +156,6 @@ def save_report_data(data: pd.DataFrame) -> None: Args: data: The dataframe with the report data. """ - from logging import getLogger - - getLogger().warning(data.columns) data_ = data.dropna(axis=1, how="all").dropna(axis=0, subset=["value"]) data_import_avail = "data_import_id" in data_.columns Report.objects.bulk_create( From 225b1eaa213fad9e81035a4fff4c1f57155a4faa Mon Sep 17 00:00:00 2001 From: Diego Alonso Alvarez Date: Mon, 14 Oct 2024 15:53:56 +0100 Subject: [PATCH 5/6] :bug: Remove duplicate migrations --- formatting/migrations/0007_alter_time_code.py | 18 ------------------ .../migrations/0017_alter_station_region.py | 19 ------------------- 2 files changed, 37 deletions(-) delete mode 100644 formatting/migrations/0007_alter_time_code.py delete mode 100644 station/migrations/0017_alter_station_region.py diff --git a/formatting/migrations/0007_alter_time_code.py b/formatting/migrations/0007_alter_time_code.py deleted file mode 100644 index 244d7abc..00000000 --- a/formatting/migrations/0007_alter_time_code.py +++ /dev/null @@ -1,18 +0,0 @@ -# Generated by Django 5.1.1 on 2024-10-08 12:50 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ('formatting', '0006_alter_classification_accumulate_and_more'), - ] - - operations = [ - migrations.AlterField( - model_name='time', - name='code', - field=models.CharField(help_text='The code used to parse the date column, eg. `%H:%M:%S`', max_length=20, verbose_name='Code'), - ), - ] diff --git a/station/migrations/0017_alter_station_region.py b/station/migrations/0017_alter_station_region.py deleted file mode 100644 index abffeb15..00000000 --- a/station/migrations/0017_alter_station_region.py +++ /dev/null @@ -1,19 +0,0 @@ -# Generated by Django 5.1.1 on 2024-10-08 12:51 - -import django.db.models.deletion -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ('station', '0016_remove_station_delta_t_delete_deltat'), - ] - - operations = [ - migrations.AlterField( - model_name='station', - name='region', - field=models.ForeignKey(blank=True, help_text='Region within the Country where the station is located.', null=True, on_delete=django.db.models.deletion.PROTECT, to='station.region', verbose_name='Region'), - ), - ] From 2986dd546d6c0a58eadc5169ced77466c02f0023 Mon Sep 17 00:00:00 2001 From: Diego Alonso Alvarez Date: Mon, 14 Oct 2024 15:59:35 +0100 Subject: [PATCH 6/6] :bug: Add owner to DataImport in test. --- tests/importing/test_functions.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/importing/test_functions.py b/tests/importing/test_functions.py index 58654434..9abcaef9 100755 --- a/tests/importing/test_functions.py +++ b/tests/importing/test_functions.py @@ -39,7 +39,10 @@ def setUp(self): self.station = Station.objects.get(station_id=8) self.station.timezone = TIMEZONES[0][0] self.data_import = DataImport.objects.create( - station=self.station, format=self.file_format, rawfile=self.data_file + owner=self.station.owner, + station=self.station, + format=self.file_format, + rawfile=self.data_file, ) def test_preformat_matrix(self):