Source code for biomechzoo.processing.addevent_data

import numpy as np
import copy
import warnings
from typing import Dict, List, Union, Optional, Any
from scipy.signal import find_peaks
from biomechzoo.utils.peak_sign import peak_sign
from biomechzoo.biomech_ops.movement_onset import movement_onset, movement_offset
from biomechzoo.imu.step_detection import imu_mcgrath

[docs] def addevent_data( data: Dict[str, Any], channels: Union[str, List[str]], ename: str, etype: str, fsamp: Optional[float] = None, constant: Optional[float] = None ) -> Dict[str, Any]: """ Add events to specified channels in a biomechanical data structure. This function creates events based on various event types (e.g., max, min, peaks, movement onset/offset, force plate thresholds) and adds them to the specified channels in the data dictionary. Parameters ---------- data : dict of str to Any Biomechanical data dictionary containing channels and zoosystem metadata. channels : str or list of str Channel name(s) to add events to. Pass ``'all'`` to apply to all channels. ename : str Name of the event to add. Use empty string to clear all events. etype : str Event type, one of: 'max', 'min', 'absmax', 'first', 'last', 'rom', 'first peak', 'movement_onset', 'movement_offset', 'mcgrath_fs', 'mcgrath_fo', 'fs_fp', 'fo_fp'. fsamp : float, optional Sampling frequency in Hz. If None, extracted from zoosystem metadata. constant : float, optional Threshold or parameter value for certain event types (e.g., peak height threshold, force plate threshold). Returns ------- dict of str to Any Deep copy of input data with events added to specified channels. Raises ------ KeyError If specified channel does not exist in data. ValueError If event type is unknown or if required peaks are not found. Warning If video and analog sampling rates differ for force plate events. Notes ----- Event format in data structure: [index, value, 0] where index is the frame number, value is the signal value at that frame, and 0 is a placeholder. For event types that find multiple events (mcgrath_fs, mcgrath_fo), events are numbered sequentially (e.g., 'ename_1', 'ename_2', etc.). """ data_new = copy.deepcopy(data) if isinstance(channels, str): channels = [channels] if len(channels) == 1 and channels[0].lower() == 'all': channels = [key for key in data if key != 'zoosystem'] for channel in channels: if ename == '': data[channel]['event'] = {} continue if channel not in data: raise KeyError('Channel {} does not exist'.format(channel)) # todo extract sampling frequency (needed for some events) # if channel in data['zoosystem']['Video']['Channels']: # fsamp = data['zoosystem']['Video']['Freq'] # elif channel in data['zoosystem']['Analog']['Channels']: # fsamp = data['zoosystem']['Analog']['Freq'] # else: # raise ValueError('Cannot extract sampling frequency associated with data') if fsamp is None: fsamp = data['zoosystem']['Video']['Freq'] yd = data_new[channel]['line'] # 1D array etype = etype.lower() if etype == 'absmax': exd = int(np.argmax(np.abs(yd))) eyd = float(yd[exd]) elif etype == 'first': exd = 0 eyd = float(yd[exd]) elif etype == 'last': exd = len(yd) - 1 eyd = float(yd[exd]) elif etype == 'max': exd = int(np.argmax(yd)) eyd = float(yd[exd]) elif etype == 'min': exd = int(np.argmin(yd)) eyd = float(yd[exd]) elif etype == 'rom': eyd = float(np.max(yd) - np.min(yd)) exd = 0 # dummy index (like MATLAB version) elif etype == 'first peak': # special event for gait and running exd = find_first_peak(yd, constant) eyd = float(yd[exd]) elif etype == 'movement_onset': exd = movement_onset(yd, fsamp, constant) eyd = yd[exd] elif etype == 'movement_offset': exd = movement_offset(yd, fsamp, constant) eyd = yd[exd] elif etype == 'mcgrath_fs': if constant is None: constant = 95 IC, _ = imu_mcgrath(yd, fsamp, min_stance_t=constant) exd = [int(ic) for ic in IC] ey = [] for i, ex in enumerate(exd): ey.append(yd[ex]) eyd = [float(y) for y in ey] elif etype == 'mcgrath_fo': if constant is None: constant = 95 _, TC = imu_mcgrath(yd, fsamp, min_stance_t=constant) exd = [int(tc) for tc in TC] ey = [] for i, ex in enumerate(exd): ey.append(yd[ex]) eyd = [float(y) for y in ey] elif etype in ['fs_fp', 'fo_fp']: # --- Handle constant --- if constant is None: print('Warning: Force plate threshold not set, defaulting to 0.') constant = 0.0 # --- Check sampling rates --- AVR = data['zoosystem']['AVR'] if AVR != 1: warnings.warn('Video and Analog channels must be at the same sampling rate or events will be incorrect.') # --- Handle units --- units = data['zoosystem']['Units']['Forces'] if units == 'N/kg': m = data['zoosystem']['Anthro']['Bodymass'] else: m = 1.0 # --- Extract force signal --- if '_' not in channel: yd = data_new[channel]['line'][:, 2] # looking for GRF Z else: yd = data_new[channel]['line'] # --- Determine peak sign --- peak = peak_sign(yd) # user-defined function # --- Find threshold crossing --- threshold_signal = peak * yd * m if 'fs' in etype: exd_array = np.where(threshold_signal > constant)[0] exd = exd_array[0] - 1 # MATLAB indexing correction eyd = yd[exd] else: # 'FO' type exd_array = np.where(threshold_signal > constant)[0] exd = exd_array[-1] + 1 eyd = yd[exd] else: raise ValueError(f'Unknown event type: {etype}') # Add event to the channel's event dict if isinstance(exd, list): for i, ex in enumerate(exd): name = ename + '_' + str(i + 1) data_new[channel]['event'][name] = [int(ex), eyd[i], 0] else: data_new[channel]['event'][ename] = [exd, eyd, 0] return data_new
[docs] def find_first_peak(yd: np.ndarray, constant: Optional[float]) -> int: """ Extract the index of the first peak from a multi-peak signal. Parameters ---------- yd : ndarray 1-D signal data array to search for peaks. constant : float or None Minimum height threshold for peak detection. Returns ------- int Index of the first peak in the signal. Raises ------ ValueError If fewer than 2 peaks are found. """ # Find peaks above threshold peaks, _ = find_peaks(yd, height=constant) if len(peaks) == 0: raise ValueError('No peaks found') elif len(peaks) == 1: raise ValueError('Only 1 peak found') else: # Take the first valid peak exd = peaks[0] return exd