Source code for infomeasure.estimators.entropy.zhang

"""Module for the Zhang entropy estimator."""

from numpy import log, array, arange, sum as np_sum, where

from infomeasure.estimators.base import DiscreteHEstimator


def _compute_vectorized_bias_correction_terms(N, max_k, valid_counts):
    """Compute vectorized bias correction terms for Zhang entropy estimation.

    Calculates the cumulative product factors and their harmonic-weighted sums
    that form the core bias correction components in the Zhang entropy formula.
    This implements the vectorized computation of the inner summation:
    sum(t1/k) where t1 is the cumulative product of bias correction factors.

    Parameters
    ----------
    valid_counts : ndarray
        Array of valid count values (counts > 0 and counts < N).
    N : int
        Total number of observations.

    Returns
    -------
    ndarray
        Array of bias correction terms corresponding to each valid count.
        Shape: (len(valid_counts),)
    """
    k_values = arange(1, max_k + 1)  # Shape: (max_k,)
    # Create mask for valid k values for each count | Shape: (len(valid_counts), max_k)
    valid_k_mask = k_values[None, :] <= (N - valid_counts[:, None])

    # Calculate factors for each (count, k) pair | Shape: (len(valid_counts), max_k)
    factors = 1.0 - (valid_counts[:, None] - 1.0) / (N - k_values[None, :])

    # Apply mask to factors - Set invalid factors to 1.0 (neutral for product)
    factors = where(valid_k_mask, factors, 1.0)

    # Calculate cumulative products along k dimension for each count
    t1_matrix = factors.cumprod(axis=1)  # Shape: (len(valid_counts), max_k)

    # Apply mask again and calculate t2 for each count
    t1_masked = where(valid_k_mask, t1_matrix, 0.0)  # Set invalid t1 values to 0.0
    reciprocal_k = 1.0 / k_values[None, :]  # Shape: (1, max_k)
    # Calculate t2 = sum(t1/k) for each count
    t2_values = np_sum(t1_masked * reciprocal_k, axis=1)  # Shape: (len(valid_counts),)

    return t2_values


[docs] class ZhangEntropyEstimator(DiscreteHEstimator): r"""Zhang entropy estimator for discrete data. The Zhang estimator computes the Shannon entropy using the recommended definition from :cite:p:`grabchakAuthorshipAttributionUsing2013`: .. math:: \hat{H}_Z = \sum_{i=1}^K \hat{p}_i \sum_{v=1}^{N - n_i} \frac{1}{v} \prod_{j=0}^{v-1} \left( 1 + \frac{1 - n_i}{N - 1 - j} \right) where :math:`\hat{p}_i` are the empirical probabilities, :math:`n_i` are the counts for each unique value, :math:`K` is the number of unique values, and :math:`N` is the total number of observations. The actual algorithm implementation follows the fast calculation approach from :cite:p:`lozanoFastCalculationEntropy2017`. Attributes ---------- *data : array-like The data used to estimate the entropy. """ def _simple_entropy(self): """Calculate the Zhang entropy of the data. Returns ------- float The calculated Zhang entropy. """ # Get counts and total observations counts = self.data[0].counts N = self.data[0].N # Filter out invalid counts (0 or >= N) valid_mask = (counts > 0) & (counts < N) valid_counts = counts[valid_mask] if len(valid_counts) == 0: return 0.0 # Vectorized computation # We need to handle different ranges for each count # This is more complex than Bonachela because the range depends on the count max_k = N - valid_counts.min() # Maximum possible k value if max_k <= 0: return 0.0 # Create k values array t2_values = _compute_vectorized_bias_correction_terms(N, max_k, valid_counts) # Calculate contributions contributions = t2_values * (valid_counts / N) # Shape: (len(valid_counts),) # Sum all contributions ent = np_sum(contributions) # Convert to the desired base if needed if self.base != "e": ent /= log(self.base) return ent def _extract_local_values(self): """Calculate local Zhang entropy values for each data point. Returns ------- ndarray[float] The calculated local values of Zhang entropy. """ # Get counts, unique values, and total observations counts = self.data[0].counts uniq_vals = self.data[0].uniq N = self.data[0].N # Filter out invalid counts (0 or >= N) valid_mask = (counts > 0) & (counts < N) valid_counts = counts[valid_mask] valid_uniq_vals = uniq_vals[valid_mask] # Create a mapping from unique values to their Zhang entropy contributions zhang_contributions = {} # Set contributions for invalid counts to 0.0 for i, (uniq_val, count) in enumerate(zip(uniq_vals, counts)): if count == 0 or count >= N: zhang_contributions[uniq_val] = 0.0 if len(valid_counts) > 0: # Vectorized computation for valid counts max_k = N - valid_counts.min() # Maximum possible k value if max_k > 0: # Create k values array t2_values = _compute_vectorized_bias_correction_terms( N, max_k, valid_counts ) # Store contributions for valid unique values for uniq_val, t2 in zip(valid_uniq_vals, t2_values): zhang_contributions[uniq_val] = t2 else: # If max_k <= 0, set all valid contributions to 0.0 for uniq_val in valid_uniq_vals: zhang_contributions[uniq_val] = 0.0 # Map each data point to its local Zhang entropy value local_values = array([zhang_contributions[val] for val in self.data[0].data]) # Convert to the desired base if needed if self.base != "e": local_values /= log(self.base) return local_values def _cross_entropy(self): """Calculate cross-entropy between two distributions using Zhang estimator. Returns ------- float The calculated cross-entropy. """ from ...utils.exceptions import TheoreticalInconsistencyError raise TheoreticalInconsistencyError( "Cross-entropy is not implemented for Zhang estimator due to " "theoretical inconsistencies in applying bias corrections from " "different distributions." )