Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a formatting check to CI #283

Merged
merged 3 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ env:
REGISTRY: ghcr.io/noaa-gsl/vxingest

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install poetry
run: pipx install poetry
- uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install dependencies
run: |
poetry env use 3.11
poetry install
- name: Format with Ruff
run: poetry run ruff format --check src tests
build-ingest:
name: Build Ingest image
runs-on: ubuntu-latest
Expand Down
12 changes: 3 additions & 9 deletions src/vxingest/builder_common/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,16 @@ def handle_data(self, **kwargs): # pylint: disable=missing-function-docstring
def derive_id(self, **kwargs): # pylint: disable=missing-function-docstring
pass

def load_data(
self, doc, key, element
): # pylint: disable=missing-function-docstring
def load_data(self, doc, key, element): # pylint: disable=missing-function-docstring
pass

def handle_document(self): # pylint: disable=missing-function-docstring
pass

def build_document(
self, queue_element
): # pylint: disable=missing-function-docstring
def build_document(self, queue_element): # pylint: disable=missing-function-docstring
pass

def build_datafile_doc(
self, file_name, data_file_id, origin_type
): # pylint: disable=missing-function-docstring
def build_datafile_doc(self, file_name, data_file_id, origin_type): # pylint: disable=missing-function-docstring
pass

def create_data_file_id(self, subset, file_type, origin_type, file_name):
Expand Down
2 changes: 1 addition & 1 deletion src/vxingest/builder_common/builder_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def truncate_round(_n, decimals=0):
Returns:
float: The number multiplied by n and then divided by n
"""
multiplier = 10 ** decimals
multiplier = 10**decimals
return int(_n * multiplier) / multiplier


Expand Down
47 changes: 29 additions & 18 deletions src/vxingest/builder_common/ingest_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ def __init__(
logger.exception(_re)
raise _re

def process_queue_element(
self, queue_element
): # pylint: disable=missing-function-docstring
def process_queue_element(self, queue_element): # pylint: disable=missing-function-docstring
pass

def close_cb(self):
Expand All @@ -108,23 +106,32 @@ def connect_cb(self):
# get a reference to our cluster
# noinspection PyBroadException
try:
timeout_options=ClusterTimeoutOptions(kv_timeout=timedelta(seconds=25), query_timeout=timedelta(seconds=120))
options=ClusterOptions(PasswordAuthenticator(self.cb_credentials["user"], self.cb_credentials["password"]), timeout_options=timeout_options)
timeout_options = ClusterTimeoutOptions(
kv_timeout=timedelta(seconds=25), query_timeout=timedelta(seconds=120)
)
options = ClusterOptions(
PasswordAuthenticator(
self.cb_credentials["user"], self.cb_credentials["password"]
),
timeout_options=timeout_options,
)
self.cluster = Cluster(
"couchbase://" + self.cb_credentials["host"], options
)
self.collection = (
self.cluster
.bucket(self.cb_credentials["bucket"])
.collection(self.cb_credentials["collection"])
)
self.collection = self.cluster.bucket(
self.cb_credentials["bucket"]
).collection(self.cb_credentials["collection"])
# stash the database connection for the builders to reuse
self.load_spec["cluster"] = self.cluster
self.load_spec["collection"] = self.collection
logger.info("Couchbase connection success")
except Exception as _e: # pylint:disable=broad-except
logger.exception("*** builder_common.CommonVxIngestManager in connect_cb ***")
sys.exit("*** builder_common.CommonVxIngestManager Error when connecting to cb database")
logger.exception(
"*** builder_common.CommonVxIngestManager in connect_cb ***"
)
sys.exit(
"*** builder_common.CommonVxIngestManager Error when connecting to cb database"
)

# entry point of the thread. Is invoked automatically when the thread is
# started.
Expand All @@ -143,7 +150,7 @@ def run(self):

# noinspection PyBroadException
try:
self.cb_credentials = self.load_spec['cb_connection']
self.cb_credentials = self.load_spec["cb_connection"]
# get a connection
self.connect_cb()
# infinite loop terminates when the file_name_queue is empty
Expand Down Expand Up @@ -179,9 +186,7 @@ def run(self):
)
break
except Exception as _e: # pylint:disable=broad-except
logger.exception(
"%s: *** Error in IngestManager run ***", self.thread_name
)
logger.exception("%s: *** Error in IngestManager run ***", self.thread_name)
raise _e
finally:
logger.info("%s: IngestManager finished", self.thread_name)
Expand Down Expand Up @@ -277,7 +282,13 @@ def write_document_to_files(self, file_name, document_map):
_f.write(json_data)
_f.close()
except Exception as _e1: # pylint:disable=broad-except
logger.exception("write_document_to_files - trying write: Got Exception %s", str(_e1))
logger.exception(
"write_document_to_files - trying write: Got Exception %s",
str(_e1),
)
except Exception as _e: # pylint:disable=broad-except
logger.exception(": *** {self.thread_name} Error writing to files: in process_element writing document*** %s", str(_e))
logger.exception(
": *** {self.thread_name} Error writing to files: in process_element writing document*** %s",
str(_e),
)
raise _e
29 changes: 19 additions & 10 deletions src/vxingest/builder_common/vx_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
# Get a logger with this module's name to help with debugging
logger = logging.getLogger(__name__)


class CommonVxIngest: # pylint: disable=too-many-arguments disable=too-many-instance-attributes
"""
Parent class for all VxIngest.
Expand Down Expand Up @@ -136,23 +137,31 @@ def connect_cb(self):
# get a reference to our cluster
# noinspection PyBroadException
try:

timeout_options=ClusterTimeoutOptions(kv_timeout=timedelta(seconds=25), query_timeout=timedelta(seconds=120))
options=ClusterOptions(PasswordAuthenticator(self.cb_credentials["user"], self.cb_credentials["password"]), timeout_options=timeout_options)
timeout_options = ClusterTimeoutOptions(
kv_timeout=timedelta(seconds=25), query_timeout=timedelta(seconds=120)
)
options = ClusterOptions(
PasswordAuthenticator(
self.cb_credentials["user"], self.cb_credentials["password"]
),
timeout_options=timeout_options,
)
self.cluster = Cluster(
"couchbase://" + self.cb_credentials["host"], options
)
self.collection = (
self.cluster
.bucket(self.cb_credentials["bucket"])
.collection(self.cb_credentials["collection"])
)
self.collection = self.cluster.bucket(
self.cb_credentials["bucket"]
).collection(self.cb_credentials["collection"])
# stash the credentials for the VxIngestManager - see NOTE at the top of this file.
self.load_spec["cb_credentials"] = self.cb_credentials
logger.info("%s: Couchbase connection success")
except Exception as _e: # pylint:disable=broad-except
logger.exception("*** builder_common.CommonVxIngest Error in connect_cb *** %s", str(_e))
sys.exit("*** builder_common.CommonVxIngest Error when connecting to cb database: ")
logger.exception(
"*** builder_common.CommonVxIngest Error in connect_cb *** %s", str(_e)
)
sys.exit(
"*** builder_common.CommonVxIngest Error when connecting to cb database: "
)

def get_file_list(self, df_query, directory, file_pattern):
"""This method accepts a file path (directory), a query statement (df_query),
Expand Down
22 changes: 11 additions & 11 deletions src/vxingest/ctc_to_cb/ctc_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,8 @@ def __init__(self, load_spec, ingest_document):
self.sub_doc_type = None
self.variable = None
self.model_fcst_valid_epochs = []
self.model_data = (
{}
) # used to stash each fcstValidEpoch model_data for the handlers
self.obs_data = (
{}
) # used to stash each fcstValidEpoch obs_data for the handlers
self.model_data = {} # used to stash each fcstValidEpoch model_data for the handlers
self.obs_data = {} # used to stash each fcstValidEpoch obs_data for the handlers
self.obs_station_names = [] # used to stash sorted obs names for the handlers
self.thresholds = None
self.not_found_stations = set()
Expand Down Expand Up @@ -491,10 +487,16 @@ def build_document(self, queue_element):
time.sleep(2) # don't hammer the server too hard
error_count = error_count + 1
# initial value for the max epoch
max_ctc_fcst_valid_epochs = self.load_spec["first_last_params"]["first_epoch"]
max_ctc_fcst_valid_epochs = self.load_spec["first_last_params"][
"first_epoch"
]
max_ctc_fcst_valid_epochs_result = list(result)
# if there are ctc's for this model and region then get the max epoch from the query
max_ctc_fcst_valid_epochs = max_ctc_fcst_valid_epochs_result[0] if max_ctc_fcst_valid_epochs_result[0] is not None else 0
max_ctc_fcst_valid_epochs = (
max_ctc_fcst_valid_epochs_result[0]
if max_ctc_fcst_valid_epochs_result[0] is not None
else 0
)

# Second get the intersection of the fcstValidEpochs that correspond for this
# model and the obs for all fcstValidEpochs greater than the first_epoch ctc
Expand Down Expand Up @@ -600,9 +602,7 @@ def build_document(self, queue_element):
)
return {}

def get_stations_for_region_by_geosearch(
self, region_name, valid_epoch
): # pylint: disable=unused-argument
def get_stations_for_region_by_geosearch(self, region_name, valid_epoch): # pylint: disable=unused-argument
# NOTE: this is currently broken because we have to modify this query to
# work woth the data model that has data elements as a MAP indexed by station name
"""Using a geosearh return all the stations within the defined region
Expand Down
13 changes: 8 additions & 5 deletions src/vxingest/ctc_to_cb/run_ingest_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
# Get a logger with this module's name to help with debugging
logger = logging.getLogger(__name__)


def parse_args(args):
"""
Parse command line arguments
Expand Down Expand Up @@ -229,21 +230,23 @@ def runit(self, args, log_queue: Queue, log_configurer: Callable[[Queue], None])
# thread that uses builders to process a file
# Make the Pool of data_type_managers
ingest_manager_list = []
logger.info(f"The ingest documents in the queue are: {self.load_spec['ingest_document_ids']}")
logger.info(
f"The ingest documents in the queue are: {self.load_spec['ingest_document_ids']}"
)
logger.info(f"Starting {self.thread_count} processes")
for thread_count in range(int(self.thread_count)):
# noinspection PyBroadException
try:
ingest_manager_thread = VxIngestManager(
f"VxIngestManager-{thread_count+1}", # Processes are 1 indexed in the logger
f"VxIngestManager-{thread_count+1}", # Processes are 1 indexed in the logger
self.load_spec,
_q,
self.output_dir,
log_queue, # Queue to pass logging messages back to the main process on
log_configurer, # Config function to set up the logger in the multiprocess Process
log_queue, # Queue to pass logging messages back to the main process on
log_configurer, # Config function to set up the logger in the multiprocess Process
)
ingest_manager_list.append(ingest_manager_thread)
ingest_manager_thread.start() # This calls a .run() method in the class
ingest_manager_thread.start() # This calls a .run() method in the class
logger.info(f"Started thread: VxIngestManager-{thread_count+1}")
except Exception as _e: # pylint:disable=broad-except
logger.error("*** Error in VXIngest %s***", str(_e))
Expand Down
4 changes: 1 addition & 3 deletions src/vxingest/ctc_to_cb/vx_ingest_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@
logger = logging.getLogger(__name__)


class VxIngestManager(
CommonVxIngestManager
): # pylint:disable=too-many-instance-attributes
class VxIngestManager(CommonVxIngestManager): # pylint:disable=too-many-instance-attributes
"""
IngestManager is a Process Thread that manages an object pool of
builders to ingest data from GSD couchbase documents, producing new documents
Expand Down
39 changes: 13 additions & 26 deletions src/vxingest/grib2_to_cb/grib_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ def load_data(self, doc, key, element):

# named functions
# pylint: disable=no-self-use
def handle_ceiling(
self, params_dict
): # pylint: disable=unused-argument, disable=too-many-branches
def handle_ceiling(self, params_dict): # pylint: disable=unused-argument, disable=too-many-branches
"""
returns the ceiling values for all the stations in a list
the dict_params aren't used here since the calculations are all done here
Expand Down Expand Up @@ -176,7 +174,9 @@ def handle_ceiling(
].values
ceil_msl_values = []
# print('fcst_valid_epoch',self.ds_translate_item_variables_map["fcst_valid_epoch"])
for station in self.domain_stations: # get the initial surface values and ceil_msl values for each station
for station in (
self.domain_stations
): # get the initial surface values and ceil_msl values for each station
geo_index = get_geo_index(
self.ds_translate_item_variables_map["fcst_valid_epoch"],
station["geo"],
Expand All @@ -193,7 +193,9 @@ def handle_ceiling(
ceil_msl_values.append(ceil_var_values[y_gridpoint, x_gridpoint])
ceil_agl = []
i = 0
for station in self.domain_stations: # determine the ceil_agl values for each station
for (
station
) in self.domain_stations: # determine the ceil_agl values for each station
if ceil_msl_values[i] == 60000:
ceil_agl.append(60000)
else:
Expand Down Expand Up @@ -230,9 +232,7 @@ def handle_surface_pressure(self, params_dict):
translate all the pressures(one per station location) to milibars
"""
pressures = []
for _v, v_intrp_pressure in list(params_dict.values())[
0
]: # pylint:disable=unused-variable
for _v, v_intrp_pressure in list(params_dict.values())[0]:
# Convert from pascals to milibars
pressures.append(float(v_intrp_pressure) / 100)
return pressures
Expand All @@ -248,12 +248,7 @@ def handle_visibility(self, params_dict):
"""
# convert all the values to a float
vis_values = []
for _v, v_intrp_ignore in list( # pylint: disable=unused-variable
params_dict.values()
)[ # pylint: disable=unused-variable
0
# convert to miles (float)
]: # pylint:disable=unused-variable
for _v, _v_intrp_ignore in list(params_dict.values())[0]:
vis_values.append(float(_v) / 1609.344 if _v is not None else None)
return vis_values

Expand All @@ -268,9 +263,7 @@ def handle_RH(self, params_dict): # pylint:disable=invalid-name
"""
# convert all the values to a float
rh_interpolated_values = []
for _v, v_intrp_pressure in list(params_dict.values())[
0
]: # pylint:disable=unused-variable
for _v, v_intrp_pressure in list(params_dict.values())[0]:
rh_interpolated_values.append(
float(v_intrp_pressure) if v_intrp_pressure is not None else None
)
Expand All @@ -283,9 +276,7 @@ def kelvin_to_farenheight(self, params_dict):
"""
# Convert each station value from Kelvin to Farenheit
tempf_values = []
for _v, v_intrp_tempf in list(params_dict.values())[
0
]: # pylint:disable=unused-variable
for _v, v_intrp_tempf in list(params_dict.values())[0]:
tempf_values.append(
((float(v_intrp_tempf) - 273.15) * 9) / 5 + 32
if v_intrp_tempf is not None
Expand Down Expand Up @@ -347,9 +338,7 @@ def handle_wind_speed(self, params_dict): # pylint:disable=unused-argument

# wind direction

def handle_wind_direction(
self, params_dict
): # pylint:disable=unused-argument, disable=too-many-locals
def handle_wind_direction(self, params_dict): # pylint:disable=unused-argument, disable=too-many-locals
"""The params_dict aren't used here since we need to
select two messages (self.grbs.select is expensive since it scans the whole grib file).
Each message is selected once and the station location data saved in an array,
Expand Down Expand Up @@ -503,9 +492,7 @@ def handle_vegetation_type(self, params_dict): # pylint:disable=unused-argument
)
return vegetation_type

def getName(
self, params_dict
): # pylint:disable=unused-argument,disable=invalid-name
def getName(self, params_dict): # pylint:disable=unused-argument,disable=invalid-name
"""translate the station name
Args:
params_dict (object): named function parameters - unused here
Expand Down
Loading