Source code for replay_trajectory_classification.clusterless_simulation

"""Simulate clusterless spikes and associated spike waveform features."""

from __future__ import annotations

import numpy as np

from replay_trajectory_classification.simulate import (
    get_trajectory_direction,
    simulate_multiunit_with_place_fields,
    simulate_position,
    simulate_time,
)

SAMPLING_FREQUENCY = 1000
TRACK_HEIGHT = 175
RUNNING_SPEED = 15
PLACE_FIELD_VARIANCE = 6.0**2
PLACE_FIELD_MEANS = np.arange(0, 200, 10)
N_RUNS = 15
REPLAY_SPEEDUP = 120.0
N_TETRODES = 5
N_FEATURES = 4
MARK_SPACING = 5


[docs] def make_simulated_run_data( sampling_frequency: int = SAMPLING_FREQUENCY, track_height: float = TRACK_HEIGHT, running_speed: float = RUNNING_SPEED, n_runs: int = N_RUNS, place_field_variance: float = PLACE_FIELD_VARIANCE, place_field_means: np.ndarray = PLACE_FIELD_MEANS, n_tetrodes: int = N_TETRODES, make_inbound_outbound_neurons: bool = False, ): """Make simulated data of a rat running back and forth on a linear maze with unclustered spikes. Parameters ---------- sampling_frequency : int, optional track_height : float, optional Height of the simulated track running_speed : float, optional Speed of the simulated animal n_runs : int, optional Number of runs across the track the simulated animal will perform place_field_variance : float, optional Spatial extent of place fields place_field_means : np.ndarray, shape (n_neurons,), optional Location of the center of the Gaussian place fields. n_tetrodes : int, optional Total number of tetrodes to simulate make_inbound_outbound_neurons : bool, optional Create neurons with directional place fields Returns ------- time : np.ndarray, shape (n_time,) position : np.ndarray, shape (n_time,) sampling_frequency : float multiunits : np.ndarray, shape (n_time, n_features, n_electrodes) multiunits_spikes : np.ndarray (n_time, n_electrodes) place_field_means : np.ndarray (n_tetrodes, n_place_fields) """ n_samples = int(n_runs * sampling_frequency * 2 * track_height / running_speed) time = simulate_time(n_samples, sampling_frequency) position = simulate_position(time, track_height, running_speed) multiunits = [] if not make_inbound_outbound_neurons: for place_means in place_field_means.reshape(((n_tetrodes, -1))): multiunits.append( simulate_multiunit_with_place_fields( place_means, position, mark_spacing=10, n_mark_dims=4, sampling_frequency=sampling_frequency, ) ) else: trajectory_direction = get_trajectory_direction(position) for direction in np.unique(trajectory_direction): is_condition = trajectory_direction == direction for place_means in place_field_means.reshape(((n_tetrodes, -1))): multiunits.append( simulate_multiunit_with_place_fields( place_means, position, mark_spacing=10, n_mark_dims=4, sampling_frequency=sampling_frequency, is_condition=is_condition, ) ) multiunits = np.stack(multiunits, axis=-1) multiunits_spikes = np.any(~np.isnan(multiunits), axis=1) return (time, position, sampling_frequency, multiunits, multiunits_spikes)
[docs] def make_continuous_replay( sampling_frequency: int = SAMPLING_FREQUENCY, track_height: float = TRACK_HEIGHT, running_speed: float = RUNNING_SPEED, place_field_means: np.ndarray = PLACE_FIELD_MEANS, replay_speedup: int = REPLAY_SPEEDUP, n_tetrodes: int = N_TETRODES, n_features: int = N_FEATURES, mark_spacing: float = MARK_SPACING, ) -> tuple[np.ndarray, np.ndarray]: """Creates a simulated continuous replay. Parameters ---------- sampling_frequency : int, optional Samples per second track_height : float, optional Height of the simulated track running_speed : float, optional Simualted speed of the animal place_field_means : np.ndarray, optional Location of the center of the Gaussian place fields. replay_speedup : int, optional Number of times faster the replay event is faster than the running speed n_tetrodes : int, optional Number of simulated tetrodes n_features : int, optional Number of simulated features mark_spacing : float, optional Spacing between Gaussian mark features Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_multiunits : np.ndarray, shape (n_time, n_features, n_tetrodes) Binned clusterless spike times and features. NaN indicates no spike. Non-Nan indicates spike. """ replay_speed = running_speed * replay_speedup n_samples = int(0.5 * sampling_frequency * 2 * track_height / replay_speed) replay_time = simulate_time(n_samples, sampling_frequency) true_replay_position = simulate_position(replay_time, track_height, replay_speed) place_field_means = place_field_means.reshape(((n_tetrodes, -1))) min_times_ind = np.argmin( np.abs(true_replay_position[:, np.newaxis] - place_field_means.ravel()), axis=0 ) tetrode_ind = ( np.ones_like(place_field_means) * np.arange(5)[:, np.newaxis] ).ravel() test_multiunits = np.full((replay_time.size, n_features, n_tetrodes), np.nan) n_neurons = place_field_means.shape[1] mark_centers = np.arange(0, n_neurons * mark_spacing, mark_spacing) mark_ind = (np.ones_like(place_field_means) * np.arange(4)).ravel() for i in range(n_features): test_multiunits[(min_times_ind, i, tetrode_ind)] = mark_centers[mark_ind] return replay_time, test_multiunits
[docs] def make_hover_replay( hover_neuron_ind: int = None, place_field_means: np.ndarray = PLACE_FIELD_MEANS, sampling_frequency: int = SAMPLING_FREQUENCY, n_tetrodes: int = N_TETRODES, n_features: int = N_FEATURES, mark_spacing: float = MARK_SPACING, ) -> tuple[np.ndarray, np.ndarray]: """Creates a simulated stationary replay. Parameters ---------- hover_neuron_ind : int, optional Index of which neuron is the stationary neuron. place_field_means : np.ndarray, optional Location of the center of the Gaussian place fields. sampling_frequency : int, optional Samples per second n_tetrodes : int, optional Number of simulated tetrodes n_features : int, optional Number of simulated features mark_spacing : float, optional Spacing between Gaussian mark features Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_multiunits : np.ndarray, shape (n_time, n_features, n_tetrodes) Binned clusterless spike times and features. NaN indicates no spike. Non-Nan indicates spike. """ place_field_means = place_field_means.reshape(((n_tetrodes, -1))) if hover_neuron_ind is None: hover_neuron_ind = place_field_means.size // 2 tetrode_ind, neuron_ind = np.unravel_index( hover_neuron_ind, place_field_means.shape ) N_TIME = 50 replay_time = np.arange(N_TIME) / sampling_frequency spike_time_ind = np.arange(0, N_TIME, 2) test_multiunits = np.full((replay_time.size, n_features, n_tetrodes), np.nan) n_neurons = place_field_means.shape[1] mark_centers = np.arange(0, n_neurons * mark_spacing, mark_spacing) test_multiunits[spike_time_ind, :, tetrode_ind] = mark_centers[neuron_ind] return replay_time, test_multiunits
[docs] def make_fragmented_replay( place_field_means: np.ndarray = PLACE_FIELD_MEANS, sampling_frequency: int = SAMPLING_FREQUENCY, n_tetrodes: int = N_TETRODES, n_features: int = N_FEATURES, mark_spacing: float = MARK_SPACING, ) -> tuple[np.ndarray, np.ndarray]: """Creates a simulated fragmented replay. Parameters ---------- place_field_means : np.ndarray, optional Location of the center of the Gaussian place fields. sampling_frequency : int, optional Samples per second n_tetrodes : int, optional Number of simulated tetrodes n_features : int, optional Number of simulated features mark_spacing : float, optional Spacing between Gaussian mark features Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_multiunits : np.ndarray, shape (n_time, n_features, n_tetrodes) Binned clusterless spike times and features. NaN indicates no spike. Non-Nan indicates spike. """ N_TIME = 10 place_field_means = place_field_means.reshape(((n_tetrodes, -1))) replay_time = np.arange(N_TIME) / sampling_frequency n_total_neurons = place_field_means.size neuron_inds = [1, n_total_neurons - 1, 10, n_total_neurons - 5, 8] neuron_inds = np.unravel_index(neuron_inds, place_field_means.shape) spike_time_ind = [1, 3, 5, 7, 9] test_multiunits = np.full((replay_time.size, n_features, n_tetrodes), np.nan) n_neurons = place_field_means.shape[1] mark_centers = np.arange(0, n_neurons * mark_spacing, mark_spacing) for t_ind, tetrode_ind, neuron_ind in zip(spike_time_ind, *neuron_inds): test_multiunits[t_ind, :, tetrode_ind] = mark_centers[neuron_ind] return replay_time, test_multiunits
[docs] def make_hover_continuous_hover_replay( sampling_frequency: int = SAMPLING_FREQUENCY, place_field_means: np.ndarray = PLACE_FIELD_MEANS, ) -> tuple[np.ndarray, np.ndarray]: """Make a simulated replay that first is stationary, then is continuous, then is stationary again. Parameters ---------- sampling_frequency : int, optional Samples per second place_field_means : np.ndarray, optional Location of the center of the Gaussian place fields. Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_multiunits : np.ndarray, shape (n_time, n_features, n_tetrodes) Binned clusterless spike times and features. NaN indicates no spike. Non-Nan indicates spike. """ _, test_multiunits1 = make_hover_replay(hover_neuron_ind=0) _, test_multiunits2 = make_continuous_replay() n_total_neurons = place_field_means.size _, test_multiunits3 = make_hover_replay(hover_neuron_ind=n_total_neurons - 1) test_multiunits = np.concatenate( (test_multiunits1, test_multiunits2, test_multiunits3) ) replay_time = np.arange(test_multiunits.shape[0]) / sampling_frequency return replay_time, test_multiunits
[docs] def make_fragmented_hover_fragmented_replay( sampling_frequency: int = SAMPLING_FREQUENCY, ) -> tuple[np.ndarray, np.ndarray]: """Makes a simulated replay that first is fragmented, then is stationary, then is fragmented again. Parameters ---------- sampling_frequency : int, optional Samples per second Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_multiunits : np.ndarray, shape (n_time, n_features, n_tetrodes) Binned clusterless spike times and features. NaN indicates no spike. Non-Nan indicates spike. """ _, test_multiunits1 = make_fragmented_replay() _, test_multiunits2 = make_hover_replay(hover_neuron_ind=6) _, test_multiunits3 = make_fragmented_replay() test_multiunits = np.concatenate( (test_multiunits1, test_multiunits2, test_multiunits3) ) replay_time = np.arange(test_multiunits.shape[0]) / sampling_frequency return replay_time, test_multiunits
[docs] def make_fragmented_continuous_fragmented_replay( sampling_frequency: int = SAMPLING_FREQUENCY, ) -> tuple[np.ndarray, np.ndarray]: """Makes a simulated replay that first is fragmented, then is continuous, then is fragmented again. Parameters ---------- sampling_frequency : int, optional Samples per second Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_multiunits : np.ndarray, shape (n_time, n_features, n_tetrodes) Binned clusterless spike times and features. NaN indicates no spike. Non-Nan indicates spike. """ _, test_multiunits1 = make_fragmented_replay() _, test_multiunits2 = make_continuous_replay() _, test_multiunits3 = make_fragmented_replay() test_multiunits = np.concatenate( (test_multiunits1, test_multiunits2, test_multiunits3) ) replay_time = np.arange(test_multiunits.shape[0]) / sampling_frequency return replay_time, test_multiunits