Source code for pyotelem.dsp

[docs]def normalized(a, axis=-1, order=2): """Return normalized vector for arbitrary axis Args ---- a: ndarray (n,3) Tri-axial vector data axis: int Axis index to overwhich to normalize order: int Order of nomalization to calculate Notes ----- This function was adapted from the following StackOverflow answer: http://stackoverflow.com/a/21032099/943773 """ import numpy l2 = numpy.atleast_1d(numpy.linalg.norm(a, order, axis)) l2[l2 == 0] = 1 return a / numpy.expand_dims(l2, axis)
[docs]def findzc(x, thresh, t_max=None): """ Find cues to each zero-crossing in vector x. To be accepted as a zero-crossing, the signal must pass from below -thresh to above thresh, or vice versa, in no more than t_max samples. Args ---- thresh: (float) magnitude threshold for detecting a zero-crossing. t_max: (int) maximum duration in samples between threshold crossings. Returns ------- zc: ndarray Array containing the start **zc_s**, finish **zc_f** and direction **S** of zero crossings where: * zc_s: the cue of the first threshold-crossing in samples * zc_f: the cue of the second threshold-crossing in samples * S: the sign of each zero-crossing (1 = positive-going, -1 = negative-going). Notes ----- This routine is a reimplementation of Mark Johnson's Dtag toolbox method and tested against the Matlab version to be sure it has the same result. """ import numpy # positive threshold: p (over) n (under) pt_p = x > thresh pt_n = ~pt_p # negative threshold: p (over) n (under) nt_n = x < -thresh nt_p = ~nt_n # Over positive threshold +thresh # neg to pos pt_np = (pt_p[:-1] & pt_n[1:]).nonzero()[0] # pos to neg pt_pn = (pt_n[:-1] & pt_p[1:]).nonzero()[0] + 1 # Over positive threshold +thresh # neg to pos nt_np = (nt_p[:-1] & nt_n[1:]).nonzero()[0] + 1 # pos to neg nt_pn = (nt_n[:-1] & nt_p[1:]).nonzero()[0] # Concat indices, order sequentially ind_all = numpy.hstack((pt_np, nt_np, pt_pn, nt_pn)) ind_all.sort() # Omit rows where just touching but not crossing crossing_mask = ~(numpy.diff(numpy.sign(x[ind_all])) == 0) # Append a False to make the same length as ind_all crossing_mask = numpy.hstack((crossing_mask, False)) # Get 1st and 2nd crossings ind_1stx = ind_all[crossing_mask] ind_2ndx = ind_all[numpy.where(crossing_mask)[0] + 1] # TODO odd option to replace with NaNs rather than delete? # Delete indices that do not have a second crossing del_ind = numpy.where(ind_2ndx > len(x) - 1)[0] for i in del_ind: ind_1stx = numpy.delete(ind_1stx, i) ind_2ndx = numpy.delete(ind_1stx, i) # Get direction/sign of crossing signs = numpy.sign(x[ind_1stx]) * -1 # Add column of direction and transpose zc = numpy.vstack((ind_1stx, ind_2ndx, signs)).T # TODO not mentioned in docstring, remove? # x_norm? = ((x[:, 1] * zc[:, 0]) - (x[:, 0] * zc[:, 1])) / x[:, 1] - x[:, 0] if t_max: zc = zc[zc[:, 1] - zc[:, 0] <= t_max, :] return zc.astype(int)
[docs]def butter_filter(cutoff, fs, order=5, btype="low"): """Create a digital butter fileter with cutoff frequency in Hz Args ---- cutoff: float Cutoff frequency where filter should separate signals fs: float sampling frequency btype: str Type of filter type to create. 'low' creates a low-frequency filter and 'high' creates a high-frequency filter (Default 'low). Returns ------- b: ndarray Numerator polynomials of the IIR butter filter a: ndarray Denominator polynomials of the IIR butter filter Notes ----- This function was adapted from the following StackOverflow answer: http://stackoverflow.com/a/25192640/943773 """ import scipy.signal nyq = 0.5 * fs normal_cutoff = cutoff / nyq b, a = scipy.signal.butter(order, normal_cutoff, btype=btype, analog=False) return b, a
[docs]def butter_apply(b, a, data): """Apply filter with filtfilt to allign filtereted data with input The filter is applied once forward and once backward to give it linear phase, using Gustafsson's method to give the same length as the original signal. Args ---- b: ndarray Numerator polynomials of the IIR butter filter a: ndarray Denominator polynomials of the IIR butter filter Returns ------- x: ndarray Filtered data with linear phase Notes ----- This function was adapted from the following StackOverflow answer: http://stackoverflow.com/a/25192640/943773 """ import scipy.signal return scipy.signal.filtfilt(b, a, data, method="gust")
[docs]def calc_PSD_welch(x, fs, nperseg): """Caclulate power spectral density with Welch's method Args ---- x: ndarray sample array fs: float sampling frequency (1/dt) Returns ------- f_welch: ndarray Discrete frequencies S_xx_welch: ndarray Estimated PSD at discrete frequencies `f_welch` P_welch: ndarray Signal power (integrated PSD) df_welch: ndarray Delta between discreet frequencies `f_welch` """ import numpy import scipy.signal # Estimate PSD `S_xx_welch` at discrete frequencies `f_welch` f_welch, S_xx_welch = scipy.signal.welch(x, fs=fs, nperseg=nperseg) # Integrate PSD over spectral bandwidth # to obtain signal power `P_welch` df_welch = f_welch[1] - f_welch[0] P_welch = numpy.sum(S_xx_welch) * df_welch return f_welch, S_xx_welch, P_welch, df_welch
[docs]def simple_peakfinder(x, y, delta): """Detect local maxima and minima in a vector A point is considered a maximum peak if it has the maximal value, and was preceded (to the left) by a value lower by `delta`. Args ---- y: ndarray array of values to find local maxima and minima in delta: float minimum change in `y` since previous peak to be considered new peak. It should be positive and it's absolute value taken to ensure this. x: ndarray corresponding x-axis positions to y array Returns ------- max_ind: ndarray Indices of local maxima min_ind: ndarray Indices of local minima Example ------- max_ind, min_ind = simple_peakfinder(x, y, delta) # get values of `y` at local maxima local_max = y[max_ind] Notes ----- Matlab Author: Eli Billauer http://billauer.co.il/peakdet.html Python translation: Chris Muktar https://gist.github.com/endolith/250860 Python cleanup: Ryan J. Dillon """ import numpy y = numpy.asarray(y) max_ind = list() min_ind = list() local_min = numpy.inf local_max = -numpy.inf local_min_pos = numpy.nan local_max_pos = numpy.nan lookformax = True for i in range(len(y)): if y[i] > local_max: local_max = y[i] local_max_pos = x[i] if y[i] < local_min: local_min = y[i] local_min_pos = x[i] if lookformax: if y[i] < local_max - abs(delta): max_ind.append(local_max_pos) local_min = y[i] local_min_pos = x[i] lookformax = False else: if y[i] > local_min + abs(delta): min_ind.append(local_min_pos) local_max = y[i] local_max_pos = x[i] lookformax = True return numpy.array(max_ind), numpy.array(min_ind)