Skip to content

Commit

Permalink
Merge pull request #1117 from VincentRouvreau/persistence_lengths_rep…
Browse files Browse the repository at this point in the history
…resentation

PersistenceLengths and its unitary tests
  • Loading branch information
VincentRouvreau authored Sep 23, 2024
2 parents 4d8779b + e984496 commit 1670055
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 23 deletions.
100 changes: 83 additions & 17 deletions src/python/gudhi/representations/vector_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def transform(self, X):
Parameters:
X (list of n x 2 numpy arrays): input persistence diagrams.
Returns:
numpy array with shape (number of diagrams) x (number of pixels = **resolution[0]** x **resolution[1]**): output persistence images.
"""
Expand Down Expand Up @@ -196,7 +196,7 @@ def transform(self, X):
Parameters:
X (list of n x 2 numpy arrays): input persistence diagrams.
Returns:
numpy array with shape (number of diagrams) x (number of samples = **num_landscapes** x **resolution**): output persistence landscapes.
"""
Expand Down Expand Up @@ -271,7 +271,7 @@ def transform(self, X):
Parameters:
X (list of n x 2 numpy arrays): input persistence diagrams.
Returns:
numpy array with shape (number of diagrams) x (**resolution**): output persistence silhouettes.
"""
Expand Down Expand Up @@ -363,7 +363,7 @@ def fit(self, X, y = None):

if self.predefined_grid is None:
if self.resolution is None: # Flexible/exact version
self.grid_ = np.unique(np.concatenate([pd.ravel() for pd in X] + [[-np.inf]], axis=0))
self.grid_ = np.unique(np.concatenate([pd.ravel() for pd in X] + [[-np.inf]], axis=0))
else:
_grid_from_sample_range(self, X)
else:
Expand Down Expand Up @@ -391,7 +391,7 @@ def transform(self, X):

print("Empty list: output has shape [0, len(grid)]")
return np.zeros((N, len(self.grid_)))

else:

events = np.concatenate([pd.ravel(order="F") for pd in X], axis=0)
Expand All @@ -413,7 +413,7 @@ def transform(self, X):
i += 1
for k in range(0, N):
bettis[k].append(bettis[k][-1])

return np.array(bettis, dtype=int)[:, 0:-1]

def fit_transform(self, X, y = None):
Expand Down Expand Up @@ -485,7 +485,7 @@ def __init__(self, mode="scalar", normalized=True, resolution=100, sample_range=
Parameters:
mode (string): what entropy to compute: either "scalar" for computing the entropy statistics, or "vector" for computing the entropy summary functions (default "scalar").
normalized (bool): whether to normalize the entropy summary function (default True). Used only if **mode** = "vector".
normalized (bool): whether to normalize the entropy summary function (default True). Used only if **mode** = "vector".
resolution (int): number of sample for the entropy summary function (default 100). Used only if **mode** = "vector".
sample_range ([double, double]): minimum and maximum of the entropy summary function domain, of the form [x_min, x_max] (default [numpy.nan, numpy.nan]). It is the interval on which samples will be drawn evenly. If one of the values is numpy.nan, it can be computed from the persistence diagrams with the fit() method. Used only if **mode** = "vector".
keep_endpoints (bool): when computing `sample_range`, use the exact extremities. This is mostly useful for plotting, the default is to use a slightly smaller range.
Expand Down Expand Up @@ -515,16 +515,16 @@ def transform(self, X):
Parameters:
X (list of n x 2 numpy arrays): input persistence diagrams.
Returns:
numpy array with shape (number of diagrams) x (1 if **mode** = "scalar" else **resolution**): output entropy.
"""
num_diag, Xfit = len(X), []
new_X = BirthPersistenceTransform().fit_transform(X)
new_X = BirthPersistenceTransform().fit_transform(X)

for i in range(num_diag):
orig_diagram, new_diagram, num_pts_in_diag = X[i], new_X[i], X[i].shape[0]

p = new_diagram[:,1]
p = p/np.sum(p)
if self.mode == "scalar":
Expand Down Expand Up @@ -566,7 +566,7 @@ def __init__(self, threshold=10):
Constructor for the TopologicalVector class.
Parameters:
threshold (int): number of distances to keep (default 10). This is the dimension of the topological vector. If -1, this threshold is computed from the list of persistence diagrams by considering the one with the largest number of points and using the dimension of its corresponding topological vector as threshold.
threshold (int): number of distances to keep (default 10). This is the dimension of the topological vector. If -1, this threshold is computed from the list of persistence diagrams by considering the one with the largest number of points and using the dimension of its corresponding topological vector as threshold.
"""
self.threshold = threshold

Expand All @@ -586,7 +586,7 @@ def transform(self, X):
Parameters:
X (list of n x 2 numpy arrays): input persistence diagrams.
Returns:
numpy array with shape (number of diagrams) x (**threshold**): output topological vectors.
"""
Expand Down Expand Up @@ -638,7 +638,7 @@ def __init__(self, polynomial_type="R", threshold=10):
Parameters:
polynomial_type (char): either "R", "S" or "T" (default "R"). Type of complex polynomial that is going to be computed (explained in https://link.springer.com/chapter/10.1007%2F978-3-319-23231-7_27).
threshold (int): number of coefficients (default 10). This is the dimension of the complex vector of coefficients, i.e. the number of coefficients corresponding to the largest degree terms of the polynomial. If -1, this threshold is computed from the list of persistence diagrams by considering the one with the largest number of points and using the dimension of its corresponding complex vector of coefficients as threshold.
threshold (int): number of coefficients (default 10). This is the dimension of the complex vector of coefficients, i.e. the number of coefficients corresponding to the largest degree terms of the polynomial. If -1, this threshold is computed from the list of persistence diagrams by considering the one with the largest number of points and using the dimension of its corresponding complex vector of coefficients as threshold.
"""
self.threshold, self.polynomial_type = threshold, polynomial_type

Expand All @@ -658,7 +658,7 @@ def transform(self, X):
Parameters:
X (list of n x 2 numpy arrays): input persistence diagrams.
Returns:
numpy array with shape (number of diagrams) x (**threshold**): output complex vectors of coefficients.
"""
Expand All @@ -681,9 +681,9 @@ def transform(self, X):
roots = np.multiply( (D[:,1]-D[:,0])/2, np.cos(alpha) - np.sin(alpha) + 1j * (np.cos(alpha) + np.sin(alpha)) )
coeff = [0] * (N+1)
coeff[N] = 1
for i in range(1, N+1):
for j in range(N-i-1, N):
coeff[j] += ((-1) * roots[i-1] * coeff[j+1])
for i in range(1, N+1):
for j in range(N-i-1, N):
coeff[j] += ((-1) * roots[i-1] * coeff[j+1])
coeff = np.array(coeff[::-1])[1:]
Xfit[d, :min(thresh, coeff.shape[0])] = coeff[:min(thresh, coeff.shape[0])]
return Xfit
Expand Down Expand Up @@ -885,3 +885,69 @@ def transform(self, X, sample_weight=None):

def get_feature_names_out(self):
return self._running_transform_names

class PersistenceLengths(BaseEstimator, TransformerMixin):
"""
This is a class that returns the sorted N-longest persistence lengths. If the input does not contain enough values,
the output will be filled with zeros.
"""

def __init__(self, num_lengths=5):
"""
Constructor for the PersistenceLengths class.
Parameters:
num_lengths (int): number of persistence lengths to return (default 5).
:raises ValueError: If num_lengths is lower or equal to 0.
"""
if num_lengths <= 0:
raise ValueError("num_lengths must be greater than 0.")
self.num_lengths = num_lengths

def fit(self, X, y=None):
"""
Fit the PersistenceLengths class on a list of persistence diagrams (this function actually does nothing but is
useful when PersistenceLengths is included in a scikit-learn Pipeline).
Parameters:
X (list of n x 2 numpy arrays): input persistence diagrams.
y (None): Ignored.
"""
return self

def transform(self, X):
"""
Compute the persistence lengths for each persistence diagram individually.
Parameters:
X (list of n x 2 numpy arrays): input persistence diagrams.
Returns:
numpy array with shape (number of diagrams) x (num_lengths): output persistence lengths.
"""
pers_length_array = np.zeros((len(X), self.num_lengths))
for idx, pd in enumerate(X):
pl = pd[:, 1] - pd[:, 0]
if len(pl) >= self.num_lengths:
# Select the num_lengths biggest persistence bars length
pl = np.partition(pl, -self.num_lengths)[-self.num_lengths :]

# Sort in reverse order persistence lengths (where length = death - birth)
pl = np.flip(np.sort(pl))
# Filled with zeros if not enough values
pers_length_array[idx][:len(pl)] = pl

return pers_length_array

def __call__(self, diag):
"""
Apply PersistenceLengths on a single persistence diagram and outputs the result.
Parameters:
diag (n x 2 numpy array): input persistence diagram.
Returns:
numpy 1d array of length num_lengths: output persistence lengths.
"""
return self.transform([diag])[0]
27 changes: 21 additions & 6 deletions src/python/test/test_representations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

# Vectorization
from gudhi.representations import (Landscape, Silhouette, BettiCurve, ComplexPolynomial,\
TopologicalVector, PersistenceImage, Entropy)
TopologicalVector, PersistenceImage, Entropy, PersistenceLengths)

# Preprocessing
from gudhi.representations import (BirthPersistenceTransform, Clamping, DiagramScaler, Padding, ProminentPoints, \
Expand Down Expand Up @@ -128,8 +128,8 @@ def test_atol_doc():

# Check the center of the first_cluster and second_cluster are in Atol centers
centers = atol_vectoriser.fit(X=[a, b, c]).centers
np.isclose(centers, first_cluster.mean(axis=0)).all(1).any()
np.isclose(centers, second_cluster.mean(axis=0)).all(1).any()
np.isclose(centers, first_cluster.mean(axis=0)).all(1).any()
np.isclose(centers, second_cluster.mean(axis=0)).all(1).any()

vectorization = atol_vectoriser.transform(X=[a, b, c])
assert np.allclose(vectorization[0], atol_vectoriser(a))
Expand Down Expand Up @@ -203,7 +203,7 @@ def test_vectorization_empty_diagrams():
scv = Entropy(mode="vector", normalized=False, resolution=random_resolution)(empty_diag)
assert not np.any(scv)
assert scv.shape[0] == random_resolution

def test_entropy_miscalculation():
diag_ex = np.array([[0.0,1.0], [0.0,1.0], [0.0,2.0]])
def pe(pd):
Expand All @@ -215,14 +215,14 @@ def pe(pd):
sce = Entropy(mode="vector", resolution=4, normalized=False, keep_endpoints=True)
pef = [-1/4*np.log(1/4)-1/4*np.log(1/4)-1/2*np.log(1/2),
-1/4*np.log(1/4)-1/4*np.log(1/4)-1/2*np.log(1/2),
-1/2*np.log(1/2),
-1/2*np.log(1/2),
0.0]
assert all(([pef] == sce.fit_transform([diag_ex]))[0])
sce = Entropy(mode="vector", resolution=4, normalized=True)
pefN = (sce.fit_transform([diag_ex]))[0]
area = np.linalg.norm(pefN, ord=1)
assert area==pytest.approx(1)

def test_kernel_empty_diagrams():
empty_diag = np.empty(shape = [0, 2])
assert SlicedWassersteinDistance(num_directions=100)(empty_diag, empty_diag) == 0.
Expand Down Expand Up @@ -318,3 +318,18 @@ def test_endpoints():
def test_get_params():
for vec in [ Landscape(), Silhouette(), BettiCurve(), Entropy(mode="vector") ]:
vec.get_params()

def test_persistence_lengths():
diag = np.array([[3., 5.], [0, np.inf], [4., 4.], [2., 6.]])
for nl in range(1, 6):
pl = PersistenceLengths(num_lengths=nl)(diag)
# test the result is sorted
assert np.all(sorted(pl, reverse=True) == pl)
if len(pl) > 0:
assert np.isinf(pl[0])
for idx in range(len(pl)):
if idx >= len(diag):
# test it is filled with zeros when going further the input
assert pl[idx] == 0.
with pytest.raises(ValueError):
pl = PersistenceLengths(num_lengths=0)(diag)

0 comments on commit 1670055

Please sign in to comment.