Skip to content

Commit

Permalink
Merge pull request #684 from markotoplak/dask-hyper
Browse files Browse the repository at this point in the history
[ENH] Hyperspectra: DaskTable support
  • Loading branch information
markotoplak authored Oct 16, 2023
2 parents e47a8b2 + 5c6e7a1 commit 2178600
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 77 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ htmlcov/*

# check_pylint_diff
.pylint_cache

# dask lcok files
*.lock
*.dirlock
11 changes: 11 additions & 0 deletions orangecontrib/spectroscopy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ def get_sample_datasets_dir():


Orange.data.table.dataset_dirs.append(get_sample_datasets_dir())


try:
import dask
import dask.distributed
dask_client = dask.distributed.Client(processes=False, n_workers=2,
set_as_default=False,
dashboard_address=None)
except ImportError:
dask = None
dask_client = None
6 changes: 3 additions & 3 deletions orangecontrib/spectroscopy/preprocess/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def __call__(self, data):
return np.ones((len(data), len(self.points))) * np.nan
interpfn = self.interpfn
if interpfn is None:
if self.handle_nans and bottleneck.anynan(ys):
if self.handle_nans and np.any(np.isnan(ys)):
if self.kind == "linear":
interpfn = interp1d_with_unknowns_numpy
else:
Expand Down Expand Up @@ -472,7 +472,7 @@ def __call__(self, data):
self.handle_nans, interpfn=self.interpfn)
domain = Orange.data.Domain(atts, data.domain.class_vars,
data.domain.metas)
return data.from_table(domain, data)
return data.transform(domain)


class NotAllContinuousException(Exception):
Expand Down Expand Up @@ -505,7 +505,7 @@ def __call__(self, data):
domain = Orange.data.Domain(self.target.domain.attributes, data.domain.class_vars,
data.domain.metas)
data = data.transform(domain)
with data.unlocked(data.X):
with data.unlocked_reference(data.X):
data.X = X
return data

Expand Down
9 changes: 7 additions & 2 deletions orangecontrib/spectroscopy/preprocess/integrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ def draw_info(self, data, common=None):
if common is None:
common = self.compute_shared(data)
x_s, y_s = self.extract_data(data, common)
# draw_info is rarely called. The following assures
# that compute_draw_info will only need to work with numpy
# arrays, which then in turn are the assumed input in pyqtgraph
x_s = np.asarray(x_s)
y_s = np.asarray(y_s)
return self.compute_draw_info(x_s, y_s)

def extract_data(self, data, common):
Expand Down Expand Up @@ -85,10 +90,10 @@ def compute_baseline(self, x, y):
return edge_baseline(x, y)

def compute_integral(self, x, y_s):
y_s = y_s - self.compute_baseline(x, y_s)
if np.any(np.isnan(y_s)):
# interpolate unknowns as trapz can not handle them
y_s, _ = nan_extend_edges_and_interpolate(x, y_s)
y_s = y_s - self.compute_baseline(x, y_s)
return np.trapz(y_s, x, axis=1)

def compute_draw_info(self, x, ys):
Expand Down Expand Up @@ -206,7 +211,7 @@ def compute_integral(self, x_s, y_s):
return np.zeros((y_s.shape[0],)) * np.nan
# avoid whole nan rows
whole_nan_rows = np.isnan(y_s).all(axis=1)
y_s[whole_nan_rows] = 0
y_s[whole_nan_rows, :] = 0
# select positions
pos = x_s[bottleneck.nanargmax(y_s, axis=1)]
# set unknown results
Expand Down
54 changes: 39 additions & 15 deletions orangecontrib/spectroscopy/preprocess/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import bottleneck
import numpy as np
from Orange.data import Table, Domain
from Orange.data.util import SharedComputeValue
Expand All @@ -7,10 +6,26 @@
from orangecontrib.spectroscopy.data import getx


try:
import dask
import dask.array
except ImportError:
dask = False


def is_increasing(a):
return np.all(np.diff(a) >= 0)


def full_like_type(orig, shape, val):
if isinstance(orig, np.ndarray):
return np.full(shape, val)
elif dask and isinstance(orig, dask.array.Array):
return dask.array.full(shape, val)
else:
raise RuntimeError("Unknown matrix txpe")


class PreprocessException(Exception):

def message(self):
Expand Down Expand Up @@ -145,12 +160,10 @@ def nan_extend_edges_and_interpolate(xs, X):
so that they do not propagate.
"""
nans = None
if bottleneck.anynan(X):
if np.any(np.isnan(X)):
nans = np.isnan(X)
X = X.copy()
xs, xsind, mon, X = transform_to_sorted_wavenumbers(xs, X)
fill_edges(X)
X = interp1d_with_unknowns_numpy(xs[xsind], X, xs[xsind])
X = interp1d_with_unknowns_numpy(xs[xsind], X, xs[xsind], sides=None)
X = transform_back_to_features(xsind, mon, X)
return X, nans

Expand All @@ -174,46 +187,57 @@ def transform_back_to_features(xsind, mon, X):
def fill_edges_1d(l):
"""Replace (inplace!) NaN at sides with the closest value"""
loc = np.where(~np.isnan(l))[0]
if len(loc):
fi, li = loc[[0, -1]]
try:
fi, li = np.array(loc[[0, -1]])
except IndexError:
# nothing to do, no valid value
return l
else:
l[:fi] = l[fi]
l[li + 1:] = l[li]
return l


def fill_edges(mat):
"""Replace (inplace!) NaN at sides with the closest value"""
for l in mat:
fill_edges_1d(l)
for i, l in enumerate(mat):
if dask and isinstance(mat, dask.array.Array):
l = fill_edges_1d(l)
mat[i] = l
else:
fill_edges_1d(l)


def remove_whole_nan_ys(x, ys):
"""Remove whole NaN columns of ys with corresponding x coordinates."""
whole_nan_columns = bottleneck.allnan(ys, axis=0)
whole_nan_columns = np.isnan(ys).all(axis=0)
if np.any(whole_nan_columns):
x = x[~whole_nan_columns]
ys = ys[:, ~whole_nan_columns]
return x, ys


def interp1d_with_unknowns_numpy(x, ys, points, kind="linear"):
def interp1d_with_unknowns_numpy(x, ys, points, kind="linear", sides=np.nan):
if kind != "linear":
raise NotImplementedError
out = np.zeros((len(ys), len(points)))*np.nan
out = full_like_type(ys, (len(ys), len(points)), np.nan)
sorti = np.argsort(x)
x = x[sorti]
for i, y in enumerate(ys):
y = y[sorti]
# the next line ensures numpy arrays
# for Dask, it would be much more efficient to work with larger sections
y = np.array(y[sorti])
nan = np.isnan(y)
xt = x[~nan]
yt = y[~nan]
# do not interpolate unknowns at the edges
if len(xt): # check if all values are removed
out[i] = np.interp(points, xt, yt, left=np.nan, right=np.nan)
out[i] = np.interp(points, xt, yt, left=sides, right=sides)
return out


def interp1d_with_unknowns_scipy(x, ys, points, kind="linear"):
out = np.zeros((len(ys), len(points)))*np.nan
out = full_like_type(ys, (len(ys), len(points)), np.nan)
sorti = np.argsort(x)
x = x[sorti]
for i, y in enumerate(ys):
Expand Down
Loading

0 comments on commit 2178600

Please sign in to comment.