diff --git a/src/vxingest/builder_common/vx_ingest.py b/src/vxingest/builder_common/vx_ingest.py index f4cfe892..fe843b6f 100644 --- a/src/vxingest/builder_common/vx_ingest.py +++ b/src/vxingest/builder_common/vx_ingest.py @@ -198,6 +198,7 @@ def get_file_list(self, df_query, directory, file_pattern, file_mask): df_elements = list(result) df_full_names = [element["url"] for element in df_elements] if pathlib.Path(directory).exists() and pathlib.Path(directory).is_dir(): + # the file list is sorted by getmtime so that the oldest files are processed first file_list = sorted( pathlib.Path(directory).glob(file_pattern), key=os.path.getmtime ) diff --git a/src/vxingest/ctc_to_cb/ctc_builder.py b/src/vxingest/ctc_to_cb/ctc_builder.py index ba438054..97ce8ca2 100644 --- a/src/vxingest/ctc_to_cb/ctc_builder.py +++ b/src/vxingest/ctc_to_cb/ctc_builder.py @@ -42,7 +42,7 @@ class CTCBuilder(Builder): geo.top_left.lon as tl_lon FROM `{self.bucket}`.{self.scope}.{self.collection} WHERE type="MD" and docType="region" and subset='COMMON' and version='V01' and name="ALL_HRRR" - use the boubnding box to select stations for the region + use the bounding box to select stations for the region [ { "bottom_right": { diff --git a/src/vxingest/grib2_to_cb/grib_builder_parent.py b/src/vxingest/grib2_to_cb/grib_builder_parent.py index e755f5c6..1e92b8fe 100644 --- a/src/vxingest/grib2_to_cb/grib_builder_parent.py +++ b/src/vxingest/grib2_to_cb/grib_builder_parent.py @@ -10,6 +10,7 @@ import cProfile import logging import math +import os import sys from pathlib import Path from pstats import Stats @@ -51,6 +52,9 @@ def __init__( self.ds_translate_item_variables_map = None # self.do_profiling = False - in super + self.do_profiling = os.getenv("PROFILE") + if self.do_profiling: + self.profile_output_path = os.getenv("PROFILE_OUTPUT_DIR") # set to True to enable build_document profiling def get_proj_params_from_string(self, proj_string): @@ -799,13 +803,15 @@ def build_document(self, queue_element): if self.do_profiling: with cProfile.Profile() as _pr: self.handle_document() - with Path("profiling_stats.txt").open( + with Path(self.profile_output_path / "profiling_stats.txt").open( "w", encoding="utf-8" ) as stream: stats = Stats(_pr, stream=stream) stats.strip_dirs() stats.sort_stats("time") - stats.dump_stats("profiling_stats.prof") + stats.dump_stats( + self.profile_output_path / "profiling_stats.prof" + ) stats.print_stats() else: self.handle_document() @@ -826,6 +832,15 @@ def build_document(self, queue_element): document_map[data_file_doc["id"]] = data_file_doc self.delete_idx_file(queue_element) return document_map + except FileNotFoundError: + logger.error( + "%s: Exception with builder build_document: file_name: %s, error: file not found - skipping this file", + self.__class__.__name__, + queue_element, + ) + # remove any idx file that may have been created + self.delete_idx_file(queue_element) + return {} except Exception as _e: logger.exception( "%s: Exception with builder build_document: file_name: %s, exception %s", diff --git a/src/vxingest/main.py b/src/vxingest/main.py index bf18c6c2..6af80386 100644 --- a/src/vxingest/main.py +++ b/src/vxingest/main.py @@ -507,6 +507,8 @@ def run_ingest() -> None: log_queue, args.log_dir / f"all_logs-{runtime.strftime('%Y-%m-%dT%H:%M:%S%z')}.log", ) + # set profiling output + os.environ["PROFILE_OUTPUT_DIR"] = args.log_dir logger.info("Getting credentials") creds = get_credentials(args.credentials_file) diff --git a/src/vxingest/netcdf_to_cb/netcdf_builder.py b/src/vxingest/netcdf_to_cb/netcdf_builder.py index 2e3a446d..9f7fd9d5 100644 --- a/src/vxingest/netcdf_to_cb/netcdf_builder.py +++ b/src/vxingest/netcdf_to_cb/netcdf_builder.py @@ -382,6 +382,14 @@ def build_document(self, queue_element): ) document_map[data_file_doc["id"]] = data_file_doc return document_map + + except FileNotFoundError: + logger.error( + "%s: Exception with builder build_document: file_name: %s, error: file not found - skipping this file", + self.__class__.__name__, + queue_element, + ) + return {} except Exception as _e: logger.exception( "%s: Exception with builder build_document: file_name: %s", diff --git a/tests/vxingest/builder_common/test_unit_queries.py b/tests/vxingest/builder_common/test_unit_queries.py index 9659052f..0be8160e 100644 --- a/tests/vxingest/builder_common/test_unit_queries.py +++ b/tests/vxingest/builder_common/test_unit_queries.py @@ -8,6 +8,8 @@ from couchbase.cluster import Cluster from couchbase.options import ClusterOptions, ClusterTimeoutOptions, QueryOptions +from vxingest.builder_common.vx_ingest import CommonVxIngest + def connect_cb(): """ @@ -43,6 +45,32 @@ def connect_cb(): return cb_connection +@pytest.mark.integration +def test_get_file_list(tmp_path): + vx_ingest = CommonVxIngest() + vx_ingest.credentials_file = os.environ["CREDENTIALS"] + vx_ingest.cb_credentials = vx_ingest.get_credentials(vx_ingest.load_spec) + vx_ingest.connect_cb() + testdata = Path("tests/vxingest/builder_common/testdata/get_file_list_grib2.n1ql") + with testdata.open(mode="r", encoding="utf-8") as file: + _statement = file.read() + with Path(tmp_path / "2128723000010").open("w") as f: + f.write("test") + with Path.open(Path(tmp_path / "2128723000020"), "w") as f: + f.write("test") + with Path.open(Path(tmp_path / "2128723000030"), "w") as f: + f.write("test") + with Path.open(Path(tmp_path, "2128723000040"), "w") as f: + f.write("test") + + file_list = vx_ingest.get_file_list( + _statement, tmp_path, "21287230000[0123456789]?", "%y%j%H%f" + ) + assert file_list is not None + assert len(file_list) > 0 + assert file_list[3] > file_list[2], "file_list is not reverse sorted" + + @pytest.mark.integration def test_stations_fcst_valid_epoch(request): _expected_time = 10 diff --git a/tests/vxingest/utilities/get_data_for_raobs_from_adpupa_dump.py b/tests/vxingest/utilities/get_data_for_raobs_from_adpupa_dump.py new file mode 100644 index 00000000..34e908d3 --- /dev/null +++ b/tests/vxingest/utilities/get_data_for_raobs_from_adpupa_dump.py @@ -0,0 +1,186 @@ +import sys +from pathlib import Path + + +def main(): + _wmoid = sys.argv[1] if sys.argv and len(sys.argv) > 1 else "65578" + with Path( + f"/opt/data/prepbufr_to_cb/test_artifacts/{_wmoid}_mandatory_values.txt" + ).open() as _f: + data = [] + qualified = False + row = {} + row_index = None + while line := _f.readline(): + # skip empty lines - a qualified empty line means this subset is finished + if not line.strip(): + if ( + qualified + and row_index is None + and row + and row.get("press") != "null" + ): + # if row doesn't exist in data it must be appended + data.append(row) + row = {} + qualified = False + continue + mnemonic = line.split()[0] + if mnemonic == "POB": + qualified = True + _press = round(float(line.split()[1])) + if _press not in [ + 1000, + 850, + 700, + 500, + 400, + 300, + 250, + 200, + 150, + 100, + 70, + 50, + 30, + 20, + ]: + # not a standard level that we are interested in + continue + # see if the row is already there (if this is a wind subset the row is already there). + row_index = None + for i, item in enumerate(data): + if "press" in item and item["press"] == _press: + row_index = i + break + row = data[row_index] if row_index is not None else {"press": _press} + continue + if not qualified: + continue + else: # still qualified + try: + match mnemonic: + case "SID": + continue + case "TYP": + continue + case "PQM": + if round(float(line.split()[1])) not in [0, 1, 2]: + # disqualified because of quality marker + # go to next POB + qualified = False + row = {} + continue + case "PPC": + if round(float(line.split()[1])) != 1: + # disqualified because of program code + # go to next POB + qualified = False + row = {} + continue + case "QOB": + if qualified: + row["sh"] = line.split()[1] + continue + case "QQM": + if round(float(line.split()[1])) not in [0, 1, 2, 9, 15]: + # disqualified because of quality marker + row["sh"] = None + continue + case "QPC": + if round(float(line.split()[1])) != 1: + # disqualified because of program code + row["sh"] = None + continue + case "ZOB": + row["z"] = line.split()[1] + continue + case "ZQM": + if round(float(line.split()[1])) not in [0, 1, 2]: + # disqualified because of quality marker + row["z"] = None + continue + case "ZPC": + if round(float(line.split()[1])) != 1: + # disqualified because of program code + row["z"] = None + continue + case "TOB": + row["t"] = line.split()[1] + continue + case "TQM": + if round(float(line.split()[1])) not in [0, 1, 2]: + # disqualified because of quality marker + row["t"] = None + continue + case "TPC": + if round(float(line.split()[1])) != 1: + # disqualified because of program code + row["t"] = None + continue + case "TDO": + # does not need to be qualified + row["dp"] = line.split()[1] + continue + case "DDO": + row["wd"] = line.split()[1] + continue + case "FFO": + row["ws"] = line.split()[1] + continue + case "DFQ": + if round(float(line.split()[1])) not in [0, 1, 2]: + # disqualified because of quality marker + row["wd"] = None + row["ws"] = None + continue + case "DFP": + if round(float(line.split()[1])) != 1: + # disqualified because of program code + row["wd"] = None + row["ws"] = None + continue + case _: + print(f"Unknown mnemonic {mnemonic}") + continue + except Exception as e: + print(f"Error: {e}") + table = [ + [ + "press", + "z", + "t", + "dp", + "wd", + "ws", + "ws(FFO)", + ] + ] + try: + for row in data: + if row.get("press") is None: + continue + table.append( + [ + row.get("press", "null"), + row.get("z", "null"), + row.get("t", "null"), + row.get("dp", "null"), + row.get("wd", "null"), + "...", + row.get("ws", "null"), + ] + ) + # out_table = tabulate(table, headers="firstrow", tablefmt="plain") + # # Split the table into separate columns with spaces + # columns = out_table.split('\t') + print(f"data-dump-{_wmoid}") + for row in table: + print(" ".join(list(map(str, row)))) + # print(" ".join(columns)) + except Exception as e: + print(f"Error: {e}") + + +if __name__ == "__main__": + main() diff --git a/tests/vxingest/utilities/get_data_for_raobs_from_cb.py b/tests/vxingest/utilities/get_data_for_raobs_from_cb.py new file mode 100644 index 00000000..882a0b9a --- /dev/null +++ b/tests/vxingest/utilities/get_data_for_raobs_from_cb.py @@ -0,0 +1,91 @@ +import os +import sys +from pathlib import Path + +import yaml +from couchbase.auth import PasswordAuthenticator +from couchbase.cluster import Cluster +from couchbase.options import ClusterOptions +from tabulate import tabulate + + +def connect_cb(): + """ + create a couchbase connection and maintain the collection and cluster objects. + """ + credentials_file = os.environ["CREDENTIALS"] + assert ( + Path(credentials_file).is_file() is True + ), f"*** credentials_file file {credentials_file} can not be found!" + with Path(credentials_file).open(encoding="utf-8") as _f: + _yaml_data = yaml.load(_f, yaml.SafeLoader) + cb_connection = {} + cb_connection["host"] = _yaml_data["cb_host"] + cb_connection["user"] = _yaml_data["cb_user"] + cb_connection["password"] = _yaml_data["cb_password"] + cb_connection["bucket"] = _yaml_data["cb_bucket"] + cb_connection["collection"] = _yaml_data["cb_collection"] + cb_connection["scope"] = _yaml_data["cb_scope"] + # I really want the RAOB collection + cb_connection["collection"] = "RAOB" + + options = ClusterOptions( + PasswordAuthenticator(cb_connection["user"], cb_connection["password"]) + ) + cb_connection["cluster"] = Cluster(cb_connection["host"], options) + cb_connection["collection"] = ( + cb_connection["cluster"] + .bucket(cb_connection["bucket"]) + .collection(cb_connection["collection"]) + ) + return cb_connection + + +def main(): + wmoid = sys.argv[1] + _statement = f"""SELECT + d.data.['{wmoid}'].['pressure'] press, + d.data.['{wmoid}'].['height'] z, + ROUND(d.data.['{wmoid}'].['temperature'],4) t, + ROUND(d.data.['{wmoid}'].['dewpoint'],4) dp, + ROUND(d.data.['{wmoid}'].['relative_humidity'],4) rh, + d.data.['{wmoid}'].['wind_direction'] wd, + d.data.['{wmoid}'].['wind_speed'] ws + FROM vxdata._default.RAOB AS d + WHERE type='DD' + AND subset='RAOB' + AND docType='obs' + AND subDocType = 'prepbufr' + AND fcstValidISO = '2024-07-31T00:00:00Z' + AND d.data.['{wmoid}'].['pressure'] IN [1000,850,700,500,400,300,250,200,150,100,70,50,30,20] + ORDER BY d.data.['{wmoid}'].['pressure'] DESC;""" + data_iter = connect_cb()["cluster"].query(_statement) + table = [ + [ + "press", + "z", + "t", + "dp", + "rh", + "wd", + "ws", + ] + ] + + for row in data_iter.rows(): + table.append( + [ + row["press"] if row["press"] != "null" else "null", + row["z"] if row["z"] else "null", + row["t"] if row["t"] else "null", + row["dp"] if row["dp"] else "null", + row["rh"] if row["rh"] else "null", + row["wd"] if row["wd"] else "null", + row["ws"] if row["ws"] else "null", + ] + ) + print(tabulate(table, headers="firstrow", tablefmt="plain")) + + +if __name__ == "__main__": + main()