"""Estimates a marked point process likelihood where the marks are
features of the spike waveform. Mark features are int16.
Marks are converted to integers and KDE uses hash tables to compute Gaussian
kernels."""
from __future__ import annotations
from typing import Optional, Union
import numpy as np
from tqdm.autonotebook import tqdm
from replay_trajectory_classification.core import atleast_2d
from replay_trajectory_classification.likelihoods.diffusion import (
diffuse_each_bin,
estimate_diffusion_position_density,
estimate_diffusion_position_distance,
)
[docs]
def gaussian_pdf(x: np.ndarray, mean: np.ndarray, sigma: np.ndarray) -> np.ndarray:
"""Compute the value of a Gaussian probability density function at x with
given mean and sigma."""
return np.exp(-0.5 * ((x - mean) / sigma) ** 2) / (sigma * np.sqrt(2.0 * np.pi))
[docs]
def estimate_position_distance(
place_bin_centers: np.ndarray, positions: np.ndarray, position_std: np.ndarray
) -> np.ndarray:
"""Estimates the Euclidean distance between positions and position bins.
Parameters
----------
place_bin_centers : np.ndarray, shape (n_position_bins, n_position_dims)
positions : np.ndarray, shape (n_time, n_position_dims)
position_std : array_like, shape (n_position_dims,)
Returns
-------
position_distance : np.ndarray, shape (n_time, n_position_bins)
"""
n_time, n_position_dims = positions.shape
n_position_bins = place_bin_centers.shape[0]
if isinstance(position_std, (int, float)):
position_std = [position_std] * n_position_dims
position_distance = np.ones((n_time, n_position_bins), dtype=np.float32)
for position_ind, std in enumerate(position_std):
position_distance *= gaussian_pdf(
np.expand_dims(place_bin_centers[:, position_ind], axis=0),
np.expand_dims(positions[:, position_ind], axis=1),
std,
)
return position_distance
[docs]
def estimate_position_density(
place_bin_centers: np.ndarray,
positions: np.ndarray,
position_std: Union[np.ndarray, float],
block_size: int = 100,
) -> np.ndarray:
"""Estimates a kernel density estimate over position bins using
Euclidean distances.
Parameters
----------
place_bin_centers : np.ndarray, shape (n_position_bins, n_position_dims)
positions : np.ndarray, shape (n_time, n_position_dims)
position_std : float or array_like, shape (n_position_dims,)
Returns
-------
position_density : np.ndarray, shape (n_position_bins,)
"""
n_time = positions.shape[0]
n_position_bins = place_bin_centers.shape[0]
if block_size is None:
block_size = n_time
position_density = np.empty((n_position_bins,))
for start_ind in range(0, n_position_bins, block_size):
block_inds = slice(start_ind, start_ind + block_size)
position_density[block_inds] = np.mean(
estimate_position_distance(
place_bin_centers[block_inds], positions, position_std
),
axis=0,
)
return position_density
[docs]
def estimate_log_intensity(
density: np.ndarray, occupancy: np.ndarray, mean_rate: float
) -> np.ndarray:
"""Calculates intensity in log space."""
return np.log(mean_rate) + np.log(density) - np.log(occupancy)
[docs]
def estimate_intensity(density, occupancy, mean_rate):
"""Calculates intensity.
Parameters
----------
density : np.ndarray, shape (n_bins,)
occupancy : np.ndarray, shape (n_bins,)
mean_rate : float
Returns
-------
intensity : np.ndarray, shape (n_bins,)
"""
return np.exp(estimate_log_intensity(density, occupancy, mean_rate))
[docs]
def normal_pdf_integer_lookup(
x: int, mean: int, std: float = 20.0, max_value: int = 6000
) -> int:
"""Fast density evaluation for integers by precomputing a hash table of
values.
Parameters
----------
x : int
mean : int
std : float
max_value : int
Returns
-------
probability_density : int
"""
normal_density = gaussian_pdf(np.arange(-max_value, max_value), 0, std).astype(
np.float32
)
return normal_density[(x - mean) + max_value]
[docs]
def estimate_log_joint_mark_intensity(
decoding_marks: np.ndarray,
encoding_marks: np.ndarray,
mark_std: Union[np.ndarray, float],
occupancy: np.ndarray,
mean_rate: float,
place_bin_centers: Optional[np.ndarray] = None,
encoding_positions: Optional[np.ndarray] = None,
position_std: Optional[np.ndarray] = None,
max_mark_diff: int = 6000,
set_diag_zero: bool = False,
position_distance: Optional[np.ndarray] = None,
) -> np.ndarray:
"""Finds the joint intensity of the marks and positions in log space.
Parameters
----------
decoding_marks : np.ndarray, shape (n_decoding_spikes, n_features)
encoding_marks : np.ndarray, shape (n_encoding_spikes, n_features)
mark_std : float or np.ndarray, shape (n_features,)
occupancy : np.ndarray, shape (n_position_bins,)
mean_rate : float
place_bin_centers : None or np.ndarray, shape (n_position_bins, n_position_dims)
If None, position distance must be not None
encoding_positions : None or np.ndarray, shape (n_decoding_spikes, n_position_dims)
If None, position distance must be not None
position_std : None or float or array_like, shape (n_position_dims,)
If None, position distance must be not None
max_mark_diff : int
Maximum distance between integer marks.
set_diag_zero : bool
position_distance : np.ndarray, shape (n_encoding_spikes, n_position_bins)
Precalculated distance between position and position bins.
Returns
-------
log_joint_mark_intensity : np.ndarray, shape (n_decoding_spikes, n_position_bins)
"""
n_encoding_spikes, n_marks = encoding_marks.shape
n_decoding_spikes = decoding_marks.shape[0]
mark_distance = np.ones((n_decoding_spikes, n_encoding_spikes), dtype=np.float32)
for mark_ind in range(n_marks):
mark_distance *= normal_pdf_integer_lookup(
np.expand_dims(decoding_marks[:, mark_ind], axis=1),
np.expand_dims(encoding_marks[:, mark_ind], axis=0),
std=mark_std,
max_value=max_mark_diff,
)
if set_diag_zero:
diag_ind = (np.arange(n_decoding_spikes), np.arange(n_decoding_spikes))
mark_distance[diag_ind] = 0.0
if position_distance is None:
position_distance = estimate_position_distance(
place_bin_centers, encoding_positions, position_std
).astype(np.float32)
return estimate_log_intensity(
mark_distance @ position_distance / n_encoding_spikes, occupancy, mean_rate
)
[docs]
def fit_multiunit_likelihood_integer(
position: np.ndarray,
multiunits: np.ndarray,
place_bin_centers: np.ndarray,
mark_std: np.ndarray,
position_std: Union[np.ndarray, float],
is_track_boundary: Optional[np.ndarray] = None,
is_track_interior: Optional[np.ndarray] = None,
edges: Optional[list[np.ndarray]] = None,
block_size: int = 100,
use_diffusion: bool = False,
**kwargs,
) -> dict:
"""Fits the clusterless place field model.
Parameters
----------
position : np.ndarray, shape (n_time, n_position_dims)
multiunits : np.ndarray, shape (n_time, n_marks, n_electrodes)
place_bin_centers : np.ndarray, shape (n_bins, n_position_dims)
mark_std : float
Amount of smoothing for the mark features. Standard deviation of kernel.
position_std : float or array_like, shape (n_position_dims,)
Amount of smoothing for position. Standard deviation of kernel.
is_track_boundary : None or np.ndarray, shape (n_bins,)
is_track_interior : None or np.ndarray, shape (n_bins,)
edges : None or list of np.ndarray
block_size : int
Size of data to process in chunks
use_diffusion : bool
Use diffusion to respect the track geometry.
Returns
-------
encoding_model : dict
"""
if is_track_interior is None:
is_track_interior = np.ones((place_bin_centers.shape[0],), dtype=bool)
position = atleast_2d(position)
place_bin_centers = atleast_2d(place_bin_centers)
interior_place_bin_centers = np.asarray(
place_bin_centers[is_track_interior.ravel(order="F")], dtype=np.float32
)
gpu_is_track_interior = np.asarray(is_track_interior.ravel(order="F"))
not_nan_position = np.all(~np.isnan(position), axis=1)
if use_diffusion & (position.shape[1] > 1):
n_total_bins = np.prod(is_track_interior.shape)
bin_diffusion_distances = diffuse_each_bin(
is_track_interior,
is_track_boundary,
dx=edges[0][1] - edges[0][0],
dy=edges[1][1] - edges[1][0],
std=position_std,
).reshape((n_total_bins, -1), order="F")
else:
bin_diffusion_distances = None
if use_diffusion & (position.shape[1] > 1):
occupancy = np.asarray(
estimate_diffusion_position_density(
position[not_nan_position],
edges,
bin_distances=bin_diffusion_distances,
),
dtype=np.float32,
)
else:
occupancy = np.zeros((place_bin_centers.shape[0],), dtype=np.float32)
occupancy[gpu_is_track_interior] = estimate_position_density(
interior_place_bin_centers,
np.asarray(position[not_nan_position], dtype=np.float32),
position_std,
block_size=block_size,
)
mean_rates = []
summed_ground_process_intensity = np.zeros(
(place_bin_centers.shape[0],), dtype=np.float32
)
encoding_marks = []
encoding_positions = []
for multiunit in np.moveaxis(multiunits, -1, 0):
# ground process intensity
is_spike = np.any(~np.isnan(multiunit), axis=1)
mean_rates.append(is_spike.mean())
if is_spike.sum() > 0:
if use_diffusion & (position.shape[1] > 1):
marginal_density = np.asarray(
estimate_diffusion_position_density(
position[is_spike & not_nan_position],
edges,
bin_distances=bin_diffusion_distances,
),
dtype=np.float32,
)
else:
marginal_density = np.zeros(
(place_bin_centers.shape[0],), dtype=np.float32
)
marginal_density[gpu_is_track_interior] = estimate_position_density(
interior_place_bin_centers,
np.asarray(position[is_spike & not_nan_position], dtype=np.float32),
position_std,
block_size=block_size,
)
summed_ground_process_intensity += estimate_intensity(
marginal_density, occupancy, mean_rates[-1]
)
is_mark_features = np.any(~np.isnan(multiunit), axis=0)
encoding_marks.append(
np.asarray(
multiunit[np.ix_(is_spike & not_nan_position, is_mark_features)],
dtype=np.int16,
)
)
encoding_positions.append(position[is_spike & not_nan_position])
summed_ground_process_intensity = summed_ground_process_intensity + np.spacing(1)
return {
"encoding_marks": encoding_marks,
"encoding_positions": encoding_positions,
"summed_ground_process_intensity": summed_ground_process_intensity,
"occupancy": occupancy,
"mean_rates": mean_rates,
"mark_std": mark_std,
"position_std": position_std,
"block_size": block_size,
"bin_diffusion_distances": bin_diffusion_distances,
"use_diffusion": use_diffusion,
"edges": edges,
**kwargs,
}
[docs]
def estimate_multiunit_likelihood_integer(
multiunits: np.ndarray,
encoding_marks: np.ndarray,
mark_std: np.ndarray,
place_bin_centers: np.ndarray,
encoding_positions: np.ndarray,
position_std: np.ndarray,
occupancy: np.ndarray,
mean_rates: np.ndarray,
summed_ground_process_intensity: np.ndarray,
bin_diffusion_distances: np.ndarray,
edges: list[np.ndarray],
max_mark_diff: int = 6000,
set_diag_zero: bool = False,
is_track_interior: Optional[np.ndarray] = None,
time_bin_size: int = 1,
block_size: int = 100,
ignore_no_spike: bool = False,
disable_progress_bar: bool = False,
use_diffusion: bool = False,
) -> np.ndarray:
"""Estimates the likelihood of position bins given multiunit marks.
Parameters
----------
multiunits : np.ndarray, shape (n_decoding_time, n_marks, n_electrodes)
encoding_marks : np.ndarray, shape (n_encoding_spikes, n_marks, n_electrodes)
mark_std : float
Amount of smoothing for mark features. Standard deviation of kernel.
place_bin_centers : np.ndarray, shape (n_bins, n_position_dims)
encoding_positions : np.ndarray, shape (n_encoding_spikes, n_position_dims)
position_std : float or array_like, shape (n_position_dims,)
Amount of smoothing for position. Standard deviation of kernel.
occupancy : np.ndarray, (n_bins,)
mean_rates : list, len (n_electrodes,)
summed_ground_process_intensity : np.ndarray, shape (n_bins,)
bin_diffusion_distances : np.ndarray, shape (n_bins, n_bins)
edges : list of np.ndarray
max_mark_diff : int
Maximum difference between mark features
set_diag_zero : bool
Remove influence of the same mark in encoding and decoding.
is_track_interior : None or np.ndarray, shape (n_bins_x, n_bins_y)
time_bin_size : float
Size of time steps
block_size : int
Size of data to process in chunks
ignore_no_spike : bool
Set contribution of no spikes to zero
disable_progress_bar : bool
If False, a progress bar will be displayed.
use_diffusion : bool
Respect track geometry by using diffusion distances
Returns
-------
log_likelihood : np.ndarray, shape (n_time, n_bins)
"""
if is_track_interior is None:
is_track_interior = np.ones((place_bin_centers.shape[0],), dtype=bool)
else:
is_track_interior = is_track_interior.ravel(order="F")
n_time = multiunits.shape[0]
if ignore_no_spike:
log_likelihood = (
-time_bin_size
* summed_ground_process_intensity
* np.zeros((n_time, 1), dtype=np.float32)
)
else:
log_likelihood = (
-time_bin_size
* summed_ground_process_intensity
* np.ones((n_time, 1), dtype=np.float32)
)
multiunits = np.moveaxis(multiunits, -1, 0)
n_position_bins = is_track_interior.sum()
interior_place_bin_centers = np.asarray(
place_bin_centers[is_track_interior], dtype=np.float32
)
gpu_is_track_interior = np.asarray(is_track_interior)
interior_occupancy = occupancy[gpu_is_track_interior]
for multiunit, enc_marks, enc_pos, mean_rate in zip(
tqdm(multiunits, desc="n_electrodes", disable=disable_progress_bar),
encoding_marks,
encoding_positions,
mean_rates,
):
is_spike = np.any(~np.isnan(multiunit), axis=1)
is_mark_features = np.any(~np.isnan(multiunit), axis=0)
decoding_marks = np.asarray(
multiunit[np.ix_(is_spike, is_mark_features)], dtype=np.int16
)
n_decoding_marks = decoding_marks.shape[0]
log_joint_mark_intensity = np.zeros(
(n_decoding_marks, n_position_bins), dtype=np.float32
)
if block_size is None:
block_size = n_decoding_marks
if use_diffusion & (place_bin_centers.shape[1] > 1):
position_distance = np.asarray(
estimate_diffusion_position_distance(
enc_pos,
edges,
bin_distances=bin_diffusion_distances,
)[:, is_track_interior],
dtype=np.float32,
)
else:
position_distance = estimate_position_distance(
interior_place_bin_centers,
np.asarray(enc_pos, dtype=np.float32),
position_std,
).astype(np.float32)
for start_ind in range(0, n_decoding_marks, block_size):
block_inds = slice(start_ind, start_ind + block_size)
log_joint_mark_intensity[block_inds] = estimate_log_joint_mark_intensity(
decoding_marks[block_inds],
enc_marks,
mark_std,
interior_occupancy,
mean_rate,
max_mark_diff=max_mark_diff,
set_diag_zero=set_diag_zero,
position_distance=position_distance,
)
log_likelihood[np.ix_(is_spike, is_track_interior)] += np.nan_to_num(
log_joint_mark_intensity
)
log_likelihood[:, ~is_track_interior] = np.nan
return log_likelihood