Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

221-investigate-replacing-pygrib-with-cfgribprobably-with-xarray-for-the-ingest-processes #230

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions builder_common/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ def create_data_file_id(self, subset, file_type, origin_type, file_name):
"""
try:
base_name = os.path.basename(file_name)
an_id = "DF:{s}:{f}:{o}:{b}".format(
s=subset, f=file_type, o=origin_type, b=base_name
)
an_id = f"DF:{subset}:{file_type}:{origin_type}:{base_name}"
return an_id
except Exception as _e: # pylint: disable=broad-except
logging.exception("%s create_data_file_id", self.__class__.__name__)
Expand Down
21 changes: 9 additions & 12 deletions builder_common/ingest_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
import json
from pathlib import Path
from couchbase.exceptions import TimeoutException
from couchbase.cluster import Cluster, ClusterOptions, ClusterTimeoutOptions
from couchbase_core.cluster import PasswordAuthenticator


from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions, ClusterTimeoutOptions
class CommonVxIngestManager(Process): # pylint:disable=too-many-instance-attributes
"""
IngestManager is a Process Thread that manages an object pool of
Expand Down Expand Up @@ -73,7 +72,7 @@ def close_cb(self):
close couchbase connection
"""
if self.cluster:
self.cluster.disconnect()
self.cluster.close()
self.cluster = None
self.collection = None

Expand Down Expand Up @@ -246,15 +245,13 @@ def write_document_to_files(self, file_name, document_map):
num_documents,
complete_file_name,
)
_f = open(complete_file_name, "w")
_f = open(complete_file_name, "w", encoding="utf-8")
# we need to write out a list of the values of the _document_map for cbimport
_f.write(json.dumps(list(document_map.values())))
json_data = json.dumps(list(document_map.values()))
_f.write(json_data)
_f.close()
except Exception as _e1: # pylint:disable=broad-except
logging.exception("write_document_to_files - trying write: Got Exception")
logging.exception("write_document_to_files - trying write: Got Exception %s", str(_e1))
except Exception as _e: # pylint:disable=broad-except
logging.exception(
": *** %s Error writing to files: in process_element writing document***",
self.thread_name,
)
logging.exception(": *** {self.thread_name} Error writing to files: in process_element writing document*** %s", str(_e))
raise _e
12 changes: 6 additions & 6 deletions builder_common/load_backup_ingest_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
from pathlib import Path

import yaml
from couchbase.cluster import Cluster, ClusterOptions
from couchbase_core.cluster import PasswordAuthenticator

from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions

def parse_args(args):
"""parse command line arguments"""
Expand Down Expand Up @@ -62,7 +62,7 @@ def run(self, args):
+ credentials_file
+ " can not be found!"
)
_f = open(credentials_file)
_f = open(credentials_file, encoding="utf-8")
yaml_data = yaml.load(_f, yaml.SafeLoader)
self.cb_credentials["host"] = yaml_data["cb_host"]
self.cb_credentials["user"] = yaml_data["cb_user"]
Expand All @@ -71,7 +71,7 @@ def run(self, args):

f_name = args["file_name"]
# Opening JSON file
_f = open(f_name)
_f = open(f_name, encoding="utf-8")
# returns JSON object as
# a dictionary
list_data = json.load(_f)
Expand All @@ -92,7 +92,7 @@ def run(self, args):
def close_cb(self):
"""close the cluster"""
if self.cluster:
self.cluster.disconnect()
self.cluster.close()

def connect_cb(self):
"""Connect to database"""
Expand Down
2 changes: 1 addition & 1 deletion builder_common/test/get_file_list_grib2.n1ql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
SELECT url, mtime
FROM `vxdata`._default.METAR
WHERE
subset='metar'
subset='METAR'
AND type='DF'
AND fileType='grib2'
AND originType='model'
Expand Down
125 changes: 18 additions & 107 deletions builder_common/test/test_unit_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from pathlib import Path
from datetime import timedelta
import yaml
from couchbase.cluster import Cluster, ClusterOptions, ClusterTimeoutOptions, QueryOptions
from couchbase_core.cluster import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions, QueryOptions, ClusterTimeoutOptions
from couchbase.auth import PasswordAuthenticator


def connect_cb():
Expand All @@ -14,13 +15,13 @@ def connect_cb():
# noinspection PyBroadException
try:
try:
randytpierce marked this conversation as resolved.
Show resolved Hide resolved
cb_connection # is it defined
cb_connection # pylint: disable=used-before-assignment
except NameError:
credentials_file = os.environ["HOME"] + "/adb-cb1-credentials"
assert (
Path(credentials_file).is_file() is True
), f"*** credentials_file file {credentials_file} can not be found!"
_f = open(credentials_file)
_f = open(credentials_file, "r", encoding="utf-8")
_yaml_data = yaml.load(_f, yaml.SafeLoader)
cb_connection = {}
cb_connection["host"] = _yaml_data["cb_host"]
Expand All @@ -44,49 +45,18 @@ def connect_cb():
assert False, f"test_unit_queries Exception failure connecting: {_e}"


def test_map_station_query_no_let(request):
"""test"""
try:
_expected_time = 12
_name = request.node.name
_statement = open("./builder_common/test/map_station_query_no_let.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"


def test_map_station_query(request):
"""test"""
try:
_expected_time = 12
_name = request.node.name
_statement = open("./builder_common/test/map_station_query.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"


def test_stations_fcst_valid_epoch(request):
"""test"""
try:
_expected_time = 10
_name = request.node.name
_statement = open("./builder_common/test/stations_fcst_valid_epoch.n1ql").read()
_statement = open("./builder_common/test/stations_fcst_valid_epoch.n1ql", encoding="utf-8").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
# have to read the rows before we can get to the metadata as of couchbase 4.1
_rows = list(result.rows())
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"
Expand All @@ -97,12 +67,13 @@ def test_stations_get_file_list_grib2(request):
try:
_expected_time = 10
_name = request.node.name
_statement = open("./builder_common/test/get_file_list_grib2.n1ql").read()
_statement = open("./builder_common/test/get_file_list_grib2.n1ql", encoding="utf-8").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
# have to read the rows before we can get to the metadata as of couchbase 4.1
_rows = list(result.rows())
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"
Expand All @@ -112,89 +83,29 @@ def test_stations_get_file_list_netcdf(request):
try:
_expected_time = 5
_name = request.node.name
_statement = open("./builder_common/test/get_file_list_netcdf.n1ql").read()
_statement = open("./builder_common/test/get_file_list_netcdf.n1ql", encoding="utf-8").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
# have to read the rows before we can get to the metadata as of couchbase 4.1
_rows = list(result.rows())
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"

def test_METAR_count(request):
def test_metar_count(request):
"""test"""
try:
_expected_time = 0.05
_name = request.node.name
_statement = open("./builder_common/test/METAR_count.n1ql").read()
_statement = open("./builder_common/test/METAR_count.n1ql", encoding="utf-8").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
# have to read the rows before we can get to the metadata as of couchbase 4.1
_rows = list(result.rows())
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"

def test_final_DieOff(request):
"""test"""
try:
_expected_time = 120
_name = request.node.name
_statement = open("./builder_common/test/final_DieOff.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None, f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"

def test_final_Map(request):
"""test"""
try:
_expected_time = 10
_name = request.node.name
_statement = open("./builder_common/test/final_Map.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None, f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"


def test_final_TimeSeries(request):
"""test"""
try:
_expected_time = 10
_name = request.node.name
_statement = open("./builder_common/test/final_TimeSeries.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"

def test_final_ValidTime(request):
"""test"""
try:
_expected_time = 10
_name = request.node.name
_statement = open("./builder_common/test/final_ValidTime.n1ql").read()
result = connect_cb()["cluster"].query(_statement, QueryOptions(metrics=True))
elapsed_time = result.metadata().metrics().elapsed_time().total_seconds()
print(f"{_name}: elapsed_time is {elapsed_time}")
assert result is not None,f"{_name}: result is None"
assert len(result.errors) == 0, f"{_name}: result has errors{result.errors}"
assert elapsed_time < _expected_time, f"{_name}: elasped_time greater than {_expected_time} {elapsed_time}"
except Exception as _e: # pylint:disable=broad-except
assert False, f"{_name} Exception failure: {_e}"

22 changes: 9 additions & 13 deletions builder_common/vx_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
from pathlib import Path
from datetime import timedelta
import yaml
from couchbase.cluster import Cluster, ClusterOptions, ClusterTimeoutOptions
from couchbase_core.cluster import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions, ClusterTimeoutOptions

class CommonVxIngest: # pylint: disable=too-many-arguments disable=too-many-instance-attributes
"""
Expand Down Expand Up @@ -73,7 +74,7 @@ def write_load_job_to_files(self):
try:
file_name = self.load_job_id + ".json"
complete_file_name = os.path.join(self.output_dir, file_name)
_f = open(complete_file_name, "w")
_f = open(complete_file_name, "w", encoding="utf-8")
_f.write(json.dumps([self.load_spec["load_job_doc"]]))
_f.close()
except Exception as _e: # pylint: disable=broad-except
Expand All @@ -92,16 +93,11 @@ def build_load_job_doc(self, lineage):
git_hash = stream.read().strip()
_document_id = (
self.load_spec["ingest_document_ids"][0]
if "ingest_document_ids" in self.load_spec.keys()
if "ingest_document_ids" in self.load_spec
else None
)
subset = _document_id.split(":")[2]
self.load_job_id = "LJ:{s}:{m}:{c}:{t}".format(
s=subset,
m=self.__module__,
c=self.__class__.__name__,
t=str(int(time.time())),
)
self.load_job_id = f"LJ:{subset}:{self.__module__}:{self.__class__.__name__}:{str(int(time.time()))}"
lj_doc = {
"id": self.load_job_id,
"subset": subset,
Expand All @@ -119,7 +115,7 @@ def close_cb(self):
close couchbase connection
"""
if self.cluster:
self.cluster.disconnect()
self.cluster.close()

def connect_cb(self):
"""
Expand All @@ -144,7 +140,7 @@ def connect_cb(self):
self.load_spec["cb_credentials"] = self.cb_credentials
logging.info("%s: Couchbase connection success")
except Exception as _e: # pylint:disable=broad-except
logging.exception("*** builder_common.CommonVxIngest Error in connect_cb ***")
logging.exception("*** builder_common.CommonVxIngest Error in connect_cb *** %s", str(_e))
sys.exit("*** builder_common.CommonVxIngest Error when connecting to cb database: ")

def get_file_list(self, df_query, directory, file_pattern):
Expand Down Expand Up @@ -241,7 +237,7 @@ def get_credentials(self, load_spec):
+ self.credentials_file
+ " can not be found!"
)
_f = open(self.credentials_file)
_f = open(self.credentials_file, encoding="utf-8")
_yaml_data = yaml.load(_f, yaml.SafeLoader)
load_spec["cb_connection"] = {}
load_spec["cb_connection"]["host"] = _yaml_data["cb_host"]
Expand Down
2 changes: 1 addition & 1 deletion classic_sql_to_cb/run_sql_ingest_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def get_credentials(self, load_spec):
# check for existence of file
if not Path(self.credentials_file).is_file():
sys.exit("*** credentials_file file " + self.credentials_file + " can not be found!")
_f = open(self.credentials_file)
_f = open(self.credentials_file, encoding='utf-8')
_yaml_data = yaml.load(_f, yaml.SafeLoader)
load_spec['cb_connection']['host'] = _yaml_data['cb_host']
load_spec['cb_connection']['user'] = _yaml_data['cb_user']
Expand Down
9 changes: 4 additions & 5 deletions classic_sql_to_cb/sql_ingest_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,12 @@
import queue
import sys
import time
from itertools import islice
from multiprocessing import Process
from couchbase.cluster import Cluster, ClusterOptions
from couchbase.exceptions import DocumentNotFoundException, TimeoutException
from couchbase_core.cluster import PasswordAuthenticator
from classic_sql_to_cb import sql_builder as sql_builder
from itertools import islice

from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from couchbase.auth import PasswordAuthenticator

def document_map_chunks(data, chunk_size=1000):
"""
Expand Down
Loading