Source code for pedpy.io.trajectory_loader

"""Load trajectories to the internal trajectory data format."""

import pathlib
from typing import Any, Optional, Tuple

import pandas as pd

from pedpy.data.trajectory_data import TrajectoryData, TrajectoryUnit


[docs]def load_trajectory( *, trajectory_file: pathlib.Path, default_frame_rate: Optional[float] = None, default_unit: Optional[TrajectoryUnit] = None, ) -> TrajectoryData: """Loads the trajectory file in the internal trajectory data format. Loads the relevant data: trajectory data, frame rate, and type of trajectory from the given trajectory file. If the file does not contain some data, defaults can be submitted. Args: trajectory_file (pathlib.Path): file containing the trajectory default_frame_rate (float): frame rate of the file, None if frame rate from file is used default_unit (TrajectoryUnit): unit in which the coordinates are stored in the file, None if unit should be parsed from the file Returns: Tuple containing: trajectory data, frame rate, and type of trajectory. """ if not trajectory_file.exists(): raise IOError(f"{trajectory_file} does not exist.") if not trajectory_file.is_file(): raise IOError(f"{trajectory_file} is not a file.") traj_frame_rate, traj_unit = _load_trajectory_meta_data( trajectory_file=trajectory_file, default_frame_rate=default_frame_rate, default_unit=default_unit, ) traj_dataframe = _load_trajectory_data( trajectory_file=trajectory_file, unit=traj_unit ) return TrajectoryData( data=traj_dataframe, frame_rate=traj_frame_rate, file=trajectory_file )
def _load_trajectory_data( *, trajectory_file: pathlib.Path, unit: TrajectoryUnit ) -> pd.DataFrame: """Parse the trajectory file for trajectory data. Args: trajectory_file (pathlib.Path): file containing the trajectory default_unit (TrajectoryUnit): unit in which the coordinates are stored in the file, None if unit should be parsed from the file Returns: The trajectory data as data frame, the coordinates are converted to meter (m). """ try: data = pd.read_csv( trajectory_file, sep=r"\s+", comment="#", header=None, names=["ID", "frame", "X", "Y", "Z"], usecols=[0, 1, 2, 3, 4], dtype={ "ID": "int64", "frame": "int64", "X": "float64", "Y": "float64", "Z": "float64", }, ) if data.empty: raise ValueError( "The given trajectory file seem to be empty. It should " "contain at least 5 columns: ID, frame, X, Y, Z. The values " "should be separated by any white space. Comment line may " "start with a '#' and will be ignored. " f"Please check your trajectory file: {trajectory_file}." ) if unit == TrajectoryUnit.CENTIMETER: data["X"] = data["X"].div(100) data["Y"] = data["Y"].div(100) data["Z"] = data["Z"].div(100) return data except pd.errors.ParserError: raise ValueError( "The given trajectory file could not be parsed. It should " "contain at least 5 columns: ID, frame, X, Y, Z. The values " "should be separated by any white space. Comment line may start " "with a '#' and will be ignored. " f"Please check your trajectory file: {trajectory_file}." ) def _load_trajectory_meta_data( *, trajectory_file: pathlib.Path, default_frame_rate: Optional[float], default_unit: Optional[TrajectoryUnit], ) -> Tuple[float, TrajectoryUnit]: parsed_frame_rate: Any = None parsed_unit: Any = None with open(trajectory_file, "r") as file_content: for line in file_content: if not line.startswith("#"): break if "framerate" in line: for substring in line.split(): try: if parsed_frame_rate is None: parsed_frame_rate = float(substring) except ValueError: continue if "x/cm" in line.lower() or "in cm" in line.lower(): parsed_unit = TrajectoryUnit.CENTIMETER if "x/m" in line.lower() or "in m" in line.lower(): parsed_unit = TrajectoryUnit.METER frame_rate = parsed_frame_rate if parsed_frame_rate is None and default_frame_rate is not None: if default_frame_rate <= 0: raise ValueError( f"Default frame needs to be positive but is " f"{default_frame_rate}" ) frame_rate = default_frame_rate if parsed_frame_rate is None and default_frame_rate is None: raise ValueError( "Frame rate is needed, but none could be found in the trajectory " "file. " f"Please check your trajectory file: {trajectory_file} or provide " "a default frame rate." ) if parsed_frame_rate is not None and default_frame_rate is None: if parsed_frame_rate <= 0: raise ValueError( "Frame rate needs to be a positive value, but is " f"{parsed_frame_rate}. " "Please check your trajectory file: {trajectory_file}." ) if parsed_frame_rate is not None and default_frame_rate is not None: if parsed_frame_rate != default_frame_rate: raise ValueError( "The given default frame rate seems to differ from the frame " "rate given in the trajectory file: " f"{default_frame_rate} != {parsed_frame_rate}" ) unit = parsed_unit if parsed_unit is None and default_unit is not None: unit = default_unit if parsed_unit is None and default_unit is None: raise ValueError( "Unit is needed, but none could be found in the trajectory file. " f"Please check your trajectory file: {trajectory_file} or provide " "a default unit." ) if parsed_unit is not None and default_unit is not None: if parsed_unit != default_unit: raise ValueError( "The given default unit seems to differ from the unit given " "in the trajectory file: " f"{default_unit} != {parsed_unit}" ) return (frame_rate, unit)