Skip to content

Commit

Permalink
Merge pull request #70 from enram/SVH-key-based
Browse files Browse the repository at this point in the history
Add CLI option to use the s3 file enlisting instead of the modified date to create daily/monthly files
  • Loading branch information
peterdesmet authored Jan 22, 2024
2 parents 3c4234d + 99a3e97 commit 6173b18
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 34 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ exclude =
[options.extras_require]
# Requirements to work with the transfer functionalities (FTP/S3)
transfer =
s3fs[boto3]
s3fs[boto3]==2023.5.0
paramiko
fsspec
pyarrow
Expand Down
101 changes: 69 additions & 32 deletions src/vptstools/bin/vph5_to_vpts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pandas as pd

from vptstools.vpts import vpts, vpts_to_csv
from vptstools.s3 import handle_manifest, OdimFilePath
from vptstools.s3 import handle_manifest, OdimFilePath, extract_daily_group_from_path
from vptstools.bin.click_exception import catch_all_exceptions, report_click_exception_to_sns

# Load environmental variables from file in dev
Expand Down Expand Up @@ -48,7 +48,14 @@
help="Range of HDF5 VP files to include, i.e. files modified between now and N"
"modified-days-ago. If 0, all HDF5 files in the bucket will be included.",
)
def cli(modified_days_ago):
@click.option(
"--path-s3-folder",
"path_s3_folder",
type=str,
help="Apply the conversion to VPTS to all files within a S3 sub-folders instead "
"of using the modified date of the files. This option does not use the inventory files."
)
def cli(modified_days_ago, path_s3_folder=None):
"""Convert and aggregate HDF5 VP files to daily and monthly VPTS CSV files on S3 bucket
Check the latest modified
Expand All @@ -59,16 +66,25 @@ def cli(modified_days_ago):
HDF5 files were recently added and convert those files from ODIM bird profile to the
`VPTS CSV format <https://github.com/enram/vpts-csv>`_. Finally, upload the generated daily/monthly VPTS files to S3.
When using the `path_s3_folder` option, the modified date is not used, but a recursive search within the given s3
path is applied to define the daily/monthly files to recreate.
E.g. `vph5_to_vpts --path-s3-folder uva/hdf5/nldhl/2019` or
`vph5_to_vpts --path-s3-folder baltrad/hdf5/bejab/2022/10`.
Besides, while scanning the S3 inventory to define the files to convert,
the CLI routine creates the ``coverage.csv`` file and uploads it to the bucket.
Configuration is loaded from the following environmental variables:
- ``S3_BUCKET``: AWS S3 bucket to read and write data to
- ``INVENTORY_BUCKET``: AWS S3 bucket configured as `s3 inventory bucket <https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-inventory.html>`_ for the S3_BUCKET.
\b
- ``DESTINATION_BUCKET``: AWS S3 bucket to read and write data to
- ``INVENTORY_BUCKET``: AWS S3 bucket configured as `s3 inventory bucket
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-inventory.html>`_
for the S3_BUCKET.
- ``SNS_TOPIC``: AWS SNS topic to report when routine fails
- ``AWS_REGION``: AWS region where the SNS alerting is defined
- ``AWS_PROFILE``: AWS profile (mainly useful for local development when working with multiple AWS profiles)
- ``AWS_PROFILE``: AWS profile (mainly useful for local development when
working with multiple AWS profiles)
"""
if AWS_PROFILE:
storage_options = {"profile": AWS_PROFILE}
Expand All @@ -77,36 +93,57 @@ def cli(modified_days_ago):
storage_options = dict()
boto3_options = dict()

# Load the S3 manifest of today
click.echo(f"Load the S3 manifest of {date.today()}.")

manifest_parent_key = (
pd.Timestamp.now(tz="utc").date() - pd.Timedelta("1day")
).strftime(f"%Y-%m-%dT{MANIFEST_HOUR_OF_DAY}Z")
# define manifest of today
s3_url = f"{MANIFEST_URL}/{manifest_parent_key}/manifest.json"

click.echo(f"Extract coverage and days to recreate from manifest {s3_url}.")
if modified_days_ago == 0:
modified_days_ago = (pd.Timestamp.now(tz="utc") - S3_BUCKET_CREATION).days + 1
click.echo(
f"Recreate the full set of bucket files (files "
f"modified since {modified_days_ago}days). "
f"This will take a while!"
if path_s3_folder:
click.echo(f"Applying the vpts conversion to all files within {path_s3_folder}. "
f"Ignoring the modified date of the files.")

inbo_s3 = s3fs.S3FileSystem(**storage_options)
odim5_files = inbo_s3.glob(f"{S3_BUCKET}/{path_s3_folder}/**/*.h5")

days_to_create_vpts = (
pd.DataFrame(odim5_files, columns=["file"])
.set_index("file")
.groupby(extract_daily_group_from_path).size().reset_index()
.rename(
columns={
"index": "directory",
0: "file_count",
}
)
)

df_cov, days_to_create_vpts = handle_manifest(
s3_url,
modified_days_ago=f"{modified_days_ago}day",
storage_options=storage_options,
)
else:
# Load the S3 manifest of today
click.echo(f"Load the S3 manifest of {date.today()} to rerun only files modified "
f"since {modified_days_ago} days ago.")

manifest_parent_key = (
pd.Timestamp.now(tz="utc").date() - pd.Timedelta("1day")
).strftime(f"%Y-%m-%dT{MANIFEST_HOUR_OF_DAY}Z")
# define manifest of today
s3_url = f"{MANIFEST_URL}/{manifest_parent_key}/manifest.json"

click.echo(f"Extract coverage and days to recreate from manifest {s3_url}.")
if modified_days_ago == 0:
modified_days_ago = (pd.Timestamp.now(tz="utc") - S3_BUCKET_CREATION).days + 1
click.echo(
f"Recreate the full set of bucket files (files "
f"modified since {modified_days_ago}days). "
f"This will take a while!"
)

# Save coverage file to S3 bucket
click.echo("Save coverage file to S3.")
df_cov["directory"] = df_cov["directory"].str.join("/")
df_cov.to_csv(
f"s3://{S3_BUCKET}/coverage.csv", index=False, storage_options=storage_options
)
df_cov, days_to_create_vpts = handle_manifest(
s3_url,
modified_days_ago=f"{modified_days_ago}day",
storage_options=storage_options,
)

# Save coverage file to S3 bucket
click.echo("Save coverage file to S3.")
df_cov["directory"] = df_cov["directory"].str.join("/")
df_cov.to_csv(
f"s3://{S3_BUCKET}/coverage.csv", index=False, storage_options=storage_options
)

# Run VPTS daily conversion for each radar-day with modified files
inbo_s3 = s3fs.S3FileSystem(**storage_options)
Expand Down
27 changes: 26 additions & 1 deletion src/vptstools/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def from_s3fs_enlisting(cls, h5_file_path):
return cls(
h5_file_path.split("/")[1],
*cls.parse_file_name(str(h5_file_path)),
h5_file_path.split("/")[1],
h5_file_path.split("/")[2],
)

@staticmethod
Expand Down Expand Up @@ -199,6 +199,31 @@ def extract_daily_group_from_inventory(file_path):
path_info.day,
)

def extract_daily_group_from_path(file_path):
"""Extract file name components to define a group
The coverage file counts the number of files available
per group (e.g. daily files per radar). This function is passed
to the Pandas ``groupby`` to translate the file path to a
countable set (e.g. source, radar-code, year month and day for
daily files per radar).
Parameters
----------
file_path : str
File path of the ODIM HDF5 file. Only the file name is taken
into account and a folder-path is ignored.
"""
path_info = OdimFilePath.from_s3fs_enlisting(file_path)
return (
path_info.source,
path_info.file_type,
path_info.radar_code,
path_info.year,
path_info.month,
path_info.day,
)


def _last_modified_from_inventory(df, modified_days_ago="2day"):
"""Filter manifest files on last modified
Expand Down

0 comments on commit 6173b18

Please sign in to comment.