Source code for replay_trajectory_classification.sorted_spikes_simulation

"""Functions for generating clustered spikes data."""
from __future__ import annotations
import numpy as np
from replay_trajectory_classification.simulate import (
    get_trajectory_direction,
    simulate_position,
    simulate_neuron_with_place_field,
    simulate_place_field_firing_rate,
    simulate_time,
)

SAMPLING_FREQUENCY = 1000
TRACK_HEIGHT = 180
RUNNING_SPEED = 15
PLACE_FIELD_VARIANCE = 6.0**2
PLACE_FIELD_MEANS = np.arange(0, TRACK_HEIGHT + 10, 10)
N_RUNS = 15
REPLAY_SPEEDUP = 120.0

# Figure Parameters
MM_TO_INCHES = 1.0 / 25.4
ONE_COLUMN = 89.0 * MM_TO_INCHES
ONE_AND_HALF_COLUMN = 140.0 * MM_TO_INCHES
TWO_COLUMN = 178.0 * MM_TO_INCHES
PAGE_HEIGHT = 247.0 * MM_TO_INCHES
GOLDEN_RATIO = (np.sqrt(5) - 1.0) / 2.0


[docs] def make_simulated_run_data( sampling_frequency: int = SAMPLING_FREQUENCY, track_height: float = TRACK_HEIGHT, running_speed: float = RUNNING_SPEED, n_runs: int = N_RUNS, place_field_variance: float = PLACE_FIELD_VARIANCE, place_field_means: np.ndarray = PLACE_FIELD_MEANS, make_inbound_outbound_neurons: bool = False, ) -> tuple[np.ndarray, np.ndarray, float, np.ndarray, np.ndarray]: """Make simulated data of a rat running back and forth on a linear maze with sorted spikes. Parameters ---------- sampling_frequency : float, optional Samples per second track_height : float, optional Height of the simulated track running_speed : float, optional Speed of the simulated animal n_runs : int, optional Number of runs across the track the simulated animal will perform place_field_variance : float, optional Spatial variance of the place field place_field_means : np.ndarray, shape (n_neurons,), optional Location of the center of the Gaussian place fields. make_inbound_outbound_neurons : bool Makes neurons direction selective. Returns ------- time : np.ndarray, shape (n_time,) position : np.ndarray, shape (n_time,) Position of the simualted animal sampling_frequency : float Samples per second spikes : np.ndarray, shape (n_time, n_neurons) Binned spike indicator. 1 means spike occured. 0 means no spike occured. place_fields : np.ndarray, shape (n_time, n_neurons) """ n_samples = int(n_runs * sampling_frequency * 2 * track_height / running_speed) time = simulate_time(n_samples, sampling_frequency) position = simulate_position(time, track_height, running_speed) if not make_inbound_outbound_neurons: place_fields = np.stack( [ simulate_place_field_firing_rate( place_field_mean, position, variance=place_field_variance ) for place_field_mean in place_field_means ], axis=1, ) spikes = np.stack( [ simulate_neuron_with_place_field( place_field_mean, position, max_rate=15, variance=place_field_variance, sampling_frequency=sampling_frequency, ) for place_field_mean in place_field_means.T ], axis=1, ) else: trajectory_direction = get_trajectory_direction(position) place_fields = [] spikes = [] for direction in np.unique(trajectory_direction): is_condition = trajectory_direction == direction for place_field_mean in place_field_means: place_fields.append( simulate_place_field_firing_rate( place_field_mean, position, variance=place_field_variance, is_condition=is_condition, ) ) spikes.append( simulate_neuron_with_place_field( place_field_mean, position, max_rate=15, variance=place_field_variance, sampling_frequency=sampling_frequency, is_condition=is_condition, ) ) place_fields = np.stack(place_fields, axis=1) spikes = np.stack(spikes, axis=1) return time, position, sampling_frequency, spikes, place_fields
[docs] def make_continuous_replay( sampling_frequency: int = SAMPLING_FREQUENCY, track_height: float = TRACK_HEIGHT, running_speed: float = RUNNING_SPEED, place_field_means: np.ndarray = PLACE_FIELD_MEANS, replay_speedup: int = REPLAY_SPEEDUP, is_outbound: bool = True, ) -> tuple[np.ndarray, np.ndarray]: """Make a simulated continuous replay. Parameters ---------- sampling_frequency : int, optional Samples per second track_height : float, optional Height of the simulated track running_speed : float, optional Speed of the simulated animal place_field_means : np.ndarray, optional Location of the center of the Gaussian place fields. replay_speedup : int, optional _description_, by default REPLAY_SPEEDUP is_outbound : bool, optional _description_, by default True Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_spikes : np.ndarray, shape (n_time, n_neurons) Binned spike indicator. 1 means spike occured. 0 means no spike occured. """ replay_speed = running_speed * replay_speedup n_samples = int(np.ceil(2 * sampling_frequency * track_height / replay_speed)) replay_time = simulate_time(n_samples, sampling_frequency) true_replay_position = simulate_position(replay_time, track_height, replay_speed) # Make inbound or outbound replay_time = replay_time[: n_samples // 2] if is_outbound: true_replay_position = true_replay_position[: n_samples // 2] else: true_replay_position = true_replay_position[n_samples // 2 :] min_times_ind = np.argmin( np.abs(true_replay_position[:, np.newaxis] - place_field_means), axis=0 ) n_neurons = place_field_means.shape[0] test_spikes = np.zeros((replay_time.size, n_neurons)) test_spikes[ ( min_times_ind, np.arange(n_neurons), ) ] = 1.0 return replay_time, test_spikes
[docs] def make_hover_replay( hover_neuron_ind: int = None, place_field_means: np.ndarray = PLACE_FIELD_MEANS, sampling_frequency: int = SAMPLING_FREQUENCY, ) -> tuple[np.ndarray, np.ndarray]: """Make a simulated stationary replay. Parameters ---------- hover_neuron_ind : int, optional _description_, by default None place_field_means : np.ndarray, optional _description_, by default PLACE_FIELD_MEANS sampling_frequency : int, optional Samples per second Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_spikes : np.ndarray, shape (n_time, n_neurons) Binned spike indicator. 1 means spike occured. 0 means no spike occured. """ n_neurons = place_field_means.shape[0] if hover_neuron_ind is None: hover_neuron_ind = n_neurons // 2 N_TIME = 50 replay_time = np.arange(N_TIME) / sampling_frequency spike_time_ind = np.arange(0, N_TIME, 2) test_spikes = np.zeros((N_TIME, n_neurons)) neuron_ind = np.ones((N_TIME // 2,), dtype=int) * hover_neuron_ind test_spikes[(spike_time_ind, neuron_ind)] = 1.0 return replay_time, test_spikes
[docs] def make_fragmented_replay( place_field_means: np.ndarray = PLACE_FIELD_MEANS, sampling_frequency: int = SAMPLING_FREQUENCY, ) -> tuple[np.ndarray, np.ndarray]: """Make a simulated fragmented replay. Parameters ---------- place_field_means : np.ndarray, optional _description_, by default PLACE_FIELD_MEANS sampling_frequency : int, optional Samples per second Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_spikes : np.ndarray, shape (n_time, n_neurons) Binned spike indicator. 1 means spike occured. 0 means no spike occured. """ N_TIME = 10 replay_time = np.arange(N_TIME) / sampling_frequency ind = ([1, 3, 5, 7, 9], [1, -1, 10, -5, 8]) n_neurons = place_field_means.shape[0] test_spikes = np.zeros((N_TIME, n_neurons)) test_spikes[ind] = 1.0 return replay_time, test_spikes
[docs] def make_hover_continuous_hover_replay( sampling_frequency: int = SAMPLING_FREQUENCY, ) -> tuple[np.ndarray, np.ndarray]: """Make a replay that starts stationary, then is continuous, then is stationary again. Parameters ---------- sampling_frequency : int, optional Samples per second Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_spikes : np.ndarray, shape (n_time, n_neurons) Binned spike indicator. 1 means spike occured. 0 means no spike occured. """ _, test_spikes1 = make_hover_replay(hover_neuron_ind=0) _, test_spikes2 = make_continuous_replay() _, test_spikes3 = make_hover_replay(hover_neuron_ind=-1) test_spikes = np.concatenate((test_spikes1, test_spikes2, test_spikes3)) replay_time = np.arange(test_spikes.shape[0]) / sampling_frequency return replay_time, test_spikes
[docs] def make_fragmented_hover_fragmented_replay( sampling_frequency: int = SAMPLING_FREQUENCY, ) -> tuple[np.ndarray, np.ndarray]: """Make a replay that starts fragmented, then is stationary, and then is fragmented again. Parameters ---------- sampling_frequency : int, optional Samples per second Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_spikes : np.ndarray, shape (n_time, n_neurons) Binned spike indicator. 1 means spike occured. 0 means no spike occured. """ _, test_spikes1 = make_fragmented_replay() _, test_spikes2 = make_hover_replay(hover_neuron_ind=6) _, test_spikes3 = make_fragmented_replay() test_spikes = np.concatenate((test_spikes1, test_spikes2, test_spikes3)) replay_time = np.arange(test_spikes.shape[0]) / sampling_frequency return replay_time, test_spikes
[docs] def make_fragmented_continuous_fragmented_replay( sampling_frequency: int = SAMPLING_FREQUENCY, ) -> tuple[np.ndarray, np.ndarray]: """Make a replay that is fragmented, then is continuous, then is fragmented again. Parameters ---------- sampling_frequency : int, optional Samples per second Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_spikes : np.ndarray, shape (n_time, n_neurons) Binned spike indicator. 1 means spike occured. 0 means no spike occured. """ _, test_spikes1 = make_fragmented_replay() _, test_spikes2 = make_continuous_replay() _, test_spikes3 = make_fragmented_replay() test_spikes = np.concatenate((test_spikes1, test_spikes2, test_spikes3)) replay_time = np.arange(test_spikes.shape[0]) / sampling_frequency return replay_time, test_spikes
[docs] def make_theta_sweep( sampling_frequency: int = SAMPLING_FREQUENCY, n_sweeps: int = 5 ) -> tuple[np.ndarray, np.ndarray]: """Simulate theta sweeping Parameters ---------- sampling_frequency : int, optional Samples per second n_sweeps : int, optional Number of sweeps to simulate, by default 5 Returns ------- replay_time : np.ndarray, shape (n_time,) Time in seconds. test_spikes : np.ndarray, shape (n_time, n_neurons) Binned spike indicator. 1 means spike occured. 0 means no spike occured. """ _, test_spikes1 = make_continuous_replay(is_outbound=False, replay_speedup=145) _, test_spikes2 = make_continuous_replay(is_outbound=True, replay_speedup=145) test_spikes = np.concatenate( [test_spikes1[test_spikes1.shape[0] // 2 :], test_spikes2] * n_sweeps ) replay_time = np.arange(test_spikes.shape[0]) / sampling_frequency return replay_time, test_spikes