Source code for routee.transit.gtfs_processing

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from nrel.routee.compass.io.generate_dataset import HookParameters

import datetime
import logging
import multiprocessing as mp
from functools import partial

from pathlib import Path
import geopandas as gpd
import pandas as pd
from geopy.distance import great_circle
from gtfsblocks import Feed
import importlib.resources
import tomlkit

logger = logging.getLogger("gtfs_processing")

KM_TO_METERS = 1000
FT_TO_METERS = 0.3048
FT_TO_MILES = 0.000189394


[docs] def write_gtfs_stops(params: "HookParameters", feed: Feed) -> None: """ Hook to write GTFS stop-to-edge mapping after dataset generation. Args: params: Parameters from generate_compass_dataset feed: GTFS feed object """ # 1. Join stops to edges spatially edges_gdf = params.edges stops_gdf = gpd.GeoDataFrame( feed.stops, geometry=gpd.points_from_xy(feed.stops.stop_lon, feed.stops.stop_lat), crs="EPSG:4326", ) # Find nearest edge for each stop stops_projected = stops_gdf.to_crs("EPSG:3857") edges_projected = edges_gdf.to_crs("EPSG:3857") matched_stops = stops_projected.sjoin_nearest( edges_projected[["edge_id", "geometry"]], distance_col="dist" ) # 2. Map stops to trip_id using stop_times # stop_times maps trip_id -> stop_id # matched_stops maps stop_id -> edge_id stop_to_edge = matched_stops.set_index("stop_id")["edge_id"].to_dict() stop_edge_records = [] for _, row in feed.stop_times.iterrows(): stop_id = row["stop_id"] trip_id = row["trip_id"] if stop_id in stop_to_edge: stop_edge_records.append( {"trip_id": trip_id, "edge_id": stop_to_edge[stop_id]} ) stop_edge_df = pd.DataFrame(stop_edge_records).drop_duplicates() # 3. Write to CSV output_path = params.output_directory / "gtfs_stops.csv" stop_edge_df.to_csv(output_path, index=False) logger.info(f"Wrote {len(stop_edge_df)} stop-edge mappings to {output_path}")
[docs] def copy_transit_config( params: "HookParameters", vehicle_models: list[str] | None = None ) -> None: """ Hook to copy the transit_energy.toml from package resources to the output directory. Args: params: Parameters from generate_compass_dataset vehicle_models: Optional list of vehicle models to include. If None, all are included. """ with importlib.resources.path( "routee.transit.resources", "transit_energy.toml" ) as config_path: with open(config_path, "r") as f: config = tomlkit.load(f) # Filter vehicle_models if requested if vehicle_models is not None: vehicle_set = set(vehicle_models) search = config.get("search", {}) traversal = search.get("traversal", {}) models = traversal.get("models", []) for model in models: if model.get("type") == "transit_energy" and "vehicle_input_files" in model: model["vehicle_input_files"] = [ p for p in model["vehicle_input_files"] if Path(p).stem in vehicle_set ] output_path = params.output_directory / "transit_energy.toml" with open(output_path, "w") as f: tomlkit.dump(config, f) logger.info(f"Copied transit_energy.toml to {output_path}")
[docs] def upsample_shape(shape_df: pd.DataFrame) -> pd.DataFrame: """Upsample a GTFS shape DataFrame to generate a roughly 1 Hz GPS trace. Interpolates latitude, longitude, and distance traveled, assuming a constant speed. The function performs the following steps: * Calculates the distance between consecutive shape points using great-circle distance * Computes the cumulative distance traveled along the shape * Assigns timestamps based on constant speed (30 km/h) * Resamples and interpolates the shape to 1-second intervals * Returns DataFrame with interpolated coordinates, timestamps, and distances Args: shape_df: DataFrame containing GTFS shape points with columns 'shape_pt_lat', 'shape_pt_lon', and 'shape_id'. Returns: Upsampled DataFrame with columns 'shape_pt_lat', 'shape_pt_lon', 'shape_dist_traveled', 'timestamp', and 'shape_id', sampled at 1 Hz. """ # Shift latitude and longitude to get previous point shape_df["prev_latitude"] = shape_df["shape_pt_lat"].shift() shape_df["prev_longitude"] = shape_df["shape_pt_lon"].shift() # Calculate the distance between consecutive points using great_circle # TODO: move away from apply() for speed shape_df["distance_km"] = shape_df.apply( lambda row: ( great_circle( (row["prev_latitude"], row["prev_longitude"]), # Previous point (row["shape_pt_lat"], row["shape_pt_lon"]), # Current point ).kilometers if pd.notnull(row["prev_latitude"]) else 0 ), axis=1, ) # Calculate total distance total_distance_km = shape_df["distance_km"].sum() # Use calculated total distance instead of shape_dist_traveled shape_df["shape_dist_traveled"] = shape_df["distance_km"].cumsum() # Speed is assumed to be 30 km/h, which is about 10 (8.33) m per second/node shape_df["segment_duration_delta"] = ( shape_df["shape_dist_traveled"] / shape_df["shape_dist_traveled"].max() * datetime.timedelta(seconds=round(total_distance_km / 30 * 3600)) ) shape_df["segment_duration_delta"] = shape_df["segment_duration_delta"].apply( lambda x: datetime.timedelta(seconds=round(x.total_seconds())) ) # Define an arbitrary date to convert from timedelta to datetime date_tmp = pd.Timestamp(datetime.datetime(2023, 9, 3)) shape_df["timestamp"] = date_tmp + shape_df["segment_duration_delta"] # Upsample to 1s shape_id_tmp = shape_df.shape_id.iloc[0] shape_df = ( shape_df[["shape_pt_lat", "shape_pt_lon", "timestamp", "shape_dist_traveled"]] .drop_duplicates(subset=["timestamp"]) .set_index("timestamp") .resample("1s") .interpolate(method="linear") ) # Now we have the 1 Hz gps trace for each trip with timestamp shape_df = shape_df.reset_index(drop=True) shape_df["shape_id"] = shape_id_tmp return shape_df
[docs] def add_stop_flags_to_shape( trip_shape_df: pd.DataFrame, stop_times_ext: pd.DataFrame ) -> gpd.GeoDataFrame: """Attach stop information to a DataFrame of shape points for a specific trip. Given a DataFrame of shape points (`trip_shape_df`) and a DataFrame of stop times (`stop_times_ext`) joined with stop locations, this function identifies which shape points correspond to stops for the trip and annotates them. Parameters ---------- trip_shape_df : pd.DataFrame DataFrame containing shape points for a single trip. Must include columns 'trip_id', 'shape_pt_lon', 'shape_pt_lat', and 'coordinate_id'. stop_times_ext : pd.DataFrame DataFrame containing stop times with extended information. Must include columns 'trip_id', 'stop_lon', and 'stop_lat'. Returns ------- pd.DataFrame The input DataFrame with an additional column 'with_stop', where 1 indicates the shape point is nearest to a stop, and 0 otherwise. Notes ----- - Uses spatial join to find the nearest shape point for each stop. """ # Confirm we're only getting a single trip id if trip_shape_df["trip_id"].nunique() > 1: raise ValueError( f"trip_shape_df should only contain data for a single trip, but the " f"input includes {trip_shape_df.trip_id.nunique()} different trip IDs." ) # Filter down stop_times to only the given trip from trip_id = trip_shape_df["trip_id"].iloc[0] trip_stop_times = stop_times_ext[stop_times_ext.trip_id == trip_id] # Convert DFs to GeoDataFrame for spatial join trip_gdf = gpd.GeoDataFrame( trip_shape_df, geometry=gpd.points_from_xy( trip_shape_df.shape_pt_lon, trip_shape_df.shape_pt_lat ), ) stop_times_gdf = gpd.GeoDataFrame( trip_stop_times, geometry=gpd.points_from_xy(trip_stop_times.stop_lon, trip_stop_times.stop_lat), ) # TODO: handle downstream effects of this warning, or change to an error if (~trip_gdf.geometry.is_valid).any(): logger.warning(f"Invalid geometry detected for trip {trip_id}") stop_times_gdf = stop_times_gdf.sjoin_nearest( trip_gdf[["geometry", "coordinate_id"]] ) trip_gdf["with_stop"] = 0 trip_gdf.loc[ trip_gdf.coordinate_id.isin(stop_times_gdf.coordinate_id.to_list()), "with_stop" ] = 1 df_tmp = trip_gdf.drop(["geometry"], axis=1) return df_tmp
[docs] def estimate_trip_timestamps(trip_shape_df: pd.DataFrame) -> pd.DataFrame: """Estimate timestamps for each shape point of a trip based on distance traveled. Args: trip_shape_df (pd.DataFrame): DataFrame containing trip shape data with columns: - 'shape_dist_traveled': Cumulative distance traveled along the shape. - 'start_time': Origin time (datetime) of the trip. - 'end_time': Destination time (datetime) of the trip. Returns: pd.DataFrame: Modified DataFrame with additional columns: - 'segment_duration_delta': Estimated duration for each segment as timedelta. - 'timestamp': Estimated timestamp for each segment. - 'Datetime_nearest5': Timestamp rounded to the nearest 5 minutes. - 'hour': Hour component of the rounded timestamp. - 'minute': Minute component of the rounded timestamp. """ start_times = pd.to_timedelta(trip_shape_df["start_time"]) end_times = pd.to_timedelta(trip_shape_df["end_time"]) trip_shape_df["segment_duration_delta"] = ( trip_shape_df["shape_dist_traveled"] / (trip_shape_df["shape_dist_traveled"].max() + 0.0001) * (end_times - start_times) ) trip_shape_df["segment_duration_delta"] = trip_shape_df[ "segment_duration_delta" ].apply(lambda x: datetime.timedelta(seconds=round(x.total_seconds()))) trip_shape_df["timestamp"] = start_times + trip_shape_df["segment_duration_delta"] ## get hour and minute of gps timestamp trip_shape_df["Datetime_nearest5"] = trip_shape_df["timestamp"].dt.round("5min") trip_shape_df["hour"] = trip_shape_df["Datetime_nearest5"].dt.components["hours"] trip_shape_df["minute"] = trip_shape_df["Datetime_nearest5"].dt.components[ "minutes" ] return trip_shape_df
[docs] def extend_trip_traces( trips_df: pd.DataFrame, matched_shapes_df: pd.DataFrame, feed: Feed, add_stop_flag: bool = False, n_processes: int | None = mp.cpu_count(), ) -> pd.DataFrame: """Extend trip shapes with stop details and estimated timestamps from GTFS. This function processes GTFS trip and shape data to: * Summarize stop times for each trip (first/last stop and times) * Merge stop time summaries into the trips DataFrame * Attach stop coordinates to stop times * Merge trip and shape data to create ordered trip traces * Optionally, attach stop indicators to shape trace points * Estimate timestamps for each trace point based on scheduled trip duration and distance Args: trips_df: DataFrame containing trip information, including 'trip_id' and 'shape_id'. matched_shapes_df: DataFrame with shape points matched to trips, including 'shape_id' and 'shape_dist_traveled'. feed: GTFS feed object containing 'stop_times' and 'stops' DataFrames. add_stop_flag: If True, attaches stop indicators to shape trace points. Defaults to False. n_processes: Number of processes to run in parallel using multiprocessing. Defaults to mp.cpu_count(). Returns: A single concatenated DataFrame with extended trace information for all trips, including estimated timestamps. """ # Add stop coordinates to stop_times stop_times_ext = feed.stop_times[["trip_id", "stop_sequence", "stop_id"]].merge( feed.stops[["stop_id", "stop_lat", "stop_lon"]], on="stop_id" ) # Calculate approximate timestamps for each trip trip_shape = pd.merge( trips_df[["trip_id", "shape_id", "start_time", "end_time"]], matched_shapes_df, how="left", on="shape_id", ) trip_shape = trip_shape.sort_values( by=["trip_id", "shape_dist_traveled"] ).reset_index(drop=True) trip_shapes_list = [item for _, item in trip_shape.groupby("trip_id")] # Attach stops to shape traces. Note that this just adds a dummy variable column # indicating whether or not a stop is located at a given point on the shape. if add_stop_flag: attach_stop_partial = partial( add_stop_flags_to_shape, stop_times_ext=stop_times_ext ) with mp.Pool(n_processes) as pool: trip_shapes_list = pool.map(attach_stop_partial, trip_shapes_list) # Attach timestamps to each trip. These are simply based on the scheduled trip # duration and shape_dist_traveled, assuming a constant speed for the entire trip. # TODO: improve timestamp estimates with mp.Pool(n_processes) as pool: trips_with_timestamps_list = pool.map( estimate_trip_timestamps, trip_shapes_list ) logger.info("Finished attaching timestamps") return pd.concat(trips_with_timestamps_list)