Skip to content

Commit

Permalink
Add new cli option to use an s3 path instead of the modified date
Browse files Browse the repository at this point in the history
  • Loading branch information
stijnvanhoey committed Sep 18, 2023
1 parent a375d7a commit 7da453c
Showing 1 changed file with 63 additions and 30 deletions.
93 changes: 63 additions & 30 deletions src/vptstools/bin/vph5_to_vpts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pandas as pd

from vptstools.vpts import vpts, vpts_to_csv
from vptstools.s3 import handle_manifest, OdimFilePath
from vptstools.s3 import handle_manifest, OdimFilePath, extract_daily_group_from_path
from vptstools.bin.click_exception import catch_all_exceptions, report_click_exception_to_sns

# Load environmental variables from file in dev
Expand Down Expand Up @@ -48,7 +48,14 @@
help="Range of HDF5 VP files to include, i.e. files modified between now and N"
"modified-days-ago. If 0, all HDF5 files in the bucket will be included.",
)
def cli(modified_days_ago):
@click.option(
"--path-s3-folder",
"path_s3_folder",
type=str,
help="Apply the conversion to VPTS to all files within a S3 sub-folders instead "
"of using the modified date of the files. This option does not use the inventory files."
)
def cli(modified_days_ago, path_s3_folder=None):
"""Convert and aggregate HDF5 VP files to daily and monthly VPTS CSV files on S3 bucket
Check the latest modified
Expand All @@ -59,12 +66,17 @@ def cli(modified_days_ago):
HDF5 files were recently added and convert those files from ODIM bird profile to the
`VPTS CSV format <https://github.com/enram/vpts-csv>`_. Finally, upload the generated daily/monthly VPTS files to S3.
When using the `path_s3_folder` option, the modified date is not used, but e recursive search within the given s3
path is applied to define the daily/monthly files to recreate.
E.g. `vph5_to_vpts --path-s3-folder uva/hdf5/nldhl/2019` or
`vph5_to_vpts --path-s3-folder baltrad/hdf5/bejab/2022/10`.
Besides, while scanning the S3 inventory to define the files to convert,
the CLI routine creates the ``coverage.csv`` file and uploads it to the bucket.
Configuration is loaded from the following environmental variables:
- ``S3_BUCKET``: AWS S3 bucket to read and write data to
- ``DESTINATION_BUCKET``: AWS S3 bucket to read and write data to
- ``INVENTORY_BUCKET``: AWS S3 bucket configured as `s3 inventory bucket <https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-inventory.html>`_ for the S3_BUCKET.
- ``SNS_TOPIC``: AWS SNS topic to report when routine fails
- ``AWS_REGION``: AWS region where the SNS alerting is defined
Expand All @@ -77,36 +89,57 @@ def cli(modified_days_ago):
storage_options = dict()
boto3_options = dict()

# Load the S3 manifest of today
click.echo(f"Load the S3 manifest of {date.today()}.")

manifest_parent_key = (
pd.Timestamp.now(tz="utc").date() - pd.Timedelta("1day")
).strftime(f"%Y-%m-%dT{MANIFEST_HOUR_OF_DAY}Z")
# define manifest of today
s3_url = f"{MANIFEST_URL}/{manifest_parent_key}/manifest.json"

click.echo(f"Extract coverage and days to recreate from manifest {s3_url}.")
if modified_days_ago == 0:
modified_days_ago = (pd.Timestamp.now(tz="utc") - S3_BUCKET_CREATION).days + 1
click.echo(
f"Recreate the full set of bucket files (files "
f"modified since {modified_days_ago}days). "
f"This will take a while!"
if path_s3_folder:
click.echo(f"Applying the vpts conversion to all files within {path_s3_folder}. "
f"Ignoring the modified date of the files.")

inbo_s3 = s3fs.S3FileSystem(**storage_options)
odim5_files = inbo_s3.glob(f"{S3_BUCKET}/{path_s3_folder}/**/*.h5")

days_to_create_vpts = (
pd.DataFrame(odim5_files, columns=["file"])
.set_index("file")
.groupby(extract_daily_group_from_path).size().reset_index()
.rename(
columns={
"index": "directory",
0: "file_count",
}
)
)

df_cov, days_to_create_vpts = handle_manifest(
s3_url,
modified_days_ago=f"{modified_days_ago}day",
storage_options=storage_options,
)
else:
# Load the S3 manifest of today
click.echo(f"Load the S3 manifest of {date.today()} to rerun only files modified "
f"since {modified_days_ago} days ago.")

manifest_parent_key = (
pd.Timestamp.now(tz="utc").date() - pd.Timedelta("1day")
).strftime(f"%Y-%m-%dT{MANIFEST_HOUR_OF_DAY}Z")
# define manifest of today
s3_url = f"{MANIFEST_URL}/{manifest_parent_key}/manifest.json"

click.echo(f"Extract coverage and days to recreate from manifest {s3_url}.")
if modified_days_ago == 0:
modified_days_ago = (pd.Timestamp.now(tz="utc") - S3_BUCKET_CREATION).days + 1
click.echo(
f"Recreate the full set of bucket files (files "
f"modified since {modified_days_ago}days). "
f"This will take a while!"
)

# Save coverage file to S3 bucket
click.echo("Save coverage file to S3.")
df_cov["directory"] = df_cov["directory"].str.join("/")
df_cov.to_csv(
f"s3://{S3_BUCKET}/coverage.csv", index=False, storage_options=storage_options
)
df_cov, days_to_create_vpts = handle_manifest(
s3_url,
modified_days_ago=f"{modified_days_ago}day",
storage_options=storage_options,
)

# Save coverage file to S3 bucket
click.echo("Save coverage file to S3.")
df_cov["directory"] = df_cov["directory"].str.join("/")
df_cov.to_csv(
f"s3://{S3_BUCKET}/coverage.csv", index=False, storage_options=storage_options
)

# Run VPTS daily conversion for each radar-day with modified files
inbo_s3 = s3fs.S3FileSystem(**storage_options)
Expand Down

0 comments on commit 7da453c

Please sign in to comment.