Source code for pyotelem.glides

[docs]def get_stroke_freq(Ax, Az, fs_a, nperseg, peak_thresh, stroke_ratio=None): """Determine stroke frequency to use as a cutoff for filtering Args ---- Ax: numpy.ndarray, shape (n,) x-axis accelermeter data (longitudinal) Ay: numpy.ndarray, shape (n,) x-axis accelermeter data (lateral) Az: numpy.ndarray, shape (n,) z-axis accelermeter data (dorso-ventral) fs_a: float sampling frequency (i.e. number of samples per second) nperseg: int length of each segment (i.e. number of samples per frq. band in PSD calculation. Default to 512 (scipy.signal.welch() default is 256) peak_thresh: float PSD power level threshold. Only peaks over this threshold are returned. Returns ------- cutoff_frq: float cutoff frequency of signal (Hz) to be used for low/high-pass filtering stroke_frq: float frequency of dominant wavelength in signal stroke_ratio: float Notes ----- During all descents and ascents phases where mainly steady swimming occurs. When calculated for the whole dive it may be difficult to differentiate the peak at which stroking rate occurs as there are movements than only steady swimming Here the power spectra is calculated of the longitudinal and dorso-ventral accelerometer signals during descents and ascents to determine the dominant stroke frequency for each animal in each phase This numpy samples per f segment 512 and a sampling rate of fs_a. Output: S is the amount of power in each particular frequency (f) """ import numpy from . import dsp from . import utils from .plots import plotdsp # Axes to be used for determining `stroke_frq` stroke_axes = [(0, "x", "dorsa-ventral", Ax), (2, "z", "lateral", Az)] # Lists for appending values from each axis cutoff_frqs = list() stroke_frqs = list() stroke_ratios = list() # Iterate over axes in `stroke_axes` list appending output to above lists for i, i_alph, name, data in stroke_axes: frqs, S, _, _ = dsp.calc_PSD_welch(data, fs_a, nperseg) # Find index positions of local maxima and minima in PSD delta = S.max() / 1000 max_ind, min_ind = dsp.simple_peakfinder(range(len(S)), S, delta) max0 = max_ind[0] # TODO hack fix, improve later try: min_ind[0] except Exception: stroke_ratio = 0.4 stroke_frq = frqs[max0] # Prompt user for `cutoff_frq` value after inspecting PSD plot title = "PSD - {} axis (n={}), {}".format(i_alph, i, name) # Plot power spectrum against frequency distribution plotdsp.plot_welch_peaks(frqs, S, max_ind, title=title) # Get user input of cutoff frequency identified off plots cutoff_frq = utils.recursive_input("cutoff frequency", float) # Append values for axis to list cutoff_frqs.append(cutoff_frq) stroke_frqs.append(stroke_frq) stroke_ratios.append(stroke_ratio) # Average values for all axes cutoff_frq = float(numpy.mean(cutoff_frqs)) stroke_frq = float(numpy.mean(stroke_frqs)) # Handle exception of manual selection when `stroke_ratio == None` # TODO with fix try: stroke_ratio = float(numpy.mean(stroke_ratios)) except Exception: stroke_ratio = None return cutoff_frq, stroke_frq, stroke_ratio
[docs]def get_stroke_glide_indices(A_g_hf, fs_a, J, t_max): """Get stroke and glide indices from high-pass accelerometer data Args ---- A_g_hf: 1-D ndarray Animal frame triaxial accelerometer matrix at sampling rate fs_a. fs_a: int Number of accelerometer samples per second J: float Frequency threshold for detecting a fluke stroke in m/s^2. If J is not given, fluke strokes will not be located but the rotations signal (pry) will be computed. t_max: int Maximum duration allowable for a fluke stroke in seconds. A fluke stroke is counted whenever there is a cyclic variation in the pitch deviation with peak-to-peak magnitude greater than +/-J and consistent with a fluke stroke duration of less than t_max seconds, e.g., for Mesoplodon choose t_max=4. Returns ------- GL: 1-D ndarray Matrix containing the start time (first column) and end time (2nd column) of any glides (i.e., no zero crossings in t_max or more seconds). Times are in seconds. Note ---- If no J or t_max is given, J=[], or t_max=[], GL returned as None """ import numpy from . import dsp # Check if input array is 1-D if A_g_hf.ndim > 1: raise IndexError( "A_g_hf multidimensional: Glide index determination " "requires 1-D acceleration array as input" ) # Convert t_max to number of samples n_max = t_max * fs_a # Find zero-crossing start/stops in pry(:,n), rotations around n axis. zc = dsp.findzc(A_g_hf, J, n_max / 2) # find glides - any interval between zeros crossings greater than `t_max` ind = numpy.where(zc[1:, 0] - zc[0:-1, 1] > n_max)[0] gl_ind = numpy.vstack([zc[ind, 0] - 1, zc[ind + 1, 1] + 1]).T # Compute mean index position of glide, Only include sections with jerk < J gl_mean_idx = numpy.round(numpy.mean(gl_ind, 1)).astype(int) gl_ind = numpy.round(gl_ind).astype(int) for i in range(len(gl_mean_idx)): col = range(gl_mean_idx[i], gl_ind[i, 0], -1) test = numpy.where(numpy.isnan(A_g_hf[col]))[0] if test.size != 0: gl_mean_idx[i] = numpy.nan gl_ind[i, 0] = numpy.nan gl_ind[i, 1] = numpy.nan else: over_J1 = numpy.where(abs(A_g_hf[col]) >= J)[0][0] gl_ind[i, 0] = gl_mean_idx[i] - over_J1 + 1 col = range(gl_mean_idx[i], gl_ind[i, 1]) over_J2 = numpy.where(abs(A_g_hf[col]) >= J)[0][0] gl_ind[i, 1] = gl_mean_idx[i] + over_J2 - 1 GL = gl_ind GL = GL[numpy.where(GL[:, 1] - GL[:, 0] > n_max / 2)[0], :] return GL
[docs]def split_glides(n_samples, dur, fs_a, GL, min_dur=None): """Get start/stop indices of each `dur` length sub-glide for glides in GL Args ---- dur: int Desired duration of glides GL: ndarray, (n, 2) Matrix containing the start time (first column) and end time (2nd column) of any glides.Times are in seconds. min_dur: int, default (bool False) Minimum number of seconds for sub-glide. Default value is `False`, which makes `min_dur` equal to `dur`, ignoring sub-glides smaller than `dur`. Attributes ---------- gl_ind_diff: ndarray, (n,3) GL, with additional column of difference between the first two columns Returns ------- SGL: ndarray, (n, 2) Matrix containing the start time (first column) and end time (2nd column) of the generated sub-glides. All glides must have duration equal to the given dur value.Times are in seconds. """ import numpy # Convert `dur` in seconds to duration in number of samples `ndur` ndur = dur * fs_a # If minimum duration not passed, set to `min_dur` to skip slices < `dur` if not min_dur: min_ndur = dur * fs_a else: min_ndur = min_dur * fs_a # `GL` plus column for total duration of glide, seconds gl_ind_diff = numpy.vstack((GL.T, GL[:, 1] - GL[:, 0])).T # Split all glides in `GL` SGL_started = False for i in range(len(GL)): gl_ndur = gl_ind_diff[i, 2] # Split into sub glides if longer than duration if abs(gl_ndur) > ndur: # Make list of index lengths to start of each sub-glide n_sgl = int(gl_ndur // ndur) sgl_ndur = numpy.ones(n_sgl) * ndur sgl_start = numpy.arange(n_sgl) * (ndur + 1) # Add remainder as a sub-glide, skips if `min_ndur` not passed if gl_ndur % ndur > min_ndur: last_ndur = numpy.floor(gl_ndur % ndur) sgl_ndur = numpy.hstack([sgl_ndur, last_ndur]) last_start = (len(sgl_start) * ndur) + ndur sgl_start = numpy.hstack([sgl_start, last_start]) # Get start and end index positions for each sub-glide for k in range(len(sgl_start)): # starting at original glide start... # sgl_start_ind: add index increments of ndur+1 for next start idx next_start_ind = (gl_ind_diff[i, 0] + sgl_start[k]).astype(int) # end_glide: add `ndur` to that to get ending idx next_end_ind = (next_start_ind + sgl_ndur[k]).astype(int) # If first iteration, set equal to first set of indices if SGL_started is False: sgl_start_ind = next_start_ind sgl_end_ind = next_end_ind SGL_started = True else: # Concatenate 1D arrays together, shape (n,) sgl_start_ind = numpy.hstack((sgl_start_ind, next_start_ind)) sgl_end_ind = numpy.hstack((sgl_end_ind, next_end_ind)) # Stack and transpose indices into shape (n, 2) SGL = numpy.vstack((sgl_start_ind, sgl_end_ind)).T # Filter out sub-glides that fall outside of sensor data indices SGL = SGL[(SGL[:, 0] >= 0) & (SGL[:, 1] < n_samples)] # check that all sub-glides have a duration of `ndur` seconds sgl_ndur = SGL[:, 1] - SGL[:, 0] # If sub-glide `min_ndur` set, make sure all above `min_ndur`, below `ndur` if min_dur: assert numpy.all((sgl_ndur <= ndur) & (sgl_ndur >= min_ndur)) # Else make sure all sample number durations equal to `ndur` else: assert numpy.all(sgl_ndur == ndur) # Create `data_sgl_mask` data_sgl_mask = numpy.zeros(n_samples, dtype=bool) for start, stop in SGL.astype(int): data_sgl_mask[start:stop] = True return SGL, data_sgl_mask
[docs]def calc_glide_des_asc( depths, pitch_lf, roll_lf, heading_lf, swim_speed, dives, SGL, pitch_lf_deg, temperature, Dsw, ): """Calculate glide ascent and descent summary data Args ---- SGL: numpy.ndarray, shape (n,2) start and end index positions for sub-glides Returns ------- sgls: pandas.DataFrame Sub-glide summary information defined by `SGL` start/stop indices *Columns*: * dive_phase * dive_id * dive_min_depth * dive_max_depth * dive_duration * start_idx * stop_idx * duration * mean_depth * total_depth_change * abs_depth_change * mean_speed * total_speed_change * mean_pitch * mean_sin_pitch * SD_pitch * mean_temp * mean_swdensity * mean_a * R2_speed_vs_time * SE_speed_vs_time * mean_pitch_circ * pitch_concentration * mean_roll_circ * roll_concentration * mean_heading_circ * heading_concentration """ import scipy.stats import numpy import pandas keys = [ "dive_phase", "dive_id", "dive_min_depth", "dive_max_depth", "dive_duration", "start_idx", "stop_idx", "duration", "mean_depth", "total_depth_change", "abs_depth_change", "mean_speed", "total_speed_change", "mean_pitch", "mean_sin_pitch", "SD_pitch", "mean_temp", "mean_swdensity", "mean_a", "R2_speed_vs_time", "SE_speed_vs_time", "mean_pitch_circ", "pitch_concentration", "mean_roll_circ", "roll_concentration", "mean_heading_circ", "heading_concentration", ] # Create empty pandas dataframe for summary values sgls = pandas.DataFrame(index=range(len(SGL)), columns=keys) for i in range(len(SGL)): start_idx = SGL[i, 0] stop_idx = SGL[i, 1] # sub-glide start point in seconds sgls["start_idx"][i] = start_idx # sub-glide end point in seconds sgls["stop_idx"][i] = stop_idx # sub-glide duration sgls["duration"][i] = SGL[i, 1] - SGL[i, 0] # mean depth(m)during sub-glide sgls["mean_depth"][i] = numpy.mean(depths[start_idx:stop_idx]) # total depth(m)change during sub-glide sgl_depth_diffs = numpy.diff(depths[start_idx:stop_idx]) sgls["total_depth_change"][i] = numpy.sum(abs(sgl_depth_diffs)) # depth(m)change from start to end of sub-glide sgls["abs_depth_change"][i] = abs(depths[start_idx] - depths[stop_idx]) # mean swim speed during the sub-glide, only given if pitch>30 degrees sgls["mean_speed"][i] = numpy.nanmean(swim_speed[start_idx:stop_idx]) # total speed change during sub-glide sgl_speed_diffs = numpy.diff(swim_speed[start_idx:stop_idx]) sgls["total_speed_change"][i] = numpy.nansum(abs(sgl_speed_diffs)) # mean pitch during the sub-glide sgls["mean_pitch"][i] = numpy.mean(pitch_lf_deg[start_idx:stop_idx]) # mean sin pitch during the sub-glide sin_pitch = numpy.sin(numpy.mean(pitch_lf_deg[start_idx:stop_idx])) sgls["mean_sin_pitch"][i] = sin_pitch # SD of pitch during the sub-glide # TODO just use original radian array, not deg # make sure "original" is radians ;) sd_pitch = numpy.std(pitch_lf_deg[start_idx:stop_idx]) * 180 / numpy.pi sgls["SD_pitch"][i] = sd_pitch # mean temperature during the sub-glide sgls["mean_temp"][i] = numpy.mean(temperature[start_idx:stop_idx]) # mean seawater density (kg/m^3) during the sub-glide sgls["mean_swdensity"][i] = numpy.mean(Dsw[start_idx:stop_idx]) try: # Perform linear regression on sub-glide data subset xpoly = numpy.arange(start_idx, stop_idx).astype(int) ypoly = swim_speed[start_idx:stop_idx] # slope, intercept, r-value, p-value, standard error m, c, r, p, std_err = scipy.stats.linregress(xpoly, ypoly) # mean acceleration during the sub-glide sgls["mean_a"][i] = m # R2-value for the regression swim speed vs. time during the # sub-glide sgls["R2_speed_vs_time"][i] = r ** 2 # SE of the gradient for the regression swim speed vs. time during # the sub-glide sgls["SE_speed_vs_time"][i] = std_err except Exception: # mean acceleration during the sub-glide sgls["mean_a"][i] = numpy.nan # R2-value for the regression swim speed vs. time during the # sub-glide sgls["R2_speed_vs_time"][i] = numpy.nan # SE of the gradient for the regression swim speed vs. time during # the sub-glide sgls["SE_speed_vs_time"][i] = numpy.nan # TODO this does not take into account the positive depths, flipping # descents to ascents sgl_signs = numpy.sign(sgl_depth_diffs) if all(sgl_signs == -1): sgls["dive_phase"][i] = "descent" elif all(sgl_signs == 1): sgls["dive_phase"][i] = "ascent" elif any(sgl_signs == -1) & any(sgl_signs == 1): sgls["dive_phase"][i] = "mixed" # Get indices of dive in which glide occurred dive_ind = numpy.where( (dives["start_idx"] < start_idx) & (dives["stop_idx"] > stop_idx) )[0] # TODO Hack way of pulling the information from first column of dives if dive_ind.size == 0: dive_inf = pandas.DataFrame.copy(dives.iloc[0]) dive_inf[:] = numpy.nan else: dive_inf = dives.iloc[dive_ind].iloc[0] # Dive number in which the sub-glide recorded sgls["dive_id"][i] = dive_inf["dive_id"] # Minimum dive depth (m) of the dive sgls["dive_min_depth"][i] = dive_inf["depth_min"] # Maximum dive depth (m) of the dive sgls["dive_max_depth"][i] = dive_inf["depth_max"] # Dive duration (s) of the dive sgls["dive_duration"][i] = dive_inf["dive_dur"] # Mean pitch(deg) calculated using circular statistics pitch_circmean = scipy.stats.circmean(pitch_lf[start_idx:stop_idx]) sgls["mean_pitch_circ"][i] = pitch_circmean # Measure of concentration (r) of pitch during the sub-glide (i.e. 0 # for random direction, 1 for unidirectional) pitch_circvar = scipy.stats.circvar(pitch_lf[start_idx:stop_idx]) sgls["pitch_concentration"][i] = 1 - pitch_circvar # Mean roll (deg) calculated using circular statistics roll_circmean = scipy.stats.circmean(roll_lf[start_idx:stop_idx]) sgls["mean_roll_circ"][i] = roll_circmean # Measure of concentration (r) of roll during the sub-glide roll_circvar = scipy.stats.circvar(roll_lf[start_idx:stop_idx]) sgls["roll_concentration"][i] = 1 - roll_circvar # Mean heading (deg) calculated using circular statistics heading_circmean = scipy.stats.circmean(heading_lf[start_idx:stop_idx]) sgls["mean_heading_circ"][i] = heading_circmean # Measure of concentration (r) of heading during the sub-glide heading_circvar = scipy.stats.circvar(heading_lf[start_idx:stop_idx]) sgls["heading_concentration"][i] = 1 - heading_circvar return sgls
[docs]def calc_glide_ratios(dives, des, asc, glide_mask, depths, pitch_lf): """Calculate summary information on glides during dive ascent/descents Args ---- dives: (n,10) Numpy record array with summary information of dives in sensor data des: ndarray Boolean mask of descents over sensor data asc: ndarray Boolean mask of descents over sensor data glid_mask: ndarray Boolean mask of glides over sensor data depths: ndarray Depth values at each sensor sampling pitch_lf: ndarray Pitch in radians over the low frequency signals of acceleration Returns ------- gl_ratio: pandas.DataFrame Dataframe of summary information of glides during dive descent/ascents *Columns*: * des_duration * des_gl_duration * des_gl_ratio * des_mean_pitch * des_rate * asc_duration * asc_gl_duration * asc_gl_ratio * asc_mean_pitch * asc_rate """ import numpy import pandas # Create empty `pandas.DataFrame` for storing data, init with `nan`s cols = [ "des_duration", "des_gl_duration", "des_gl_ratio", "des_mean_pitch", "des_rate", "asc_duration", "asc_gl_duration", "asc_gl_ratio", "asc_mean_pitch", "asc_rate", ] gl_ratio = pandas.DataFrame(index=range(len(dives)), columns=cols) # For each dive with start/stop indices in `dives` for i in range(len(dives)): # Get indices for descent and ascent phase of dive `i` start_idx = dives["start_idx"][i] stop_idx = dives["stop_idx"][i] des_ind = numpy.where(des[start_idx:stop_idx])[0] asc_ind = numpy.where(asc[start_idx:stop_idx])[0] # DESCENT # total duration of the descent phase (s) gl_ratio["des_duration"][i] = len(des_ind) # total glide duration during the descent phase (s) des_glides = numpy.where(glide_mask[des_ind])[0] gl_ratio["des_gl_duration"][i] = len(des_glides) if len(des_ind) == 0: gl_ratio["des_gl_ratio"][i] = 0 gl_ratio["des_rate"][i] = 0 else: # glide ratio during the descent phase des_gl_ratio = gl_ratio["des_gl_duration"][i] / len(des_ind) gl_ratio["des_gl_ratio"][i] = des_gl_ratio # descent rate (m/sample) max_depth_des = depths[des_ind].max() gl_ratio["des_rate"][i] = max_depth_des / len(des_ind) # mean pitch during the descent phase(degrees) des_mean_pitch = numpy.mean(numpy.rad2deg(pitch_lf[des_ind])) gl_ratio["des_mean_pitch"][i] = des_mean_pitch # ASCENT # total duration of the ascent phase (s) gl_ratio["asc_duration"][i] = len(asc_ind) # total glide duration during the ascent phase (s) asc_glides = numpy.where(glide_mask[asc_ind])[0] gl_ratio["asc_gl_duration"][i] = len(asc_glides) if len(asc_ind) == 0: gl_ratio["asc_gl_ratio"][i] = 0 gl_ratio["asc_rate"][i] = 0 else: # glide ratio during the ascent phase asc_gl_ratio = gl_ratio["asc_gl_duration"][i] / len(asc_ind) gl_ratio["asc_gl_ratio"][i] = asc_gl_ratio # ascent rate (m/sample) max_depth_asc = depths[asc_ind].max() gl_ratio["asc_rate"][i] = max_depth_asc / len(asc_ind) # mean pitch during the ascent phase(degrees) asc_mean_pitch = numpy.mean(numpy.rad2deg(pitch_lf[asc_ind])) gl_ratio["asc_mean_pitch"][i] = asc_mean_pitch return gl_ratio