"""Estimates a Poisson likelihood using place fields estimated with a kernel density estimate."""
from __future__ import annotations
from typing import Optional, Union
import numpy as np
import pandas as pd
import xarray as xr
from scipy.special import xlogy
from tqdm.auto import tqdm
from replay_trajectory_classification.core import atleast_2d
from replay_trajectory_classification.likelihoods.diffusion import (
diffuse_each_bin,
estimate_diffusion_position_density,
)
[docs]
def gaussian_pdf(x: np.ndarray, mean: np.ndarray, sigma: np.ndarray) -> np.ndarray:
"""Compute the value of a Gaussian probability density function at x with
given mean and sigma."""
return np.exp(-0.5 * ((x - mean) / sigma) ** 2) / (sigma * np.sqrt(2.0 * np.pi))
[docs]
def estimate_position_distance(
place_bin_centers: np.ndarray, positions: np.ndarray, position_std: np.ndarray
) -> np.ndarray:
"""Estimates the Euclidean distance between positions and position bins.
Parameters
----------
place_bin_centers : np.ndarray, shape (n_position_bins, n_position_dims)
positions : np.ndarray, shape (n_time, n_position_dims)
position_std : array_like, shape (n_position_dims,)
Returns
-------
position_distance : np.ndarray, shape (n_time, n_position_bins)
"""
n_time, n_position_dims = positions.shape
n_position_bins = place_bin_centers.shape[0]
position_distance = np.ones((n_time, n_position_bins), dtype=np.float32)
if isinstance(position_std, (int, float)):
position_std = [position_std] * n_position_dims
for position_ind, std in enumerate(position_std):
position_distance *= gaussian_pdf(
np.expand_dims(place_bin_centers[:, position_ind], axis=0),
np.expand_dims(positions[:, position_ind], axis=1),
std,
)
return position_distance
[docs]
def estimate_position_density(
place_bin_centers: np.ndarray,
positions: np.ndarray,
position_std: Union[float, np.ndarray],
block_size: int = 100,
) -> np.ndarray:
"""Estimates a kernel density estimate over position bins using
Euclidean distances.
Parameters
----------
place_bin_centers : np.ndarray, shape (n_position_bins, n_position_dims)
positions : np.ndarray, shape (n_time, n_position_dims)
position_std : float or array_like, shape (n_position_dims,)
block_size : int
Returns
-------
position_density : np.ndarray, shape (n_position_bins,)
"""
n_time = positions.shape[0]
n_position_bins = place_bin_centers.shape[0]
if block_size is None:
block_size = n_time
position_density = np.empty((n_position_bins,))
for start_ind in range(0, n_position_bins, block_size):
block_inds = slice(start_ind, start_ind + block_size)
position_density[block_inds] = np.mean(
estimate_position_distance(
place_bin_centers[block_inds], positions, position_std
),
axis=0,
)
return position_density
[docs]
def get_firing_rate(
is_spike: np.ndarray,
position: np.ndarray,
place_bin_centers: np.ndarray,
is_track_interior: np.ndarray,
not_nan_position: np.ndarray,
occupancy: np.ndarray,
position_std: np.ndarray,
block_size: Optional[int] = None,
) -> np.ndarray:
if is_spike.sum() > 0:
mean_rate = is_spike.mean()
marginal_density = np.zeros((place_bin_centers.shape[0],), dtype=np.float32)
marginal_density[is_track_interior] = estimate_position_density(
place_bin_centers[is_track_interior],
np.asarray(position[is_spike & not_nan_position], dtype=np.float32),
position_std,
block_size=block_size,
)
return np.exp(np.log(mean_rate) + np.log(marginal_density) - np.log(occupancy))
else:
return np.zeros_like(occupancy)
[docs]
def get_diffusion_firing_rate(
is_spike: np.ndarray,
position: np.ndarray,
edges: list[np.ndarray],
bin_diffusion_distances: np.ndarray,
occupancy: np.ndarray,
not_nan_position: np.ndarray,
) -> np.ndarray:
if is_spike.sum() > 0:
mean_rate = is_spike.mean()
marginal_density = estimate_diffusion_position_density(
position[is_spike & not_nan_position],
edges,
bin_distances=bin_diffusion_distances,
).astype(np.float32)
return np.exp(np.log(mean_rate) + np.log(marginal_density) - np.log(occupancy))
else:
return np.zeros_like(occupancy)
[docs]
def estimate_place_fields_kde(
position: np.ndarray,
spikes: np.ndarray,
place_bin_centers: np.ndarray,
position_std: np.ndarray,
is_track_boundary: Optional[np.ndarray] = None,
is_track_interior: Optional[np.ndarray] = None,
edges: Optional[list[np.ndarray]] = None,
place_bin_edges: Optional[np.ndarray] = None,
use_diffusion: bool = False,
block_size: Optional[int] = None,
) -> xr.DataArray:
"""Gives the conditional intensity of the neurons' spiking with respect to
position.
Parameters
----------
position : np.ndarray, shape (n_time, n_position_dims)
spikes : np.ndarray, shape (n_time,)
Indicator of spike or no spike at current time.
place_bin_centers : np.ndarray, shape (n_bins, n_position_dims)
position_std : float or array_like, shape (n_position_dims,)
Amount of smoothing for position. Standard deviation of kernel.
is_track_boundary : None or np.ndarray, shape (n_bins,)
is_track_interior : None or np.ndarray, shape (n_bins,)
edges : None or list of np.ndarray
place_bin_edges : np.ndarray, shape (n_bins + 1, n_position_dims)
use_diffusion : bool
Respect track geometry by using diffusion distances
block_size : int
Size of data to process in chunks
Returns
-------
conditional_intensity : np.ndarray, shape (n_bins, n_neurons)
"""
position = atleast_2d(position).astype(np.float32)
place_bin_centers = atleast_2d(place_bin_centers).astype(np.float32)
not_nan_position = np.all(~np.isnan(position), axis=1)
if use_diffusion & (position.shape[1] > 1):
n_total_bins = np.prod(is_track_interior.shape)
bin_diffusion_distances = diffuse_each_bin(
is_track_interior,
is_track_boundary,
dx=edges[0][1] - edges[0][0],
dy=edges[1][1] - edges[1][0],
std=position_std,
).reshape((n_total_bins, -1), order="F")
occupancy = estimate_diffusion_position_density(
position[not_nan_position],
edges,
bin_distances=bin_diffusion_distances,
).astype(np.float32)
place_fields = np.stack(
[
get_diffusion_firing_rate(
is_spike,
position,
edges,
bin_diffusion_distances,
occupancy,
not_nan_position,
)
for is_spike in np.asarray(spikes, dtype=bool).T
],
axis=1,
)
else:
occupancy = np.zeros((place_bin_centers.shape[0],), dtype=np.float32)
occupancy[is_track_interior.ravel(order="F")] = estimate_position_density(
place_bin_centers[is_track_interior.ravel(order="F")],
position[not_nan_position],
position_std,
block_size=block_size,
)
place_fields = np.stack(
[
get_firing_rate(
is_spike,
position,
place_bin_centers,
is_track_interior.ravel(order="F"),
not_nan_position,
occupancy,
position_std,
)
for is_spike in np.asarray(spikes, dtype=bool).T
],
axis=1,
)
DIMS = ["position", "neuron"]
if position.shape[1] == 1:
names = ["position"]
coords = {"position": place_bin_centers.squeeze()}
elif position.shape[1] == 2:
names = ["x_position", "y_position"]
coords = {
"position": pd.MultiIndex.from_arrays(
place_bin_centers.T.tolist(), names=names
)
}
return xr.DataArray(data=place_fields, coords=coords, dims=DIMS)
[docs]
def poisson_log_likelihood(
spikes: np.ndarray, conditional_intensity: np.ndarray
) -> np.ndarray:
"""Probability of parameters given spiking at a particular time.
Parameters
----------
spikes : np.ndarray, shape (n_time,)
Indicator of spike or no spike at current time.
conditional_intensity : np.ndarray, shape (n_place_bins,)
Instantaneous probability of observing a spike
Returns
-------
poisson_log_likelihood : array_like, shape (n_time, n_place_bins)
"""
return (
xlogy(spikes[:, np.newaxis], conditional_intensity[np.newaxis, :])
- conditional_intensity[np.newaxis, :]
)
[docs]
def combined_likelihood(
spikes: np.ndarray, conditional_intensity: np.ndarray
) -> np.ndarray:
"""
Parameters
----------
spikes : np.ndarray, shape (n_time, n_neurons)
conditional_intensity : np.ndarray, shape (n_bins, n_neurons)
"""
n_time = spikes.shape[0]
n_bins = conditional_intensity.shape[0]
log_likelihood = np.zeros((n_time, n_bins))
conditional_intensity = np.clip(conditional_intensity, a_min=1e-15, a_max=None)
for is_spike, ci in zip(tqdm(spikes.T), conditional_intensity.T):
log_likelihood += poisson_log_likelihood(is_spike, ci)
return log_likelihood
[docs]
def estimate_spiking_likelihood_kde(
spikes: np.ndarray,
conditional_intensity: np.ndarray,
is_track_interior: Optional[np.ndarray] = None,
) -> np.ndarray:
"""
Parameters
----------
spikes : np.ndarray, shape (n_time, n_neurons)
conditional_intensity : np.ndarray, shape (n_bins, n_neurons)
is_track_interior : None or np.ndarray, optional, shape (n_x_position_bins,
n_y_position_bins)
Returns
-------
likelihood : np.ndarray, shape (n_time, n_bins)
"""
spikes = np.asarray(spikes, dtype=np.float32)
if is_track_interior is not None:
is_track_interior = is_track_interior.ravel(order="F")
else:
n_bins = conditional_intensity.shape[0]
is_track_interior = np.ones((n_bins,), dtype=bool)
log_likelihood = combined_likelihood(spikes, conditional_intensity)
mask = np.ones_like(is_track_interior, dtype=float)
mask[~is_track_interior] = np.nan
return log_likelihood * mask