import os
import time
import inspect
import pandas as pd
from collections import defaultdict
from biomechzoo.utils.engine import engine
from biomechzoo.utils.zload import zload
from biomechzoo.utils.zsave import zsave
from biomechzoo.utils.batchdisp import batchdisp
[docs]
def combine_quats_to_csv():
raise NotImplementedError("Use combine_imu_to_csv() instead")
[docs]
def combine_imu_to_csv_data(csv_files: list[str], prefixes: list[str], skip_rows:int = None ,out_folder: str = None, out_filename: str = None,
verbose: int = 1) -> str:
"""
Merge multiple Xsens DOT CSV files into a single combined CSV file.
Each sensor file is identified by a prefix. Quaternion, gyroscope, and
accelerometer columns from every file are renamed with the corresponding
prefix and concatenated side-by-side.
Parameters
----------
csv_files : list of str
Ordered list of paths to the individual sensor CSV files.
prefixes : list of str
Ordered list of sensor prefixes (e.g. ``['LF', 'RF', 'Trunk']``).
Must be the same length as ``csv_files``.
skip_rows : int or None, optional
Number of header rows to skip when reading each CSV. Default is ``None``.
out_folder : str or None, optional
Directory to write the combined CSV. Defaults to ``'combined_csvs'``
inside the current working directory.
out_filename : str or None, optional
Name of the output CSV file. Defaults to ``'combined_sensors.csv'``.
verbose : int, optional
If non-zero, prints the path of the saved file. Default is ``1``.
Returns
-------
str
Absolute path to the saved combined CSV file.
"""
if out_folder is None:
out_folder = "combined_csvs"
if out_filename is None:
out_filename = "combined_sensors.csv"
save_folder = os.path.join(os.getcwd(), out_folder)
os.makedirs(save_folder, exist_ok=True)
sensor_columns = {
"Quat": ["Quat_W", "Quat_X", "Quat_Y", "Quat_Z"],
"Gyr": ["Gyr_X", "Gyr_Y", "Gyr_Z"],
"Acc": ["Acc_X", "Acc_Y", "Acc_Z"],
}
first_df = pd.read_csv(csv_files[0], skiprows= skip_rows) # Taking the time column from the first file, assuming they all have = lengths
combined_dfs = [
first_df[["PacketCounter"]].rename(columns={"PacketCounter": "time"})
]
for csv_path, prefix in zip(csv_files, prefixes):
df = pd.read_csv(csv_path, skiprows=skip_rows)
for cols in sensor_columns.values():
renamed = {
c: f"{prefix}_{c}" for c in cols
}
combined_dfs.append(df[cols].rename(columns=renamed))
combined_df = pd.concat(combined_dfs, axis=1)
out_file = os.path.join(save_folder, out_filename)
combined_df.to_csv(out_file, index=False)
if verbose:
print(f"Saved combined CSV to: {out_file}")
return out_file
[docs]
def combine_imu_to_csv(prefixes: list[str],in_folder,out_folder=None,inplace=False,name_contains=None,subfolders=None,
verbose=1):
"""
Batch-combine Xsens DOT CSV files for multiple subjects.
Searches ``in_folder`` recursively for CSV files, groups them by subject
(parent folder name), matches files to ``prefixes`` by filename suffix,
then calls :func:`combine_imu_to_csv_data` for each subject.
Parameters
----------
prefixes : list of str
Sensor prefixes to match and order (e.g. ``['LF', 'RF', 'Trunk']``).
Files whose basename ends with a prefix will be selected.
in_folder : str
Root folder to search for CSV files.
out_folder : str or None, optional
Root output folder. Each subject gets its own subfolder. Defaults to
the subject's source folder.
inplace : bool, optional
Not currently used. Reserved for future in-place saving. Default is ``False``.
name_contains : str or list of str or None, optional
Only include files whose path contains this substring or all these
substrings. Default is ``None`` (no filter).
subfolders : str or list of str or None, optional
Only include files located inside these subfolder names. Default is
``None`` (no filter).
verbose : int, optional
Verbosity level. ``0`` = silent, ``1`` = summary, ``2`` = per-subject.
Default is ``1``.
Returns
-------
None
Results are saved to disk; nothing is returned.
"""
start_time = time.time()
files = list(engine(
in_folder,
extension='.csv',
name_contains=name_contains,
subfolders=subfolders
))
subjects = defaultdict(list)
for f in files:
subjects[os.path.basename(os.path.dirname(f))].append(f)
for subject, csv_files in subjects.items():
batchdisp(
f'combine_imu_to_csv for {subject}',
level=2,
verbose=verbose
)
prefix_map = {
os.path.splitext(os.path.basename(f))[0].split('_')[-1]: f
for f in csv_files
}
csv_sorted = []
prefix_sorted = []
for p in prefixes:
if p in prefix_map:
csv_sorted.append(prefix_map[p])
prefix_sorted.append(p)
if csv_sorted:
subject_out_folder = os.path.join(out_folder, subject) if out_folder else subject
combine_imu_to_csv_data(
csv_files=csv_sorted,
prefixes=prefix_sorted,
out_folder=subject_out_folder,
out_filename=f'{subject}_combined.csv',
verbose=verbose
)
batchdisp(
f'{inspect.currentframe().f_code.co_name} complete '
f'for {len(subjects)} file(s) in {time.time() - start_time:.2f}s',
level=1,verbose=verbose
)