Skip to content

Commit

Permalink
download, extract, and load signals
Browse files Browse the repository at this point in the history
  • Loading branch information
letiziaia committed Aug 15, 2024
1 parent e487aca commit 270f0ee
Showing 1 changed file with 154 additions and 41 deletions.
195 changes: 154 additions & 41 deletions tsgm/utils/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging

import yfinance as yf
import wfdb

import sklearn
import sklearn.datasets
Expand All @@ -14,14 +15,16 @@
import pandas as pd
import scipy.io.arff

from pathlib import Path

from tensorflow import keras
from tensorflow.python.types.core import TensorLike

from tsgm.utils import covid19_data_utils
from tsgm.utils import file_utils


logger = logging.getLogger('utils')
logger = logging.getLogger("utils")
logger.setLevel(logging.DEBUG)


Expand All @@ -48,12 +51,19 @@ def gen_sine_dataset(N: int, T: int, D: int, max_value: int = 10) -> npt.NDArray
shift = np.random.random() * max_value + 1
ts = np.arange(0, T, 1)
for d in range(1, D + 1):
result[-1].append((a * np.sin((d + 3) * ts / 25. + shift)).T)
result[-1].append((a * np.sin((d + 3) * ts / 25.0 + shift)).T)

return np.transpose(np.array(result), [0, 2, 1])


def gen_sine_const_switch_dataset(N: int, T: int, D: int, max_value: int = 10, const: int = 0, frequency_switch: float = 0.1) -> T.Tuple[TensorLike, TensorLike]:
def gen_sine_const_switch_dataset(
N: int,
T: int,
D: int,
max_value: int = 10,
const: int = 0,
frequency_switch: float = 0.1,
) -> T.Tuple[TensorLike, TensorLike]:
"""
Generates a dataset with alternating constant and sinusoidal sequences.
Expand Down Expand Up @@ -93,7 +103,9 @@ def gen_sine_const_switch_dataset(N: int, T: int, D: int, max_value: int = 10, c
return np.array(result_X), np.array(result_y)


def gen_sine_vs_const_dataset(N: int, T: int, D: int, max_value: int = 10, const: int = 0) -> T.Tuple[TensorLike, TensorLike]:
def gen_sine_vs_const_dataset(
N: int, T: int, D: int, max_value: int = 10, const: int = 0
) -> T.Tuple[TensorLike, TensorLike]:
"""
Generates a dataset with alternating sinusoidal and constant sequences.
Expand Down Expand Up @@ -141,10 +153,13 @@ class UCRDataManager:
note = {\\url{https://www.cs.ucr.edu/~eamonn/time_series_data_2018/}}
}
"""

mirrors = ["https://www.cs.ucr.edu/~eamonn/time_series_data_2018/"]
resources = [("UCRArchive_2018.zip", 0)]
key = "someone"
default_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../data")
default_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "../../data"
)

def __init__(self, path: str = default_path, ds: str = "gunpoint") -> None:
"""
Expand All @@ -155,7 +170,9 @@ def __init__(self, path: str = default_path, ds: str = "gunpoint") -> None:
:raises ValueError: When there is no stored UCR archive, or the name of the dataset is incorrect.
"""
file_utils.download_all_resources(self.mirrors[0], path, self.resources, pwd=bytes(self.key, 'utf-8'))
file_utils.download_all_resources(
self.mirrors[0], path, self.resources, pwd=bytes(self.key, "utf-8")
)
path = os.path.join(path, "UCRArchive_2018")

self.ds = ds.strip().lower()
Expand All @@ -166,14 +183,20 @@ def __init__(self, path: str = default_path, ds: str = "gunpoint") -> None:
if len(train_files) == 0:
raise ValueError("ds should be listed at UCR website")
self.train_df = pd.read_csv(
glob.glob(os.path.join(path, "*TRAIN.tsv"))[0],
sep='\t', header=None)
glob.glob(os.path.join(path, "*TRAIN.tsv"))[0], sep="\t", header=None
)
self.test_df = pd.read_csv(
glob.glob(os.path.join(path, "*TEST.tsv"))[0],
sep='\t', header=None)

self.X_train, self.y_train = self.train_df[self.train_df.columns[1:]].to_numpy(), self.train_df[self.train_df.columns[0]].to_numpy()
self.X_test, self.y_test = self.test_df[self.test_df.columns[1:]].to_numpy(), self.test_df[self.test_df.columns[0]].to_numpy()
glob.glob(os.path.join(path, "*TEST.tsv"))[0], sep="\t", header=None
)

self.X_train, self.y_train = (
self.train_df[self.train_df.columns[1:]].to_numpy(),
self.train_df[self.train_df.columns[0]].to_numpy(),
)
self.X_test, self.y_test = (
self.test_df[self.test_df.columns[1:]].to_numpy(),
self.test_df[self.test_df.columns[0]].to_numpy(),
)
self.y_all = np.concatenate((self.y_train, self.y_test), axis=0)

def get(self) -> T.Tuple[TensorLike, TensorLike, TensorLike, TensorLike]:
Expand All @@ -193,7 +216,10 @@ def get_classes_distribution(self) -> T.Dict:
:rtype: dict[Any, float]
"""
if self.y_all is not None:
return {k: v / len(self.y_all) for k, v in collections.Counter(self.y_all).items()}
return {
k: v / len(self.y_all)
for k, v in collections.Counter(self.y_all).items()
}
else:
logger.warning("y_all is None, cannot get classes distribution")
return {}
Expand Down Expand Up @@ -229,7 +255,9 @@ def get_mauna_loa() -> T.Tuple[TensorLike, TensorLike]:
return X, y


def split_dataset_into_objects(X: TensorLike, y: TensorLike, step: int = 10) -> T.Tuple[TensorLike, TensorLike]:
def split_dataset_into_objects(
X: TensorLike, y: TensorLike, step: int = 10
) -> T.Tuple[TensorLike, TensorLike]:
"""
Splits the dataset into objects of fixed length.
Expand All @@ -250,7 +278,7 @@ def split_dataset_into_objects(X: TensorLike, y: TensorLike, step: int = 10) ->

Xs, ys = [], []
for start in range(0, X.shape[0], step):
cur_x, cur_y = X[start:start + step], y[start:start + step]
cur_x, cur_y = X[start : start + step], y[start : start + step]
Xs.append(np.pad(cur_x, [(0, step - cur_x.shape[0]), (0, 0)]))
ys.append(np.pad(cur_y, [(0, step - cur_y.shape[0])]))

Expand Down Expand Up @@ -306,12 +334,16 @@ def get_synchronized_brainwave_dataset() -> T.Tuple[pd.DataFrame, pd.DataFrame]:
:return: A tuple containing the input features (X) and target labels (y).
:rtype: tuple[pd.DataFrame, pd.DataFrame]
"""
url = ("https://www.dropbox.com/scl/fi/uqah9rthwrt5i2q6evtws/eeg-data.csv.zip?rlkey=z7sautwq74jow2xt9o6q7lcij&st"
"=hvpvvfez&dl=1")
url = (
"https://www.dropbox.com/scl/fi/uqah9rthwrt5i2q6evtws/eeg-data.csv.zip?rlkey=z7sautwq74jow2xt9o6q7lcij&st"
"=hvpvvfez&dl=1"
)
cur_path = os.path.dirname(__file__)
path_to_folder = os.path.join(cur_path, "../../data/")
path_to_resource = os.path.join(path_to_folder, 'eeg-data.csv.zip')
path_to_renamed_csv = os.path.join(path_to_folder, "synchronized_brainwave_dataset.csv")
path_to_resource = os.path.join(path_to_folder, "eeg-data.csv.zip")
path_to_renamed_csv = os.path.join(
path_to_folder, "synchronized_brainwave_dataset.csv"
)
os.makedirs(path_to_folder, exist_ok=True)
if not os.path.exists(path_to_renamed_csv):
file_utils.download(url, path_to_folder)
Expand Down Expand Up @@ -343,14 +375,22 @@ def get_power_consumption() -> npt.NDArray:
:rtype: numpy.ndarray
"""
cur_path = os.path.dirname(__file__)
path = os.path.join(cur_path, '../../data/')
path = os.path.join(cur_path, "../../data/")

url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00235/"
file_utils.download_all_resources(url, path, resources=[("household_power_consumption.zip", None)])
file_utils.download_all_resources(
url, path, resources=[("household_power_consumption.zip", None)]
)

df = pd.read_csv(
os.path.join(path, "household_power_consumption.txt"), sep=';', parse_dates={'dt' : ['Date', 'Time']}, infer_datetime_format=True,
low_memory=False, na_values=['nan', '?'], index_col='dt')
os.path.join(path, "household_power_consumption.txt"),
sep=";",
parse_dates={"dt": ["Date", "Time"]},
infer_datetime_format=True,
low_memory=False,
na_values=["nan", "?"],
index_col="dt",
)
return df.to_numpy()


Expand Down Expand Up @@ -409,7 +449,9 @@ def get_mnist_data() -> T.Tuple[TensorLike, TensorLike, TensorLike, TensorLike]:
path_to_folder = os.path.join(cur_path, "../../data/")
path_to_resource = os.path.join(path_to_folder, "mnist.npz")

(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data(path_to_resource)
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data(
path_to_resource
)
x_train = x_train.reshape(-1, 28 * 28, 1)
x_test = x_test.reshape(-1, 28 * 28, 1)

Expand All @@ -433,8 +475,8 @@ def _exponential_quadratic(x: npt.NDArray, y: npt.NDArray) -> float:


def get_gp_samples_data(
num_samples: int, max_time: int,
covar_func: T.Callable = _exponential_quadratic) -> npt.NDArray:
num_samples: int, max_time: int, covar_func: T.Callable = _exponential_quadratic
) -> npt.NDArray:
"""
Generates samples from a Gaussian process.
Expand All @@ -457,10 +499,13 @@ def get_gp_samples_data(
sigma = covar_func(times, times)

return np.random.multivariate_normal(
mean=np.zeros(max_time), cov=sigma, size=num_samples)[:, None, :]
mean=np.zeros(max_time), cov=sigma, size=num_samples
)[:, None, :]


def get_physionet2012() -> T.Tuple[TensorLike, TensorLike, TensorLike, TensorLike, TensorLike, TensorLike]:
def get_physionet2012() -> (
T.Tuple[TensorLike, TensorLike, TensorLike, TensorLike, TensorLike, TensorLike]
):
"""
Retrieves the Physionet 2012 dataset.
Expand All @@ -487,7 +532,11 @@ def download_physionet2012() -> None:
"""
base_url = "https://physionet.org/files/challenge-2012/1.0.0/"
destination_folder = "physionet2012"
if os.path.exists(destination_folder) and not os.path.isfile(destination_folder) and len(os.listdir(destination_folder)):
if (
os.path.exists(destination_folder)
and not os.path.isfile(destination_folder)
and len(os.listdir(destination_folder))
):
logger.info(f"Using downloaded dataset from {destination_folder}")
return
X_a = "set-a.tar.gz"
Expand All @@ -505,7 +554,9 @@ def download_physionet2012() -> None:
file_utils.download(base_url + y, destination_folder)

for X, y in all_files:
file_utils.extract_archive(os.path.join(destination_folder, X), destination_folder)
file_utils.extract_archive(
os.path.join(destination_folder, X), destination_folder
)


def _get_physionet_X_dataframe(dataset_path: str) -> pd.DataFrame:
Expand All @@ -521,14 +572,14 @@ def _get_physionet_X_dataframe(dataset_path: str) -> pd.DataFrame:
"""
txt_all = list()
for f in os.listdir(dataset_path):
with open(os.path.join(dataset_path, f), 'r') as fp:
with open(os.path.join(dataset_path, f), "r") as fp:
txt = fp.readlines()

# add recordid as a column
recordid = txt[1].rstrip('\n').split(',')[-1]
txt = [t.rstrip('\n').split(',') + [int(recordid)] for t in txt]
recordid = txt[1].rstrip("\n").split(",")[-1]
txt = [t.rstrip("\n").split(",") + [int(recordid)] for t in txt]
txt_all.extend(txt[1:])
df = pd.DataFrame(txt_all, columns=['time', 'parameter', 'value', 'recordid'])
df = pd.DataFrame(txt_all, columns=["time", "parameter", "value", "recordid"])
return df


Expand All @@ -544,8 +595,8 @@ def _get_physionet_y_dataframe(file_path: str) -> pd.DataFrame:
pd.DataFrame: The target (y) dataframe.
"""
y = pd.read_csv(file_path)
y.set_index('RecordID', inplace=True)
y.index.name = 'recordid'
y.set_index("RecordID", inplace=True)
y.index.name = "recordid"
y.reset_index(inplace=True)
return y

Expand All @@ -567,7 +618,9 @@ def get_covid_19() -> T.Tuple[TensorLike, T.Tuple, T.List]:
The second element is the graph tuple (nodes, edges).
The third element is the order of states.
"""
base_url = "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-states.csv"
base_url = (
"https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-states.csv"
)
destination_folder = "covid19"
file_utils.download(base_url, destination_folder)
result, graph = covid19_data_utils.covid_dataset(
Expand All @@ -580,7 +633,67 @@ def get_covid_19() -> T.Tuple[TensorLike, T.Tuple, T.List]:
for state in covid19_data_utils.LIST_OF_STATES:
cur_data = result[timestamp][state]
processed_dataset[-1].append(
[cur_data["deaths"], cur_data["cases"],
cur_data["deaths_normalized"], cur_data["cases_normalized"]]
[
cur_data["deaths"],
cur_data["cases"],
cur_data["deaths_normalized"],
cur_data["cases_normalized"],
]
)
return np.transpose(np.array(processed_dataset), (1, 0, 2)), graph, covid19_data_utils.LIST_OF_STATES
return (
np.transpose(np.array(processed_dataset), (1, 0, 2)),
graph,
covid19_data_utils.LIST_OF_STATES,
)


def get_arrythmia() -> T.Tuple[TensorLike, TensorLike]:
"""
Downloads and loads the Arrhythmia dataset from
https://physionet.org/static/published-projects/mitdb/mit-bih-arrhythmia-database-1.0.0.zip
and returns the input features (X) and target labels (y).
The Arrhythmia dataset contains ECG recordings of patients with arrhythmia.
:return: A tuple containing the input features (X) and target labels (y).
X has shape (N, M, D) where
N is the number of samples,
M is the signal length (650000),
D is the number of dimensions (2).
:rtype: tuple[TensorLike, TensorLike]
"""
cur_path = os.path.dirname(__file__)
path_to_folder = os.path.join(cur_path, "../../data/")
path_to_resource = os.path.join(path_to_folder, "arrhythmia.data")
dataset = "mit-bih-arrhythmia-database-1.0.0"
url = f"https://physionet.org/static/published-projects/mitdb/{dataset}.zip"
if not os.path.exists(path_to_resource):
file_utils.download(url, path_to_folder)
file_utils.extract_archive(
os.path.join(path_to_folder, f"{dataset}.zip"), path_to_folder
)

# load the dataset
X = []
y = []
for i in range(100, 235):
try:
record_path = os.path.join(path_to_folder, f"{dataset}/{i}")
record = wfdb.rdrecord(record_path)
# equivalent to:
# wfdb.rdsamp(record_path, sampto=3000))

# next line does not work
# annotation = wfdb.rdann(record_path, 'atr',)

# The signal is an (MxN) 2d numpy array, where M is the signal length.
X.append(record.p_signal)
# comments are in record.comments

# annotation data (e.g., sample numbers and symbols)
# y.append((annotation.sample, annotation.symbol))
except Exception as e:
logger.error(f"Failed to parse {record_path}: {e}")
pass

return np.array(X), np.array(y)

0 comments on commit 270f0ee

Please sign in to comment.