From 9c71dd4f10c4bcc7d7e0faa67e9d283841ebdd43 Mon Sep 17 00:00:00 2001 From: klounis Date: Tue, 19 Dec 2023 17:32:34 +0100 Subject: [PATCH] ENH : the code is now able to deal with CSV with headers. It cleans the dataset by droping NaN values. I added my functions in tools.py. --- rocketpy/mathutils/function.py | 120 ++------------------ rocketpy/simulation/flight_data_importer.py | 2 +- rocketpy/tools.py | 106 +++++++++++++++++ 3 files changed, 116 insertions(+), 112 deletions(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index 5d89e8840..9cfa32641 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -12,6 +12,7 @@ import numpy as np import csv from scipy import integrate, linalg, optimize +from ..tools import data_preprocessing try: from functools import cached_property @@ -199,7 +200,7 @@ def set_source(self, source): self.__outputs__, self.__interpolation__, self.__extrapolation__, - ) + ) # If the source is a Function if isinstance(source, Function): source = source.get_source() @@ -207,10 +208,10 @@ def set_source(self, source): if isinstance(source, (str, Path)): with open(source, "r") as file: try: - source = np.loadtxt(file, delimiter=",", dtype=float) + source = np.loadtxt(file, delimiter=",") except ValueError: # If an error occurs, headers are present - source = self.data_preprocessing(source) + source = np.loadtxt(data_preprocessing(source), delimiter=",", dtype=np.float64) except Exception as e: raise ValueError( "The source file is not a valid csv or txt file." @@ -1104,6 +1105,7 @@ def low_pass_filter(self, alpha): Function The function with the incoming source filtered """ + print(self.source) filtered_signal = np.zeros_like(self.source) filtered_signal[0] = self.source[0] @@ -1117,47 +1119,9 @@ def low_pass_filter(self, alpha): source=filtered_signal, interpolation=self.__interpolation__, extrapolation=self.__extrapolation__, - ) - - def data_preprocessing(self, source): - """Clear data (in particular NaN objects) and returns a CSV file without header and its name. - - Parameters - ---------- - source : string - The file path to the CSV file. - - Returns - ------- - Function - The function with the incoming cleared CSV - """ - output_path = 'cleared_data.csv' - - with open(source, 'r') as file: - reader = csv.reader(file) - header = next(reader) # Read the header - - data = [row for row in reader] - - # Create a new list without the headers - data_no_headers = [] - - for row in data[1:]: - # Check if the row is not empty and if all values in the row can be converted to float - if row and all(self.is_float(value) for value in row): - data_no_headers.append(row) - - # Save the processed data to a new CSV file - with open(output_path, 'w', newline='') as output_file: - writer = csv.writer(output_file, delimiter=',') - writer.writerows(data_no_headers) - - return output_path + ) - - # Define all presentation methods def __call__(self, *args): """Plot the Function if no argument is given. If an @@ -2951,10 +2915,10 @@ def _check_user_input( if isinstance(source, (str, Path)): # Convert to numpy array try: - source = np.loadtxt(source, delimiter=",", dtype=float) + source = np.loadtxt(source, delimiter=",", dtype=np.float64) except ValueError: - # Skip header - source = np.loadtxt(source, delimiter=",", dtype=float, skiprows=1) + # If an error occurs, there is a header + source = np.loadtxt(data_preprocessing(source), delimiter=",", dtype=np.float64) except Exception as e: raise ValueError( "The source file is not a valid csv or txt file." @@ -3028,73 +2992,7 @@ def _check_user_input( ) return inputs, outputs, interpolation, extrapolation - def is_float(self, element): - """ - Returns a boolean indicating us if an element is convertible to a float or not. - True : the element is convertible to a float - False : the element is not convertible to a float - - Parameters - ---------- - element : any - This is the element to test. - - Returns - ------- - result : boolean - The element is convertible or not. - """ - if element is None: - return False - try: - float(element) - return True - except ValueError: - return False - - def return_first_data(self, source): - """ - Returns the first data of a CSV file. - - Parameters - ---------- - source : string - This is the file path to the csv. - - Returns - ------- - result : any - The first data of the CSV file. - """ - native_data = open(source) - for row in native_data : - for value in row : - return value - - - def if_header(self, source): - """ - Returns if a CSV file has a header or not. - True : The CSV file has a header - False : The CSV file has no header - - Parameters - ---------- - source : string - This is the file path to the csv. - - Returns - ------- - result : boolean - The result of the CSV file containing a header or not. - """ - return not self.is_float(self.return_first_data(source)) - - - - - class PiecewiseFunction(Function): """Class for creating piecewise functions. These kind of functions are defined by a dictionary of functions, where the keys are tuples that diff --git a/rocketpy/simulation/flight_data_importer.py b/rocketpy/simulation/flight_data_importer.py index 3b6d3f473..e493b1072 100644 --- a/rocketpy/simulation/flight_data_importer.py +++ b/rocketpy/simulation/flight_data_importer.py @@ -8,7 +8,7 @@ import numpy as np from rocketpy.mathutils import Function -from rocketpy.units import UNITS_CONVERSION_DICT +#from rocketpy.units import UNITS_CONVERSION_DICT FLIGHT_LABEL_MAP = { # "name of Flight Attribute": "Label to be displayed" diff --git a/rocketpy/tools.py b/rocketpy/tools.py index 6e765ca7a..07d2fa5e4 100644 --- a/rocketpy/tools.py +++ b/rocketpy/tools.py @@ -1,6 +1,7 @@ import importlib import importlib.metadata import re +import csv from bisect import bisect_left import pytz @@ -381,6 +382,108 @@ def check_requirement_version(module_name, version): return True + +def is_float(element): + """ + Returns a boolean indicating us if an element is convertible to a float or not. + True : the element is convertible to a float + False : the element is not convertible to a float + + Parameters + ---------- + element : any + This is the element to test. + + Returns + ------- + result : boolean + The element is convertible or not. + """ + if element is None: + return False + try: + float(element) + return True + except ValueError: + return False + +def return_first_data(source): + """ + Returns the first data of a CSV file. + + Parameters + ---------- + source : string + This is the file path to the csv. + + Returns + ------- + result : any + The first data of the CSV file. + """ + native_data = open(source) + for row in native_data : + for value in row : + return value + + +def if_header(source): + """ + Returns if a CSV file has a header or not. + True : The CSV file has a header + False : The CSV file has no header + + Parameters + ---------- + source : string + This is the file path to the csv. + + Returns + ------- + result : boolean + The result of the CSV file containing a header or not. + """ + return not is_float(return_first_data(source)) + + +def data_preprocessing(source): + """Clear data (in particular NaN objects) and returns a CSV file without header and its name. + + Parameters + ---------- + source : string + The file path to the CSV file. + + Returns + ------- + Function + The function with the incoming cleared CSV + """ + output_path = 'cleaned_data.csv' + + with open(source, 'r') as file: + reader = csv.reader(file) + header = next(reader) # Read the header + + data = [row for row in reader] + + # Create a new list without the headers + data_no_headers = [] + + for row in data[1:]: + # Check if the row is not empty and if all values in the row can be converted to float + if row and all(is_float(value) for value in row): + data_no_headers.append(row) + + # Save the processed data to a new CSV file + with open(output_path, 'w', encoding='utf-8') as output_file: + writer = csv.writer(output_file, delimiter=',') + writer.writerows(data_no_headers) + + return output_path + + + if __name__ == "__main__": import doctest @@ -389,3 +492,6 @@ def check_requirement_version(module_name, version): print(f"All the {results.attempted} tests passed!") else: print(f"{results.failed} out of {results.attempted} tests failed.") + + +