Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Link data import objects with Measurements and Reports #384

Merged
merged 7 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions djangomain/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,6 @@
crossorigin="anonymous",
)
)


DATA_UPLOAD_MAX_NUMBER_FIELDS = None
32 changes: 26 additions & 6 deletions importing/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from django.db.models import FileField

from formatting.models import Classification, Format
from measurement.models import Measurement
from importing.models import DataImport
from measurement.models import Measurement, Report
from station.models import Station

unix_epoch = np.datetime64(0, "s")
Expand Down Expand Up @@ -262,7 +263,7 @@ def standardise_datetime(date_time: Any, datetime_format: str) -> datetime:


def save_temp_data_to_permanent(
station: Station, file_format: Format, file: FileField
data_import: DataImport,
) -> tuple[datetime, datetime, int]:
"""Function to pass the temporary import to the final table.

Expand All @@ -278,14 +279,21 @@ def save_temp_data_to_permanent(
Args:
data_import_temp: DataImportTemp object.
"""
station = data_import.station
file_format = data_import.format
file = data_import.rawfile

# Delete exiting measurements and reports for the same data_import_id
Measurement.objects.filter(data_import_id=data_import.data_import_id).delete()
Report.objects.filter(data_import_id=data_import.data_import_id).delete()

all_data = construct_matrix(file, file_format, station)
all_data = construct_matrix(file, file_format, station, data_import)
if not all_data:
msg = "No data to import. Is the chosen format correct?"
getLogger().error(msg)
raise ValidationError(msg)

must_cols = ["station_id", "variable_id", "date", "value"]
must_cols = ["data_import_id", "station_id", "variable_id", "date", "value"]
start_date = all_data[0]["date"].iloc[0]
end_date = all_data[0]["date"].iloc[-1]
num_records = len(all_data[0])
Expand All @@ -301,12 +309,18 @@ def save_temp_data_to_permanent(
records = table.to_dict("records")
variable_id = table["variable_id"].iloc[0]

# Delete existing data between the date ranges
# Delete existing data between the date ranges. Needed for data not linked
# to a data_import_id. Both measurements and reports are deleted.
Measurement.timescale.filter(
time__range=[start_date, end_date],
station_id=station.station_id,
variable_id=variable_id,
).delete()
Report.objects.filter(
time__range=[start_date, end_date],
station_id=station.station_id,
variable_id=variable_id,
).delete()

# Bulk add new data
model_instances = [Measurement(**record) for record in records]
Expand All @@ -321,7 +335,12 @@ def save_temp_data_to_permanent(
return start_date, end_date, num_records


def construct_matrix(matrix_source, file_format, station) -> list[pd.DataFrame]:
def construct_matrix(
matrix_source: FileField,
file_format: Format,
station: Station,
data_import: DataImport,
) -> list[pd.DataFrame]:
"""Construct the "matrix" or results table. Does various cleaning / simple
transformations depending on the date format, type of data (accumulated,
incremental...) and deals with NANs.
Expand Down Expand Up @@ -443,6 +462,7 @@ def construct_matrix(matrix_source, file_format, station) -> list[pd.DataFrame]:
data["value"] = data["value"] * float(classification.resolution)
data["station_id"] = station.station_id
data["variable_id"] = classification.variable.variable_id
data["data_import_id"] = data_import.data_import_id

# Add the data to the main list
to_ingest.append(data)
Expand Down
4 changes: 1 addition & 3 deletions importing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ def ingest_data(data_import_pk: int) -> None:
try:
getLogger("huey").info("Ingesting data for %s", data_import)
data_import.start_date, data_import.end_date, data_import.records = (
save_temp_data_to_permanent(
data_import.station, data_import.format, data_import.rawfile
)
save_temp_data_to_permanent(data_import)
)
data_import.status = "C"
data_import.log = "Data ingestion completed successfully"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Generated by Django 5.1.1 on 2024-10-08 12:51

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('importing', '0006_alter_dataimport_data_import_id_and_more'),
('measurement', '0014_alter_report_completeness'),
]

operations = [
migrations.AddField(
model_name='measurement',
name='data_import',
field=models.ForeignKey(blank=True, help_text='Data import this measurement belongs to.', null=True, on_delete=django.db.models.deletion.CASCADE, to='importing.dataimport'),
),
migrations.AddField(
model_name='report',
name='data_import',
field=models.ForeignKey(blank=True, help_text='Data import this measurement belongs to.', null=True, on_delete=django.db.models.deletion.CASCADE, to='importing.dataimport'),
),
]
8 changes: 8 additions & 0 deletions measurement/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from django.db import models
from timescale.db.models.models import TimescaleDateTimeField, TimescaleModel

from importing.models import DataImport
from management.models import apply_add_permissions_to_standard_group
from station.models import Station
from variable.models import Variable
Expand Down Expand Up @@ -46,6 +47,13 @@ class MeasurementBase(TimescaleModel):

time: TimescaleDateTimeField

data_import = models.ForeignKey(
DataImport,
on_delete=models.CASCADE,
null=True,
blank=True,
help_text="Data import this measurement belongs to.",
)
station = models.ForeignKey(
Station,
on_delete=models.PROTECT,
Expand Down
16 changes: 16 additions & 0 deletions measurement/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pandas as pd

from importing.models import DataImport
from measurement.models import Measurement, Report
from station.models import Station
from variable.models import Variable
Expand Down Expand Up @@ -34,6 +35,17 @@ def calculate_reports(
daily = hourly.resample("D").agg(operation)
monthly = daily.resample("MS").agg(operation)

# Get the right data_import for each period. We use the mode to get the most common
# data_import value in the period.
def mode(x: pd.Series) -> str | None:
modes = x.mode()
return modes[0] if not modes.empty else None

cols2 = ["time", "data_import_id"]
hourly["data_import_id"] = data[cols2].resample("H", on="time").agg(mode)
daily["data_import_id"] = data[cols2].resample("D", on="time").agg(mode)
monthly["data_import_id"] = data[cols2].resample("MS", on="time").agg(mode)

# Put everything together
hourly["report_type"] = "hourly"
daily["report_type"] = "daily"
Expand Down Expand Up @@ -145,9 +157,13 @@ def save_report_data(data: pd.DataFrame) -> None:
data: The dataframe with the report data.
"""
data_ = data.dropna(axis=1, how="all").dropna(axis=0, subset=["value"])
data_import_avail = "data_import_id" in data_.columns
Report.objects.bulk_create(
[
Report(
data_import=DataImport.objects.get(pk=row["data_import_id"])
if data_import_avail and not pd.isna(row["data_import_id"])
else None,
station=Station.objects.get(station_code=row["station"]),
variable=Variable.objects.get(variable_code=row["variable"]),
time=time,
Expand Down
9 changes: 8 additions & 1 deletion tests/importing/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class TestMatrixFunctions(TestCase):

def setUp(self):
from formatting.models import Format
from importing.models import DataImport
from station.models import TIMEZONES, Station

self.file_format = Format.objects.get(format_id=45)
Expand All @@ -37,6 +38,12 @@ def setUp(self):
)
self.station = Station.objects.get(station_id=8)
self.station.timezone = TIMEZONES[0][0]
self.data_import = DataImport.objects.create(
owner=self.station.owner,
station=self.station,
format=self.file_format,
rawfile=self.data_file,
)

def test_preformat_matrix(self):
from importing.functions import read_data_to_import
Expand All @@ -51,7 +58,7 @@ def test_construct_matrix(self):
from variable.models import Variable

variables_data = construct_matrix(
self.data_file, self.file_format, self.station
self.data_file, self.file_format, self.station, self.data_import
)
self.assertEqual(len(variables_data), 2)
vars = list(
Expand Down
4 changes: 4 additions & 0 deletions tests/measurement/test_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def test_calculate_reports(self):
data = pd.DataFrame(
{
"time": time,
"data_import_id": None,
"value": np.linspace(1, 5, len(time)),
"maximum": np.linspace(1, 5, len(time)) + 1,
"minimum": np.linspace(1, 5, len(time)) - 1,
Expand Down Expand Up @@ -53,6 +54,7 @@ def test_calculate_reports(self):
"value",
"maximum",
"minimum",
"data_import_id",
"report_type",
"station",
"variable",
Expand Down Expand Up @@ -181,6 +183,7 @@ def test_save_report_data(self):
{
"station": [self.station.station_code],
"variable": [self.variable.variable_code],
"data_import_id": [None],
"time": time,
"value": [1.0],
"report_type": ["hourly"],
Expand Down Expand Up @@ -259,6 +262,7 @@ def test_get_report_data(self):
[
"id",
"time",
"data_import_id",
"station",
"variable",
"value",
Expand Down