'''
===============================================================
:mod:`gridcells.analysis.signal` - signal analysis
===============================================================
The can be e.g. filtering, slicing, correlation analysis, up/down-sampling, etc.
.. autosummary::
acorr
corr
ExtremumTypes
local_extrema
local_maxima
local_minima
LocalExtrema
'''
from __future__ import absolute_import, print_function, division
import os
from enum import IntEnum
import numpy as np
__all__ = [
'acorr',
'corr',
'local_extrema',
'local_minima',
'local_maxima',
'ExtremumTypes',
'LocalExtrema',
]
# Do not import when in RDT environment
on_rtd = os.environ.get('READTHEDOCS', None) == 'True'
if not on_rtd:
from . import _signal
[docs]class ExtremumTypes(IntEnum):
'''Specifies types of extrema.'''
UNDEFINED = 0
'''Undefined.'''
MIN = 1
'''Local minimum.'''
MAX = 2
'''Local maximum.'''
[docs]def corr(a, b, mode='onesided', lag_start=None, lag_end=None):
'''
An enhanced correlation function of two real signals, based on a custom C++
code.
This function uses dot product instead of FFT to compute a correlation
function with range restricted lags.
Thus, for a long-range of lags and big arrays it can be slower than the
numpy.correlate (which uses fft-based convolution). However, for arrays in
which the number of lags << max(a.size, b.size) the computation time might
be much shorter than using convolution to calculate the full correlation
function and taking a slice of it.
Parameters:
a, b : ndarray
One dimensional numpy arrays (in the current implementation, they will
be converted to dtype=double if not already of that type.
mode : str, optional
A string indicating the size of the output:
``onesided`` : range of lags is [0, b.size - 1]
``twosided`` : range of lags is [-(a.size - 1), b.size - 1]
``range`` : range of lags is [-lag_start, lag_end]
lag_start, lag_end : int, optional
Initial and final lag value. Only used when mode == 'range'
output : numpy.ndarray with shape (1, ) and dtype.float
A 1D array of size depending on mode
.. note::
This function always returns a numpy array with dtype=float.
.. seealso:: :py:func:`acorr`
'''
a = np.require(a, np.float, 'F')
b = np.require(b, np.float, 'F')
sz1 = a.size
sz2 = b.size
if sz1 == 0 or sz2 == 0:
raise TypeError("Both input arrays must have non-zero size!")
if mode == 'onesided':
return _signal.correlation_function(a, b, 0, sz2 - 1)
elif mode == 'twosided':
return _signal.correlation_function(a, b, -(sz1 - 1), sz2 - 1)
elif mode == 'range':
lag_start = int(lag_start)
lag_end = int(lag_end)
if lag_start <= -sz1 or lag_end >= sz2:
raise ValueError("Lag range must be in the range "
"[{0}, {1}]".format(-(sz1 - 1), sz2 - 1))
return _signal.correlation_function(a, b, lag_start, lag_end)
else:
raise ValueError("mode must be one of 'onesided', 'twosided', or "
"'range'")
[docs]def acorr(sig, max_lag=None, norm=False, mode='onesided'):
'''
Compute an autocorrelation function of a real signal.
Parameters
----------
sig : numpy.ndarray
The signal, 1D vector, to compute an autocorrelation of.
max_lag : int, optional
Maximal number of lags. If mode == 'onesided', the range of lags will be
[0, max_lag], i.e. the size of the output will be (max_lag+1). If
mode == 'twosided', the lags will be in the range [-max_lag, max_lag],
and so the size of the output will be 2*max_lag + 1.
If max_lag is None, then max_lag will be set to len(sig)-1
norm : bool, optional
Whether to normalize the auto correlation result, so that res(0) = 1
mode : string, optional
``onesided`` or ``twosided``. See description of max_lag
output : numpy.ndarray
A 1D array, size depends on ``max_lag`` and ``mode`` parameters.
Notes
-----
If the normalisation constant is zero (i.e. the input array is zero), this
function will return a zero array.
'''
if max_lag is None:
max_lag = len(sig) - 1
if mode == 'onesided':
c = corr(sig, sig, mode='range', lag_start=0, lag_end=max_lag)
elif mode == 'twosided':
c = corr(sig, sig, mode='range', lag_start=-max_lag, lag_end=max_lag)
else:
raise ValueError("mode can be either 'onesided' or 'twosided'!")
if norm:
maximum = np.max(np.abs(c))
if maximum != 0.:
c /= maximum
return c
[docs]def local_extrema(sig):
'''Find all local extrema using the derivative approach.
Parameters
----------
sig : numpy.ndarray
A 1D numpy array
Returns
-------
extrema : :class:`~LocalExtrema`
An object containing the local extrema of the signal ``sig``.
See also
--------
local_minima : Finds local minima.
local_maxima : Finds local maxima.
Notes
-----
This method is not suitable to find local extrema of functions where the
extremum is flat, i.e. as in square pulses.
'''
sz = len(sig)
szDiff = sz - 1
der = np.diff(sig)
der0 = (der[0:szDiff - 1] * der[1:szDiff]) < 0.
ext_idx = np.nonzero(der0)[0]
dder = np.diff(der)[ext_idx]
ext_idx += 1 # Correction for a peak position
ext_t = np.ndarray((dder.size, ), dtype=int)
ext_t[dder < 0] = ExtremumTypes.MAX
ext_t[dder > 0] = ExtremumTypes.MIN
return LocalExtrema(ext_idx, ext_t)
[docs]def local_maxima(sig):
'''Find all local maxima using the derivative approach
Parameters
----------
sig : numpy.ndarray
A 1D numpy array
Returns
-------
maxima : np.ndarray
An array of indices into ``sig`` of the local maxima.
See also
--------
local_extrema : Finds local extrema.
local_minima: Finds local minima.
'''
return local_extrema(sig).get_type(ExtremumTypes.MAX)
[docs]def local_minima(sig):
'''Find all local minima using the derivative approach
Parameters
----------
sig : numpy.ndarray
A 1D numpy array
Returns
-------
maxima : np.ndarray
An array of indices into ``sig`` of the local minima.
See also
--------
local_extrema : Finds local extrema.
local_maxima: Finds local maxima.
'''
return local_extrema(sig).get_type(ExtremumTypes.MIN)
[docs]class LocalExtrema(object):
'''A class representing local extrema for a particular object.
This is only a helper class. Users should not instantiate this class
directly
.. note::
For now only 1D extrema are supported.
Parameters
----------
extrema : 1D numpy array
Positions of the extrema in a signal. The user must track the source.
types : 1D numpy array
Types of the extrema held in this object. Contains values from
:class:`~ExtremumTypes`.
'''
def __init__(self, extrema, types):
self._extrema = np.asanyarray(extrema)
self._types = np.asanyarray(types)
if len(extrema) != len(types):
raise IndexError('Number of extrema and number of their types '
'must match.')
def __len__(self):
return len(self._types)
[docs] def get_type(self, extremum_type):
'''Get all the local extrema that are of type ``extremum_type``.
Parameters
----------
extremum_type : :class:`~ExtremumTypes`
The type of the extremum to retrieve.
Returns
-------
extrema : iterable
An iterable that will contain the extrema with the specified type.
If no extremum of the requested type is present, returns an empty
array.
'''
return self._extrema[self._types == extremum_type]