"""Estimates a marked point process likelihood where the marks are
features of the spike waveform. Features are float32."""
from __future__ import annotations
from typing import Optional, Union
import numpy as np
from tqdm.autonotebook import tqdm
from replay_trajectory_classification.core import atleast_2d
from replay_trajectory_classification.likelihoods.diffusion import (
diffuse_each_bin,
estimate_diffusion_position_density,
estimate_diffusion_position_distance,
)
[docs]
def gaussian_pdf(x: np.ndarray, mean: np.ndarray, sigma: np.ndarray) -> np.ndarray:
"""Compute the value of a Gaussian probability density function at x with
given mean and sigma."""
return np.exp(-0.5 * ((x - mean) / sigma) ** 2) / (sigma * np.sqrt(2.0 * np.pi))
[docs]
def estimate_position_distance(
place_bin_centers: np.ndarray, positions: np.ndarray, position_std: np.ndarray
) -> np.ndarray:
"""Estimates the Euclidean distance between positions and position bins.
Parameters
----------
place_bin_centers : np.ndarray, shape (n_position_bins, n_position_dims)
positions : np.ndarray, shape (n_time, n_position_dims)
position_std : array_like, shape (n_position_dims,)
Returns
-------
position_distance : np.ndarray, shape (n_time, n_position_bins)
"""
n_time, n_position_dims = positions.shape
n_position_bins = place_bin_centers.shape[0]
if isinstance(position_std, (int, float)):
position_std = [position_std] * n_position_dims
position_distance = np.ones((n_time, n_position_bins), dtype=np.float32)
for position_ind, std in enumerate(position_std):
position_distance *= gaussian_pdf(
np.expand_dims(place_bin_centers[:, position_ind], axis=0),
np.expand_dims(positions[:, position_ind], axis=1),
std,
)
return position_distance
[docs]
def estimate_position_density(
place_bin_centers: np.ndarray,
positions: np.ndarray,
position_std: np.ndarray,
block_size: int = 100,
) -> np.ndarray:
"""Estimates a kernel density estimate over position bins using
Euclidean distances.
Parameters
----------
place_bin_centers : np.ndarray, shape (n_position_bins, n_position_dims)
positions : np.ndarray, shape (n_time, n_position_dims)
position_std : float or array_like, shape (n_position_dims,)
Returns
-------
position_density : np.ndarray, shape (n_position_bins,)
"""
n_time = positions.shape[0]
n_position_bins = place_bin_centers.shape[0]
if block_size is None:
block_size = n_time
position_density = np.empty((n_position_bins,))
for start_ind in range(0, n_position_bins, block_size):
block_inds = slice(start_ind, start_ind + block_size)
position_density[block_inds] = np.mean(
estimate_position_distance(
place_bin_centers[block_inds], positions, position_std
),
axis=0,
)
return position_density
[docs]
def estimate_log_intensity(
density: np.ndarray, occupancy: np.ndarray, mean_rate: float
) -> np.ndarray:
"""Calculates intensity in log space."""
return np.log(mean_rate) + np.log(density) - np.log(occupancy)
[docs]
def estimate_intensity(
density: np.ndarray, occupancy: np.ndarray, mean_rate: float
) -> np.ndarray:
"""Calculates intensity.
Parameters
----------
density : np.ndarray, shape (n_bins,)
occupancy : np.ndarray, shape (n_bins,)
mean_rate : float
Returns
-------
intensity : np.ndarray, shape (n_bins,)
"""
return np.exp(estimate_log_intensity(density, occupancy, mean_rate))
[docs]
def estimate_log_joint_mark_intensity(
decoding_marks: np.ndarray,
encoding_marks: np.ndarray,
mark_std: np.ndarray,
occupancy: np.ndarray,
mean_rate: float,
place_bin_centers: Optional[np.ndarray] = None,
encoding_positions: Optional[np.ndarray] = None,
position_std: Union[np.ndarray, float, None] = None,
max_mark_diff: int = 6000,
set_diag_zero: bool = False,
position_distance: Optional[np.ndarray] = None,
):
"""Finds the joint intensity of the marks and positions in log space.
Parameters
----------
decoding_marks : np.ndarray, shape (n_decoding_spikes, n_features)
encoding_marks : np.ndarray, shape (n_encoding_spikes, n_features)
mark_std : float or np.ndarray, shape (n_features,)
occupancy : np.ndarray, shape (n_position_bins,)
mean_rate : float
place_bin_centers : None or np.ndarray, shape (n_position_bins, n_position_dims)
If None, position distance must be not None
encoding_positions : None or np.ndarray, shape (n_decoding_spikes, n_position_dims)
If None, position distance must be not None
position_std : None or float or array_like, shape (n_position_dims,)
If None, position distance must be not None
max_mark_diff : int
Maximum distance between integer marks.
set_diag_zero : bool
position_distance : np.ndarray, shape (n_encoding_spikes, n_position_bins)
Precalculated distance between position and position bins.
Returns
-------
log_joint_mark_intensity : np.ndarray, shape (n_decoding_spikes, n_position_bins)
"""
n_encoding_spikes, n_marks = encoding_marks.shape
n_decoding_spikes = decoding_marks.shape[0]
mark_distance = np.ones((n_decoding_spikes, n_encoding_spikes), dtype=np.float32)
for mark_ind in range(n_marks):
mark_distance *= gaussian_pdf(
np.expand_dims(decoding_marks[:, mark_ind], axis=1),
np.expand_dims(encoding_marks[:, mark_ind], axis=0),
mark_std[mark_ind],
)
if set_diag_zero:
diag_ind = (np.arange(n_decoding_spikes), np.arange(n_decoding_spikes))
mark_distance[diag_ind] = 0.0
if position_distance is None:
position_distance = estimate_position_distance(
place_bin_centers, encoding_positions, position_std
).astype(np.float32)
return estimate_log_intensity(
mark_distance @ position_distance / n_encoding_spikes, occupancy, mean_rate
)
[docs]
def fit_multiunit_likelihood(
position: np.ndarray,
multiunits: np.ndarray,
place_bin_centers: np.ndarray,
mark_std: Union[np.ndarray, float],
position_std: Union[np.ndarray, float],
is_track_boundary: Optional[np.ndarray] = None,
is_track_interior: Optional[np.ndarray] = None,
edges: Optional[list[np.ndarray]] = None,
block_size: int = 100,
use_diffusion: bool = False,
**kwargs,
) -> dict:
"""Fits the clusterless place field model.
Parameters
----------
position : np.ndarray, shape (n_time, n_position_dims)
multiunits : np.ndarray, shape (n_time, n_marks, n_electrodes)
place_bin_centers : np.ndarray, shape (n_bins, n_position_dims)
mark_std : float or array_like, shape (n_marks,)
Amount of smoothing for the mark features. Standard deviation of kernel.
position_std : float or array_like, shape (n_position_dims,)
Amount of smoothing for position. Standard deviation of kernel.
is_track_boundary : None or np.ndarray, shape (n_bins,)
is_track_interior : None or np.ndarray, shape (n_bins,)
edges : None or list of np.ndarray
block_size : int
Size of data to process in chunks
use_diffusion : bool
Use diffusion to respect the track geometry.
Returns
-------
encoding_model : dict
"""
if is_track_interior is None:
is_track_interior = np.ones((place_bin_centers.shape[0],), dtype=bool)
position = atleast_2d(position)
place_bin_centers = atleast_2d(place_bin_centers)
interior_place_bin_centers = np.asarray(
place_bin_centers[is_track_interior.ravel(order="F")], dtype=np.float32
)
gpu_is_track_interior = np.asarray(is_track_interior.ravel(order="F"))
not_nan_position = np.all(~np.isnan(position), axis=1)
if use_diffusion & (position.shape[1] > 1):
n_total_bins = np.prod(is_track_interior.shape)
bin_diffusion_distances = diffuse_each_bin(
is_track_interior,
is_track_boundary,
dx=edges[0][1] - edges[0][0],
dy=edges[1][1] - edges[1][0],
std=position_std,
).reshape((n_total_bins, -1), order="F")
else:
bin_diffusion_distances = None
if use_diffusion & (position.shape[1] > 1):
occupancy = np.asarray(
estimate_diffusion_position_density(
position[not_nan_position],
edges,
bin_distances=bin_diffusion_distances,
),
dtype=np.float32,
)
else:
occupancy = np.zeros((place_bin_centers.shape[0],), dtype=np.float32)
occupancy[gpu_is_track_interior] = estimate_position_density(
interior_place_bin_centers,
np.asarray(position[not_nan_position], dtype=np.float32),
position_std,
block_size=block_size,
)
mean_rates = []
summed_ground_process_intensity = np.zeros(
(place_bin_centers.shape[0],), dtype=np.float32
)
encoding_marks = []
encoding_positions = []
n_marks = multiunits.shape[1]
if isinstance(mark_std, (int, float)):
mark_std = np.asarray([mark_std] * n_marks)
else:
mark_std = np.asarray(mark_std)
for multiunit in np.moveaxis(multiunits, -1, 0):
# ground process intensity
is_spike = np.any(~np.isnan(multiunit), axis=1)
mean_rates.append(is_spike.mean())
if is_spike.sum() > 0:
if use_diffusion & (position.shape[1] > 1):
marginal_density = np.asarray(
estimate_diffusion_position_density(
position[is_spike & not_nan_position],
edges,
bin_distances=bin_diffusion_distances,
),
dtype=np.float32,
)
else:
marginal_density = np.zeros(
(place_bin_centers.shape[0],), dtype=np.float32
)
marginal_density[gpu_is_track_interior] = estimate_position_density(
interior_place_bin_centers,
np.asarray(position[is_spike & not_nan_position], dtype=np.float32),
position_std,
block_size=block_size,
)
summed_ground_process_intensity += estimate_intensity(
marginal_density, occupancy, mean_rates[-1]
)
is_mark_features = np.any(~np.isnan(multiunit), axis=0)
encoding_marks.append(
np.asarray(
multiunit[np.ix_(is_spike & not_nan_position, is_mark_features)],
dtype=np.float32,
)
)
encoding_positions.append(position[is_spike & not_nan_position])
summed_ground_process_intensity = summed_ground_process_intensity + np.spacing(1)
return {
"encoding_marks": encoding_marks,
"encoding_positions": encoding_positions,
"summed_ground_process_intensity": summed_ground_process_intensity,
"occupancy": occupancy,
"mean_rates": mean_rates,
"mark_std": mark_std,
"position_std": position_std,
"block_size": block_size,
"bin_diffusion_distances": bin_diffusion_distances,
"use_diffusion": use_diffusion,
"edges": edges,
**kwargs,
}
[docs]
def estimate_multiunit_likelihood(
multiunits: np.ndarray,
encoding_marks: np.ndarray,
mark_std: np.ndarray,
place_bin_centers: np.ndarray,
encoding_positions: np.ndarray,
position_std: Union[float, np.ndarray],
occupancy: np.ndarray,
mean_rates: np.ndarray,
summed_ground_process_intensity: np.ndarray,
bin_diffusion_distances: np.ndarray,
edges: list[np.ndarray],
max_mark_diff: int = 6000,
set_diag_zero: bool = False,
is_track_interior: Optional[np.ndarray] = None,
time_bin_size: int = 1,
block_size: int = 100,
ignore_no_spike: bool = False,
disable_progress_bar: bool = False,
use_diffusion: bool = False,
) -> np.ndarray:
"""Estimates the likelihood of position bins given multiunit marks.
Parameters
----------
multiunits : np.ndarray, shape (n_decoding_time, n_marks, n_electrodes)
encoding_marks : np.ndarray, shape (n_encoding_spikes, n_marks, n_electrodes)
mark_std : list, shape (n_marks,)
Amount of smoothing for mark features
place_bin_centers : np.ndarray, shape (n_bins, n_position_dims)
encoding_positions : np.ndarray, shape (n_encoding_spikes, n_position_dims)
position_std : float or array_like, shape (n_position_dims,)
Amount of smoothing for position
occupancy : np.ndarray, (n_bins,)
mean_rates : list, len (n_electrodes,)
summed_ground_process_intensity : np.ndarray, shape (n_bins,)
bin_diffusion_distances : np.ndarray, shape (n_bins, n_bins)
edges : list of np.ndarray
max_mark_diff : int
Maximum difference between mark features
set_diag_zero : bool
Remove influence of the same mark in encoding and decoding.
is_track_interior : None or np.ndarray, shape (n_bins_x, n_bins_y)
time_bin_size : float
Size of time steps
block_size : int
Size of data to process in chunks
ignore_no_spike : bool
Set contribution of no spikes to zero
disable_progress_bar : bool
If False, a progress bar will be displayed.
use_diffusion : bool
Respect track geometry by using diffusion distances
Returns
-------
log_likelihood : (n_time, n_bins)
"""
if is_track_interior is None:
is_track_interior = np.ones((place_bin_centers.shape[0],), dtype=bool)
else:
is_track_interior = is_track_interior.ravel(order="F")
n_time = multiunits.shape[0]
if ignore_no_spike:
log_likelihood = (
-time_bin_size
* summed_ground_process_intensity
* np.zeros((n_time, 1), dtype=np.float32)
)
else:
log_likelihood = (
-time_bin_size
* summed_ground_process_intensity
* np.ones((n_time, 1), dtype=np.float32)
)
multiunits = np.moveaxis(multiunits, -1, 0)
n_position_bins = is_track_interior.sum()
interior_place_bin_centers = np.asarray(
place_bin_centers[is_track_interior], dtype=np.float32
)
gpu_is_track_interior = np.asarray(is_track_interior)
interior_occupancy = occupancy[gpu_is_track_interior]
for multiunit, enc_marks, enc_pos, mean_rate in zip(
tqdm(multiunits, desc="n_electrodes", disable=disable_progress_bar),
encoding_marks,
encoding_positions,
mean_rates,
):
is_spike = np.any(~np.isnan(multiunit), axis=1)
is_mark_features = np.any(~np.isnan(multiunit), axis=0)
decoding_marks = np.asarray(
multiunit[np.ix_(is_spike, is_mark_features)], dtype=np.float32
)
n_decoding_marks = decoding_marks.shape[0]
log_joint_mark_intensity = np.zeros(
(n_decoding_marks, n_position_bins), dtype=np.float32
)
if block_size is None:
block_size = n_decoding_marks
if use_diffusion & (place_bin_centers.shape[1] > 1):
position_distance = np.asarray(
estimate_diffusion_position_distance(
enc_pos,
edges,
bin_distances=bin_diffusion_distances,
)[:, is_track_interior],
dtype=np.float32,
)
else:
position_distance = estimate_position_distance(
interior_place_bin_centers,
np.asarray(enc_pos, dtype=np.float32),
position_std,
).astype(np.float32)
for start_ind in range(0, n_decoding_marks, block_size):
block_inds = slice(start_ind, start_ind + block_size)
log_joint_mark_intensity[block_inds] = estimate_log_joint_mark_intensity(
decoding_marks[block_inds],
enc_marks,
mark_std[is_mark_features],
interior_occupancy,
mean_rate,
max_mark_diff=max_mark_diff,
set_diag_zero=set_diag_zero,
position_distance=position_distance,
)
log_likelihood[np.ix_(is_spike, is_track_interior)] += np.nan_to_num(
log_joint_mark_intensity
)
log_likelihood[:, ~is_track_interior] = np.nan
return log_likelihood