"""Helper functions for the analysis methods."""
import itertools
from collections import defaultdict
from typing import List, Optional, Tuple
import numpy as np
import pandas as pd
import shapely
from scipy.spatial import Voronoi
from shapely import LineString, Polygon
from pedpy.data.geometry import Geometry
from pedpy.data.trajectory_data import TrajectoryData
[docs]def is_trajectory_valid(*, traj: TrajectoryData, geometry: Geometry) -> bool:
"""Checks if all trajectory data points lie within the given geometry.
Args:
traj (TrajectoryData): trajectory data
geometry (Geometry): geometry
Returns:
All points lie within geometry
"""
return get_invalid_trajectory(traj=traj, geometry=geometry).empty
[docs]def get_invalid_trajectory(
*, traj: TrajectoryData, geometry: Geometry
) -> pd.DataFrame:
"""Returns all trajectory data points outside the given geometry.
Args:
traj (TrajectoryData): trajectory data
geometry (Geometry): geometry
Returns:
DataFrame showing all data points outside the given geometry
"""
return traj.data.loc[
~shapely.within(traj.data.points, geometry.walkable_area)
]
[docs]def compute_frame_range_in_area(
*, traj_data: pd.DataFrame, measurement_line: LineString, width: float
) -> Tuple[pd.DataFrame, Polygon]:
"""Compute the frame ranges for each pedestrian inside the measurement area.
Note:
Only pedestrians passing the complete measurement area will be
considered. Meaning they need to cross measurement_line and the line
with the given offset in one go. If leaving the area between two lines
through the same line will be ignored.
As passing we define the frame the pedestrians enter the area and
then moves through the complete area without leaving it. Hence,
doing a closed analysis of the movement area with several measuring
ranges underestimates the actual movement time.
Args:
traj_data (pd.DataFrame): trajectory data
measurement_line (shapely.LineString): measurement line
width (float): distance to the second measurement line
Returns:
DataFrame containing the columns: 'ID', 'frame_start', 'frame_end' and
the created measurement area
"""
assert len(shapely.get_coordinates(measurement_line)) == 2, (
f"The measurement line needs to be a straight line but has more "
f"coordinates (expected 2, got "
f"{len(shapely.get_coordinates(measurement_line))})"
)
# Create the second with the given offset
second_line = shapely.offset_curve(measurement_line, distance=width)
# Reverse the order of the coordinates for the second line string to
# create a rectangular area between the lines
measurement_area = Polygon(
[
*measurement_line.coords,
*second_line.coords[::-1],
]
)
inside_range = _get_continuous_parts_in_area(traj_data, measurement_area)
crossing_frames_first = _compute_crossing_frames(
traj_data, measurement_line
)
crossing_frames_second = _compute_crossing_frames(traj_data, second_line)
start_crossed_1 = _check_crossing_in_frame_range(
inside_range, crossing_frames_first, "frame_start", "start_crossed_1"
)
end_crossed_1 = _check_crossing_in_frame_range(
inside_range, crossing_frames_first, "frame_end", "end_crossed_1"
)
start_crossed_2 = _check_crossing_in_frame_range(
inside_range, crossing_frames_second, "frame_start", "start_crossed_2"
)
end_crossed_2 = _check_crossing_in_frame_range(
inside_range, crossing_frames_second, "frame_end", "end_crossed_2"
)
frame_range_between_lines = (
start_crossed_1.merge(
start_crossed_2, how="outer", on=["ID", "frame_start", "frame_end"]
)
.merge(
end_crossed_1, how="outer", on=["ID", "frame_start", "frame_end"]
)
.merge(
end_crossed_2, how="outer", on=["ID", "frame_start", "frame_end"]
)
)
frame_range_between_lines = frame_range_between_lines[
(
frame_range_between_lines.start_crossed_1
& frame_range_between_lines.end_crossed_2
)
| (
frame_range_between_lines.start_crossed_2
& frame_range_between_lines.end_crossed_1
)
]
return (
frame_range_between_lines.loc[:, ("ID", "frame_start", "frame_end")],
measurement_area,
)
[docs]def compute_neighbors(individual_voronoi_data: pd.DataFrame) -> pd.DataFrame:
"""Compute the neighbors of each pedestrian based on the Voronoi cells.
Computation of the neighborhood of each pedestrian per frame. Every other
pedestrian is a neighbor if the Voronoi cells of both pedestrian intersect
and some point.
Args:
individual_voronoi_data (pd.DataFrame): individual voronoi data, needs
to contain a column 'individual voronoi', which holds
shapely.Polygon information
Returns:
DataFrame containing the columns: 'ID', 'frame' and 'neighbors', where
neighbors are a list of the neighbor's IDs
"""
neighbor_df = []
for frame, frame_data in individual_voronoi_data.groupby("frame"):
touching = shapely.dwithin(
np.array(frame_data["individual voronoi"])[:, np.newaxis],
np.array(frame_data["individual voronoi"])[np.newaxis, :],
1e-9, # Voronoi cells as close as 1 mm are touching
)
# the peds are not neighbors of themselves
for i in range(len(touching)):
touching[i, i] = False
# create matrix with ped IDs
ids = np.outer(
np.ones_like(frame_data["ID"].values),
frame_data["ID"].values.reshape(1, -1),
)
neighbors = np.where(touching, ids, np.nan)
neighbors_list = [
np.array(l)[~np.isnan(np.array(l))].astype(int).tolist()
for l in neighbors
]
frame_df = pd.DataFrame(
zip(
frame_data["ID"].values, itertools.repeat(frame), neighbors_list
),
columns=["ID", "frame", "neighbors"],
)
neighbor_df.append(frame_df)
return pd.concat(neighbor_df)
[docs]def compute_individual_voronoi_polygons(
*,
traj_data: pd.DataFrame,
geometry: Geometry,
cut_off: Optional[Tuple[float, int]] = None,
use_blind_points: bool = False,
) -> pd.DataFrame:
"""Compute the individual voronoi cells for each person and frame.
Args:
traj_data (pd.DataFrame): trajectory data
geometry (Geometry): bounding area, where pedestrian are supposed to be
cut_off (Tuple[float, int]): radius of max extended voronoi cell (in
m), number of linear segments in the approximation of circular
arcs, needs to be divisible by 4!
use_blind_points (bool): adds extra 4 points outside the geometry to
also compute voronoi cells when less than 4 peds are in the
geometry
Returns:
DataFrame containing the columns: 'ID', 'frame' and 'individual
voronoi'.
"""
dfs = []
bounds = geometry.walkable_area.bounds
x_diff = abs(bounds[2] - bounds[0])
y_diff = abs(bounds[3] - bounds[1])
clipping_diameter = 2 * max(x_diff, y_diff)
if use_blind_points:
blind_points = np.array(
[
[100 * (bounds[0] - x_diff), 100 * (bounds[1] - y_diff)],
[100 * (bounds[2] + x_diff), 100 * (bounds[1] - y_diff)],
[100 * (bounds[0] - x_diff), 100 * (bounds[3] + y_diff)],
[100 * (bounds[2] + x_diff), 100 * (bounds[3] + y_diff)],
]
)
for _, peds_in_frame in traj_data.groupby(traj_data.frame):
points = peds_in_frame[["X", "Y"]].values
if use_blind_points:
points = np.concatenate([points, blind_points])
# if blind points were added, len(points) will always be > 4
if len(points) < 4:
continue
vor = Voronoi(points)
voronoi_polygons = _clip_voronoi_polygons(vor, clipping_diameter)
if use_blind_points:
voronoi_polygons = voronoi_polygons[:-4]
voronoi_in_frame = peds_in_frame.loc[:, ("ID", "frame", "points")]
# Compute the intersecting area with the walkable area
voronoi_in_frame["individual voronoi"] = shapely.intersection(
voronoi_polygons, geometry.walkable_area
)
if cut_off is not None:
num_edges = cut_off[1]
radius = cut_off[0]
quad_edges = int(num_edges / 4)
voronoi_in_frame["individual voronoi"] = shapely.intersection(
voronoi_in_frame["individual voronoi"],
shapely.buffer(
peds_in_frame["points"], radius, quad_segs=quad_edges
),
)
# Only consider the parts of a multipolygon which contain the position
# of the pedestrian
voronoi_in_frame.loc[
shapely.get_type_id(voronoi_in_frame["individual voronoi"]) != 3,
"individual voronoi",
] = voronoi_in_frame.loc[
shapely.get_type_id(voronoi_in_frame["individual voronoi"]) != 3, :
].apply(
lambda x: shapely.get_parts(x["individual voronoi"])[
shapely.within(
x["points"], shapely.get_parts(x["individual voronoi"])
)
][0],
axis=1,
)
dfs.append(voronoi_in_frame)
return pd.concat(dfs)[["ID", "frame", "individual voronoi"]]
[docs]def compute_intersecting_polygons(
individual_voronoi_data: pd.DataFrame, measurement_area: Polygon
) -> pd.DataFrame:
"""Compute the intersection of the voronoi cells with the measurement area.
Args:
individual_voronoi_data (pd.DataFrame): individual voronoi data, needs
to contain a column 'individual voronoi' which holds
shapely.Polygon information
measurement_area (shapely.Polygon):
Returns:
DataFrame containing the columns: 'ID', 'frame' and
'intersection voronoi'.
"""
df_intersection = individual_voronoi_data[["ID", "frame"]].copy()
df_intersection["intersection voronoi"] = shapely.intersection(
individual_voronoi_data["individual voronoi"], measurement_area
)
return df_intersection
def _clip_voronoi_polygons(
voronoi: Voronoi, diameter: float
) -> List[shapely.Polygon]:
"""Generate Polygons from the Voronoi diagram.
Generate shapely.geometry.Polygon objects corresponding to the
regions of a scipy.spatial.Voronoi object, in the order of the
input points. The polygons for the infinite regions are large
enough that all points within a distance 'diameter' of a Voronoi
vertex are contained in one of the infinite polygons.
from: https://stackoverflow.com/a/52727406/9601068
"""
polygons = []
centroid = voronoi.points.mean(axis=0)
# Mapping from (input point index, Voronoi point index) to list of
# unit vectors in the directions of the infinite ridges starting
# at the Voronoi point and neighbouring the input point.
ridge_direction = defaultdict(list)
for (p, q), rv in zip(voronoi.ridge_points, voronoi.ridge_vertices):
u, v = sorted(rv)
if u == -1:
# Infinite ridge starting at ridge point with index v,
# equidistant from input points with indexes p and q.
t = voronoi.points[q] - voronoi.points[p] # tangent
n = np.array([-t[1], t[0]]) / np.linalg.norm(t) # normal
midpoint = voronoi.points[[p, q]].mean(axis=0)
direction = np.sign(np.dot(midpoint - centroid, n)) * n
ridge_direction[p, v].append(direction)
ridge_direction[q, v].append(direction)
for i, r in enumerate(voronoi.point_region):
region = voronoi.regions[r]
if -1 not in region:
# Finite region.
polygons.append(shapely.polygons(voronoi.vertices[region]))
continue
# Infinite region.
inf = region.index(-1) # Index of vertex at infinity.
j = region[(inf - 1) % len(region)] # Index of previous vertex.
k = region[(inf + 1) % len(region)] # Index of next vertex.
if j == k:
# Region has one Voronoi vertex with two ridges.
dir_j, dir_k = ridge_direction[i, j]
else:
# Region has two Voronoi vertices, each with one ridge.
(dir_j,) = ridge_direction[i, j]
(dir_k,) = ridge_direction[i, k]
# Length of ridges needed for the extra edge to lie at least
# 'diameter' away from all Voronoi vertices.
length = 2 * diameter / np.linalg.norm(dir_j + dir_k)
# Polygon consists of finite part plus an extra edge.
finite_part = voronoi.vertices[region[inf + 1 :] + region[:inf]]
extra_edge = np.array(
[
voronoi.vertices[j] + dir_j * length,
voronoi.vertices[k] + dir_k * length,
]
)
polygons.append(
shapely.polygons(np.concatenate((finite_part, extra_edge)))
)
return polygons
def _compute_individual_movement(
traj_data: pd.DataFrame, frame_step: int, bidirectional: bool = True
) -> pd.DataFrame:
"""Compute the individual movement in the time interval frame_step.
The movement is computed for the interval [frame - frame_step: frame +
frame_step], if one of the boundaries is not contained in the trajectory
frame will be used as boundary. Hence, the intervals become [frame,
frame + frame_step], or [frame - frame_step, frame] respectively.
Args:
traj_data (pd.DataFrame): trajectory data
frame_step (int): how many frames back and forwards are used to compute
the movement.
bidirectional (bool): if True also the prev. frame_step points will
be used to determine the movement
Returns:
DataFrame containing the columns: 'ID', 'frame', 'start', 'end',
'start_frame, and 'end_frame'. Where 'start'/'end' are the points
where the movement start/ends, and 'start_frame'/'end_frame' are the
corresponding frames.
"""
df_movement = traj_data.copy(deep=True)
df_movement["start"] = (
df_movement.groupby("ID")["points"]
.shift(frame_step)
.fillna(df_movement["points"])
)
df_movement["start_frame"] = (
df_movement.groupby("ID")["frame"]
.shift(frame_step)
.fillna(df_movement["frame"])
)
if bidirectional:
df_movement["end"] = (
df_movement.groupby("ID")["points"]
.shift(-frame_step)
.fillna(df_movement["points"])
)
df_movement["end_frame"] = (
df_movement.groupby("ID")["frame"]
.shift(-frame_step)
.fillna(df_movement["frame"])
)
else:
df_movement["end"] = df_movement["points"]
df_movement["end_frame"] = df_movement["frame"]
return df_movement[
["ID", "frame", "start", "end", "start_frame", "end_frame"]
]
def _compute_crossing_frames(
traj_data: pd.DataFrame, measurement_line: LineString
) -> pd.DataFrame:
"""Compute the frames at the pedestrians pass the measurement line.
As crossing we define a movement that moves across the measurement line.
When the movement ends on the line, the line is not crossed. When it
starts on the line, it counts as crossed.
Note:
Due to oscillations it may happen that a pedestrian crosses the
measurement line multiple time in a small time interval.
Args:
traj_data (pd.DataFrame): trajectory data
measurement_line (shapely.LineString):
Returns:
DataFrame containing the columns: 'ID', 'frame', where 'frame' are
the frames where the measurement line is crossed.
"""
# stack is used to get the coordinates in the correct order, as pygeos
# does not support creating linestring from points directly. The
# resulting array looks as follows:
# [[[x_0_start, y_0_start], [x_0_end, y_0_end]],
# [[x_1_start, y_1_start], [x_1_end, y_1_end]], ... ]
df_movement = _compute_individual_movement(traj_data, 1, False)
df_movement["movement"] = shapely.linestrings(
np.stack(
[
shapely.get_coordinates(df_movement["start"]),
shapely.get_coordinates(df_movement["end"]),
],
axis=1,
)
)
# crossing means, the current movement crosses the line and the end point
# of the movement is not on the line. The result is sorted by frame number
crossing_frames = df_movement.loc[
shapely.intersects(df_movement["movement"], measurement_line)
][["ID", "frame"]]
return crossing_frames
def _get_continuous_parts_in_area(
traj_data: pd.DataFrame, measurement_area: Polygon
) -> pd.DataFrame:
"""Compute the time-continuous parts of each pedestrian in the area.
Compute the time-continuous parts in which the pedestrians are inside
the given measurement area. As leaving the first frame outside the area is
considered.
Args:
traj_data (pd.DataFrame): trajectory data
measurement_area (shapely.Polygon): area which is considered
Returns:
DataFrame containing the columns: 'ID', 'frame_start', 'frame_end'
"""
inside = traj_data.loc[
shapely.within(traj_data.points, measurement_area), :
].copy()
inside.loc[:, "g"] = inside.groupby("ID", group_keys=False)["frame"].apply(
lambda x: x.diff().ge(2).cumsum()
)
inside_range = (
inside.groupby(["ID", "g"])
.agg(
frame_start=("frame", "first"),
frame_end=("frame", "last"),
)
.reset_index()[["ID", "frame_start", "frame_end"]]
)
inside_range["frame_end"] += 1
return inside_range
def _check_crossing_in_frame_range(
inside_range: pd.DataFrame,
crossing_frames: pd.DataFrame,
check_column: str,
column_name: str,
) -> pd.DataFrame:
"""Returns rows of inside_range which are also in crossing_frames.
Args:
inside_range (pd.DataFrame): DataFrame containing the columns 'ID'
and check_column
crossing_frames (pd.DataFrame): DataFrame containing the columns 'ID'
and 'frame'
check_column (str): name of the column in inside_range which represents
a frame value. Needs to be 'frame_start' or 'frame_end'
column_name (str): name of the result column
Returns:
DataFrame containing the columns 'ID', 'frame_start', 'frame_end',
column_name
"""
assert check_column in (
"frame_start",
"frame_end",
), "check_column needs to be 'frame_start' or 'frame_end'"
crossed = pd.merge(
inside_range,
crossing_frames,
left_on=["ID", check_column],
right_on=["ID", "frame"],
how="left",
indicator=column_name,
)[["ID", "frame_start", "frame_end", column_name]]
crossed[column_name] = crossed[column_name] == "both"
crossed = crossed[crossed[column_name]]
return crossed