Source code for biomechzoo.processing.partition_data
from typing import Dict, Any
from biomechzoo.utils.findfield import findfield
import warnings
import copy
import numpy as np
[docs]
def partition_data(
data: Dict[str, Any],
evt_start: str,
evt_end: str
) -> Dict[str, Any]:
"""
Partition data for all channels between two specified events.
This function extracts a subset of data between a start event and an end event,
trimming all channel data to this time range. Event indices within channels are
adjusted relative to the new partitioned start position.
Parameters
----------
data : dict of str to Any
Biomechanical data dictionary containing channels and events.
evt_start : str
Name of the starting event for partitioning.
evt_end : str
Name of the ending event for partitioning.
Returns
-------
dict of str to Any
Deep copy of input data with channels partitioned between the two events.
Raises
------
ValueError
If either start or end event is not found in the data.
Notes
-----
Event indices are automatically adjusted relative to the new partition start.
Events marked with index 999 (outlier markers) are preserved unchanged.
Channels that cause IndexError or ValueError during partitioning will be
skipped with a warning message.
"""
# extract event values
e1, _ = findfield(data, evt_start)
e2, _ = findfield(data, evt_end)
if e1 is None or e2 is None or len(e1) == 0 or len(e2) == 0:
raise ValueError(f"Event not found: evt_start='{evt_start}' returned {e1}, evt_end='{evt_end}' returned {e2}")
# convert to int and get first value
e1 = int(e1[0])
e2 = int(e2[0])
data_new = copy.deepcopy(data)
for ch_name, ch_data in sorted(data_new.items()):
if ch_name != 'zoosystem':
r = ch_data['line']
try:
if r.ndim == 1:
data_new[ch_name]['line'] = r[e1:e2]
else:
data_new[ch_name]['line'] = r[e1:e2, :]
except (IndexError, ValueError) as e:
# IndexError: if e1[0]:e2[0] goes beyond the available indices
# ValueError: less likely, but may arise with shape mismatches
warnings.warn(f"Skipping {ch_name} due to error: {e}")
# partition events
events = ch_data['event']
if len(events)>0:
for event_name, value in events.items():
original_frame = int(value[0])
if original_frame == 999:
continue # do not change outlier markers
else:
arr = np.array(data_new[ch_name]['event'][event_name], dtype=np.int32)
arr[0] = original_frame - e1
data_new[ch_name]['event'][event_name] = arr
return data_new