Source code for replay_trajectory_classification.likelihoods.multiunit_likelihood_integer

"""Estimates a marked point process likelihood where the marks are
 features of the spike waveform. Mark features are int16.

 Marks are converted to integers and KDE uses hash tables to compute Gaussian
 kernels."""

from __future__ import annotations

from typing import Optional, Union

import numpy as np
from tqdm.autonotebook import tqdm

from replay_trajectory_classification.core import atleast_2d
from replay_trajectory_classification.likelihoods.diffusion import (
    diffuse_each_bin,
    estimate_diffusion_position_density,
    estimate_diffusion_position_distance,
)


[docs] def gaussian_pdf(x: np.ndarray, mean: np.ndarray, sigma: np.ndarray) -> np.ndarray: """Compute the value of a Gaussian probability density function at x with given mean and sigma.""" return np.exp(-0.5 * ((x - mean) / sigma) ** 2) / (sigma * np.sqrt(2.0 * np.pi))
[docs] def estimate_position_distance( place_bin_centers: np.ndarray, positions: np.ndarray, position_std: np.ndarray ) -> np.ndarray: """Estimates the Euclidean distance between positions and position bins. Parameters ---------- place_bin_centers : np.ndarray, shape (n_position_bins, n_position_dims) positions : np.ndarray, shape (n_time, n_position_dims) position_std : array_like, shape (n_position_dims,) Returns ------- position_distance : np.ndarray, shape (n_time, n_position_bins) """ n_time, n_position_dims = positions.shape n_position_bins = place_bin_centers.shape[0] if isinstance(position_std, (int, float)): position_std = [position_std] * n_position_dims position_distance = np.ones((n_time, n_position_bins), dtype=np.float32) for position_ind, std in enumerate(position_std): position_distance *= gaussian_pdf( np.expand_dims(place_bin_centers[:, position_ind], axis=0), np.expand_dims(positions[:, position_ind], axis=1), std, ) return position_distance
[docs] def estimate_position_density( place_bin_centers: np.ndarray, positions: np.ndarray, position_std: Union[np.ndarray, float], block_size: int = 100, ) -> np.ndarray: """Estimates a kernel density estimate over position bins using Euclidean distances. Parameters ---------- place_bin_centers : np.ndarray, shape (n_position_bins, n_position_dims) positions : np.ndarray, shape (n_time, n_position_dims) position_std : float or array_like, shape (n_position_dims,) Returns ------- position_density : np.ndarray, shape (n_position_bins,) """ n_time = positions.shape[0] n_position_bins = place_bin_centers.shape[0] if block_size is None: block_size = n_time position_density = np.empty((n_position_bins,)) for start_ind in range(0, n_position_bins, block_size): block_inds = slice(start_ind, start_ind + block_size) position_density[block_inds] = np.mean( estimate_position_distance( place_bin_centers[block_inds], positions, position_std ), axis=0, ) return position_density
[docs] def estimate_log_intensity( density: np.ndarray, occupancy: np.ndarray, mean_rate: float ) -> np.ndarray: """Calculates intensity in log space.""" return np.log(mean_rate) + np.log(density) - np.log(occupancy)
[docs] def estimate_intensity(density, occupancy, mean_rate): """Calculates intensity. Parameters ---------- density : np.ndarray, shape (n_bins,) occupancy : np.ndarray, shape (n_bins,) mean_rate : float Returns ------- intensity : np.ndarray, shape (n_bins,) """ return np.exp(estimate_log_intensity(density, occupancy, mean_rate))
[docs] def normal_pdf_integer_lookup( x: int, mean: int, std: float = 20.0, max_value: int = 6000 ) -> int: """Fast density evaluation for integers by precomputing a hash table of values. Parameters ---------- x : int mean : int std : float max_value : int Returns ------- probability_density : int """ normal_density = gaussian_pdf(np.arange(-max_value, max_value), 0, std).astype( np.float32 ) return normal_density[(x - mean) + max_value]
[docs] def estimate_log_joint_mark_intensity( decoding_marks: np.ndarray, encoding_marks: np.ndarray, mark_std: Union[np.ndarray, float], occupancy: np.ndarray, mean_rate: float, place_bin_centers: Optional[np.ndarray] = None, encoding_positions: Optional[np.ndarray] = None, position_std: Optional[np.ndarray] = None, max_mark_diff: int = 6000, set_diag_zero: bool = False, position_distance: Optional[np.ndarray] = None, ) -> np.ndarray: """Finds the joint intensity of the marks and positions in log space. Parameters ---------- decoding_marks : np.ndarray, shape (n_decoding_spikes, n_features) encoding_marks : np.ndarray, shape (n_encoding_spikes, n_features) mark_std : float or np.ndarray, shape (n_features,) occupancy : np.ndarray, shape (n_position_bins,) mean_rate : float place_bin_centers : None or np.ndarray, shape (n_position_bins, n_position_dims) If None, position distance must be not None encoding_positions : None or np.ndarray, shape (n_decoding_spikes, n_position_dims) If None, position distance must be not None position_std : None or float or array_like, shape (n_position_dims,) If None, position distance must be not None max_mark_diff : int Maximum distance between integer marks. set_diag_zero : bool position_distance : np.ndarray, shape (n_encoding_spikes, n_position_bins) Precalculated distance between position and position bins. Returns ------- log_joint_mark_intensity : np.ndarray, shape (n_decoding_spikes, n_position_bins) """ n_encoding_spikes, n_marks = encoding_marks.shape n_decoding_spikes = decoding_marks.shape[0] mark_distance = np.ones((n_decoding_spikes, n_encoding_spikes), dtype=np.float32) for mark_ind in range(n_marks): mark_distance *= normal_pdf_integer_lookup( np.expand_dims(decoding_marks[:, mark_ind], axis=1), np.expand_dims(encoding_marks[:, mark_ind], axis=0), std=mark_std, max_value=max_mark_diff, ) if set_diag_zero: diag_ind = (np.arange(n_decoding_spikes), np.arange(n_decoding_spikes)) mark_distance[diag_ind] = 0.0 if position_distance is None: position_distance = estimate_position_distance( place_bin_centers, encoding_positions, position_std ).astype(np.float32) return estimate_log_intensity( mark_distance @ position_distance / n_encoding_spikes, occupancy, mean_rate )
[docs] def fit_multiunit_likelihood_integer( position: np.ndarray, multiunits: np.ndarray, place_bin_centers: np.ndarray, mark_std: np.ndarray, position_std: Union[np.ndarray, float], is_track_boundary: Optional[np.ndarray] = None, is_track_interior: Optional[np.ndarray] = None, edges: Optional[list[np.ndarray]] = None, block_size: int = 100, use_diffusion: bool = False, **kwargs, ) -> dict: """Fits the clusterless place field model. Parameters ---------- position : np.ndarray, shape (n_time, n_position_dims) multiunits : np.ndarray, shape (n_time, n_marks, n_electrodes) place_bin_centers : np.ndarray, shape (n_bins, n_position_dims) mark_std : float Amount of smoothing for the mark features. Standard deviation of kernel. position_std : float or array_like, shape (n_position_dims,) Amount of smoothing for position. Standard deviation of kernel. is_track_boundary : None or np.ndarray, shape (n_bins,) is_track_interior : None or np.ndarray, shape (n_bins,) edges : None or list of np.ndarray block_size : int Size of data to process in chunks use_diffusion : bool Use diffusion to respect the track geometry. Returns ------- encoding_model : dict """ if is_track_interior is None: is_track_interior = np.ones((place_bin_centers.shape[0],), dtype=bool) position = atleast_2d(position) place_bin_centers = atleast_2d(place_bin_centers) interior_place_bin_centers = np.asarray( place_bin_centers[is_track_interior.ravel(order="F")], dtype=np.float32 ) gpu_is_track_interior = np.asarray(is_track_interior.ravel(order="F")) not_nan_position = np.all(~np.isnan(position), axis=1) if use_diffusion & (position.shape[1] > 1): n_total_bins = np.prod(is_track_interior.shape) bin_diffusion_distances = diffuse_each_bin( is_track_interior, is_track_boundary, dx=edges[0][1] - edges[0][0], dy=edges[1][1] - edges[1][0], std=position_std, ).reshape((n_total_bins, -1), order="F") else: bin_diffusion_distances = None if use_diffusion & (position.shape[1] > 1): occupancy = np.asarray( estimate_diffusion_position_density( position[not_nan_position], edges, bin_distances=bin_diffusion_distances, ), dtype=np.float32, ) else: occupancy = np.zeros((place_bin_centers.shape[0],), dtype=np.float32) occupancy[gpu_is_track_interior] = estimate_position_density( interior_place_bin_centers, np.asarray(position[not_nan_position], dtype=np.float32), position_std, block_size=block_size, ) mean_rates = [] summed_ground_process_intensity = np.zeros( (place_bin_centers.shape[0],), dtype=np.float32 ) encoding_marks = [] encoding_positions = [] for multiunit in np.moveaxis(multiunits, -1, 0): # ground process intensity is_spike = np.any(~np.isnan(multiunit), axis=1) mean_rates.append(is_spike.mean()) if is_spike.sum() > 0: if use_diffusion & (position.shape[1] > 1): marginal_density = np.asarray( estimate_diffusion_position_density( position[is_spike & not_nan_position], edges, bin_distances=bin_diffusion_distances, ), dtype=np.float32, ) else: marginal_density = np.zeros( (place_bin_centers.shape[0],), dtype=np.float32 ) marginal_density[gpu_is_track_interior] = estimate_position_density( interior_place_bin_centers, np.asarray(position[is_spike & not_nan_position], dtype=np.float32), position_std, block_size=block_size, ) summed_ground_process_intensity += estimate_intensity( marginal_density, occupancy, mean_rates[-1] ) is_mark_features = np.any(~np.isnan(multiunit), axis=0) encoding_marks.append( np.asarray( multiunit[np.ix_(is_spike & not_nan_position, is_mark_features)], dtype=np.int16, ) ) encoding_positions.append(position[is_spike & not_nan_position]) summed_ground_process_intensity = summed_ground_process_intensity + np.spacing(1) return { "encoding_marks": encoding_marks, "encoding_positions": encoding_positions, "summed_ground_process_intensity": summed_ground_process_intensity, "occupancy": occupancy, "mean_rates": mean_rates, "mark_std": mark_std, "position_std": position_std, "block_size": block_size, "bin_diffusion_distances": bin_diffusion_distances, "use_diffusion": use_diffusion, "edges": edges, **kwargs, }
[docs] def estimate_multiunit_likelihood_integer( multiunits: np.ndarray, encoding_marks: np.ndarray, mark_std: np.ndarray, place_bin_centers: np.ndarray, encoding_positions: np.ndarray, position_std: np.ndarray, occupancy: np.ndarray, mean_rates: np.ndarray, summed_ground_process_intensity: np.ndarray, bin_diffusion_distances: np.ndarray, edges: list[np.ndarray], max_mark_diff: int = 6000, set_diag_zero: bool = False, is_track_interior: Optional[np.ndarray] = None, time_bin_size: int = 1, block_size: int = 100, ignore_no_spike: bool = False, disable_progress_bar: bool = False, use_diffusion: bool = False, ) -> np.ndarray: """Estimates the likelihood of position bins given multiunit marks. Parameters ---------- multiunits : np.ndarray, shape (n_decoding_time, n_marks, n_electrodes) encoding_marks : np.ndarray, shape (n_encoding_spikes, n_marks, n_electrodes) mark_std : float Amount of smoothing for mark features. Standard deviation of kernel. place_bin_centers : np.ndarray, shape (n_bins, n_position_dims) encoding_positions : np.ndarray, shape (n_encoding_spikes, n_position_dims) position_std : float or array_like, shape (n_position_dims,) Amount of smoothing for position. Standard deviation of kernel. occupancy : np.ndarray, (n_bins,) mean_rates : list, len (n_electrodes,) summed_ground_process_intensity : np.ndarray, shape (n_bins,) bin_diffusion_distances : np.ndarray, shape (n_bins, n_bins) edges : list of np.ndarray max_mark_diff : int Maximum difference between mark features set_diag_zero : bool Remove influence of the same mark in encoding and decoding. is_track_interior : None or np.ndarray, shape (n_bins_x, n_bins_y) time_bin_size : float Size of time steps block_size : int Size of data to process in chunks ignore_no_spike : bool Set contribution of no spikes to zero disable_progress_bar : bool If False, a progress bar will be displayed. use_diffusion : bool Respect track geometry by using diffusion distances Returns ------- log_likelihood : np.ndarray, shape (n_time, n_bins) """ if is_track_interior is None: is_track_interior = np.ones((place_bin_centers.shape[0],), dtype=bool) else: is_track_interior = is_track_interior.ravel(order="F") n_time = multiunits.shape[0] if ignore_no_spike: log_likelihood = ( -time_bin_size * summed_ground_process_intensity * np.zeros((n_time, 1), dtype=np.float32) ) else: log_likelihood = ( -time_bin_size * summed_ground_process_intensity * np.ones((n_time, 1), dtype=np.float32) ) multiunits = np.moveaxis(multiunits, -1, 0) n_position_bins = is_track_interior.sum() interior_place_bin_centers = np.asarray( place_bin_centers[is_track_interior], dtype=np.float32 ) gpu_is_track_interior = np.asarray(is_track_interior) interior_occupancy = occupancy[gpu_is_track_interior] for multiunit, enc_marks, enc_pos, mean_rate in zip( tqdm(multiunits, desc="n_electrodes", disable=disable_progress_bar), encoding_marks, encoding_positions, mean_rates, ): is_spike = np.any(~np.isnan(multiunit), axis=1) is_mark_features = np.any(~np.isnan(multiunit), axis=0) decoding_marks = np.asarray( multiunit[np.ix_(is_spike, is_mark_features)], dtype=np.int16 ) n_decoding_marks = decoding_marks.shape[0] log_joint_mark_intensity = np.zeros( (n_decoding_marks, n_position_bins), dtype=np.float32 ) if block_size is None: block_size = n_decoding_marks if use_diffusion & (place_bin_centers.shape[1] > 1): position_distance = np.asarray( estimate_diffusion_position_distance( enc_pos, edges, bin_distances=bin_diffusion_distances, )[:, is_track_interior], dtype=np.float32, ) else: position_distance = estimate_position_distance( interior_place_bin_centers, np.asarray(enc_pos, dtype=np.float32), position_std, ).astype(np.float32) for start_ind in range(0, n_decoding_marks, block_size): block_inds = slice(start_ind, start_ind + block_size) log_joint_mark_intensity[block_inds] = estimate_log_joint_mark_intensity( decoding_marks[block_inds], enc_marks, mark_std, interior_occupancy, mean_rate, max_mark_diff=max_mark_diff, set_diag_zero=set_diag_zero, position_distance=position_distance, ) log_likelihood[np.ix_(is_spike, is_track_interior)] += np.nan_to_num( log_joint_mark_intensity ) log_likelihood[:, ~is_track_interior] = np.nan return log_likelihood