Source code for infomeasure.estimators.entropy.discrete
"""Module for the discrete entropy estimator."""
from numpy import sum as np_sum, ndarray, asarray
from ..base import EntropyEstimator, DistributionMixin
from ..utils.ordinal import reduce_joint_space
from ..utils.unique import unique_vals
from ... import Config
from ...utils.config import logger
from ...utils.types import LogBaseType
[docs]
class DiscreteEntropyEstimator(DistributionMixin, EntropyEstimator):
"""Estimator for discrete entropy (Shannon entropy).
Attributes
----------
*data : array-like
The data used to estimate the entropy.
"""
def __init__(self, *data, base: LogBaseType = Config.get("base")):
"""Initialize the DiscreteEntropyEstimator."""
super().__init__(*data, base=base)
# warn if the data looks like a float array
for i_var in range(len(data)):
if (
isinstance(self.data[i_var], ndarray)
and self.data[i_var].dtype.kind == "f"
):
logger.warning(
"The data looks like a float array ("
f"{self.data[i_var].dtype}). "
"Make sure it is properly symbolized or discretized "
"for the entropy estimation."
)
elif isinstance(self.data[i_var], tuple) and any(
isinstance(marginal, ndarray) and marginal.dtype.kind == "f"
for marginal in self.data[i_var]
):
logger.warning(
"Some of the data looks like a float array. "
"Make sure it is properly symbolized or discretized "
"for the entropy estimation."
)
# reduce any joint space if applicable
reduce = tuple(
(isinstance(var, ndarray) and var.ndim > 1) or isinstance(var, tuple)
for var in self.data
)
if any(reduce):
# As the discrete shannon entropy disregards the order of the data,
# we can reduce the values to unique integers.
# In case of having multiple random variables (tuple or list),
# this enumerates the unique co-occurrences.
self.data = tuple(
reduce_joint_space(var) if red else var
for var, red in zip(self.data, reduce)
)
def _simple_entropy(self):
"""Calculate the entropy of the data.
Returns
-------
float
The calculated entropy.
"""
uniq, counts, self.dist_dict = unique_vals(self.data[0])
probabilities = asarray(list(self.dist_dict.values()))
# Calculate the entropy
return -np_sum(probabilities * self._log_base(probabilities))
def _joint_entropy(self):
"""Calculate the joint entropy of the data.
Returns
-------
float
The calculated joint entropy.
"""
# The data has already been reduced to unique values of co-occurrences
return self._simple_entropy()
def _extract_local_values(self):
"""Separately, calculate the local values.
Returns
-------
ndarray[float]
The calculated local values of entropy.
"""
p_local = [self.dist_dict[val] for val in self.data[0]]
return -self._log_base(p_local)
def _cross_entropy(self) -> float:
"""Calculate the cross-entropy between two distributions.
Returns
-------
float
The calculated cross-entropy.
"""
# Calculate distribution of both data sets
uniq_p, counts_p, dist_p = unique_vals(self.data[0])
uniq_q, counts_q, dist_q = unique_vals(self.data[1])
# Only consider the values where both RV have the same support
uniq = list(set(uniq_p).intersection(set(uniq_q))) # P ∩ Q
if len(uniq) == 0:
logger.warning("No common support between the two distributions.")
return 0.0
return -np_sum([dist_p[val] * self._log_base(dist_q[val]) for val in uniq])