Source code for infomeasure.estimators.utils.kde

"""Kernel Density Estimation (KDE) utilities."""

from multiprocessing import Pool, cpu_count

from numpy import (
    argsort,
    array_split,
    concatenate,
    cov,
    dot,
    inf,
    issubdtype,
    number,
)
from numpy import sum as np_sum
from numpy.linalg import eig
from scipy.spatial import KDTree
from scipy.stats import gaussian_kde

from ...utils.config import logger


[docs] def kde_probability_density_function( data, bandwidth, at=None, kernel="box", workers=-1 ): """ Estimate the probability density function for a given data set using Kernel Density Estimation (KDE). Parameters ---------- data : array A numpy array of data points, where each column represents a dimension. bandwidth : float The bandwidth for the kernel. at : array, optional A numpy array of points at which to evaluate the KDE. If None, the KDE is evaluated at the data points. kernel : str Type of kernel to use ('gaussian' or 'box'). workers : int Number of parallel processes to use. -1: Use all available CPU cores. Default is 1. Returns ------- ndarray[float] KDE at the given point(s). Raises ------ ValueError If the kernel type is not supported ValueError If the bandwidth is not a positive number. """ logger.debug( f"Called kde_probability_density_function with " f"kernel: {kernel}, workers: {workers}" ) if not issubdtype(type(bandwidth), number) or bandwidth <= 0: raise ValueError("The bandwidth must be a positive number.") if at is None: at = data elif data.shape[1] != at.shape[1]: # make sure ``data`` and ``at`` share the same dimensionality raise ValueError("The data and at must have the same number of dimensions.") if kernel == "gaussian": if workers == -1: workers = cpu_count() return gaussian_kernel_densities(data.T, bandwidth, at=at.T, workers=workers) elif kernel == "box": # Get the number of data points (N) and the number of dimensions (d) N, d = data.shape # Calculate the volume of the box kernel volume = bandwidth**d logger.debug("Creating KDTree from data points for box KDE... ") tree = KDTree(data) logger.debug( f"KDTree created with {N} data points and {d} dimensions. " f"Querying KDTree for ball points with {workers} workers." ) counts = tree.query_ball_point( at, bandwidth / 2, p=inf, return_length=True, workers=workers, ) densities = counts / (N * volume) # Squeeze the densities array to remove any single-dimensional entries return densities.squeeze() # Another approach to box kernel density estimation else: # TODO: Add more kernel types as needed raise ValueError(f"Unsupported kernel type: {kernel}. Use 'gaussian' or 'box'.")
[docs] def gaussian_kernel_densities( data, bandwidth, at=None, workers=1, eigen_threshold: float = 1e-10 ): """Calculate kde for gaussian kernel. In case of multivariate data, checks rank of data and reduces dimensions if eigenvalues are below threshold. If already full rank, does no reprojection. Parameters ---------- data : ndarray, shape (d, N) Data points to estimate density for. bandwidth : float Bandwidth parameter for kernel density estimation. at : array, optional A numpy array of points at which to evaluate the KDE. If None, the KDE is evaluated at the data points. workers : int, optional Number of workers to use for parallel processing. Default is 1. eigen_threshold : float, optional Threshold for eigenvalues to determine rank of data. Default is 1e-10. Returns ------- densities : ndarray, shape (n,) Estimated density values at data points. """ logger.debug(f"Calculating Gaussian KDE with bandwidth {bandwidth}...") if data.shape[0] > 1: # Multivariate case # Calculate covariance matrix covariance_matrix = cov(data) # Get eigenvalues and eigenvectors values, vectors = eig(covariance_matrix) sorted_indices = argsort(values)[::-1] values_sorted = values[sorted_indices] vectors_sorted = vectors[:, sorted_indices] # Get the number of eigenvalues greater than the threshold num_non_zero_eigenvalues = np_sum(values_sorted > eigen_threshold) # Check projection necessary if num_non_zero_eigenvalues < data.shape[0]: logger.debug( f"Reducing dimensionality from {data.shape[1]} to " f"{num_non_zero_eigenvalues} dimensions." ) # Project the data onto the reduced space pca_components = vectors_sorted[:, :num_non_zero_eigenvalues] data_projected = dot(data.T, pca_components).T logger.debug("Reprojected data, evaluate kde...") # each worker gets a chunk of data and evaluates KDE on it return parallel_kde_evaluate( data_projected, data_projected if at is None else dot(at.T, pca_components).T, bandwidth, workers, ) at = data if at is None else at return parallel_kde_evaluate(data, at, bandwidth, workers)
[docs] def query_chunk(params): """Evaluate KDE on a chunk of data.""" full_data, query_data, bandwidth = params kde = gaussian_kde(full_data, bw_method=bandwidth) return kde.evaluate(query_data).squeeze()
[docs] def parallel_kde_evaluate(data, at, bandwidth, workers): """Evaluate KDE on a set of data in parallel. Parameters ---------- data : array-like The data to evaluate the KDE on. at : array-like The points at which to evaluate the KDE. bandwidth : float or str The bandwidth to use for the KDE. workers : int The number of worker processes to use for evaluation. Notes ----- If the data is < 100000 samples or the number of workers is 1, evaluate the KDE on a single worker. """ # parallelization just gets really effective, if chunk size is not too small # if data is not too large, use less workers than possible if workers == 1 or data.shape[1] < 20000: logger.debug(f"Evaluating kde on a single worker with query size {at.shape}.") kde = gaussian_kde(data, bw_method=bandwidth) return kde.evaluate(at).squeeze() workers = min(workers, max(1, at.shape[1] // 8000)) query_chunks = array_split(at, workers, axis=1) logger.debug( f"Evaluating kde on {workers} workers with data size {at.shape} and " f"query chunks shape: {query_chunks[0].shape}." ) pool = Pool(processes=workers) results = pool.map( query_chunk, zip([data] * len(query_chunks), query_chunks, [bandwidth] * len(query_chunks)), ) pool.close() pool.join() return concatenate(results, axis=0)