Source code for gridcells.analysis.spikes

'''
===============================================================
:mod:`gridcells.analysis.spikes` - spike train analysis
===============================================================

Classes
-------
.. inheritance-diagram:: gridcells.analysis.spikes
                         gridcells.analysis.bumps.SingleBumpPopulation
    :parts: 2

.. autosummary::

    PopulationSpikes
    TorusPopulationSpikes
    TwistedTorusSpikes
'''
from __future__ import absolute_import, division, print_function

import os

import numpy as np
import scipy
import collections

# Do not import when in RDT environment
on_rtd = os.environ.get('READTHEDOCS', None) == 'True'
if not on_rtd:
    from . import _spikes

__all__ = [
    'PopulationSpikes',
    'TorusPopulationSpikes',
    'TwistedTorusSpikes'
]


def sliding_firing_rate_tuple(spikes, n, tstart, tend, dt, win_len):
    '''
    Compute a firing rate with a sliding window from a tuple of spike data:
    spikes is a tuple(n_id, times), in which n_id is a list/array of neuron id
    and times is a list/array of spike times

    Parameters
    ----------
    spikes : np.ndarray
        A pair (n_id, spikes)
    n : int
        Total number of neurons
    tstart : float
        When the firing rate will start (ms)
    tend : float
        End time of firing rate (ms)
    dt : float
        Sliding window dt - not related to simulation time (ms)
    win_len : float
        Length of the sliding window (ms). Must be >= dt.

    Returns
    -------
    output : np.ndarray
        An array of shape (n, int((tend-tstart)/dt)+1
    '''
    rate = _spikes.sliding_firing_rate_base(spikes[0], spikes[1], n, tstart,
                                            tend, dt, win_len)
    rate_t = _spikes.sliding_times(tstart, tend, dt)
    return (rate, rate_t)


[docs]class PopulationSpikes(collections.Sequence): '''Abstraction of a population of spikes.''' def __init__(self, n, senders, times): ''' Parameters ---------- n : int Number of neurons in the population senders : 1D array Neuron numbers corresponding to the spikes times : 1D array Spike times. The shape of this array must be the same as for `senders`. ''' self._N = n if n < 0: msg = "Number of neurons in the spike train must be " +\ "non-negative! Got {0}." raise ValueError(msg.format(n)) # We are expecting senders and times as numpy arrays, if they are not, # convert them. Moreover, senders.dtype must be int, for indexing. self._senders = np.ascontiguousarray(senders, dtype=np.int) self._times = np.ascontiguousarray(times) self._unpacked = [None] * self._N # unpacked version of spikes @property def n(self): ''' Number of neurons in the population ''' return self._N
[docs] def avg_firing_rate(self, tstart, tend): ''' Compute and average firing rate for all the neurons between 'tstart' and 'tend'. Return an array of firing rates, one item for each neuron in the population. Parameters ---------- tstart : float (ms) Start time. tend : float (ms) End time. Returns ------- output : numpy array Firing rate in Hz for each neuron in the population. ''' if tend < tstart: raise ValueError('tstart must be <= tend.') return _spikes.avg_fr(self._senders, self._times, self._N, tstart, tend) * 1e3
[docs] def sliding_firing_rate(self, tstart, tend, dt, win_len): ''' Compute a sliding firing rate over the population of spikes, by taking a rectangular window of specified length. Parameters ---------- tstart : float Start time of the firing rate analysis. tend : float End time of the analysis dt : float Firing rate window time step win_len : float Lengths of the windowing function (rectangle) Returns ------- output : a tuple A pair (F, t), specifying the vector of firing rates and corresponding times. F is a 2D array of the shape (n, Ntimes), in which n is the number of neurons and Ntimes is the number of time steps. 't' is a vector of times corresponding to the time windows taken. ''' spikes = (self._senders, self._times) return sliding_firing_rate_tuple(spikes, self._N, tstart, tend, dt, win_len)
[docs] def windowed(self, tlimits): ''' Return population spikes restricted to tlimits. Parameters ---------- tlimits : a pair A tuple (tstart, tend). The spikes in the population must satisfy tstart >= t <= tend. Returns ------- output : PopulationSpikes instance A copy of self with only a subset of spikes, limited by the time window. ''' tstart = tlimits[0] tend = tlimits[1] tidx = np.logical_and(self._times >= tstart, self._times <= tend) return PopulationSpikes(self._N, self._senders[tidx], self._times[tidx])
[docs] def raster_data(self, neuron_list=None): ''' Extract the senders and corresponding spike times for a raster plot. .. todo:: implement neuron_list Parameters ---------- neuron_list : list, optional Extract only neurons given in this list Returns ------- output : a tuple A pair containing (senders, times). ''' if neuron_list is not None: raise NotImplementedError() return self._senders, self._times
[docs] def spike_train_difference(self, idx1, idx2=None, full=True, reduce_fun=None): ''' Compute time differences between pairs of spikes of two neurons or a list of neurons. Parameters ---------- idx1 : int, or a sequence of ints Index of the first neuron or a list of neurons for which to compute the correlation histogram. idx2 : int, or a sequence of ints, or None Index of the second neuron or a list of indexes for the second set of spike trains. full : bool, optional Not fully implemented yet. Must be set to True. reduce_fun : callable, optional Any callable object that computes a function over an array of each spike train difference. The function must take one input argument, which will be the array of spike time differences for a pair of neurons. The output of this function will be stored instead of the default output. Returns ------- output : A 2D or 1D array Spike train autocorrelation histograms for all the pairs of neurons. The computation takes the following steps: * If ``idx1`` or ``idx2`` are integers, they will be converted to a list of size 1. * If ``idx2`` is None, then the result will be a list of lists of pairs of cross-correlations between the neurons. Even if there is only one neuron. If ``full == True``, the output will be an upper triangular matrix of all the pairs, i.e. it will exclude the duplicated. Otherwise there will be cross correlation histograms between all the pairs. * if ``idx2`` is not None, then ``idx1`` and ``idx2`` must be arrays of the same length, specifying the pairs to compute autocorrelation for ''' if not full: raise NotImplementedError() if reduce_fun is None: reduce_fun = lambda x: x if not isinstance(idx1, collections.Iterable): idx1 = [idx1] if idx2 is None: idx2 = idx1 res = [[] for x in idx1] for n1 in idx1: for n2 in idx2: # print n1, n2, len(self[n1]), len(self[n2]) res[n1].append( reduce_fun( _spikes.spike_time_diff(self[n1], self[n2]))) return res elif not isinstance(idx2, collections.Iterable): idx2 = [idx2] # Two arrays of pairs if len(idx1) != len(idx2): raise TypeError('Length of neuron indexes do not match!') res = [None] * len(idx1) for n in range(len(idx1)): res[n] = reduce_fun(_spikes.spike_time_diff(self[idx1[n]], self[idx2[n]])) return res
[docs] class CallableHistogram(object): '''Callable class that computes a histogram.''' def __init__(self, **kw): self.kw = kw def __call__(self, x): ''' Perform the histogram on x and return the result of numpy.histogram, without bin_edges ''' res, _ = np.histogram(x, **self.kw) return res
[docs] def get_bin_edges(self): '''Calculate bin edges.''' _, bin_edges = np.histogram([], **self.kw) return bin_edges
[docs] def spike_train_xcorr(self, idx1, idx2, lag_range, bins=50, **kw): ''' Compute the spike train crosscorrelation function for all pairs of spike trains in the population. For explanation of how ``idx1`` and ``idx2`` are treated, see :meth:`~PopulationSpikes.spike_train_difference`. Parameters ---------- idx1 : int, or a sequence of ints Index of the first neuron or a list of neurons for which to compute the correlation histogram. idx2 : int, or a sequence of ints, or None Index of the second neuron or a list of indexes for the second set of spike trains. lag_range : (lag_start, lag_end) Limits of the cross-correlation function. The bins will always be **centered** on the values. bins : int, optional Number of bins kw : dict Keyword arguments passed on to the numpy.histogram function Returns ------- output : a 2D or 1D list See :meth:`~PopulationSpikes.spike_train_difference`. ''' lag_start, lag_end = lag_range bin_width = (lag_end - lag_start) / (bins - 1) bin_edges = np.linspace(lag_start - bin_width / 2.0, lag_end + bin_width / 2.0, bins + 1) h = self.CallableHistogram(bins=bin_edges, **kw) xc = self.spike_train_difference(idx1, idx2, full=True, reduce_fun=h) bin_edges = h.get_bin_edges() bin_centers = (bin_edges[0:-1] + bin_edges[1:]) / 2.0 return xc, bin_centers, bin_edges
[docs] def isi_neuron(self, n): ''' Compute all interspike intervals of one neuron with ID ``n``. If the number of spikes is less than 2, returns an empty array. .. todo:: Works on sorted spike trains only! .. note:: If you get negative interspike intervals, you will need to sort your spike times (per each neuron). ''' spikes = self[n] if len(spikes) < 2: return np.array([]) return spikes[1:] - spikes[0:-1]
[docs] def isi(self, n=None, reduce_fun=None): ''' Return interspike interval of one or more neurons. Parameters ---------- n : None, int, or sequence Neuron numbers. If ``n`` is None, then compute ISI stats for all neurons in the population. If ``n`` is an int, compute ISIs for just neuron indexed by ``n``. Otherwise ``n`` is expected to be a sequence of neuron indices. reduce_fun : callable or None A reduction function (callable object) that performs an operation on all the ISIs of the population. If ``None``, nothing is done. The callable has to take one input parameter, which is the sequence of ISIs. This allows to cascade data processing without the need for duplicating spike timing data. Returns ------- output: list A list of outputs (depending on parameters) for each neuron, even if ``n`` is an int. ''' if reduce_fun is None: reduce_fun = lambda x: x res = [] if n is None: for n_id in range(len(self)): res.append(reduce_fun(self.isi_neuron(n_id))) elif isinstance(n, int): res.append(reduce_fun(self.isi_neuron(n))) else: for n_id in n: res.append(reduce_fun(self.isi_neuron(n_id))) return res
[docs] def isi_cv(self, n=None, win_len=None): ''' Coefficients of variation of inter-spike intervals of one or more neurons in the population. For the description of parameters and outputs and their semantics see also :meth:`~PopulationSpikes.ISI`. Parameters ---------- win_len : float, list of floats, or ``None`` Specify the maximal ISI value, i.e. use windowed coefficient of variation. If ``None``, use the whole range. ''' cvfunc = scipy.stats.variation if win_len is None: f = scipy.stats.variation elif (isinstance(win_len, collections.Sequence) or isinstance(win_len, np.ndarray)): f = lambda x: np.asarray([cvfunc(x[x <= wl]) for wl in win_len]) else: f = lambda x: cvfunc(x[x <= win_len]) return self.isi(n, f) ####################################################################### # Methods implementing collections.Sequence
def __getitem__(self, key): '''Retrieve a spike train for one neuron.''' if self._unpacked[key] is not None: return self._unpacked[key] ret = self._times[self._senders == key] self._unpacked[key] = ret return ret def __len__(self): return self._N
[docs]class TorusPopulationSpikes(PopulationSpikes): ''' Spikes of a population of neurons on a twisted torus. ''' def __init__(self, senders, times, sheet_size): self._sheetSize = sheet_size n = sheet_size[0] * sheet_size[1] PopulationSpikes.__init__(self, n, senders, times)
[docs] def get_x_size(self): '''Horizontal size of the torus.''' return self._sheetSize[0]
[docs] def get_y_size(self): '''Vertical size of the torus.''' return self._sheetSize[1]
[docs] def get_dimensions(self): '''Size of the torus.''' return self._sheetSize
nx = property(fget=get_x_size, doc='Horizontal size of the torus') ny = property(fget=get_y_size, doc='Vertical size of the torus') dimensions = property(fget=get_dimensions, doc='Dimensions of the torus (X, Y)') def avg_firing_rate(self, tstart, tend): rate = super(TorusPopulationSpikes, self).avg_firing_rate(tstart, tend) return np.reshape(rate, (self.ny, self.nx))
[docs] def population_vector(self, tstart, tend, dt, win_len): ''' Compute the population vector on a torus, from the spikes present. Note that this method will have a limited functionality on a twisted torus, but can be used if the population activity translates in the X dimension only. Parameters ---------- tstart : float Start time of analysis tend : float End time of analysis dt : float Time step of the (rectangular) windowing function win_len : float Length of the windowing function Returns ------- output : tuple A pair (r, t) in which r is a 2D vector of shape (int((tend-tstart)/dt)+1), 2), corresponding to the population vector for each time step of the windowing function, and t is a vector of times, of length the first dimension of r. ''' sheet_size_x = self.get_x_size() sheet_size_y = self.get_y_size() # n = sheet_size_x * sheet_size_y F, tsteps = PopulationSpikes.sliding_firing_rate(self, tstart, tend, dt, win_len) p = np.ndarray((len(tsteps), 2), dtype=complex) x, y = np.meshgrid(np.arange(sheet_size_x), np.arange(sheet_size_y)) x = np.exp(1j * (x - sheet_size_x / 2) / sheet_size_x * 2 * np.pi).ravel() y = np.exp(1j * (y - sheet_size_y / 2) / sheet_size_y * 2 * np.pi).ravel() for t_it in range(len(tsteps)): p[t_it, 0] = np.dot(F[:, t_it], x) p[t_it, 1] = np.dot(F[:, t_it], y) return (np.angle(p) / 2 / np.pi * self._sheetSize, tsteps)
[docs] def sliding_firing_rate(self, tstart, tend, dt, win_len): ''' Compute a sliding firing rate over the population of spikes, by taking a rectangular window of specified length. However, unlike the ancestor method (PopulationSpikes.sliding_firing_rate), return a 3D array, a succession of 2D population firing rates in time. Parameters ---------- tstart : float Start time of the firing rate analysis. tend : float End time of the analysis dt : float Firing rate window time step win_len : float Lengths of the windowing function (rectangle) Returns ------- output : a tuple A pair (F, t), specifying the vector of firing rates and corresponding times. F is a 3D array of the shape (nx, ny, Ntimes), in which nx/ny are the number of neurons in X and Y dimensions, respectively, and Ntimes is the number of time steps. 't' is a vector of times corresponding to the time windows taken. ''' spikes = (self._senders, self._times) F, Ft = sliding_firing_rate_tuple(spikes, self._N, tstart, tend, dt, win_len) nx = self.get_x_size() ny = self.get_y_size() return np.reshape(F, (ny, nx, len(Ft))), Ft
[docs]class TwistedTorusSpikes(TorusPopulationSpikes): ''' Spikes arranged on twisted torus. The torus is twisted in the X direction. ''' def __init__(self, senders, times, sheet_size): super(TwistedTorusSpikes, self).__init__(senders, times, sheet_size) def population_vector(self, tstart, tend, dt, win_len): msg = ('population_vector() has not been implemented yet for {}. ' + 'Note that this method is different for the regular torus ' + '(TorusPopulationSpikes).') raise NotImplementedError(msg.format(self.__class__.__name__))