Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reverse sort file list and handle FileNotFound exceptions #415

Merged
merged 7 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/vxingest/builder_common/vx_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def get_file_list(self, df_query, directory, file_pattern, file_mask):
df_elements = list(result)
df_full_names = [element["url"] for element in df_elements]
if pathlib.Path(directory).exists() and pathlib.Path(directory).is_dir():
# the file list is sorted by getmtime so that the oldest files are processed first
file_list = sorted(
pathlib.Path(directory).glob(file_pattern), key=os.path.getmtime
)
Expand Down
2 changes: 1 addition & 1 deletion src/vxingest/ctc_to_cb/ctc_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class CTCBuilder(Builder):
geo.top_left.lon as tl_lon
FROM `{self.bucket}`.{self.scope}.{self.collection}
WHERE type="MD" and docType="region" and subset='COMMON' and version='V01' and name="ALL_HRRR"
use the boubnding box to select stations for the region
use the bounding box to select stations for the region
[
{
"bottom_right": {
Expand Down
19 changes: 17 additions & 2 deletions src/vxingest/grib2_to_cb/grib_builder_parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import cProfile
import logging
import math
import os
import sys
from pathlib import Path
from pstats import Stats
Expand Down Expand Up @@ -51,6 +52,9 @@ def __init__(
self.ds_translate_item_variables_map = None

# self.do_profiling = False - in super
self.do_profiling = os.getenv("PROFILE")
if self.do_profiling:
self.profile_output_path = os.getenv("PROFILE_OUTPUT_DIR")
# set to True to enable build_document profiling

def get_proj_params_from_string(self, proj_string):
Expand Down Expand Up @@ -799,13 +803,15 @@ def build_document(self, queue_element):
if self.do_profiling:
with cProfile.Profile() as _pr:
self.handle_document()
with Path("profiling_stats.txt").open(
with Path(self.profile_output_path / "profiling_stats.txt").open(
"w", encoding="utf-8"
) as stream:
stats = Stats(_pr, stream=stream)
stats.strip_dirs()
stats.sort_stats("time")
stats.dump_stats("profiling_stats.prof")
stats.dump_stats(
self.profile_output_path / "profiling_stats.prof"
)
stats.print_stats()
else:
self.handle_document()
Expand All @@ -826,6 +832,15 @@ def build_document(self, queue_element):
document_map[data_file_doc["id"]] = data_file_doc
self.delete_idx_file(queue_element)
return document_map
except FileNotFoundError:
logger.error(
"%s: Exception with builder build_document: file_name: %s, error: file not found - skipping this file",
self.__class__.__name__,
queue_element,
)
# remove any idx file that may have been created
self.delete_idx_file(queue_element)
return {}
randytpierce marked this conversation as resolved.
Show resolved Hide resolved
except Exception as _e:
logger.exception(
"%s: Exception with builder build_document: file_name: %s, exception %s",
Expand Down
2 changes: 2 additions & 0 deletions src/vxingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ def run_ingest() -> None:
log_queue,
args.log_dir / f"all_logs-{runtime.strftime('%Y-%m-%dT%H:%M:%S%z')}.log",
)
# set profiling output
os.environ["PROFILE_OUTPUT_DIR"] = args.log_dir

logger.info("Getting credentials")
creds = get_credentials(args.credentials_file)
Expand Down
8 changes: 8 additions & 0 deletions src/vxingest/netcdf_to_cb/netcdf_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,14 @@ def build_document(self, queue_element):
)
document_map[data_file_doc["id"]] = data_file_doc
return document_map

except FileNotFoundError:
logger.error(
"%s: Exception with builder build_document: file_name: %s, error: file not found - skipping this file",
self.__class__.__name__,
queue_element,
)
return {}
except Exception as _e:
logger.exception(
"%s: Exception with builder build_document: file_name: %s",
Expand Down
28 changes: 28 additions & 0 deletions tests/vxingest/builder_common/test_unit_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions, ClusterTimeoutOptions, QueryOptions

from vxingest.builder_common.vx_ingest import CommonVxIngest


def connect_cb():
"""
Expand Down Expand Up @@ -43,6 +45,32 @@ def connect_cb():
return cb_connection


@pytest.mark.integration
def test_get_file_list(tmp_path):
vx_ingest = CommonVxIngest()
vx_ingest.credentials_file = os.environ["CREDENTIALS"]
vx_ingest.cb_credentials = vx_ingest.get_credentials(vx_ingest.load_spec)
vx_ingest.connect_cb()
testdata = Path("tests/vxingest/builder_common/testdata/get_file_list_grib2.n1ql")
with testdata.open(mode="r", encoding="utf-8") as file:
_statement = file.read()
with Path(tmp_path / "2128723000010").open("w") as f:
f.write("test")
with Path.open(Path(tmp_path / "2128723000020"), "w") as f:
f.write("test")
with Path.open(Path(tmp_path / "2128723000030"), "w") as f:
f.write("test")
with Path.open(Path(tmp_path, "2128723000040"), "w") as f:
f.write("test")

file_list = vx_ingest.get_file_list(
_statement, tmp_path, "21287230000[0123456789]?", "%y%j%H%f"
)
assert file_list is not None
assert len(file_list) > 0
assert file_list[3] > file_list[2], "file_list is not reverse sorted"


@pytest.mark.integration
def test_stations_fcst_valid_epoch(request):
_expected_time = 10
Expand Down
186 changes: 186 additions & 0 deletions tests/vxingest/utilities/get_data_for_raobs_from_adpupa_dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import sys
from pathlib import Path


def main():
_wmoid = sys.argv[1] if sys.argv and len(sys.argv) > 1 else "65578"
with Path(
f"/opt/data/prepbufr_to_cb/test_artifacts/{_wmoid}_mandatory_values.txt"
).open() as _f:
data = []
qualified = False
row = {}
row_index = None
while line := _f.readline():
# skip empty lines - a qualified empty line means this subset is finished
if not line.strip():
if (
qualified
and row_index is None
and row
and row.get("press") != "null"
):
# if row doesn't exist in data it must be appended
data.append(row)
row = {}
qualified = False
continue
mnemonic = line.split()[0]
if mnemonic == "POB":
qualified = True
_press = round(float(line.split()[1]))
if _press not in [
1000,
850,
700,
500,
400,
300,
250,
200,
150,
100,
70,
50,
30,
20,
]:
# not a standard level that we are interested in
continue
# see if the row is already there (if this is a wind subset the row is already there).
row_index = None
for i, item in enumerate(data):
if "press" in item and item["press"] == _press:
row_index = i
break
row = data[row_index] if row_index is not None else {"press": _press}
continue
if not qualified:
continue
else: # still qualified
try:
match mnemonic:
case "SID":
continue
case "TYP":
continue
case "PQM":
if round(float(line.split()[1])) not in [0, 1, 2]:
# disqualified because of quality marker
# go to next POB
qualified = False
row = {}
continue
case "PPC":
if round(float(line.split()[1])) != 1:
# disqualified because of program code
# go to next POB
qualified = False
row = {}
continue
case "QOB":
if qualified:
row["sh"] = line.split()[1]
continue
case "QQM":
if round(float(line.split()[1])) not in [0, 1, 2, 9, 15]:
# disqualified because of quality marker
row["sh"] = None
continue
case "QPC":
if round(float(line.split()[1])) != 1:
# disqualified because of program code
row["sh"] = None
continue
case "ZOB":
row["z"] = line.split()[1]
continue
case "ZQM":
if round(float(line.split()[1])) not in [0, 1, 2]:
# disqualified because of quality marker
row["z"] = None
continue
case "ZPC":
if round(float(line.split()[1])) != 1:
# disqualified because of program code
row["z"] = None
continue
case "TOB":
row["t"] = line.split()[1]
continue
case "TQM":
if round(float(line.split()[1])) not in [0, 1, 2]:
# disqualified because of quality marker
row["t"] = None
continue
case "TPC":
if round(float(line.split()[1])) != 1:
# disqualified because of program code
row["t"] = None
continue
case "TDO":
# does not need to be qualified
row["dp"] = line.split()[1]
continue
case "DDO":
row["wd"] = line.split()[1]
continue
case "FFO":
row["ws"] = line.split()[1]
continue
case "DFQ":
if round(float(line.split()[1])) not in [0, 1, 2]:
# disqualified because of quality marker
row["wd"] = None
row["ws"] = None
continue
case "DFP":
if round(float(line.split()[1])) != 1:
# disqualified because of program code
row["wd"] = None
row["ws"] = None
continue
case _:
print(f"Unknown mnemonic {mnemonic}")
continue
except Exception as e:
print(f"Error: {e}")
table = [
[
"press",
"z",
"t",
"dp",
"wd",
"ws",
"ws(FFO)",
]
]
try:
for row in data:
if row.get("press") is None:
continue
table.append(
[
row.get("press", "null"),
row.get("z", "null"),
row.get("t", "null"),
row.get("dp", "null"),
row.get("wd", "null"),
"...",
row.get("ws", "null"),
]
)
# out_table = tabulate(table, headers="firstrow", tablefmt="plain")
# # Split the table into separate columns with spaces
# columns = out_table.split('\t')
print(f"data-dump-{_wmoid}")
for row in table:
print(" ".join(list(map(str, row))))
# print(" ".join(columns))
except Exception as e:
print(f"Error: {e}")


if __name__ == "__main__":
main()
Loading