Source code for replay_trajectory_classification.standard_decoder

"""Functions for common Bayesian decoding algorithms."""

from __future__ import annotations

import numpy as np
from scipy.signal import convolve
from scipy.special import cotdg
from scipy.stats import rv_histogram
from skimage.transform import radon
from sklearn.isotonic import IsotonicRegression
from sklearn.linear_model import LinearRegression

from replay_trajectory_classification.core import scaled_likelihood
from replay_trajectory_classification.likelihoods.multiunit_likelihood import (
    estimate_intensity,
)


[docs] def poisson_mark_log_likelihood( joint_mark_intensity, ground_process_intensity, time_bin_size=1 ): """Probability of parameters given spiking indicator at a particular time and associated marks. Parameters ---------- joint_mark_intensity : ndarray, shape (n_time, n_bins) ground_process_intensity : ndarray, shape (1, n_bins) time_bin_size : int, optional Returns ------- poisson_mark_log_likelihood : ndarray, shape (n_time, n_bins) """ return np.log(joint_mark_intensity + np.spacing(1)) - ( (ground_process_intensity + np.spacing(1)) * time_bin_size )
[docs] def predict_mark_likelihood( start_time, end_time, place_bin_centers, occupancy, joint_pdf_models, multiunit_dfs, ground_process_intensities, mean_rates, is_track_interior, dt=0.020, ): n_time_bins = np.ceil((end_time - start_time) / dt).astype(int) time_bin_edges = start_time + np.arange(n_time_bins + 1) * dt n_place_bins = len(place_bin_centers) log_likelihood = np.zeros((n_time_bins, n_place_bins)) interior_bin_inds = np.nonzero(is_track_interior)[0] for joint_model, multiunit_df, gpi, mean_rate in zip( joint_pdf_models, multiunit_dfs, ground_process_intensities, mean_rates ): time_index = np.searchsorted(time_bin_edges, multiunit_df.index.total_seconds()) in_time_bins = np.nonzero(~np.isin(time_index, [0, len(time_bin_edges)]))[0] time_index = time_index[in_time_bins] - 1 multiunit_df = multiunit_df.iloc[in_time_bins, :4] multiunit_df["time_bin_ind"] = time_index n_spikes = multiunit_df.shape[0] joint_mark_intensity = np.ones((n_spikes, n_place_bins)) if n_spikes > 0: zipped = zip( interior_bin_inds, place_bin_centers[interior_bin_inds], occupancy[interior_bin_inds], ) for bin_ind, bin, bin_occupancy in zipped: marks_pos = np.asarray(multiunit_df.iloc[:, :4]) marks_pos = np.concatenate( (marks_pos, bin * np.ones((n_spikes, 1))), axis=1 ) joint_mark_intensity[:, bin_ind] = estimate_intensity( np.exp(joint_model.score_samples(marks_pos)), bin_occupancy, mean_rate, ) tetrode_likelihood = poisson_mark_log_likelihood( joint_mark_intensity, np.atleast_2d(gpi) ) for time_bin_ind in np.unique(time_index): log_likelihood[time_bin_ind] += np.sum( tetrode_likelihood[time_index == time_bin_ind], axis=0 ) mask = np.ones_like(is_track_interior, dtype=float) mask[~is_track_interior] = np.nan log_likelihood = log_likelihood * mask time = np.arange(n_time_bins) * dt return scaled_likelihood(log_likelihood), time
[docs] def predict_poisson_likelihood( start_time, end_time, spike_times, place_fields, is_track_interior, dt=0.020 ): place_fields = np.asarray(place_fields) n_time_bins = np.ceil((end_time - start_time) / dt).astype(int) time_bin_edges = start_time + np.arange(n_time_bins + 1) * dt time_bin_centers = time_bin_edges[:-1] + np.diff(time_bin_edges) / 2 spike_time_ind, neuron_ind = [], [] for ind, times in enumerate(spike_times): is_valid_time = (times >= start_time) & (times <= end_time) inds = np.digitize(times[is_valid_time], time_bin_edges[1:-1]) spike_time_ind.append(inds) neuron_ind.append(np.ones_like(inds) * ind) neuron_ind = np.concatenate(neuron_ind) spike_time_ind = np.concatenate(spike_time_ind) log_likelihood = np.stack( [ np.sum( np.log( place_fields[:, neuron_ind[spike_time_ind == time_bin]] + np.spacing(1) ), axis=1, ) for time_bin in np.arange(n_time_bins) ] ) log_likelihood -= dt * np.sum(place_fields, axis=1) mask = np.ones_like(is_track_interior, dtype=float) mask[~is_track_interior] = np.nan return scaled_likelihood(log_likelihood) * mask, time_bin_centers
[docs] def normalize_to_posterior(likelihood, prior=None): if prior is None: n_position_bins = likelihood.shape[1] prior = np.ones_like(likelihood) / n_position_bins posterior = likelihood * prior return posterior / np.nansum(posterior, axis=1, keepdims=True)
[docs] def convert_polar_to_slope_intercept( n_pixels_from_center, projection_angle, center_pixel ): slope = -cotdg(-projection_angle) intercept = ( n_pixels_from_center / np.sin(-np.deg2rad(projection_angle)) - slope * center_pixel[0] + center_pixel[1] ) return intercept, slope
[docs] def detect_line_with_radon( posterior, dt, # s dp, # cm projection_angles=np.arange(-90, 90, 0.5), # degrees filter_invalid_positions=True, incorporate_nearby_positions=True, nearby_positions_max=15, # cm ): if incorporate_nearby_positions: n_nearby_bins = int(nearby_positions_max / 2 // dp) filt = np.ones(2 * n_nearby_bins + 1) posterior = np.apply_along_axis( lambda time_bin: convolve(time_bin, filt, mode="same"), axis=1, arr=posterior, ) else: n_nearby_bins = 1 # Sinogram is shape (pixels_from_center, projection_angles) sinogram = radon( posterior.T, theta=projection_angles, circle=False, preserve_range=False ) n_time, n_position_bins = posterior.shape center_pixel = np.asarray((n_time // 2, n_position_bins // 2)) pixels_from_center = np.arange( -sinogram.shape[0] // 2 + 1, sinogram.shape[0] // 2 + 1 ) if filter_invalid_positions: start_positions, velocities = convert_polar_to_slope_intercept( pixels_from_center[:, np.newaxis], projection_angles[np.newaxis, :], center_pixel, ) end_positions = start_positions + velocities * (n_time - 1) sinogram[(start_positions < 0) | (start_positions > n_position_bins - 1)] = 0.0 sinogram[(end_positions < 0) | (end_positions > n_position_bins - 1)] = 0.0 sinogram[:, np.isinf(velocities.squeeze())] = 0.0 # Find the maximum of the sinogram n_pixels_from_center_ind, projection_angle_ind = np.unravel_index( indices=np.argmax(sinogram), shape=sinogram.shape ) projection_angle = projection_angles[projection_angle_ind] n_pixels_from_center = pixels_from_center[n_pixels_from_center_ind] # Normalized score based on the integrated projection score = np.max(sinogram) / (n_time * n_nearby_bins) # Convert from polar form to slope-intercept form start_position, velocity = convert_polar_to_slope_intercept( n_pixels_from_center, projection_angle, center_pixel ) # Convert from pixels to position units start_position *= dp velocity *= dp / dt # Estimate position for the posterior time = np.arange(n_time) * dt radon_position = start_position + velocity * time return start_position, velocity, radon_position, score
[docs] def map_estimate(posterior, place_bin_centers): posterior[np.isnan(posterior)] = 0.0 return place_bin_centers[posterior.argmax(axis=1)].squeeze()
def _m(x, w): """Weighted Mean""" return np.sum(x * w) / np.sum(w) def _cov(x, y, w): """Weighted Covariance""" return np.sum(w * (x - _m(x, w)) * (y - _m(y, w))) / np.sum(w) def _corr(x, y, w): """Weighted Correlation""" return _cov(x, y, w) / np.sqrt(_cov(x, x, w) * _cov(y, y, w))
[docs] def weighted_correlation(posterior, time, place_bin_centers): place_bin_centers = place_bin_centers.squeeze() posterior[np.isnan(posterior)] = 0.0 return _corr(time[:, np.newaxis], place_bin_centers[np.newaxis, :], posterior)
[docs] def isotonic_regression(posterior, time, place_bin_centers): place_bin_centers = place_bin_centers.squeeze() posterior[np.isnan(posterior)] = 0.0 map = map_estimate(posterior, place_bin_centers) map_probabilities = np.max(posterior, axis=1) regression = IsotonicRegression(increasing="auto").fit( X=time, y=map, sample_weight=map_probabilities, ) score = regression.score( X=time, y=map, sample_weight=map_probabilities, ) prediction = regression.predict(time) return prediction, score
def _sample_posterior(posterior, place_bin_edges, n_samples=1000): """Samples the posterior positions. Parameters ---------- posterior : np.array, shape (n_time, n_position_bins) Returns ------- posterior_samples : numpy.ndarray, shape (n_time, n_samples) """ place_bin_edges = place_bin_edges.squeeze() n_time = posterior.shape[0] posterior_samples = [ rv_histogram((posterior[time_ind], place_bin_edges)).rvs(size=n_samples) for time_ind in range(n_time) ] return np.asarray(posterior_samples)
[docs] def linear_regression(posterior, place_bin_edges, time, n_samples=1000): posterior[np.isnan(posterior)] = 0.0 samples = _sample_posterior(posterior, place_bin_edges, n_samples=n_samples) design_matrix = np.tile(time, n_samples)[:, np.newaxis] response = samples.ravel(order="F") regression = LinearRegression().fit(X=design_matrix, y=response) r2 = regression.score(X=design_matrix, y=response) slope = regression.coef_[0] intercept = regression.intercept_ prediction = regression.predict(time[:, np.newaxis]) return intercept, slope, r2, prediction