Source code for infomeasure.estimators.transfer_entropy.kernel

"""Module for the kernel-based transfer entropy estimator."""

from abc import ABC

from numpy import isinf, isnan

from ... import Config
from ...utils.config import logger
from ...utils.types import LogBaseType
from ..base import (
    ConditionalTransferEntropyEstimator,
    TransferEntropyEstimator,
)
from ..mixins import WorkersMixin
from ..utils.kde import kde_probability_density_function
from ..utils.te_slicing import cte_observations, te_observations


[docs] class BaseKernelTEEstimator(WorkersMixin, ABC): """Base class for transfer entropy using Kernel Density Estimation (KDE). Attributes ---------- source, dest : array-like The source (X) and destination (Y) data used to estimate the transfer entropy. cond : array-like, optional The conditional data used to estimate the conditional transfer entropy. bandwidth : float | int The bandwidth for the kernel. kernel : str Type of kernel to use, compatible with the KDE implementation :func:`kde_probability_density_function() <infomeasure.estimators.utils.kde.kde_probability_density_function>`. prop_time : int, optional Number of positions to shift the data arrays relative to each other (multiple of ``step_size``). Delay/lag/shift between the variables, representing propagation time. Assumed time taken by info to transfer from source to destination. Not compatible with the ``cond`` parameter / conditional TE. Alternatively called `offset`. step_size : int, optional Step size between elements for the state space reconstruction. src_hist_len, dest_hist_len : int, optional Number of past observations to consider for the source and destination data. cond_hist_len : int, optional Number of past observations to consider for the conditional data. Only used for conditional transfer entropy. workers : int, optional Number of workers to use for parallel processing. Default is 1, meaning no parallel processing. If set to -1, all available CPU cores will be used. """ def __init__( self, source, dest, *, # Enforce keyword-only arguments cond=None, bandwidth: float | int = None, kernel: str = None, prop_time: int = 0, step_size: int = 1, src_hist_len: int = 1, dest_hist_len: int = 1, cond_hist_len: int = 1, workers: int = 1, offset: int = None, base: LogBaseType = Config.get("base"), **kwargs, ): """Initialize the BaseKernelTEEstimator. Parameters ---------- source, dest : array-like The source (X) and destination (Y) data used to estimate the transfer entropy. cond : array-like, optional The conditional data used to estimate the conditional transfer entropy. prop_time : int, optional Number of positions to shift the data arrays relative to each other (multiple of ``step_size``). Delay/lag/shift between the variables, representing propagation time. Assumed time taken by info to transfer from source to destination Not compatible with the ``cond`` parameter / conditional TE. Alternatively called `offset`. step_size : int, optional Step size between elements for the state space reconstruction. src_hist_len, dest_hist_len : int, optional Number of past observations to consider for the source and destination data. cond_hist_len : int, optional Number of past observations to consider for the conditional data. Only used for conditional transfer entropy. workers : int, optional Number of workers to use for parallel processing. """ if cond is None: super().__init__( source, dest, prop_time=prop_time, step_size=step_size, src_hist_len=src_hist_len, dest_hist_len=dest_hist_len, workers=workers, offset=offset, base=base, **kwargs, ) else: super().__init__( source, dest, cond=cond, step_size=step_size, src_hist_len=src_hist_len, dest_hist_len=dest_hist_len, cond_hist_len=cond_hist_len, workers=workers, prop_time=prop_time, offset=offset, base=base, **kwargs, ) self.bandwidth = bandwidth self.kernel = kernel
[docs] class KernelTEEstimator(BaseKernelTEEstimator, TransferEntropyEstimator): """Estimator for transfer entropy using Kernel Density Estimation (KDE). Attributes ---------- source, dest : array-like The source (X) and destination (Y) data used to estimate the transfer entropy. bandwidth : float | int The bandwidth for the kernel. kernel : str Type of kernel to use, compatible with the KDE implementation :func:`kde_probability_density_function() <infomeasure.estimators.utils.kde.kde_probability_density_function>`. prop_time : int, optional Number of positions to shift the data arrays relative to each other (multiple of ``step_size``). Delay/lag/shift between the variables, representing propagation time. Assumed time taken by info to transfer from source to destination. Alternatively called `offset`. step_size : int Step size between elements for the state space reconstruction. src_hist_len, dest_hist_len : int Number of past observations to consider for the source and destination data. Notes ----- A small ``bandwidth`` can lead to under-sampling, while a large ``bandwidth`` may over-smooth the data, obscuring details. """ def _calculate(self): """Calculate the transfer entropy of the data. Returns ------- local_te_values : array Local transfer entropy values. """ # Prepare multivariate data arrays for KDE: Numerators ( joint_space_data, dest_past_embedded, marginal_1_space_data, marginal_2_space_data, ) = te_observations( self.source, self.dest, src_hist_len=self.src_hist_len, dest_hist_len=self.dest_hist_len, step_size=self.step_size, permute_src=self.permute_src, resample_src=self.resample_src, ) # Compute densities in vectorized manner # g(x_i^{(l)}, y_i^{(k)}, y_{i+1}) logger.debug( "Calculating densities for...\n1/4 g(x_i^{(l)}, y_i^{(k)}, y_{i+1})" ) p_x_past_y_past_y_future = kde_probability_density_function( joint_space_data, self.bandwidth, kernel=self.kernel, workers=self.n_workers ) # g(y_i^{(k)}) logger.debug("2/4 g(y_i^{(k)})") p_y_past = kde_probability_density_function( dest_past_embedded, self.bandwidth, kernel=self.kernel, workers=self.n_workers, ) # g(x_i^{(l)}, y_i^{(k)}) logger.debug("3/4 g(x_i^{(l)}, y_i^{(k)})") p_xy_past = kde_probability_density_function( marginal_1_space_data, self.bandwidth, kernel=self.kernel, workers=self.n_workers, ) # g(y_i^{(k)}, y_{i+1}) logger.debug("4/4 g(y_i^{(k)}, y_{i+1})") p_y_past_y_future = kde_probability_density_function( marginal_2_space_data, self.bandwidth, kernel=self.kernel, workers=self.n_workers, ) local_te_values = self._log_base( (p_x_past_y_past_y_future * p_y_past) / (p_y_past_y_future * p_xy_past) ) # where inf/nan set to zero local_te_values[isinf(local_te_values) | isnan(local_te_values)] = 0.0 return local_te_values
[docs] class KernelCTEEstimator(BaseKernelTEEstimator, ConditionalTransferEntropyEstimator): """Estimator for conditional transfer entropy using Kernel Density Estimation (KDE). Attributes ---------- source, dest, cond : array-like The source (X), destination (Y), and conditional (Z) data used to estimate the conditional transfer entropy. bandwidth : float | int The bandwidth for the kernel. kernel : str Type of kernel to use, compatible with the KDE implementation :func:`kde_probability_density_function() <infomeasure.estimators.utils.kde.kde_probability_density_function>`. step_size : int Step size between elements for the state space reconstruction. src_hist_len, dest_hist_len, cond_hist_len : int, optional Number of past observations to consider for the source, destination, and conditional data. prop_time : int, optional Not compatible with the ``cond`` parameter / conditional TE. Notes ----- A small ``bandwidth`` can lead to under-sampling, while a large ``bandwidth`` may over-smooth the data, obscuring details. """ def _calculate(self): """Calculate the conditional transfer entropy of the data. Returns ------- local_cte_values : array Local conditional transfer entropy values. """ # Prepare multivariate data arrays for KDE: Numerators ( joint_space_data, dest_past_embedded, marginal_1_space_data, marginal_2_space_data, ) = cte_observations( self.source, self.dest, self.cond, src_hist_len=self.src_hist_len, dest_hist_len=self.dest_hist_len, cond_hist_len=self.cond_hist_len, step_size=self.step_size, ) # Calculate densities in vectorized manner # g(x_i^{(l)}, z_i^{(m)}, y_i^{(k)}, y_{i+1}) logger.debug( "Calculating densities for...\n" "1/4 g(x_i^{(l)}, z_i^{(m)}, y_i^{(k)}, y_{i+1})" ) p_x_history_cond_y_history_y_future = kde_probability_density_function( joint_space_data, self.bandwidth, kernel=self.kernel, workers=self.n_workers ) # g(y_i^{(k)}, z_i^{(m)}) logger.debug("2/4 g(y_i^{(k)}, z_i^{(m)})") p_y_history_cond = kde_probability_density_function( dest_past_embedded, self.bandwidth, kernel=self.kernel, workers=self.n_workers, ) # g(x_i^{(l)}, z_i^{(m)}, y_i^{(k)}) logger.debug("3/4 g(x_i^{(l)}, z_i^{(m)}, y_i^{(k)})") p_x_history_cond_y_history = kde_probability_density_function( marginal_1_space_data, self.bandwidth, kernel=self.kernel, workers=self.n_workers, ) # g(z_i^{(m)}, y_i^{(k)}, y_{i+1}) logger.debug("4/4 g(z_i^{(m)}, y_i^{(k)}, y_{i+1})") p_cond_y_history_y_future = kde_probability_density_function( marginal_2_space_data, self.bandwidth, kernel=self.kernel, workers=self.n_workers, ) local_cte_values = self._log_base( (p_x_history_cond_y_history_y_future * p_y_history_cond) / (p_x_history_cond_y_history * p_cond_y_history_y_future) ) # where inf/nan set to zero local_cte_values[isinf(local_cte_values) | isnan(local_cte_values)] = 0.0 return local_cte_values