From 270f0eef245e5c598df3673ce092b614f23d835b Mon Sep 17 00:00:00 2001 From: letizia iannucci Date: Thu, 15 Aug 2024 19:29:17 +0300 Subject: [PATCH] download, extract, and load signals --- tsgm/utils/datasets.py | 195 ++++++++++++++++++++++++++++++++--------- 1 file changed, 154 insertions(+), 41 deletions(-) diff --git a/tsgm/utils/datasets.py b/tsgm/utils/datasets.py index 3f229ac..08b0a84 100644 --- a/tsgm/utils/datasets.py +++ b/tsgm/utils/datasets.py @@ -6,6 +6,7 @@ import logging import yfinance as yf +import wfdb import sklearn import sklearn.datasets @@ -14,6 +15,8 @@ import pandas as pd import scipy.io.arff +from pathlib import Path + from tensorflow import keras from tensorflow.python.types.core import TensorLike @@ -21,7 +24,7 @@ from tsgm.utils import file_utils -logger = logging.getLogger('utils') +logger = logging.getLogger("utils") logger.setLevel(logging.DEBUG) @@ -48,12 +51,19 @@ def gen_sine_dataset(N: int, T: int, D: int, max_value: int = 10) -> npt.NDArray shift = np.random.random() * max_value + 1 ts = np.arange(0, T, 1) for d in range(1, D + 1): - result[-1].append((a * np.sin((d + 3) * ts / 25. + shift)).T) + result[-1].append((a * np.sin((d + 3) * ts / 25.0 + shift)).T) return np.transpose(np.array(result), [0, 2, 1]) -def gen_sine_const_switch_dataset(N: int, T: int, D: int, max_value: int = 10, const: int = 0, frequency_switch: float = 0.1) -> T.Tuple[TensorLike, TensorLike]: +def gen_sine_const_switch_dataset( + N: int, + T: int, + D: int, + max_value: int = 10, + const: int = 0, + frequency_switch: float = 0.1, +) -> T.Tuple[TensorLike, TensorLike]: """ Generates a dataset with alternating constant and sinusoidal sequences. @@ -93,7 +103,9 @@ def gen_sine_const_switch_dataset(N: int, T: int, D: int, max_value: int = 10, c return np.array(result_X), np.array(result_y) -def gen_sine_vs_const_dataset(N: int, T: int, D: int, max_value: int = 10, const: int = 0) -> T.Tuple[TensorLike, TensorLike]: +def gen_sine_vs_const_dataset( + N: int, T: int, D: int, max_value: int = 10, const: int = 0 +) -> T.Tuple[TensorLike, TensorLike]: """ Generates a dataset with alternating sinusoidal and constant sequences. @@ -141,10 +153,13 @@ class UCRDataManager: note = {\\url{https://www.cs.ucr.edu/~eamonn/time_series_data_2018/}} } """ + mirrors = ["https://www.cs.ucr.edu/~eamonn/time_series_data_2018/"] resources = [("UCRArchive_2018.zip", 0)] key = "someone" - default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../data") + default_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "../../data" + ) def __init__(self, path: str = default_path, ds: str = "gunpoint") -> None: """ @@ -155,7 +170,9 @@ def __init__(self, path: str = default_path, ds: str = "gunpoint") -> None: :raises ValueError: When there is no stored UCR archive, or the name of the dataset is incorrect. """ - file_utils.download_all_resources(self.mirrors[0], path, self.resources, pwd=bytes(self.key, 'utf-8')) + file_utils.download_all_resources( + self.mirrors[0], path, self.resources, pwd=bytes(self.key, "utf-8") + ) path = os.path.join(path, "UCRArchive_2018") self.ds = ds.strip().lower() @@ -166,14 +183,20 @@ def __init__(self, path: str = default_path, ds: str = "gunpoint") -> None: if len(train_files) == 0: raise ValueError("ds should be listed at UCR website") self.train_df = pd.read_csv( - glob.glob(os.path.join(path, "*TRAIN.tsv"))[0], - sep='\t', header=None) + glob.glob(os.path.join(path, "*TRAIN.tsv"))[0], sep="\t", header=None + ) self.test_df = pd.read_csv( - glob.glob(os.path.join(path, "*TEST.tsv"))[0], - sep='\t', header=None) - - self.X_train, self.y_train = self.train_df[self.train_df.columns[1:]].to_numpy(), self.train_df[self.train_df.columns[0]].to_numpy() - self.X_test, self.y_test = self.test_df[self.test_df.columns[1:]].to_numpy(), self.test_df[self.test_df.columns[0]].to_numpy() + glob.glob(os.path.join(path, "*TEST.tsv"))[0], sep="\t", header=None + ) + + self.X_train, self.y_train = ( + self.train_df[self.train_df.columns[1:]].to_numpy(), + self.train_df[self.train_df.columns[0]].to_numpy(), + ) + self.X_test, self.y_test = ( + self.test_df[self.test_df.columns[1:]].to_numpy(), + self.test_df[self.test_df.columns[0]].to_numpy(), + ) self.y_all = np.concatenate((self.y_train, self.y_test), axis=0) def get(self) -> T.Tuple[TensorLike, TensorLike, TensorLike, TensorLike]: @@ -193,7 +216,10 @@ def get_classes_distribution(self) -> T.Dict: :rtype: dict[Any, float] """ if self.y_all is not None: - return {k: v / len(self.y_all) for k, v in collections.Counter(self.y_all).items()} + return { + k: v / len(self.y_all) + for k, v in collections.Counter(self.y_all).items() + } else: logger.warning("y_all is None, cannot get classes distribution") return {} @@ -229,7 +255,9 @@ def get_mauna_loa() -> T.Tuple[TensorLike, TensorLike]: return X, y -def split_dataset_into_objects(X: TensorLike, y: TensorLike, step: int = 10) -> T.Tuple[TensorLike, TensorLike]: +def split_dataset_into_objects( + X: TensorLike, y: TensorLike, step: int = 10 +) -> T.Tuple[TensorLike, TensorLike]: """ Splits the dataset into objects of fixed length. @@ -250,7 +278,7 @@ def split_dataset_into_objects(X: TensorLike, y: TensorLike, step: int = 10) -> Xs, ys = [], [] for start in range(0, X.shape[0], step): - cur_x, cur_y = X[start:start + step], y[start:start + step] + cur_x, cur_y = X[start : start + step], y[start : start + step] Xs.append(np.pad(cur_x, [(0, step - cur_x.shape[0]), (0, 0)])) ys.append(np.pad(cur_y, [(0, step - cur_y.shape[0])])) @@ -306,12 +334,16 @@ def get_synchronized_brainwave_dataset() -> T.Tuple[pd.DataFrame, pd.DataFrame]: :return: A tuple containing the input features (X) and target labels (y). :rtype: tuple[pd.DataFrame, pd.DataFrame] """ - url = ("https://www.dropbox.com/scl/fi/uqah9rthwrt5i2q6evtws/eeg-data.csv.zip?rlkey=z7sautwq74jow2xt9o6q7lcij&st" - "=hvpvvfez&dl=1") + url = ( + "https://www.dropbox.com/scl/fi/uqah9rthwrt5i2q6evtws/eeg-data.csv.zip?rlkey=z7sautwq74jow2xt9o6q7lcij&st" + "=hvpvvfez&dl=1" + ) cur_path = os.path.dirname(__file__) path_to_folder = os.path.join(cur_path, "../../data/") - path_to_resource = os.path.join(path_to_folder, 'eeg-data.csv.zip') - path_to_renamed_csv = os.path.join(path_to_folder, "synchronized_brainwave_dataset.csv") + path_to_resource = os.path.join(path_to_folder, "eeg-data.csv.zip") + path_to_renamed_csv = os.path.join( + path_to_folder, "synchronized_brainwave_dataset.csv" + ) os.makedirs(path_to_folder, exist_ok=True) if not os.path.exists(path_to_renamed_csv): file_utils.download(url, path_to_folder) @@ -343,14 +375,22 @@ def get_power_consumption() -> npt.NDArray: :rtype: numpy.ndarray """ cur_path = os.path.dirname(__file__) - path = os.path.join(cur_path, '../../data/') + path = os.path.join(cur_path, "../../data/") url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00235/" - file_utils.download_all_resources(url, path, resources=[("household_power_consumption.zip", None)]) + file_utils.download_all_resources( + url, path, resources=[("household_power_consumption.zip", None)] + ) df = pd.read_csv( - os.path.join(path, "household_power_consumption.txt"), sep=';', parse_dates={'dt' : ['Date', 'Time']}, infer_datetime_format=True, - low_memory=False, na_values=['nan', '?'], index_col='dt') + os.path.join(path, "household_power_consumption.txt"), + sep=";", + parse_dates={"dt": ["Date", "Time"]}, + infer_datetime_format=True, + low_memory=False, + na_values=["nan", "?"], + index_col="dt", + ) return df.to_numpy() @@ -409,7 +449,9 @@ def get_mnist_data() -> T.Tuple[TensorLike, TensorLike, TensorLike, TensorLike]: path_to_folder = os.path.join(cur_path, "../../data/") path_to_resource = os.path.join(path_to_folder, "mnist.npz") - (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data(path_to_resource) + (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data( + path_to_resource + ) x_train = x_train.reshape(-1, 28 * 28, 1) x_test = x_test.reshape(-1, 28 * 28, 1) @@ -433,8 +475,8 @@ def _exponential_quadratic(x: npt.NDArray, y: npt.NDArray) -> float: def get_gp_samples_data( - num_samples: int, max_time: int, - covar_func: T.Callable = _exponential_quadratic) -> npt.NDArray: + num_samples: int, max_time: int, covar_func: T.Callable = _exponential_quadratic +) -> npt.NDArray: """ Generates samples from a Gaussian process. @@ -457,10 +499,13 @@ def get_gp_samples_data( sigma = covar_func(times, times) return np.random.multivariate_normal( - mean=np.zeros(max_time), cov=sigma, size=num_samples)[:, None, :] + mean=np.zeros(max_time), cov=sigma, size=num_samples + )[:, None, :] -def get_physionet2012() -> T.Tuple[TensorLike, TensorLike, TensorLike, TensorLike, TensorLike, TensorLike]: +def get_physionet2012() -> ( + T.Tuple[TensorLike, TensorLike, TensorLike, TensorLike, TensorLike, TensorLike] +): """ Retrieves the Physionet 2012 dataset. @@ -487,7 +532,11 @@ def download_physionet2012() -> None: """ base_url = "https://physionet.org/files/challenge-2012/1.0.0/" destination_folder = "physionet2012" - if os.path.exists(destination_folder) and not os.path.isfile(destination_folder) and len(os.listdir(destination_folder)): + if ( + os.path.exists(destination_folder) + and not os.path.isfile(destination_folder) + and len(os.listdir(destination_folder)) + ): logger.info(f"Using downloaded dataset from {destination_folder}") return X_a = "set-a.tar.gz" @@ -505,7 +554,9 @@ def download_physionet2012() -> None: file_utils.download(base_url + y, destination_folder) for X, y in all_files: - file_utils.extract_archive(os.path.join(destination_folder, X), destination_folder) + file_utils.extract_archive( + os.path.join(destination_folder, X), destination_folder + ) def _get_physionet_X_dataframe(dataset_path: str) -> pd.DataFrame: @@ -521,14 +572,14 @@ def _get_physionet_X_dataframe(dataset_path: str) -> pd.DataFrame: """ txt_all = list() for f in os.listdir(dataset_path): - with open(os.path.join(dataset_path, f), 'r') as fp: + with open(os.path.join(dataset_path, f), "r") as fp: txt = fp.readlines() # add recordid as a column - recordid = txt[1].rstrip('\n').split(',')[-1] - txt = [t.rstrip('\n').split(',') + [int(recordid)] for t in txt] + recordid = txt[1].rstrip("\n").split(",")[-1] + txt = [t.rstrip("\n").split(",") + [int(recordid)] for t in txt] txt_all.extend(txt[1:]) - df = pd.DataFrame(txt_all, columns=['time', 'parameter', 'value', 'recordid']) + df = pd.DataFrame(txt_all, columns=["time", "parameter", "value", "recordid"]) return df @@ -544,8 +595,8 @@ def _get_physionet_y_dataframe(file_path: str) -> pd.DataFrame: pd.DataFrame: The target (y) dataframe. """ y = pd.read_csv(file_path) - y.set_index('RecordID', inplace=True) - y.index.name = 'recordid' + y.set_index("RecordID", inplace=True) + y.index.name = "recordid" y.reset_index(inplace=True) return y @@ -567,7 +618,9 @@ def get_covid_19() -> T.Tuple[TensorLike, T.Tuple, T.List]: The second element is the graph tuple (nodes, edges). The third element is the order of states. """ - base_url = "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-states.csv" + base_url = ( + "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-states.csv" + ) destination_folder = "covid19" file_utils.download(base_url, destination_folder) result, graph = covid19_data_utils.covid_dataset( @@ -580,7 +633,67 @@ def get_covid_19() -> T.Tuple[TensorLike, T.Tuple, T.List]: for state in covid19_data_utils.LIST_OF_STATES: cur_data = result[timestamp][state] processed_dataset[-1].append( - [cur_data["deaths"], cur_data["cases"], - cur_data["deaths_normalized"], cur_data["cases_normalized"]] + [ + cur_data["deaths"], + cur_data["cases"], + cur_data["deaths_normalized"], + cur_data["cases_normalized"], + ] ) - return np.transpose(np.array(processed_dataset), (1, 0, 2)), graph, covid19_data_utils.LIST_OF_STATES + return ( + np.transpose(np.array(processed_dataset), (1, 0, 2)), + graph, + covid19_data_utils.LIST_OF_STATES, + ) + + +def get_arrythmia() -> T.Tuple[TensorLike, TensorLike]: + """ + Downloads and loads the Arrhythmia dataset from + https://physionet.org/static/published-projects/mitdb/mit-bih-arrhythmia-database-1.0.0.zip + and returns the input features (X) and target labels (y). + + The Arrhythmia dataset contains ECG recordings of patients with arrhythmia. + + :return: A tuple containing the input features (X) and target labels (y). + X has shape (N, M, D) where + N is the number of samples, + M is the signal length (650000), + D is the number of dimensions (2). + :rtype: tuple[TensorLike, TensorLike] + """ + cur_path = os.path.dirname(__file__) + path_to_folder = os.path.join(cur_path, "../../data/") + path_to_resource = os.path.join(path_to_folder, "arrhythmia.data") + dataset = "mit-bih-arrhythmia-database-1.0.0" + url = f"https://physionet.org/static/published-projects/mitdb/{dataset}.zip" + if not os.path.exists(path_to_resource): + file_utils.download(url, path_to_folder) + file_utils.extract_archive( + os.path.join(path_to_folder, f"{dataset}.zip"), path_to_folder + ) + + # load the dataset + X = [] + y = [] + for i in range(100, 235): + try: + record_path = os.path.join(path_to_folder, f"{dataset}/{i}") + record = wfdb.rdrecord(record_path) + # equivalent to: + # wfdb.rdsamp(record_path, sampto=3000)) + + # next line does not work + # annotation = wfdb.rdann(record_path, 'atr',) + + # The signal is an (MxN) 2d numpy array, where M is the signal length. + X.append(record.p_signal) + # comments are in record.comments + + # annotation data (e.g., sample numbers and symbols) + # y.append((annotation.sample, annotation.symbol)) + except Exception as e: + logger.error(f"Failed to parse {record_path}: {e}") + pass + + return np.array(X), np.array(y)