"""Module containing the base classes for the measure estimators."""
from abc import ABC, abstractmethod
from io import UnsupportedOperation
from operator import gt
from typing import Callable, Generic, final, Sequence
from numpy import asarray, integer, issubdtype, log, log2, log10, nan, ndarray, std
from numpy import mean as np_mean
from numpy import sum as np_sum
from numpy.random import default_rng
from .. import Config
from ..utils.config import logger
from ..utils.types import EstimatorType, LogBaseType
from .utils.normalize import normalize_data_0_1
from .utils.te_slicing import cte_observations, te_observations
[docs]
class Estimator(Generic[EstimatorType], ABC):
"""Abstract base class for all measure estimators.
Find :ref:`Estimator Usage` on how to use the estimators and an overview of the
available measures (:ref:`Available approaches`).
Attributes
----------
res_global : float | None
The global value of the measure.
None if the measure is not calculated.
res_local : array-like | None
The local values of the measure.
None if the measure is not calculated or if not defined.
base : int | float | "e", optional
The logarithm base for the entropy calculation.
The default can be set
with :func:`set_logarithmic_unit() <infomeasure.utils.config.Config.set_logarithmic_unit>`.
See Also
--------
EntropyEstimator, MutualInformationEstimator, TransferEntropyEstimator
Notes
-----
The :meth:`_calculate` method needs to be implemented in the derived classes,
for the local values or the global value.
From local values, the global value is taken as the mean.
If is to more efficient to directly calculate the global value,
it is suggested to have :meth:`_calculate` just return the global value,
and have the separate :meth:`_extract_local_values` method for the local values,
which is lazily called by :meth:`local_val`, if needed.
If the measure has a p-value, the :meth:`p_value` method should be implemented
(use :class:`PValueMixin` for standard implementations).
"""
def __init__(self, base: LogBaseType = Config.get("base")):
"""Initialize the estimator."""
self.res_global = None
self.res_local = None
self.base = base
[docs]
@final
def calculate(self) -> None:
"""Calculate the measure.
Estimate the measure and store the results in the attributes.
"""
results = self._calculate()
if isinstance(results, ndarray):
if results.ndim != 1:
raise RuntimeError(
"Local values must be a 1D array. "
f"Received {results.ndim}D array with shape {results.shape}."
)
self.res_global, self.res_local = np_mean(results), results
logger.debug(
f"Global: {self.res_global:.4e}, "
# show the first max 5 local values
f"Local: {', '.join([f'{x:.2e}' for x in self.res_local[:5]])}"
f"{', ...' if len(self.res_local) > 5 else ''}"
)
elif isinstance(results, (int, float)):
self.res_global = results
logger.debug(f"Global: {self.res_global:.4e}")
else:
raise RuntimeError(
f"Invalid result type {type(results)} for {self.__class__.__name__}."
)
[docs]
@final
def result(self) -> float:
"""Return the global value of the measure.
Calculate the measure if not already calculated.
Returns
-------
results : float
The global value of the measure.
"""
return self.global_val()
[docs]
@final
def global_val(self) -> float:
"""Return the global value of the measure.
Calculate the measure if not already calculated.
Returns
-------
global : float
The global value of the measure.
"""
if self.res_global is None:
logger.debug(f"Using {self.__class__.__name__} to estimate the measure.")
self.calculate()
return self.res_global
[docs]
def local_vals(self):
"""Return the local values of the measure, if available.
Returns
-------
local : array-like
The local values of the measure.
Raises
------
io.UnsupportedOperation
If the local values are not available.
"""
if self.global_val() is not None and self.res_local is None:
try:
self.res_local = self._extract_local_values()
except NotImplementedError:
raise UnsupportedOperation(
f"Local values are not available for {self.__class__.__name__}."
)
# check absolute and relative difference
if (
abs(np_mean(self.res_local) - self.res_global) > 1e-10
and abs((np_mean(self.res_local) - self.res_global) / self.res_global)
> 1e-5
):
raise RuntimeError(
f"Mean of local values {np_mean(self.res_local)} "
f"does not match the global value {self.res_global}. "
f"Diff: {np_mean(self.res_local) - self.res_global:.2e}. "
+ (
f"As you are using {len(self.data)} random variables, "
f"this is likely a numerical error."
if (isinstance(self.data, tuple) and len(self.data) > 5)
else ""
)
)
return self.res_local
@abstractmethod
def _calculate(self) -> float | ndarray[float]:
"""Calculate the measure.
Returns
-------
result : float | array-like
The entropy as float, or an array of local values.
"""
pass
def _extract_local_values(self) -> ndarray[float]:
"""Extract the local values of the measure.
For estimators that only calculate the global value, this method can be
implemented to extract the local values from the data, e.g. histogram,
implementation-specific values, etc.
Returns
-------
array-like
The local values of the measure.
"""
raise NotImplementedError(
"Local values are not available for this estimator. "
"Implement the _extract_local_values method to extract them."
)
@final
def _log_base(self, x):
"""Calculate the logarithm of the data using the specified base.
Parameters
----------
x : array-like
The data to calculate the logarithm of.
Returns
-------
array-like
The logarithm of the data.
Raises
------
ValueError
If the logarithm base is negative.
Notes
-----
The logarithm base can be an integer, a float, or "e" for the natural logarithm.
"""
# Common logarithms
if self.base == 2:
return log2(x)
elif self.base == "e":
return log(x)
elif self.base == 10:
return log10(x)
# Edge case: log_1(x) = 0
elif self.base == 0:
return 0
# Negative base logarithm is undefined
elif self.base < 0:
raise ValueError(f"Logarithm base must be positive, not {self.base}.")
# General logarithm
else:
return log(x) / log(self.base)
[docs]
class EntropyEstimator(Estimator["EntropyEstimator"], ABC):
r"""Abstract base class for entropy estimators.
Estimates simple entropy of a data array or joint entropy of two data arrays.
Attributes
----------
*data : array-like, shape (n_samples,) or tuple of array-like
The data used to estimate the entropy.
When passing a tuple of arrays, the joint entropy is considered.
When passing two arrays, the cross-entropy is considered,
the second RV relative to the first RV.
base : int | float | "e", optional
The logarithm base for the entropy calculation.
The default can be set
with :func:`set_logarithmic_unit() <infomeasure.utils.config.Config.set_logarithmic_unit>`.
Raises
------
ValueError
If the data is not an array or arrays tuple/list.
See Also
--------
.entropy.discrete.DiscreteEntropyEstimator
.entropy.kernel.KernelEntropyEstimator
.entropy.kozachenko_leonenko.KozachenkoLeonenkoEntropyEstimator
.entropy.renyi.RenyiEntropyEstimator
.entropy.ordinal.OrdinalEntropyEstimator
.entropy.tsallis.TsallisEntropyEstimator
Notes
-----
- Entropy: When passing one array-like object.
- Joint Entropy: When passing one tuple of array-likes.
- Cross-Entropy: When passing two array-like objects.
Then the the second distribution :math:`q`
is considered relative to the first :math:`p`:
:math:`-\sum_{i=1}^{n} p_i \log_b q_i`
"""
def __init__(self, *data, base: LogBaseType = Config.get("base")):
"""Initialize the estimator with the data."""
# Check valid input data
if len(data) == 0:
raise ValueError("Data must be provided.")
if len(data) > 2:
raise ValueError(
"Only one or two array-like objects are allowed. \n"
"- One data array for normal entropy\n"
"- Two data arrays for cross-entropy\n"
"- When given tuples instead of arrays, "
"they are considered as one, joint RV."
)
if len(data) == 1 and not (
(
isinstance(data[0], (ndarray, Sequence))
and not isinstance(data[0], (str, tuple))
)
or (
isinstance(data[0], tuple)
and all(
(
isinstance(v, (ndarray, Sequence))
and not isinstance(v, (str, tuple))
)
for v in data[0]
)
)
):
raise ValueError(
"For normal entropy, data must be a single array-like object. "
"For joint entropy, data must be a tuple of array-like objects. "
"Pass two separate data for cross-entropy."
)
if len(data) == 2 and not all(
isinstance(var, (ndarray, Sequence)) and not isinstance(var, (str, tuple))
for var in data
):
raise ValueError(
"For cross-entropy, data must be two array-like objects. "
"Tuples for joint variables are not supported. "
"For (joint) entropy, just pass one argument."
)
# Convert to arrays if they are not already
self.data = tuple(
asarray(var)
if not isinstance(var, tuple)
else tuple(asarray(d) for d in var)
for var in data
)
# differing lengths are allowed for cross-entropy, but not inside joint RVs
for var in self.data:
if isinstance(var, tuple) and any(len(d) != len(var[0]) for d in var):
raise ValueError(
"All elements of a joint random variable must have the same length."
)
super().__init__(base=base)
[docs]
def local_vals(self):
"""Return the local values of the measure, if available.
For cross-entropy, local values cannot be calculated.
Returns
-------
local : array-like
The local values of the measure.
Raises
------
io.UnsupportedOperation
If the local values are not available.
"""
# Cross-entropy cannot be calculated locally:
# if _cross_entropy got overwritten, raise UnsupportedOperation
if len(self.data) > 1 and "_cross_entropy" in self.__class__.__dict__:
raise UnsupportedOperation(
"Local values can only be calculated for (joint) entropy, "
"not cross-entropy."
)
return super().local_vals()
def _calculate(self) -> float | ndarray[float]:
"""Calculate the entropy of the data.
Depending on the `data` type, choose simple or joint entropy calculation.
Returns
-------
float | array-like
The calculated entropy, or local values if available.
"""
if len(self.data) == 1:
if isinstance(self.data[0], tuple):
logger.debug("Calculating joint entropy.")
return self._joint_entropy()
logger.debug("Calculating simple entropy.")
return self._simple_entropy()
elif len(self.data) == 2:
logger.debug("Calculating cross-entropy.")
return self._cross_entropy()
else:
raise RuntimeError(
f"`self.data` has an invalid format (len {len(self.data)})."
)
@abstractmethod
def _simple_entropy(self) -> float | ndarray[float]:
"""Calculate the entropy of one random variable.
Returns
-------
float | array-like
The calculated entropy, or local values if available.
"""
pass
@abstractmethod
def _joint_entropy(self) -> float | ndarray[float]:
"""Calculate the joint entropy of two random variables.
Returns
-------
float | array-like
The calculated entropy, or local values if available.
"""
pass
def _cross_entropy(self) -> float:
r"""Calculate the cross-entropy between two distributions.
.. math::
H(p, q) = H_{q}(p) = -\sum_{x} p(x) \log q(x)
Consider self.data[0] as the distribution :math:`p` and self.data[1]
as the distribution :math:`q`.
Returns
-------
float
The calculated cross-entropy.
Notes
-----
As cross-entropy is not symmetric,
data[0] and data[1] are not exchangable.
Remember this when overriding this method.
"""
raise NotImplementedError(
f"Cross-entropy is not implemented for {self.__class__.__name__}."
)
[docs]
class RandomGeneratorMixin:
"""Mixin for random state generation.
Attributes
----------
rng : Generator
The random state generator.
"""
def __init__(self, *args, seed=None, **kwargs):
"""Initialize the random state generator."""
self.rng = default_rng(seed)
super().__init__(*args, **kwargs)
[docs]
class TransferEntropyEstimator(
RandomGeneratorMixin, Estimator["TransferEntropyEstimator"], ABC
):
"""Abstract base class for transfer entropy estimators.
Attributes
----------
source : array-like, shape (n_samples,)
The source data used to estimate the transfer entropy (X).
dest : array-like, shape (n_samples,)
The destination data used to estimate the transfer entropy (Y).
step_size : int
Step size between elements for the state space reconstruction.
src_hist_len, dest_hist_len : int
Number of past observations to consider for the source and destination data.
prop_time : int, optional
Number of positions to shift the data arrays relative to each other (multiple of
``step_size``).
Delay/lag/shift between the variables, representing propagation time.
Default is no shift.
Assumed time taken by info to transfer from source to destination.
base : int | float | "e", optional
The logarithm base for the entropy calculation.
The default can be set
with :func:`set_logarithmic_unit() <infomeasure.utils.config.Config.set_logarithmic_unit>`.
Raises
------
ValueError
If the data arrays have different lengths.
ValueError
If the propagation time is not an integer.
See Also
--------
.transfer_entropy.discrete.DiscreteTEEstimator
.transfer_entropy.kernel.KernelTEEstimator
.transfer_entropy.kraskov_stoegbauer_grassberger.KSGTEEstimator
.transfer_entropy.renyi.RenyiTEEstimator
.transfer_entropy.ordinal.OrdinalTEEstimator
.transfer_entropy.tsallis.TsallisTEEstimator
"""
def __init__(
self,
source,
dest,
*, # Enforce keyword-only arguments
prop_time: int = 0,
src_hist_len: int = 1,
dest_hist_len: int = 1,
step_size: int = 1,
offset: int = None,
base: LogBaseType = Config.get("base"),
):
"""Initialize the estimator with the data."""
if offset not in (None, 0):
if prop_time in (None, 0):
logger.warning(
"Using the `offset` parameter as `prop_time`. "
"Please use `prop_time` for the propagation time."
)
prop_time = offset
else:
raise ValueError(
"Both `offset` and `prop_time` are set. "
"Use only `prop_time` for the propagation time."
)
if len(source) != len(dest):
raise ValueError(
"Data arrays must be of the same length, "
f"not {len(source)} and {len(dest)}."
)
if not issubdtype(type(prop_time), integer):
raise ValueError(f"Propagation time must be an integer, not {prop_time}.")
self.source = asarray(source)
self.dest = asarray(dest)
# Apply the prop_time
self.prop_time = prop_time
if self.prop_time > 0:
self.source = self.source[: -self.prop_time * step_size or None]
self.dest = self.dest[self.prop_time * step_size :]
elif self.prop_time < 0:
self.source = self.source[-self.prop_time * step_size :]
self.dest = self.dest[: self.prop_time * step_size or None]
# Slicing parameters
self.src_hist_len, self.dest_hist_len = src_hist_len, dest_hist_len
self.step_size = step_size
# Permutation/Resample flags - used by the p-value method and te_obs. slicing
self.permute_src = False
self.resample_src = False
# Initialize Estimator ABC with the base
super().__init__(base=base)
def _generic_te_from_entropy(
self,
estimator: type(EntropyEstimator),
noise_level: float = 0,
kwargs: dict = None,
):
r"""Calculate the transfer entropy with the entropy estimator.
Given the joint processes:
- :math:`X_{t_n}^{(l)} = (X_{t_n}, X_{t_n-1}, \ldots, X_{t_n-k+1})`
- :math:`Y_{t_n}^{(k)} = (Y_{t_n}, Y_{t_n-1}, \ldots, Y_{t_n-l+1})`
The Transfer Entropy from :math:`X` to :math:`Y` can be computed using the
following formula, which is based on conditional mutual information (MI):
.. math::
I(Y_{t_{n+1}}; X_{t_n}^{(l)} | Y_{t_n}^{(k)}) = H(Y_{t_{n+1}} | Y_{t_n}^{(k)}) - H(Y_{t_{n+1}} | X_{t_n}^{(l)}, Y_{t_n}^{(k)})
Now, we will rewrite the above expression by implementing the chain rule, as:
.. math::
I(Y_{t_{n+1}} ; X_{t_n}^{(l)} | Y_{t_n}^{(k)}) = H(Y_{t_{n+1}}, Y_{t_n}^{(k)}) + H(X_{t_n}^{(l)}, Y_{t_n}^{(k)}) - H(Y_{t_{n+1}}, X_{t_n}^{(l)}, Y_{t_n}^{(k)}) - H(Y_{t_n}^{(k)})
Parameters
----------
estimator : EntropyEstimator
The entropy estimator to use.
noise_level : float, optional
The standard deviation of the Gaussian noise to add to the data to avoid
issues with zero distances.
kwargs : dict
Additional keyword arguments for the entropy estimator.
Returns
-------
float
The transfer entropy from source to destination.
Notes
-----
If possible, estimators should use a dedicated transfer entropy method.
This helper method is provided as a generic fallback.
"""
# Ensure source and dest are numpy arrays
source = self.source.copy()
dest = self.dest.copy()
# If Discrete Estimator and noise_level is set, raise an error
if estimator.__name__ == "DiscreteEntropyEstimator" and noise_level:
raise ValueError(
"Discrete entropy estimator does not support noise_level. "
"Please use a different estimator."
)
# Add Gaussian noise to the data if the flag is set
if isinstance(noise_level, (int, float)) and noise_level != 0:
source = source.astype(float)
dest = dest.astype(float)
source += self.rng.normal(0, noise_level, source.shape)
dest += self.rng.normal(0, noise_level, dest.shape)
(
joint_space_data,
dest_past_embedded,
marginal_1_space_data,
marginal_2_space_data,
) = te_observations(
source,
dest,
src_hist_len=self.src_hist_len,
dest_hist_len=self.dest_hist_len,
step_size=self.step_size,
permute_src=self.permute_src,
resample_src=self.resample_src,
)
est_y_history_y_future = estimator(marginal_2_space_data, **kwargs)
est_x_history_y_history = estimator(marginal_1_space_data, **kwargs)
est_x_history_y_history_y_future = estimator(joint_space_data, **kwargs)
est_y_history = estimator(dest_past_embedded, **kwargs)
# Compute Transfer Entropy
try:
return (
est_y_history_y_future.local_vals()
+ est_x_history_y_history.local_vals()
- est_x_history_y_history_y_future.local_vals()
- est_y_history.local_vals()
)
except UnsupportedOperation:
return (
est_y_history_y_future.global_val()
+ est_x_history_y_history.global_val()
- est_x_history_y_history_y_future.global_val()
- est_y_history.global_val()
)
[docs]
class ConditionalTransferEntropyEstimator(
RandomGeneratorMixin, Estimator["ConditionalTransferEntropyEstimator"], ABC
):
"""Abstract base class for conditional transfer entropy estimators.
Conditional Transfer Entropy (CTE) from source :math:`X` to destination :math:`Y`
given a condition :math:`Z` quantifies the amount of information obtained about
the destination variable through the source, conditioned on the condition.
Attributes
----------
source : array-like, shape (n_samples,)
The source data used to estimate the transfer entropy (X).
dest : array-like, shape (n_samples,)
The destination data used to estimate the transfer entropy (Y).
cond : array-like, shape (n_samples,)
The conditional data used to estimate the transfer entropy (Z).
step_size : int
Step size between elements for the state space reconstruction.
src_hist_len, dest_hist_len, cond_hist_len : int
Number of past observations to consider for the source, destination, and
conditional data.
prop_time : int, optional
Not compatible with the conditional transfer entropy.
base : int | float | "e", optional
The logarithm base for the entropy calculation.
The default can be set
with :func:`set_logarithmic_unit() <infomeasure.utils.config.Config.set_logarithmic_unit>`.
"""
def __init__(
self,
source,
dest,
*, # Enforce keyword-only arguments
cond=None,
src_hist_len: int = 1,
dest_hist_len: int = 1,
cond_hist_len: int = 1,
step_size: int = 1,
prop_time=None,
offset=None,
base: LogBaseType = Config.get("base"),
):
"""Initialize the estimator with the data."""
if cond is None:
raise ValueError("Conditional data must be provided for CTE estimation.")
if len(source) != len(dest) or len(source) != len(cond):
raise ValueError(
"Data arrays must be of the same length, "
f"not {len(source)}, {len(dest)}, and {len(cond)}."
)
if not issubdtype(type(prop_time), integer):
raise ValueError(f"Propagation time must be an integer, not {prop_time}.")
if prop_time not in (None, 0) or offset not in (None, 0):
raise ValueError(
"`prop_time`/`offset` are not compatible with the "
"conditional transfer entropy."
)
self.source = asarray(source)
self.dest = asarray(dest)
self.cond = asarray(cond)
# Slicing parameters
self.src_hist_len = src_hist_len
self.dest_hist_len = dest_hist_len
self.cond_hist_len = cond_hist_len
self.step_size = step_size
# Initialize Estimator ABC with the base
super().__init__(base=base)
def _generic_cte_from_entropy(
self,
estimator: type(EntropyEstimator),
noise_level: float = 0,
kwargs: dict = None,
):
r"""Calculate the conditional transfer entropy with the entropy estimator.
Parameters
----------
estimator : EntropyEstimator
The entropy estimator to use.
noise_level : float, optional
The standard deviation of the Gaussian noise to add to the data to avoid
issues with zero distances.
kwargs : dict
Additional keyword arguments for the entropy estimator.
Returns
-------
float
The conditional transfer entropy from source to destination
given the condition.
Notes
-----
If possible, estimators should use a dedicated
conditional transfer entropy method.
This helper method is provided as a generic fallback.
"""
# Ensure source, dest, and cond are numpy arrays
source = self.source.copy()
dest = self.dest.copy()
cond = self.cond.copy()
# Add Gaussian noise to the data if the flag is set
if isinstance(noise_level, (int, float)) and noise_level != 0:
source = source.astype(float)
dest = dest.astype(float)
cond = cond.astype(float)
source += self.rng.normal(0, noise_level, source.shape)
dest += self.rng.normal(0, noise_level, dest.shape)
cond += self.rng.normal(0, noise_level, cond.shape)
(
joint_space_data,
dest_past_embedded,
marginal_1_space_data,
marginal_2_space_data,
) = cte_observations(
source,
dest,
cond,
src_hist_len=self.src_hist_len,
dest_hist_len=self.dest_hist_len,
cond_hist_len=self.cond_hist_len,
step_size=self.step_size,
)
est_cond_y_history_y_future = estimator(marginal_2_space_data, **kwargs)
est_x_history_cond_y_history = estimator(marginal_1_space_data, **kwargs)
est_x_history_cond_y_history_y_future = estimator(joint_space_data, **kwargs)
est_y_history_cond = estimator(dest_past_embedded, **kwargs)
# Compute Conditional Transfer Entropy
try:
return (
est_cond_y_history_y_future.local_vals()
+ est_x_history_cond_y_history.local_vals()
- est_x_history_cond_y_history_y_future.local_vals()
- est_y_history_cond.local_vals()
)
except UnsupportedOperation:
return (
est_cond_y_history_y_future.global_val()
+ est_x_history_cond_y_history.global_val()
- est_x_history_cond_y_history_y_future.global_val()
- est_y_history_cond.global_val()
)
[docs]
class DistributionMixin:
"""Mixin for Estimators that offer introspection of the distribution.
Attributes
----------
dist_dict : dict
The distribution of the data.
Format: {symbol: probability}
"""
def __init__(self, *args, **kwargs):
"""Initialize the distribution attribute."""
self.dist_dict: dict | None = None
super().__init__(*args, **kwargs)
[docs]
def distribution(self) -> dict:
"""Get the distribution of the data.
Returns
-------
dict
The distribution of the data.
"""
if self.dist_dict is None:
self.dist_dict = self._distribution()
return self.dist_dict
def _distribution(self) -> dict:
"""Get the distribution of the data.
Child classes can implement this method to add a dedicated distribution method.
If not implemented, it's expected that the `calculate` method sets the
distribution.
Returns
-------
dict
The distribution of the data.
"""
if self.dist_dict is None:
self.calculate()
if self.dist_dict is None:
raise UnsupportedOperation(
"Distribution is not available for this estimator."
)
return self.dist_dict
[docs]
class PValueMixin(RandomGeneratorMixin):
"""Mixin for p-value calculation.
There are two methods to calculate the p-value:
- Permutation test: shuffle the data and calculate the measure.
- Bootstrap: resample the data and calculate the measure.
The :func:`p_value` can be used to determine a p-value for a measure,
and :func:`t_score` to get the corresponding t-score.
To be used as a mixin class with other :class:`Estimator` Estimator classes.
Inherit before the main class.
Notes
-----
The permutation test is a non-parametric statistical test to determine if the
observed effect is significant. The null hypothesis is that the measure is
not different from random, and the p-value is the proportion of permuted
measures greater than the observed measure.
Raises
------
NotImplementedError
If the p-value method is not implemented for the estimator.
"""
def __init__(self, *args, **kwargs):
"""Initialize the permutation test."""
self.original_data = None
self.data = None
self.p_val: float = None
self.t_scr: float = None
self.p_val_method: str = None
self.n_tests: int = None
super().__init__(*args, **kwargs)
if not isinstance(self, (MutualInformationEstimator, TransferEntropyEstimator)):
raise NotImplementedError(
"P-value method is not implemented for the estimator."
)
[docs]
def p_value(self, n_tests: int = None, method: str = None) -> float:
"""Calculate the p-value of the measure.
Method can be "permutation_test" or "bootstrap".
- Permutation test: shuffle the data and calculate the measure.
- Bootstrap: resample the data and calculate the measure.
Parameters
----------
n_tests : int, optional
Number of permutations or bootstrap samples.
Needs to be a positive integer.
Default is None, which means, if :py:meth:`t_score` was calculated before,
the same number of tests will be used.
method : str, optional
The method to calculate the p-value.
Default is None, which means, if :py:meth:`t_score` was calculated before,
the same method will be used.
If :py:meth:`t_score` was not calculated before,
it will be calculated using the default method set in the configuration.
Returns
-------
p_value : float
The p-value of the measure.
Raises
------
ValueError
If the chosen method is unknown.
"""
if (
self.p_val is not None
and (method == self.p_val_method or method is None)
and (n_tests == self.n_tests or n_tests is None)
):
return self.p_val
logger.debug(
"Calculating the p-value and t-score "
f"of the measure {self.__class__.__name__} "
f"using the {method} method with {n_tests} tests."
)
self.p_val_method = (
method if method is not None else Config.get("p_value_method")
)
self.n_tests = n_tests
if isinstance(self, MutualInformationEstimator):
if len(self.data) != 2:
raise UnsupportedOperation(
"Permutation test on mutual information is only supported "
"for two variables."
)
method = self.test_mi
elif isinstance(self, TransferEntropyEstimator):
method = self.test_te
else:
raise NotImplementedError(
"Permutation test is not implemented for this estimator."
)
self.p_val, self.t_scr = method(n_tests)
return self.p_val
[docs]
def t_score(self, n_tests: int = None, method: str = None) -> float:
"""Get the t-score of the measure.
Parameters
----------
n_tests : int, optional
Number of permutations or bootstrap samples.
Needs to be a positive integer.
Default is None, which means, if :py:meth:`p_value` was calculated before,
the same number of tests will be used.
method : str, optional
The method to calculate the t-score.
Default is None, which means, if :py:meth:`p_value` was calculated before,
the same method will be used.
If :py:meth:`p_value` was not calculated before,
it will be calculated using the default method set in the configuration.
Returns
-------
t_score : float
The t-score of the measure.
Notes
-----
The t-score is the difference between the observed value and the mean of the
permuted values, divided by the standard deviation of the permuted values
(if it is greater than 0).
One can get the z-score by converting the t-score using the cumulative
distribution function (CDF) of the t-distribution and the inverse CDF
(percent-point function) of the standard normal distribution.
Raises
------
ValueError
If the chosen method is unknown.
"""
if (
self.t_scr is not None
and (method == self.p_val_method or method is None)
and (n_tests == self.n_tests or n_tests is None)
):
return self.t_scr
self.p_value(n_tests=n_tests, method=method)
return self.t_scr
@staticmethod
def _p_value_t_score(
observed_value, test_values, comparison: Callable = gt
) -> tuple[float, float]:
"""
Calculate the p-value and t-score of the observed value.
Given a list of test values, the number of permutations, and the observed value,
calculate the p-value and t-score.
Parameters
----------
observed_value : float
The observed value.
test_values : array-like
The test values.
comparison : operator, optional
The comparison operator to use.
Pass `operator.lt` for less than, `operator.gt` for greater than.
Default is greater.
Returns
-------
float, float
The p-value and t-score of the measure.
Raises
------
ValueError
If the observed value is not a float.
ValueError
If the test values are not an array-like.
ValueError
If the comparison operator is not a function.
"""
if not isinstance(observed_value, float):
raise ValueError("Observed value must be a float.")
if not isinstance(test_values, (ndarray, list, tuple)):
raise ValueError("Test values must be an array-like.")
if not callable(comparison):
raise ValueError("Comparison operator must be a function.")
if len(test_values) < 2:
raise ValueError("Not enough test values for statistical test.")
test_values = asarray(test_values)
null_mean = np_mean(test_values)
null_std = std(test_values, ddof=1) # Unbiased estimator (dividing by N-1)
# Compute p-value: proportion of test values greater than the observed value
# (or different operator if specified)
p_value = np_sum(comparison(test_values, observed_value)) / len(test_values)
# Compute t-score:
t_score = (observed_value - null_mean) / null_std if null_std > 0 else nan
return p_value, t_score
def _calculate_mi_with_data_selection(self, method_resample_src: Callable):
"""Calculate the measure for the resampled data using specific method."""
if len(self.original_data) != 2:
raise ValueError(
"MI with data selection is only supported for two variables."
)
# Shuffle the data
self.data = (
method_resample_src(self.original_data[0]),
self.original_data[1],
)
# Calculate the measure
res_permuted = self._calculate()
return (
res_permuted if isinstance(res_permuted, float) else np_mean(res_permuted)
)
[docs]
def test_mi(self, n_tests: int) -> float:
"""Test the mutual information with a permutation test or bootstrap.
Parameters
----------
n_tests : int
The number of permutations or bootstrap samples.
Returns
-------
float, float
The p-value and t-score of the measure.
Raises
------
ValueError
If the number of permutations is not a positive integer.
"""
if not issubdtype(type(n_tests), integer) or n_tests < 1:
raise ValueError(
"Number of permutations must be a positive integer, "
f"not {n_tests} ({type(n_tests)})."
)
# Store unshuffled data
self.original_data = self.data
# Perform permutations
if self.p_val_method == "permutation_test":
method_resample_src = lambda data_src: self.rng.permutation(
data_src, axis=0
)
elif self.p_val_method == "bootstrap":
method_resample_src = lambda data_src: self.rng.choice(
data_src, size=data_src.shape[0], replace=True, axis=0
)
else:
raise ValueError(f"Invalid p-value method: {self.p_val_method}.")
permuted_values = [
self._calculate_mi_with_data_selection(method_resample_src)
for _ in range(n_tests)
]
# Restore the original data
self.data = self.original_data
return self._p_value_t_score(
observed_value=self.global_val(), test_values=permuted_values
)
[docs]
def test_te(self, n_tests: int) -> float:
"""Calculate the permutation test for transfer entropy.
Parameters
----------
n_tests : int
The number of permutations to perform.
Returns
-------
float
The p-value of the measure.
Raises
------
ValueError
If the number of permutations is not a positive integer.
"""
if not issubdtype(type(n_tests), integer) or n_tests < 1:
raise ValueError(
"Number of permutations must be a positive integer, "
f"not {n_tests} ({type(n_tests)})."
)
# Activate the permutation flag to permute the source data when slicing
if self.p_val_method == "permutation_test":
self.permute_src = self.rng
elif self.p_val_method == "bootstrap":
self.resample_src = self.rng
else:
raise ValueError(f"Invalid p-value method: {self.p_val_method}.")
permuted_values = [self._calculate() for _ in range(n_tests)]
if isinstance(permuted_values[0], ndarray):
permuted_values = [np_mean(x) for x in permuted_values]
# Deactivate the permutation/resample flag
self.permute_src, self.resample_src = False, False
return self._p_value_t_score(
observed_value=self.global_val(), test_values=permuted_values
)
[docs]
class EffectiveValueMixin:
"""Mixin for effective value calculation.
To be used as a mixin class with :class:`TransferEntropyEstimator` derived classes.
Inherit before the main class.
Attributes
----------
res_effective : float | None
The effective transfer entropy.
Notes
-----
The effective value is the difference between the original
value and the value calculated for the permuted data.
"""
def __init__(self, *args, **kwargs):
"""Initialize the estimator with the effective value."""
self.res_effective = None
super().__init__(*args, **kwargs)
[docs]
def effective_val(self):
"""Return the effective value.
Calculates the effective value if not already done,
otherwise returns the stored value.
Returns
-------
effective : float
The effective value.
"""
if self.res_effective is None:
self.res_effective = self._calculate_effective()
return self.res_effective
def _calculate_effective(self):
"""Calculate the effective value.
Returns
-------
effective : float
The effective value.
"""
# Activate the permutation flag to permute the source data when slicing
self.permute_src = self.rng
res_permuted = self._calculate()
if isinstance(res_permuted, ndarray):
res_permuted = np_mean(res_permuted)
# Deactivate the permutation flag
self.permute_src = False
# Return difference
return self.global_val() - res_permuted
[docs]
class WorkersMixin:
"""Mixin that adds an attribute for the numbers of workers to use.
Attributes
----------
n_workers : int, optional
The number of workers to use. Default is 1.
-1: Use as many workers as CPU cores available.
"""
def __init__(self, *args, workers=1, **kwargs):
if workers == -1:
from multiprocessing import cpu_count
workers = cpu_count()
super().__init__(*args, **kwargs)
self.n_workers = workers