Skip to content

Commit

Permalink
221-investigate-replacing-pygrib-with-cfgribprobably-with-xarray-for-…
Browse files Browse the repository at this point in the history
…the-ingest-processes (#230)

Implement cfgrib and do some refactoring. Remove tests that are no
longer in context. Prepare to be able to ingest Specific humidity and
vegetation type. Add some more intense grib_to_cb tests and utliities
for data inntegrity.
  • Loading branch information
randytpierce authored Sep 19, 2023
2 parents d9cc19d + 2090a00 commit 9bcda74
Show file tree
Hide file tree
Showing 155 changed files with 2,986 additions and 382,941 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ This is a sample cron entry for the import process.
*/2 * * * * /home/amb-verif/VxIngest/scripts/VXingest_utilities/run-import.sh -c /home/amb-verif/adb-cb1-credentials -d /home/amb-verif/VxIngest -l /data/temp -m /data/common/job_metrics -t /data/temp_tar > /home/amb-verif/logs/cron-import-`date +\%s`.out 2>&1
```

At some point in time (currently in a cron job) the transfer tar files will be moved to the actual import load directory on the import server. The load directory is specified on the import job with the "-l load dir" parameter. It should be noted that the ingest transfer directory and the import load directory may very well be on different servers.
At some point in time (currently in a cron job) the transfer tar files will be moved to the actual import load directory on the import server. The load directory is specified on the import job with the "-l load dir" parameter. It should be noted that the ingest transfer directory and the import load directory may very well be on different servers.

The import routine will unbundle any tar files found in the import load directory, one file at a time, and place the contents (the output files and the ingest log file) into a temporary directory tree the root of which is specified in the -t "temp_dir" parameter. The temporary directory is named after the PID of the import process.

Expand Down
4 changes: 1 addition & 3 deletions builder_common/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ def create_data_file_id(self, subset, file_type, origin_type, file_name):
"""
try:
base_name = os.path.basename(file_name)
an_id = "DF:{s}:{f}:{o}:{b}".format(
s=subset, f=file_type, o=origin_type, b=base_name
)
an_id = f"DF:{subset}:{file_type}:{origin_type}:{base_name}"
return an_id
except Exception as _e: # pylint: disable=broad-except
logging.exception("%s create_data_file_id", self.__class__.__name__)
Expand Down
21 changes: 9 additions & 12 deletions builder_common/ingest_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
import json
from pathlib import Path
from couchbase.exceptions import TimeoutException
from couchbase.cluster import Cluster, ClusterOptions, ClusterTimeoutOptions
from couchbase_core.cluster import PasswordAuthenticator


from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions, ClusterTimeoutOptions
class CommonVxIngestManager(Process): # pylint:disable=too-many-instance-attributes
"""
IngestManager is a Process Thread that manages an object pool of
Expand Down Expand Up @@ -73,7 +72,7 @@ def close_cb(self):
close couchbase connection
"""
if self.cluster:
self.cluster.disconnect()
self.cluster.close()
self.cluster = None
self.collection = None

Expand Down Expand Up @@ -246,15 +245,13 @@ def write_document_to_files(self, file_name, document_map):
num_documents,
complete_file_name,
)
_f = open(complete_file_name, "w")
_f = open(complete_file_name, "w", encoding="utf-8")
# we need to write out a list of the values of the _document_map for cbimport
_f.write(json.dumps(list(document_map.values())))
json_data = json.dumps(list(document_map.values()))
_f.write(json_data)
_f.close()
except Exception as _e1: # pylint:disable=broad-except
logging.exception("write_document_to_files - trying write: Got Exception")
logging.exception("write_document_to_files - trying write: Got Exception %s", str(_e1))
except Exception as _e: # pylint:disable=broad-except
logging.exception(
": *** %s Error writing to files: in process_element writing document***",
self.thread_name,
)
logging.exception(": *** {self.thread_name} Error writing to files: in process_element writing document*** %s", str(_e))
raise _e
12 changes: 6 additions & 6 deletions builder_common/load_backup_ingest_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
from pathlib import Path

import yaml
from couchbase.cluster import Cluster, ClusterOptions
from couchbase_core.cluster import PasswordAuthenticator

from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions

def parse_args(args):
"""parse command line arguments"""
Expand Down Expand Up @@ -62,7 +62,7 @@ def run(self, args):
+ credentials_file
+ " can not be found!"
)
_f = open(credentials_file)
_f = open(credentials_file, encoding="utf-8")
yaml_data = yaml.load(_f, yaml.SafeLoader)
self.cb_credentials["host"] = yaml_data["cb_host"]
self.cb_credentials["user"] = yaml_data["cb_user"]
Expand All @@ -71,7 +71,7 @@ def run(self, args):

f_name = args["file_name"]
# Opening JSON file
_f = open(f_name)
_f = open(f_name, encoding="utf-8")
# returns JSON object as
# a dictionary
list_data = json.load(_f)
Expand All @@ -92,7 +92,7 @@ def run(self, args):
def close_cb(self):
"""close the cluster"""
if self.cluster:
self.cluster.disconnect()
self.cluster.close()

def connect_cb(self):
"""Connect to database"""
Expand Down
2 changes: 1 addition & 1 deletion builder_common/test/get_file_list_grib2.n1ql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
SELECT url, mtime
FROM `vxdata`._default.METAR
WHERE
subset='metar'
subset='METAR'
AND type='DF'
AND fileType='grib2'
AND originType='model'
Expand Down
125 changes: 18 additions & 107 deletions builder_common/test/test_unit_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from pathlib import Path
from datetime import timedelta
import yaml
from couchbase.cluster import Cluster, ClusterOptions, ClusterTimeoutOptions, QueryOptions
from couchbase_core.cluster import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions, QueryOptions, ClusterTimeoutOptions
from couchbase.auth import PasswordAuthenticator


def connect_cb():
Expand All @@ -14,13 +15,13 @@ def connect_cb():
# noinspection PyBroadException
try:
try:
cb_connection # is it defined
cb_connection # pylint: disable=used-before-assignment
except NameError:
credentials_file = os.environ["HOME"] + "/adb-cb1-credentials"
assert (
Path(credentials_file).is_file() is True
), f"*** credentials_file file {credentials_file} can not be found!"
_f = open(credentials_file)
_f = open(credentials_file, "r", encoding="utf-8")
_yaml_data = yaml.load(_f, yaml.SafeLoader)
cb_connection = {}
cb_connection["host"] = _yaml_data["cb_host"]
Expand All @@ -44,49 +45,18 @@ def connect_cb():
assert False, f"test_unit_queries Exception failure connecting: {_e}"


def test_map_station_query_no_let(request):
"""test"""
try:
_expected_time = 12
_name = request.node.name
_statement = open("./builder_common/test/map_station_query_no_let.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"


def test_map_station_query(request):
"""test"""
try:
_expected_time = 12
_name = request.node.name
_statement = open("./builder_common/test/map_station_query.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"


def test_stations_fcst_valid_epoch(request):
"""test"""
try:
_expected_time = 10
_name = request.node.name
_statement = open("./builder_common/test/stations_fcst_valid_epoch.n1ql").read()
_statement = open("./builder_common/test/stations_fcst_valid_epoch.n1ql", encoding="utf-8").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
# have to read the rows before we can get to the metadata as of couchbase 4.1
_rows = list(result.rows())
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"
Expand All @@ -97,12 +67,13 @@ def test_stations_get_file_list_grib2(request):
try:
_expected_time = 10
_name = request.node.name
_statement = open("./builder_common/test/get_file_list_grib2.n1ql").read()
_statement = open("./builder_common/test/get_file_list_grib2.n1ql", encoding="utf-8").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
# have to read the rows before we can get to the metadata as of couchbase 4.1
_rows = list(result.rows())
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"
Expand All @@ -112,89 +83,29 @@ def test_stations_get_file_list_netcdf(request):
try:
_expected_time = 5
_name = request.node.name
_statement = open("./builder_common/test/get_file_list_netcdf.n1ql").read()
_statement = open("./builder_common/test/get_file_list_netcdf.n1ql", encoding="utf-8").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
# have to read the rows before we can get to the metadata as of couchbase 4.1
_rows = list(result.rows())
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"

def test_METAR_count(request):
def test_metar_count(request):
"""test"""
try:
_expected_time = 0.05
_name = request.node.name
_statement = open("./builder_common/test/METAR_count.n1ql").read()
_statement = open("./builder_common/test/METAR_count.n1ql", encoding="utf-8").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
# have to read the rows before we can get to the metadata as of couchbase 4.1
_rows = list(result.rows())
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"

def test_final_DieOff(request):
"""test"""
try:
_expected_time = 120
_name = request.node.name
_statement = open("./builder_common/test/final_DieOff.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None, f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"

def test_final_Map(request):
"""test"""
try:
_expected_time = 10
_name = request.node.name
_statement = open("./builder_common/test/final_Map.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None, f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"


def test_final_TimeSeries(request):
"""test"""
try:
_expected_time = 10
_name = request.node.name
_statement = open("./builder_common/test/final_TimeSeries.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"

def test_final_ValidTime(request):
"""test"""
try:
_expected_time = 10
_name = request.node.name
_statement = open("./builder_common/test/final_ValidTime.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"

22 changes: 9 additions & 13 deletions builder_common/vx_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
from pathlib import Path
from datetime import timedelta
import yaml
from couchbase.cluster import Cluster, ClusterOptions, ClusterTimeoutOptions
from couchbase_core.cluster import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions, ClusterTimeoutOptions

class CommonVxIngest: # pylint: disable=too-many-arguments disable=too-many-instance-attributes
"""
Expand Down Expand Up @@ -73,7 +74,7 @@ def write_load_job_to_files(self):
try:
file_name = self.load_job_id + ".json"
complete_file_name = os.path.join(self.output_dir, file_name)
_f = open(complete_file_name, "w")
_f = open(complete_file_name, "w", encoding="utf-8")
_f.write(json.dumps([self.load_spec["load_job_doc"]]))
_f.close()
except Exception as _e: # pylint: disable=broad-except
Expand All @@ -92,16 +93,11 @@ def build_load_job_doc(self, lineage):
git_hash = stream.read().strip()
_document_id = (
self.load_spec["ingest_document_ids"][0]
if "ingest_document_ids" in self.load_spec.keys()
if "ingest_document_ids" in self.load_spec
else None
)
subset = _document_id.split(":")[2]
self.load_job_id = "LJ:{s}:{m}:{c}:{t}".format(
s=subset,
m=self.__module__,
c=self.__class__.__name__,
t=str(int(time.time())),
)
self.load_job_id = f"LJ:{subset}:{self.__module__}:{self.__class__.__name__}:{str(int(time.time()))}"
lj_doc = {
"id": self.load_job_id,
"subset": subset,
Expand All @@ -119,7 +115,7 @@ def close_cb(self):
close couchbase connection
"""
if self.cluster:
self.cluster.disconnect()
self.cluster.close()

def connect_cb(self):
"""
Expand All @@ -144,7 +140,7 @@ def connect_cb(self):
self.load_spec["cb_credentials"] = self.cb_credentials
logging.info("%s: Couchbase connection success")
except Exception as _e: # pylint:disable=broad-except
logging.exception("*** builder_common.CommonVxIngest Error in connect_cb ***")
logging.exception("*** builder_common.CommonVxIngest Error in connect_cb *** %s", str(_e))
sys.exit("*** builder_common.CommonVxIngest Error when connecting to cb database: ")

def get_file_list(self, df_query, directory, file_pattern):
Expand Down Expand Up @@ -241,7 +237,7 @@ def get_credentials(self, load_spec):
+ self.credentials_file
+ " can not be found!"
)
_f = open(self.credentials_file)
_f = open(self.credentials_file, encoding="utf-8")
_yaml_data = yaml.load(_f, yaml.SafeLoader)
load_spec["cb_connection"] = {}
load_spec["cb_connection"]["host"] = _yaml_data["cb_host"]
Expand Down
2 changes: 1 addition & 1 deletion classic_sql_to_cb/run_sql_ingest_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def get_credentials(self, load_spec):
# check for existence of file
if not Path(self.credentials_file).is_file():
sys.exit("*** credentials_file file " + self.credentials_file + " can not be found!")
_f = open(self.credentials_file)
_f = open(self.credentials_file, encoding='utf-8')
_yaml_data = yaml.load(_f, yaml.SafeLoader)
load_spec['cb_connection']['host'] = _yaml_data['cb_host']
load_spec['cb_connection']['user'] = _yaml_data['cb_user']
Expand Down
Loading

0 comments on commit 9bcda74

Please sign in to comment.