Source code for pedpy.methods.profile_calculator

"""Module containing functions to compute profiles."""
from typing import List, Tuple

import numpy as np
import numpy.typing as npt
import pandas as pd
import shapely
from aenum import Enum
from shapely import Polygon


[docs]class VelocityMethod(Enum): """Identifier for the method used to compute the mean velocity.""" _init_ = "value __doc__" ARITHMETIC = 0, "arithmetic mean velocity" VORONOI = 1, "voronoi velocity"
[docs]def compute_profiles( *, individual_voronoi_velocity_data: pd.DataFrame, walkable_area: Polygon, grid_size: float, velocity_method: VelocityMethod, ) -> Tuple[List[npt.NDArray[np.float64]], List[npt.NDArray[np.float64]]]: """Computes the density and velocity profiles. Note: As this is a quite compute heavy operation, it is suggested to reduce the geometry to the important areas. Args: individual_voronoi_velocity_data (pd.DataFrame): individual voronoi and velocity data, needs to contain a column 'individual voronoi' which holds shapely.Polygon information and a column 'speed' which holds a floating point value walkable_area (shapely.Polygon): geometry for which the profiles are computed grid_size (float): resolution of the grid used for computing the profiles velocity_method (VelocityMethod): velocity method used to compute the velocity Returns: (List of density profiles, List of velocity profiles) """ grid_cells, rows, cols = _get_grid_cells(walkable_area, grid_size) density_profiles = [] velocity_profiles = [] for _, frame_data in individual_voronoi_velocity_data.groupby("frame"): grid_intersections_area = shapely.area( shapely.intersection( np.array(grid_cells)[:, np.newaxis], np.array(frame_data["individual voronoi"])[np.newaxis, :], ) ) # Compute density density = ( np.sum( grid_intersections_area * (1 / shapely.area(frame_data["individual voronoi"].values)), axis=1, ) / grid_cells[0].area ) # Compute velocity if velocity_method == VelocityMethod.VORONOI: velocity = _compute_voronoi_velocity( frame_data, grid_intersections_area, grid_cells[0].area ) elif velocity_method == VelocityMethod.ARITHMETIC: velocity = _compute_arithmetic_velocity( frame_data, grid_intersections_area ) else: raise ValueError("velocity method not accepted") density_profiles.append(density.reshape(rows, cols)) velocity_profiles.append(velocity.reshape(rows, cols)) return density_profiles, velocity_profiles
def _compute_arithmetic_velocity( frame_data: npt.NDArray[np.float64], grid_intersections_area: npt.NDArray[np.float64], ) -> npt.NDArray[np.float64]: """Compute the arithmetic mean velocity per grid cell. Args: frame_data (npt.NDArray[np.float64]): all relevant data in a specific frame grid_intersections_area (npt.NDArray[np.float64]): intersection areas for each pedestrian with each grid cells Returns: Arithmetic mean velocity per grid cell """ cells_with_peds = np.where(grid_intersections_area > 1e-16, 1, 0) accumulated_velocity = np.sum( cells_with_peds * frame_data["speed"].values, axis=1 ) num_peds = np.count_nonzero(cells_with_peds, axis=1) velocity = np.where( num_peds > 0, accumulated_velocity / num_peds, 0, ) return velocity def _compute_voronoi_velocity( frame_data: npt.NDArray[np.float64], grid_intersections_area: npt.NDArray[np.float64], grid_area: float, ) -> npt.NDArray[np.float64]: """Compute the Voronoi velocity per grid cell. Args: frame_data (npt.NDArray[np.float64]): all relevant data in a specific frame grid_intersections_area (npt.NDArray[np.float64]): intersection areas for each pedestrian with each grid cells grid_area (float): area of one grid cell Returns: Voronoi velocity per grid cell """ velocity = ( np.sum(grid_intersections_area * frame_data["speed"].values, axis=1) ) / grid_area return velocity def _get_grid_cells( walkable_area: Polygon, grid_size: float ) -> Tuple[npt.NDArray[np.float64], int, int]: """Creates a list of square grid cells covering the geometry. Args: walkable_area (shapely.Polygon): geometry for which the profiles are computed. grid_size (float): resolution of the grid used for computing the profiles. Returns: (List of grid cells, number of grid rows, number of grid columns) """ bounds = walkable_area.bounds min_x = bounds[0] min_y = bounds[1] max_x = bounds[2] max_y = bounds[3] x = np.arange(min_x, max_x + grid_size, grid_size) y = np.arange(max_y, min_y - grid_size, -grid_size) grid_cells = [] for j in range(len(y) - 1): for i in range(len(x) - 1): grid_cell = shapely.box(x[i], y[j], x[i + 1], y[j + 1]) grid_cells.append(grid_cell) return np.array(grid_cells), len(y) - 1, len(x) - 1