Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correction to M-test with resampling, and addition of new MLL test #268

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
272 changes: 270 additions & 2 deletions csep/core/catalog_evaluations.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# Third-Party Imports
# Python imports
import time
from typing import Optional, TYPE_CHECKING

# Third-Party Imports
import numpy
import scipy.stats

# PyCSEP imports
from csep.core.exceptions import CSEPEvaluationException
from csep.core.catalogs import CSEPCatalog
from csep.models import (
CatalogNumberTestResult,
CatalogSpatialTestResult,
Expand All @@ -14,7 +17,10 @@
CalibrationTestResult
)
from csep.utils.calc import _compute_likelihood
from csep.utils.stats import get_quantiles, cumulative_square_diff
from csep.utils.stats import get_quantiles, cumulative_square_diff, MLL_score

if TYPE_CHECKING:
from csep.core.forecasts import CatalogForecast


def number_test(forecast, observed_catalog, verbose=True):
Expand Down Expand Up @@ -55,6 +61,7 @@ def number_test(forecast, observed_catalog, verbose=True):
obs_name=observed_catalog.name)
return result


def spatial_test(forecast, observed_catalog, verbose=True):
""" Performs spatial test for catalog-based forecasts.

Expand Down Expand Up @@ -140,6 +147,7 @@ def spatial_test(forecast, observed_catalog, verbose=True):

return result


def magnitude_test(forecast, observed_catalog, verbose=True):
""" Performs magnitude test for catalog-based forecasts """
test_distribution = []
Expand Down Expand Up @@ -215,6 +223,7 @@ def magnitude_test(forecast, observed_catalog, verbose=True):

return result


def pseudolikelihood_test(forecast, observed_catalog, verbose=True):
""" Performs the spatial pseudolikelihood test for catalog forecasts.

Expand Down Expand Up @@ -310,6 +319,7 @@ def pseudolikelihood_test(forecast, observed_catalog, verbose=True):

return result


def calibration_test(evaluation_results, delta_1=False):
""" Perform the calibration test by computing a Kilmogorov-Smirnov test of the observed quantiles against a uniform
distribution.
Expand Down Expand Up @@ -345,3 +355,261 @@ def calibration_test(evaluation_results, delta_1=False):
return result


def resampled_magnitude_test(forecast: "CatalogForecast",
observed_catalog: CSEPCatalog,
verbose: bool = False,
seed: Optional[int] = None) -> CatalogMagnitudeTestResult:
"""
Performs the resampled magnitude test for catalog-based forecasts (Serafini et al., 2024),
which corrects the bias from the original M-test implementation to the total N of events.
Calculates the (pseudo log-likelihood) test statistic distribution from the forecast's
synthetic catalogs Λ_j as:

D_j = Σ_k [log(Λ_u(k) * N / N_u + 1) - log(Λ̃_j(k) + 1)] ^ 2

where k are the magnitude bins, Λ_u the union of all synthetic catalogs, N_u the total
number of events in Λ_u, and Λ̃_j the resampled catalog containing exactly N events.

The pseudo log-likelihood statistic from the observations is calculated as:

D_o = Σ_k [log(Λ_U(k) * N / N_u + 1) - log(Ω(k) + 1)]^2

where Ω is the observed catalog.

Args:
forecast (CatalogForecast): The forecast to be evaluated
observed_catalog (CSEPCatalog): The observation/testing catalog.
verbose (bool): Flag to display debug messages
seed (int): Random number generator seed

Returns:
A CatalogMagnitudeTestResult object containing the statistic distribution and the
observed statistic.
"""

# set seed
if seed:
numpy.random.seed(seed)
""" """
test_distribution = []

if forecast.region.magnitudes is None:
raise CSEPEvaluationException(
"Forecast must have region.magnitudes member to perform magnitude test.")

# short-circuit if zero events
if observed_catalog.event_count == 0:
print("Cannot perform magnitude test when observed event count is zero.")
# prepare result
result = CatalogMagnitudeTestResult(test_distribution=test_distribution,
name='M-Test',
observed_statistic=None,
quantile=(None, None),
status='not-valid',
min_mw=forecast.min_magnitude,
obs_catalog_repr=str(observed_catalog),
obs_name=observed_catalog.name,
sim_name=forecast.name)

return result

# compute expected rates for forecast if needed
if forecast.expected_rates is None:
forecast.get_expected_rates(verbose=verbose)

# THIS IS NEW - returns the average events in the magnitude bins
union_histogram = numpy.zeros(len(forecast.magnitudes))
for j, cat in enumerate(forecast):
union_histogram += cat.magnitude_counts()

mag_half_bin = numpy.diff(observed_catalog.region.magnitudes)[0] / 2.
# end new
n_union_events = numpy.sum(union_histogram)
obs_histogram = observed_catalog.magnitude_counts()
n_obs = numpy.sum(obs_histogram)
union_scale = n_obs / n_union_events
scaled_union_histogram = union_histogram * union_scale

# this is new - prob to be used for resampling
probs = union_histogram / n_union_events
# end new

# compute the test statistic for each catalog
t0 = time.time()
for i, catalog in enumerate(forecast):
# THIS IS NEW - sampled from the union forecast histogram
mag_values = numpy.random.choice(forecast.magnitudes + mag_half_bin, p=probs,
size=int(n_obs))
extended_mag_max = max(forecast.magnitudes) + 10
mag_counts, tmp = numpy.histogram(mag_values, bins=numpy.append(forecast.magnitudes,
extended_mag_max))
# end new
n_events = numpy.sum(mag_counts)
if n_events == 0:
# print("Skipping to next because catalog contained zero events.")
continue
scale = n_obs / n_events
catalog_histogram = mag_counts * scale
# compute magnitude test statistic for the catalog
test_distribution.append(
cumulative_square_diff(numpy.log10(catalog_histogram + 1),
numpy.log10(scaled_union_histogram + 1))
)
# output status
if verbose:
tens_exp = numpy.floor(numpy.log10(i + 1))
if (i + 1) % 10 ** tens_exp == 0:
t1 = time.time()
print(f'Processed {i + 1} catalogs in {t1 - t0} seconds', flush=True)

# compute observed statistic
obs_d_statistic = cumulative_square_diff(numpy.log10(obs_histogram + 1),
numpy.log10(scaled_union_histogram + 1))

# score evaluation
delta_1, delta_2 = get_quantiles(test_distribution, obs_d_statistic)

# prepare result
result = CatalogMagnitudeTestResult(test_distribution=test_distribution,
name='M-Test',
observed_statistic=obs_d_statistic,
quantile=(delta_1, delta_2),
status='normal',
min_mw=forecast.min_magnitude,
obs_catalog_repr=str(observed_catalog),
obs_name=observed_catalog.name,
sim_name=forecast.name)

return result


def MLL_magnitude_test(forecast: "CatalogForecast",
observed_catalog: CSEPCatalog,
full_calculation: bool = False,
verbose: bool = False,
seed: Optional[int] = None) -> CatalogMagnitudeTestResult:
"""
Implements the modified Multinomial log-likelihood ratio (MLL) magnitude test (Serafini et
al., 2024). Calculates the test statistic distribution as:

D̃_j = -2 * log( L(Λ_u + N_u / N_j + Λ̃_j + 1) /
[L(Λ_u + N_u / N_j) * L(Λ̃_j + 1)]
)

where L is the multinomial likelihood function, Λ_u the union of all the forecasts'
synthetic catalogs, N_u the total number of events in Λ_u, Λ̃_j the resampled catalog
containing exactly N observed events. The observed statistic is defined as:

D_o = -2 * log( L(Λ_u + N_u / N + Ω + 1) /
[L(Λ_u + N_u / N) * L(Ω + 1)]
)

where Ω is the observed catalog.

Args:
forecast (CatalogForecast): The forecast to be evaluated
observed_catalog (CSEPCatalog): The observation/testing catalog.
full_calculation (bool): Whether to sample from the entire stochastic catalogs or from
its already processed magnitude histogram.
verbose (bool): Flag to display debug messages
seed (int): Random number generator seed

Returns:
A CatalogMagnitudeTestResult object containing the statistic distribution and the
observed statistic.
"""

# set seed
if seed:
numpy.random.seed(seed)

test_distribution = []

if forecast.region.magnitudes is None:
raise CSEPEvaluationException(
"Forecast must have region.magnitudes member to perform magnitude test.")

# short-circuit if zero events
if observed_catalog.event_count == 0:
print("Cannot perform magnitude test when observed event count is zero.")
# prepare result
result = CatalogMagnitudeTestResult(test_distribution=test_distribution,
name='M-Test',
observed_statistic=None,
quantile=(None, None),
status='not-valid',
min_mw=forecast.min_magnitude,
obs_catalog_repr=str(observed_catalog),
obs_name=observed_catalog.name,
sim_name=forecast.name)

return result

# compute expected rates for forecast if needed
if forecast.expected_rates is None:
forecast.get_expected_rates(verbose=verbose)

# calculate histograms of union forecast and total number of events
Lambda_u_histogram = numpy.zeros(len(forecast.magnitudes))

if full_calculation:
Lambda_u = []
else:
mag_half_bin = numpy.diff(observed_catalog.region.magnitudes)[0] / 2.

for j, cat in enumerate(forecast):
if full_calculation:
Lambda_u = numpy.append(Lambda_u, cat.get_magnitudes())
Lambda_u_histogram += cat.magnitude_counts()

# # calculate histograms of observations and observed number of events
Omega_histogram = observed_catalog.magnitude_counts()
n_obs = numpy.sum(Omega_histogram)

# compute observed statistic
obs_d_statistic = MLL_score(union_catalog_counts=Lambda_u_histogram,
catalog_counts=Omega_histogram)

probs = Lambda_u_histogram / numpy.sum(Lambda_u_histogram)

# compute the test statistic for each catalog
t0 = time.time()
for i, catalog in enumerate(forecast):
# this is new - sampled from the union forecast histogram
if full_calculation:
mag_values = numpy.random.choice(Lambda_u, size=int(n_obs))
else:
mag_values = numpy.random.choice(forecast.magnitudes + mag_half_bin, p=probs,
size=int(n_obs))
extended_mag_max = max(forecast.magnitudes) + 10
Lambda_j_histogram, tmp = numpy.histogram(mag_values,
bins=numpy.append(forecast.magnitudes,
extended_mag_max))

# compute magnitude test statistic for the catalog
test_distribution.append(
MLL_score(union_catalog_counts=Lambda_u_histogram,
catalog_counts=Lambda_j_histogram)
)
# output status
if verbose:
tens_exp = numpy.floor(numpy.log10(i + 1))
if (i + 1) % 10 ** tens_exp == 0:
t1 = time.time()
print(f'Processed {i + 1} catalogs in {t1 - t0} seconds', flush=True)

# score evaluation
delta_1, delta_2 = get_quantiles(test_distribution, obs_d_statistic)

# prepare result
result = CatalogMagnitudeTestResult(test_distribution=test_distribution,
name='M-Test',
observed_statistic=obs_d_statistic,
quantile=(delta_1, delta_2),
status='normal',
min_mw=forecast.min_magnitude,
obs_catalog_repr=str(observed_catalog),
obs_name=observed_catalog.name,
sim_name=forecast.name)

return result
63 changes: 60 additions & 3 deletions csep/utils/stats.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import numpy
import scipy.stats
import scipy.special
# PyCSEP imports
from csep.core import regions
import scipy.stats


def sup_dist(cdf1, cdf2):
"""
Expand Down Expand Up @@ -269,3 +268,61 @@ def get_Kagan_I1_score(forecasts, catalog):
I_1[j] = numpy.dot(counts[non_zero_idx], numpy.log2(rate_den[non_zero_idx] / uniform_forecast)) / n_event

return I_1


def log_d_multinomial(x: numpy.ndarray, size: int, prob: numpy.ndarray):
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires docstrings explaining how it was calculated and/or derived from the MS.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the multinomial log-likelihood which is given by

log( Γ(size + 1) ) + Σ_i x_i log(prob_i ) - log( Γ(x_i + 1) )


Args:
x:
size:
prob:

Returns:

"""
return scipy.special.loggamma(size + 1) + numpy.sum(
x * numpy.log(prob) - scipy.special.loggamma(x + 1))


def MLL_score(union_catalog_counts: numpy.ndarray, catalog_counts: numpy.ndarray):
"""
Calculates the modified Multinomial log-likelihood (MLL) score, defined by Serafini et al.,
(2024). It is built from a collection catalogs Λ_u and a single catalog Ω

MLL_score = 2 * log( L(Λ_u + N_u / N_o + Ω + 1) /
[L(Λ_u + N_u / N_o) * L(Ω + 1)]
)
where N_u and N_j are the total number of events in Λ_u and Ω, respectively.

Args:
union_catalog_counts (numpy.ndarray):
catalog_counts (numpy.ndarray):

Returns:
The MLL score for the collection of catalogs and
"""

N_u = numpy.sum(union_catalog_counts)
N_j = numpy.sum(catalog_counts)
events_ratio = N_u / N_j

union_catalog_counts_mod = union_catalog_counts + events_ratio
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the named Lambda_U and Lambda_j to this more python variable names (No start with caps). Let me know if u agree :D or have other suggestion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is okay for me

catalog_counts_mod = catalog_counts + 1
merged_catalog_j = union_catalog_counts_mod + catalog_counts_mod

pr_merged_cat = merged_catalog_j / numpy.sum(merged_catalog_j)
pr_union_cat = union_catalog_counts_mod / numpy.sum(union_catalog_counts_mod)
pr_cat_j = catalog_counts_mod / numpy.sum(catalog_counts_mod)

log_lik_merged = log_d_multinomial(x=merged_catalog_j,
size=numpy.sum(merged_catalog_j),
prob=pr_merged_cat)
log_lik_union = log_d_multinomial(x=union_catalog_counts_mod,
size=numpy.sum(union_catalog_counts_mod),
prob=pr_union_cat)
log_like_cat_j = log_d_multinomial(x=catalog_counts_mod,
size=numpy.sum(catalog_counts_mod),
prob=pr_cat_j)

return 2 * (log_lik_merged - log_lik_union - log_like_cat_j)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a negative -2?? Or did we flip the score? I'm always confused when flipping.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should according to the definition in the manuscript. I'll change that. The minus sign does not change any property of the MLL statistic and it just make it positively or negatively oriented, so no issue changing the definition.

Loading