Skip to content

Commit

Permalink
pythongh-115532 Add kde_random() to the statistic module (python#118210)
Browse files Browse the repository at this point in the history
  • Loading branch information
rhettinger authored May 4, 2024
1 parent 1b7e5e6 commit 42dc5b4
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 63 deletions.
84 changes: 25 additions & 59 deletions Doc/library/statistics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ or sample.
:func:`geometric_mean` Geometric mean of data.
:func:`harmonic_mean` Harmonic mean of data.
:func:`kde` Estimate the probability density distribution of the data.
:func:`kde_random` Random sampling from the PDF generated by kde().
:func:`median` Median (middle value) of data.
:func:`median_low` Low median of data.
:func:`median_high` High median of data.
Expand Down Expand Up @@ -311,6 +312,30 @@ However, for reading convenience, most of the examples show sorted sequences.
.. versionadded:: 3.13


.. function:: kde_random(data, h, kernel='normal', *, seed=None)

Return a function that makes a random selection from the estimated
probability density function produced by ``kde(data, h, kernel)``.

Providing a *seed* allows reproducible selections. In the future, the
values may change slightly as more accurate kernel inverse CDF estimates
are implemented. The seed may be an integer, float, str, or bytes.

A :exc:`StatisticsError` will be raised if the *data* sequence is empty.

Continuing the example for :func:`kde`, we can use
:func:`kde_random` to generate new random selections from an
estimated probability density function:

>>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
>>> rand = kde_random(data, h=1.5, seed=8675309)
>>> new_selections = [rand() for i in range(10)]
>>> [round(x, 1) for x in new_selections]
[0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6]

.. versionadded:: 3.13


.. function:: median(data)

Return the median (middle value) of numeric data, using the common "mean of
Expand Down Expand Up @@ -1148,65 +1173,6 @@ The final prediction goes to the largest posterior. This is known as the
'female'


Sampling from kernel density estimation
***************************************

The :func:`kde()` function creates a continuous probability density
function from discrete samples. Some applications need a way to make
random selections from that distribution.

The technique is to pick a sample from a bandwidth scaled kernel
function and recenter the result around a randomly chosen point from
the input data. This can be done with any kernel that has a known or
accurately approximated inverse cumulative distribution function.

.. testcode::

from random import choice, random, seed
from math import sqrt, log, pi, tan, asin, cos, acos
from statistics import NormalDist

kernel_invcdfs = {
'normal': NormalDist().inv_cdf,
'logistic': lambda p: log(p / (1 - p)),
'sigmoid': lambda p: log(tan(p * pi/2)),
'rectangular': lambda p: 2*p - 1,
'triangular': lambda p: sqrt(2*p) - 1 if p < 0.5 else 1 - sqrt(2 - 2*p),
'parabolic': lambda p: 2 * cos((acos(2*p-1) + pi) / 3),
'cosine': lambda p: 2*asin(2*p - 1)/pi,
}

def kde_random(data, h, kernel='normal'):
'Return a function that samples from kde() smoothed data.'
kernel_invcdf = kernel_invcdfs[kernel]
def rand():
return h * kernel_invcdf(random()) + choice(data)
return rand

For example:

.. doctest::

>>> discrete_samples = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
>>> rand = kde_random(discrete_samples, h=1.5)
>>> seed(8675309)
>>> selections = [rand() for i in range(10)]
>>> [round(x, 1) for x in selections]
[4.7, 7.4, 1.2, 7.8, 6.9, -1.3, 5.8, 0.2, -1.4, 5.7]

.. testcode::
:hide:

from statistics import kde
from math import isclose

# Verify that cdf / invcdf will round trip
xarr = [i/100 for i in range(-100, 101)]
for kernel, invcdf in kernel_invcdfs.items():
cdf = kde([0.0], h=1.0, kernel=kernel, cumulative=True)
for x in xarr:
assert isclose(invcdf(cdf(x)), x, abs_tol=1E-9)

..
# This modelines must appear within the last ten lines of the file.
kate: indent-width 3; remove-trailing-space on; replace-tabs on; encoding utf-8;
3 changes: 2 additions & 1 deletion Doc/whatsnew/3.13.rst
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,8 @@ statistics

* Add :func:`statistics.kde` for kernel density estimation.
This makes it possible to estimate a continuous probability density function
from a fixed number of discrete samples.
from a fixed number of discrete samples. Also added :func:`statistics.kde_random`
for sampling from the estimated probability density function.
(Contributed by Raymond Hettinger in :gh:`115863`.)

.. _whatsnew313-subprocess:
Expand Down
103 changes: 100 additions & 3 deletions Lib/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
'geometric_mean',
'harmonic_mean',
'kde',
'kde_random',
'linear_regression',
'mean',
'median',
Expand All @@ -138,12 +139,13 @@
from itertools import count, groupby, repeat
from bisect import bisect_left, bisect_right
from math import hypot, sqrt, fabs, exp, erf, tau, log, fsum, sumprod
from math import isfinite, isinf, pi, cos, sin, cosh, atan
from math import isfinite, isinf, pi, cos, sin, tan, cosh, asin, atan, acos
from functools import reduce
from operator import itemgetter
from collections import Counter, namedtuple, defaultdict

_SQRT2 = sqrt(2.0)
_random = random

# === Exceptions ===

Expand Down Expand Up @@ -978,11 +980,9 @@ def pdf(x):
return sum(K((x - x_i) / h) for x_i in data) / (n * h)

def cdf(x):

n = len(data)
return sum(W((x - x_i) / h) for x_i in data) / n


else:

sample = sorted(data)
Expand Down Expand Up @@ -1078,6 +1078,7 @@ def quantiles(data, *, n=4, method='exclusive'):
if ld == 1:
return data * (n - 1)
raise StatisticsError('must have at least one data point')

if method == 'inclusive':
m = ld - 1
result = []
Expand All @@ -1086,6 +1087,7 @@ def quantiles(data, *, n=4, method='exclusive'):
interpolated = (data[j] * (n - delta) + data[j + 1] * delta) / n
result.append(interpolated)
return result

if method == 'exclusive':
m = ld + 1
result = []
Expand All @@ -1096,6 +1098,7 @@ def quantiles(data, *, n=4, method='exclusive'):
interpolated = (data[j - 1] * (n - delta) + data[j] * delta) / n
result.append(interpolated)
return result

raise ValueError(f'Unknown method: {method!r}')


Expand Down Expand Up @@ -1709,3 +1712,97 @@ def __getstate__(self):

def __setstate__(self, state):
self._mu, self._sigma = state


## kde_random() ##############################################################

def _newton_raphson(f_inv_estimate, f, f_prime, tolerance=1e-12):
def f_inv(y):
"Return x such that f(x) ≈ y within the specified tolerance."
x = f_inv_estimate(y)
while abs(diff := f(x) - y) > tolerance:
x -= diff / f_prime(x)
return x
return f_inv

def _quartic_invcdf_estimate(p):
sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
x = (2.0 * p) ** 0.4258865685331 - 1.0
if p >= 0.004 < 0.499:
x += 0.026818732 * sin(7.101753784 * p + 2.73230839482953)
return x * sign

_quartic_invcdf = _newton_raphson(
f_inv_estimate = _quartic_invcdf_estimate,
f = lambda t: 3/16 * t**5 - 5/8 * t**3 + 15/16 * t + 1/2,
f_prime = lambda t: 15/16 * (1.0 - t * t) ** 2)

def _triweight_invcdf_estimate(p):
sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
x = (2.0 * p) ** 0.3400218741872791 - 1.0
return x * sign

_triweight_invcdf = _newton_raphson(
f_inv_estimate = _triweight_invcdf_estimate,
f = lambda t: 35/32 * (-1/7*t**7 + 3/5*t**5 - t**3 + t) + 1/2,
f_prime = lambda t: 35/32 * (1.0 - t * t) ** 3)

_kernel_invcdfs = {
'normal': NormalDist().inv_cdf,
'logistic': lambda p: log(p / (1 - p)),
'sigmoid': lambda p: log(tan(p * pi/2)),
'rectangular': lambda p: 2*p - 1,
'parabolic': lambda p: 2 * cos((acos(2*p-1) + pi) / 3),
'quartic': _quartic_invcdf,
'triweight': _triweight_invcdf,
'triangular': lambda p: sqrt(2*p) - 1 if p < 1/2 else 1 - sqrt(2 - 2*p),
'cosine': lambda p: 2 * asin(2*p - 1) / pi,
}
_kernel_invcdfs['gauss'] = _kernel_invcdfs['normal']
_kernel_invcdfs['uniform'] = _kernel_invcdfs['rectangular']
_kernel_invcdfs['epanechnikov'] = _kernel_invcdfs['parabolic']
_kernel_invcdfs['biweight'] = _kernel_invcdfs['quartic']

def kde_random(data, h, kernel='normal', *, seed=None):
"""Return a function that makes a random selection from the estimated
probability density function created by kde(data, h, kernel).
Providing a *seed* allows reproducible selections within a single
thread. The seed may be an integer, float, str, or bytes.
A StatisticsError will be raised if the *data* sequence is empty.
Example:
>>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
>>> rand = kde_random(data, h=1.5, seed=8675309)
>>> new_selections = [rand() for i in range(10)]
>>> [round(x, 1) for x in new_selections]
[0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6]
"""
n = len(data)
if not n:
raise StatisticsError('Empty data sequence')

if not isinstance(data[0], (int, float)):
raise TypeError('Data sequence must contain ints or floats')

if h <= 0.0:
raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}')

try:
kernel_invcdf = _kernel_invcdfs[kernel]
except KeyError:
raise StatisticsError(f'Unknown kernel name: {kernel!r}')

prng = _random.Random(seed)
random = prng.random
choice = prng.choice

def rand():
return choice(data) + h * kernel_invcdf(random())

rand.__doc__ = f'Random KDE selection with {h=!r} and {kernel=!r}'

return rand
80 changes: 80 additions & 0 deletions Lib/test/test_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2426,6 +2426,86 @@ def integrate(func, low, high, steps=10_000):
self.assertEqual(f_hat(-1.0), 1/2)
self.assertEqual(f_hat(1.0), 1/2)

def test_kde_kernel_invcdfs(self):
kernel_invcdfs = statistics._kernel_invcdfs
kde = statistics.kde

# Verify that cdf / invcdf will round trip
xarr = [i/100 for i in range(-100, 101)]
for kernel, invcdf in kernel_invcdfs.items():
with self.subTest(kernel=kernel):
cdf = kde([0.0], h=1.0, kernel=kernel, cumulative=True)
for x in xarr:
self.assertAlmostEqual(invcdf(cdf(x)), x, places=5)

def test_kde_random(self):
kde_random = statistics.kde_random
StatisticsError = statistics.StatisticsError
kernels = ['normal', 'gauss', 'logistic', 'sigmoid', 'rectangular',
'uniform', 'triangular', 'parabolic', 'epanechnikov',
'quartic', 'biweight', 'triweight', 'cosine']
sample = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]

# Smoke test

for kernel in kernels:
with self.subTest(kernel=kernel):
rand = kde_random(sample, h=1.5, kernel=kernel)
selections = [rand() for i in range(10)]

# Check error cases

with self.assertRaises(StatisticsError):
kde_random([], h=1.0) # Empty dataset
with self.assertRaises(TypeError):
kde_random(['abc', 'def'], 1.5) # Non-numeric data
with self.assertRaises(TypeError):
kde_random(iter(sample), 1.5) # Data is not a sequence
with self.assertRaises(StatisticsError):
kde_random(sample, h=0.0) # Zero bandwidth
with self.assertRaises(StatisticsError):
kde_random(sample, h=0.0) # Negative bandwidth
with self.assertRaises(TypeError):
kde_random(sample, h='str') # Wrong bandwidth type
with self.assertRaises(StatisticsError):
kde_random(sample, h=1.0, kernel='bogus') # Invalid kernel

# Test name and docstring of the generated function

h = 1.5
kernel = 'cosine'
prng = kde_random(sample, h, kernel)
self.assertEqual(prng.__name__, 'rand')
self.assertIn(kernel, prng.__doc__)
self.assertIn(repr(h), prng.__doc__)

# Approximate distribution test: Compare a random sample to the expected distribution

data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2, 7.8, 14.3, 15.1, 15.3, 15.8, 17.0]
n = 1_000_000
h = 1.75
dx = 0.1

def p_expected(x):
return F_hat(x + dx) - F_hat(x - dx)

def p_observed(x):
# P(x-dx <= X < x+dx) / (2*dx)
i = bisect.bisect_left(big_sample, x - dx)
j = bisect.bisect_right(big_sample, x + dx)
return (j - i) / len(big_sample)

for kernel in kernels:
with self.subTest(kernel=kernel):

F_hat = statistics.kde(data, h, kernel, cumulative=True)
rand = kde_random(data, h, kernel, seed=8675309**2)
big_sample = sorted([rand() for i in range(n)])

for x in range(-40, 190):
x /= 10
self.assertTrue(math.isclose(p_observed(x), p_expected(x), abs_tol=0.001))


class TestQuantiles(unittest.TestCase):

Expand Down

0 comments on commit 42dc5b4

Please sign in to comment.