Skip to content

Commit

Permalink
SERL Compatible Weather CSVs (#20)
Browse files Browse the repository at this point in the history
* fix string format for regex
* SERL Compatible Weather CSVs #17
* Filter by grid
* make the CSV more like the SERL reference
  • Loading branch information
cdinu authored Jan 10, 2025
1 parent c597840 commit cc3a9ac
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 101 deletions.
6 changes: 3 additions & 3 deletions clients/copernicus_cds/copernicus_cds/geo_to_grid.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
import math
from dataclasses import dataclass


def magic_transform_coorinates_to_grid(
Expand All @@ -25,10 +25,10 @@ def __init__(self, xx: int, yy: int):
self.yy = yy

def __str__(self):
return f"{self.xx}_{self.yy}"
return f"{self.xx:02d}_{self.yy:02d}"

def __repr__(self):
return f"{self.xx}_{self.yy}"
return f"{self.xx:02d}_{self.yy:02d}"

def __eq__(self, other):
return self.xx == other.xx and self.yy == other.yy
Expand Down
82 changes: 28 additions & 54 deletions clients/copernicus_cds/copernicus_cds/netcdf_utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import logging
import sys
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import IO

import pandas as pd
import xarray as xr
from copernicus_cds.edol_config import config as edol_config
from copernicus_cds.geo_to_grid import EdolGridCell

REQUIRED_COLUMNS = {"latitude", "longitude", "valid_time"}

logger = logging.getLogger(__name__)


@dataclass
class ProcessingConfig:
Expand All @@ -29,13 +29,6 @@ class NetCDFProcessor:
def __init__(self, config: ProcessingConfig = ProcessingConfig()):
self.dataframe = None
self.config = config
self.logger = logging.getLogger("NetCDFProcessor")
if self.config.debug:
self.log("Debug mode enabled")

def log(self, message: str) -> None:
if self.logger:
self.logger.info(message)

def validate_dataset(self, ds: xr.Dataset) -> None:
"""Validate required columns in dataset."""
Expand Down Expand Up @@ -72,6 +65,14 @@ def process_dataset(self, ds: xr.Dataset) -> pd.DataFrame:
)
df = ds.to_dataframe().reset_index()

# filter grid cells of interest
# i tried to do it with the ds xarray, but it crashed the process
if self.config.grid_cells_of_interest:
logger.debug(
f"Filtering {len(self.config.grid_cells_of_interest)} grid cells of interest"
)
df = df[df["grid_cell"].isin(self.config.grid_cells_of_interest)]

# drop columns that are not in the default output columns
if self.config.output_columns:
available_columns = ["valid_time"]
Expand All @@ -93,20 +94,25 @@ def process_zip_file(self, input_path: Path, output_path: Path) -> None:
try:
with zipfile.ZipFile(input_path, "r") as zip_ref:
file_list = zip_ref.namelist()
self.log(f"Files in archive: {file_list}")
logger.debug(f"Files in archive: {file_list}")

netcdf_files = [f for f in file_list if f.endswith(".nc")]
if not netcdf_files:
raise ValueError("No NetCDF files found in archive")

# Process each NC file
for file_name in netcdf_files:
self.log(f"Processing {file_name}...")
logger.debug(f"Processing {file_name}...")
with zip_ref.open(file_name) as nc_file:
df = self.netcdf_to_df(nc_file)
logger.debug(
f"Processed {file_name}, columns: {list(df.columns)}"
)
if self.dataframe is None:
self.dataframe = df
else:
logger.debug(f"Merging {file_name} with existing data...")

# merge with suffix to avoid column name conflicts
self.dataframe = pd.merge(
self.dataframe,
Expand All @@ -128,51 +134,19 @@ def process_zip_file(self, input_path: Path, output_path: Path) -> None:
if self.config.output_columns:
self.dataframe = self.dataframe[self.config.output_columns]

self.dataframe.to_csv(output_path, index=False)
self.log(f"Saved combined data to {output_path}")
# sort by "grid_cell","analysis_date" ascending
self.dataframe.sort_values(
by=["grid_cell", "analysis_date"], inplace=True
)

# isodates with T and Z
self.dataframe.to_csv(
output_path, index=False, date_format="%Y-%m-%dT%H:%M:%SZ"
)
logger.debug(f"Saved combined data to {output_path}")

except zipfile.BadZipFile:
raise ValueError(f"Invalid or corrupted zip file: {input_path}")
except Exception as e:
self.log(f"Error processing zip file: {e}")
logger.debug(f"Error processing zip file: {e}", exc_info=True)
raise e


def main():
"""Main execution function."""
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)

if len(sys.argv) != 3:
logger.error("Usage: script.py input_file output_file")
sys.exit(1)

input_path = Path(sys.argv[1])
output_path = Path(sys.argv[2])

if not input_path.exists():
logger.error(f"Input file not found: {input_path}")
sys.exit(1)

try:
edol_processor_config = ProcessingConfig(
output_columns=edol_config["output_columns"],
netcdf_field_mapping=edol_config["netcdf_field_mapping"],
grid_cells_of_interest=edol_config["grid_cells_of_interest"],
debug=True,
)
processor = NetCDFProcessor(edol_processor_config)

processor.process_zip_file(input_path, output_path)
logger.info("Processing completed successfully")
except Exception as e:
logger.error(f"Error during processing: {e}")
raise e
sys.exit(1)


if __name__ == "__main__":
main()
71 changes: 43 additions & 28 deletions clients/copernicus_cds/copernicus_cds/request.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import datetime
from calendar import monthrange
from pathlib import Path
from typing import List, get_args
from typing import List, Tuple, get_args

import cdsapi
from copernicus_cds.schemas import CDS_Request_Variable
from copernicus_cds.schemas import Default_CDS_Request_Variables

DEFAULT_VARIABLES = list(get_args(CDS_Request_Variable))
UK_BOUNDING_BOX = [61, -8, 49.9, 2]
UK_BOUNDING_BOX = (61, -8, 49.9, 2)


def get_month_days(year: int, month: int) -> List[str]:
Expand All @@ -16,7 +15,7 @@ def get_month_days(year: int, month: int) -> List[str]:
return [f"{day:02d}" for day in range(1, num_days + 1)]


def validate_bounding_box(area):
def validate_bounding_box(area: Tuple[int, int, int, int]):
if len(area) != 4:
raise ValueError("Area should be a list of 4 integers")

Expand All @@ -39,11 +38,12 @@ def validate_bounding_box(area):
raise ValueError("Longitude should be in ascending order")


def get_era5_month_request_dict(
def get_era5_request_dict(
year: int,
month: int,
area: list[int] = UK_BOUNDING_BOX,
variables: list[str] = DEFAULT_VARIABLES,
day: List[str] | int | None = None,
area: Tuple[float, float, float, float] = UK_BOUNDING_BOX,
variables: list[str] = Default_CDS_Request_Variables,
) -> dict:
"""
Returns a dictionary to be used as the request body
Expand All @@ -62,17 +62,34 @@ def get_era5_month_request_dict(

# validate variables
for variable in variables:
if variable not in DEFAULT_VARIABLES:
if variable not in Default_CDS_Request_Variables:
raise ValueError(f"Variable {variable} is not supported")

if day is None:
# default to all days in the month
day_list = get_month_days(year, month)
elif isinstance(day, int):
# one day
if day < 1 or day > 31:
raise ValueError("Day should be between 1 and 31")
day_list = [f"{day:02d}"]
elif isinstance(day, list):
# validate the day list
for d in day:
if d < 1 or d > 31:
raise ValueError("Day should be between 1 and 31")
day_list = [f"{d:02d}" for d in day]
else:
raise ValueError("Day should be an integer or a list of integers or None")

return {
"product_type": "reanalysis",
"product_type": ["reanalysis"],
"data_format": "netcdf",
"download_format": "unarchived",
"variable": variables,
"year": year,
"month": month,
"day": get_month_days(year, month),
"day": day_list,
"time": [
"00:00",
"01:00",
Expand Down Expand Up @@ -103,32 +120,30 @@ def get_era5_month_request_dict(
}


def request_era5_montly_files(
def request_era5_files(
path: Path,
year: int,
month: int,
area: list[int] = UK_BOUNDING_BOX,
variables: list[str] = DEFAULT_VARIABLES,
day: List[str] | int | None = None,
area: Tuple[float, float, float, float] = UK_BOUNDING_BOX,
variables: list[str] = Default_CDS_Request_Variables,
) -> None:
"""
Request ERA5 monthly files from the Copernicus Climate Data Store API
and save them to the output directory
Args:
path (Path): Path to save the output files
year (int): Year to request data for
month (int): Month to request data for
day (List[str] | int | None): List of days to request data for. Default is None which means all days in the month
area (Tuple[float, float, float, float]): Bounding box for the data retrieval in the format 'lon_min,lat_min,lon_max,lat_max'. Default is UK
variables (list[str]): List of parameters to retrieve from the CDS API. Default is the list of default variables in the copernicus_cds.schemas module
"""

c = cdsapi.Client()

request_dict = get_era5_month_request_dict(year, month, area, variables)
request_dict = get_era5_request_dict(
year=year, month=month, day=day, area=area, variables=variables
)
c.retrieve("reanalysis-era5-single-levels", request_dict, path)


if __name__ == "__main__":
# Request ERA5 monthly files for the UK for the previous month
last_month = datetime.date.today().replace(day=1) - datetime.timedelta(1)
year = last_month.year
month = last_month.month

# time the request
start = datetime.datetime.now()
request_era5_montly_files(f"uk_{year}_{month}.nc", year, month)
print("Saved files to disk", f"uk_{year}_{month}.nc")
print(f"Request took {datetime.datetime.now() - start}")
17 changes: 15 additions & 2 deletions clients/copernicus_cds/copernicus_cds/schemas.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import Literal
from typing import List, Literal, get_args

CDS_Request_Variable = Literal[
"""
Type for a default variable to request from the CDS API.
"""
Default_CDS_Request_Variable = Literal[
"mean_total_precipitation_rate",
"precipitation_type",
"surface_pressure",
Expand All @@ -26,3 +29,13 @@
"10m_u_component_of_wind",
"10m_v_component_of_wind",
]

"""
Type for a list of default variables to request from the CDS API.
"""
Default_CDS_Request_Variable_List = List[Default_CDS_Request_Variable]

"""
The list of default variables to request from the CDS API.
"""
Default_CDS_Request_Variables = list(get_args(Default_CDS_Request_Variable))
13 changes: 7 additions & 6 deletions clients/hpmorg/hpmorg/semi-manual-fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
"""

import requests
import pandas as pd
from datetime import datetime, timedelta
from typing import Iterator, Tuple
import logging
from pathlib import Path
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterator, Tuple

import pandas as pd
import requests

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
Expand All @@ -45,7 +46,7 @@ def __init__(
):
self.base_url = base_url
self.users_data = pd.read_csv(
input_file, sep="\s+", names=["user_id", "start_time", "end_time"]
input_file, sep=r"\s+", names=["user_id", "start_time", "end_time"]
)

def generate_date_ranges(
Expand Down
4 changes: 1 addition & 3 deletions edol-cli/edol_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,13 @@ def main(
EDOL Glowmarkt CLI tool.
"""
logging.basicConfig(
level=logging.DEBUG,
level=logging.DEBUG if debug else logging.INFO,
format="%(asctime)s %(levelname)s\t| %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)

if debug:
logger.debug("Debug mode enabled.")
else:
logger.setLevel(logging.INFO)

c = Config(config_toml_path=config)

Expand Down
Loading

0 comments on commit cc3a9ac

Please sign in to comment.