Source code for nrel.routee.compass.compass_app

from __future__ import annotations

import logging
from tempfile import TemporaryDirectory

from pathlib import Path
from typing import Any, List, Optional, Union, Callable, TYPE_CHECKING, cast
from nrel.routee.compass.routee_compass_py import (
    CompassAppWrapper,
)
from nrel.routee.compass.io.generate_dataset import (
    GeneratePipelinePhase,
    generate_compass_dataset,
)

if TYPE_CHECKING:
    from shapely.geometry import Polygon, MultiPolygon
    from nrel.routee.compass.utils.type_alias import (
        Config,
        OSMNXQuery,
        CompassQuery,
        Result,
        Results,
    )
    import networkx as nx

import tomlkit
import json


log = logging.getLogger(__name__)


[docs] class CompassApp: """ The CompassApp holds everything needed to run a route query. """ _app: CompassAppWrapper def __init__(self, app: CompassAppWrapper, config: Config): self._app = app self._config = config
[docs] @classmethod def get_constructor(cls) -> CompassAppWrapper: """ Return the underlying constructor for the application. This allows a child class to inherit the CompassApp python class and implement its own rust based app constructor, while still using the original python methods. """ return CompassAppWrapper
[docs] @classmethod def from_config_file( cls, config_file: Union[str, Path], parallelism: Optional[int] = None, ) -> CompassApp: """ Build a CompassApp from a config file Args: config_file: Path to the config file parallelism: optional number of threads to use for parallel query execution. Overrides the value in the config file. Returns: app: A CompassApp object Example: >>> from nrel.routee.compass import CompassApp >>> app = CompassApp.from_config_file("config.toml") """ config_path = Path(config_file) if not config_path.is_file(): raise ValueError(f"Config file {str(config_path)} does not exist") with open(config_path) as f: toml_config = tomlkit.load(f) return cls.from_dict(toml_config, config_path, parallelism=parallelism)
@classmethod def _get_default_config_file(cls, phases: List[GeneratePipelinePhase]) -> str: """ Internal helper to get the default config file name based on the phases. """ if GeneratePipelinePhase.POWERTRAIN in phases: return "osm_default_energy.toml" else: return "osm_default_speed.toml"
[docs] @classmethod def from_dict( cls, config: tomlkit.TOMLDocument, working_dir: Optional[Path] = None, parallelism: Optional[int] = None, ) -> CompassApp: """ Build a CompassApp from a configuration object Args: config: Configuration dictionary working_dir: optional path to working directory parallelism: optional number of threads to use for parallel query execution. Overrides the value in the config. Returns: app: a CompassApp object Example: >>> from nrel.routee.compass import CompassApp >>> conf = { "parallelism": 2 } >>> app = CompassApp.from_config(conf) """ if parallelism is not None: config["parallelism"] = parallelism path_str = str(working_dir.absolute()) if working_dir is not None else "" toml_string = tomlkit.dumps(config) app = cls.get_constructor()._from_config_toml_string(toml_string, path_str) return cls(app, config)
[docs] @classmethod def from_graph( cls, graph: nx.MultiDiGraph, config_file: Optional[str] = None, cache_dir: Optional[Union[str, Path]] = None, hwy_speeds: Optional[dict[str, Any]] = None, fallback: Optional[float] = None, agg: Optional[Callable[[Any], Any]] = None, phases: List[GeneratePipelinePhase] = GeneratePipelinePhase.default(), raster_resolution_arc_seconds: Union[str, int] = 1, vehicle_models: Optional[List[str]] = None, parallelism: Optional[int] = None, overwrite: bool = False, ) -> CompassApp: """ Build a CompassApp from a networkx graph. Args: graph: the networkx graph to build the CompassApp from. This is assumed to be the direct output of an osmnx download. config_file: optional name of the config file to load from the generated dataset. If not set, it will default to 'osm_default_energy.toml' if the powertrain phase is included, otherwise 'osm_default_speed.toml'. cache_dir: optional path to save necessary files to build the CompassApp. If not set, TemporaryDirectory will be used instead. Defaults to None. hwy_speeds: OSM highway types and values = typical speeds (km per hour) to assign to edges of that highway type for any edges missing speed data. Any edges with highway type not in `hwy_speeds` will be assigned the mean preexisting speed value of all edges of that highway type. Defaults to None. fallback: Default speed value (km per hour) to assign to edges whose highway type did not appear in `hwy_speeds` and had no preexisting speed values on any edge. Defaults to None. agg: Aggregation function to impute missing values from observed values. The default is numpy.mean, but you might also consider for example numpy.median, numpy.nanmedian, or your own custom function. Defaults to numpy.mean. phases (List[GeneratePipelinePhase]): of the overall generate pipeline, which phases of the pipeline to run. Defaults to all (["graph", "grade", "config", "powertrain"]) raster_resolution_arc_seconds: If grade is added, the resolution (in arc-seconds) of the tiles to download (either 1 or 1/3). Defaults to 1. vehicle_models: If provided, only download and configure the listed vehicle models (by name, e.g. ``["2017_CHEVROLET_Bolt", "2016_TOYOTA_Camry_4cyl_2WD"]``). Use :func:`list_available_vehicle_models` to see valid names. When ``None`` (the default) all available models are included. parallelism: optional number of threads to use for parallel query execution. Overrides the value in the config file. overwrite: if True, will overwrite any existing files in the cache_dir. Defaults to False. Returns: CompassApp: a CompassApp object """ if cache_dir is None: temp_dir = TemporaryDirectory() cache_dir = Path(temp_dir.name) else: cache_dir = Path(cache_dir) if config_file is None: config_file = cls._get_default_config_file(phases) config_path = cache_dir / config_file if not overwrite and config_path.exists(): log.info(f"Using existing dataset found in {cache_dir}") else: generate_compass_dataset( graph, output_directory=cache_dir, hwy_speeds=hwy_speeds, fallback=fallback, agg=agg, phases=phases, raster_resolution_arc_seconds=raster_resolution_arc_seconds, default_config=True, vehicle_models=vehicle_models, ) if not config_path.exists(): raise FileNotFoundError( f"Config file {config_file} not found in {cache_dir}. " "Make sure the requested phases generated this config." ) return cls.from_config_file(config_path, parallelism=parallelism)
[docs] @classmethod def from_place( cls, query: OSMNXQuery, network_type: str = "drive", parallelism: Optional[int] = None, cache_dir: Optional[Union[str, Path]] = None, overwrite: bool = False, **kwargs: Any, ) -> CompassApp: """ Build a CompassApp from a place Args: query: the query or queries to geocode to get place boundary polygon(s) network_type: what type of street network. Default to drive List of options: ["all", "all_public", "bike", "drive", "drive_service", "walk"] parallelism: optional number of threads to use for parallel query execution. Overrides the value in the config file. cache_dir: optional path to save necessary files to build the CompassApp. If not set, TemporaryDirectory will be used instead. Defaults to None. overwrite: if True, will overwrite any existing files in the cache_dir. Defaults to False. **kwargs: additional arguments to pass to `from_graph` and `generate_compass_dataset` Returns: CompassApp: a CompassApp object Example: >>> from nrel.routee.compass import CompassApp >>> app = CompassApp.from_place("Denver, Colorado, USA") """ if cache_dir is not None: cache_path = Path(cache_dir) config_file = kwargs.get("config_file") if config_file is None: phases = kwargs.get("phases", GeneratePipelinePhase.default()) config_file = cls._get_default_config_file(phases) config_path = cache_path / config_file if not overwrite and config_path.exists(): log.info(f"Using existing dataset found in {cache_dir}") return cls.from_config_file(config_path, parallelism=parallelism) try: import osmnx as ox except ImportError: raise ImportError("requires osmnx to be installed. Try 'pip install osmnx'") graph = ox.graph_from_place(query, network_type=network_type) return cls.from_graph( graph, parallelism=parallelism, cache_dir=cache_dir, overwrite=overwrite, **kwargs, )
[docs] @classmethod def from_polygon( cls, polygon: Union["Polygon" | "MultiPolygon"], network_type: str = "drive", parallelism: Optional[int] = None, cache_dir: Optional[Union[str, Path]] = None, overwrite: bool = False, **kwargs: Any, ) -> CompassApp: """ Build a CompassApp from a polygon Args: polygon: the shape to get network data within. coordinates should be in unprojected latitude-longitude degrees network_type: what type of street network. Default to drive List of options: ["all", "all_public", "bike", "drive", "drive_service", "walk"] parallelism: optional number of threads to use for parallel query execution. Overrides the value in the config file. cache_dir: optional path to save necessary files to build the CompassApp. If not set, TemporaryDirectory will be used instead. Defaults to None. overwrite: if True, will overwrite any existing files in the cache_dir. Defaults to False. **kwargs: additional arguments to pass to `from_graph` and `generate_compass_dataset` Returns: CompassApp: a CompassApp object Example: >>> from nrel.routee.compass import CompassApp >>> from shapely import geometry >>> p1 = geometry.Point(0,0) >>> p2 = geometry.Point(1,0) >>> p3 = geometry.Point(1,1) >>> p4 = geometry.Point(0,1) >>> pointList = [p1, p2, p3, p4] >>> poly = geometry.Polygon(pointList) >>> app = CompassApp.from_polygon(poly) """ if cache_dir is not None: cache_path = Path(cache_dir) config_file = kwargs.get("config_file") if config_file is None: phases = kwargs.get("phases", GeneratePipelinePhase.default()) config_file = cls._get_default_config_file(phases) config_path = cache_path / config_file if not overwrite and config_path.exists(): log.info(f"Using existing dataset found in {cache_dir}") return cls.from_config_file(config_path, parallelism=parallelism) try: import osmnx as ox except ImportError: raise ImportError("requires osmnx to be installed. Try 'pip install osmnx'") graph = ox.graph_from_polygon(polygon, network_type=network_type) return cls.from_graph( graph, parallelism=parallelism, cache_dir=cache_dir, overwrite=overwrite, **kwargs, )
[docs] def run( self, query: Union[CompassQuery, List[CompassQuery]], config: Optional[Config] = None, ) -> Union[Result, Results]: """ Run a query (or multiple queries) against the CompassApp Args: query: A query or list of queries to run config: optional configuration Returns: results: A list of results (or a single result if a single query was passed) Example: >>> from nrel.routee.compass import CompassApp >>> app = CompassApp.from_config_file("config.toml") >>> query = { "origin_name": "NREL", "destination_name": "Comrade Brewing Company", "origin_x": -105.1710052, "origin_y": 39.7402804, "destination_x": -104.9009913, "destination_y": 39.6757025 } >>> result = app.run(query) """ if isinstance(query, dict): queries = [query] single_query = True elif isinstance(query, list): queries = query single_query = False else: raise ValueError( f"Query must be a dict or list of dicts, not {type(query)}" ) queries_str = list(map(json.dumps, queries)) config_str = json.dumps(config) if config is not None else None results_json: List[str] = self._app._run_queries(queries_str, config_str) results: Results = list(map(json.loads, results_json)) if single_query and len(results) == 1: return results[0] return results
[docs] def graph_edge_origin(self, edge_id: int) -> int: """ get the origin vertex id for some edge Args: edge_id: the id of the edge Returns: vertex_id: the vertex id at the source of the edge """ return cast(int, self._app.graph_edge_origin(edge_id))
[docs] def graph_edge_destination(self, edge_id: int) -> int: """ get the destination vertex id for some edge Args: edge_id: the id of the edge Returns: vertex_id: the vertex id at the destination of the edge """ return cast(int, self._app.graph_edge_destination(edge_id))
[docs] def graph_edge_distance( self, edge_id: int, distance_unit: Optional[str] = None ) -> float: """ get the distance for some edge Args: edge_id: the id of the edge distance_unit: distance unit, by default meters Returns: dist: the distance covered by traversing the edge """ return cast(float, self._app.graph_edge_distance(edge_id, distance_unit))
[docs] def graph_get_out_edge_ids(self, vertex_id: int) -> List[int]: """ get the list of edge ids that depart from some vertex Args: vertex_id: the id of the vertex Returns: edges: the edge ids of edges departing from this vertex """ return cast(List[int], self._app.graph_get_out_edge_ids(vertex_id))
[docs] def graph_get_in_edge_ids(self, vertex_id: int) -> List[int]: """ get the list of edge ids that arrive from some vertex Args: vertex_id: the id of the vertex Returns: edges: the edge ids of edges arriving at this vertex """ return cast(List[int], self._app.graph_get_in_edge_ids(vertex_id))
[docs] def map_match( self, query: Union[CompassQuery, List[CompassQuery]], ) -> Union[Result, Results]: """ Run a map matching query (or multiple queries) against the CompassApp Args: query: A query or list of queries to run Returns: results: A list of results (or a single result if a single query was passed) Example: >>> from nrel.routee.compass import CompassApp >>> app = CompassApp.from_config_file("config.toml") >>> query = { "trace": [ {"x": -105.1710052, "y": 39.7402804}, {"x": -105.1710052, "y": 39.7402804} ] } >>> result = app.map_match(query) """ if isinstance(query, dict): queries = [query] single_query = True elif isinstance(query, list): queries = query single_query = False else: raise ValueError( f"Query must be a dict or list of dicts, not {type(query)}" ) queries_str = list(map(json.dumps, queries)) results_json: List[str] = self._app._map_match(queries_str) results: Results = list(map(json.loads, results_json)) if single_query and len(results) == 1: return results[0] return results
[docs] def run_calculate_path( self, query: Union[CompassQuery, List[CompassQuery]], config: Optional[Config] = None, ) -> Union[Result, Results]: """ Run a path evaluation query (or multiple queries) against the CompassApp Args: query: A query or list of queries to run. Each query must have 'path'. config: optional configuration Returns: results: A list of results (or a single result if a single query was passed) Example: >>> from nrel.routee.compass import CompassApp >>> app = CompassApp.from_config_file("config.toml") >>> query = { "path": [{"edge_id": 0}, {"edge_id": 2}], } >>> result = app.run_calculate_path(query) """ if isinstance(query, dict): queries = [query] single_query = True elif isinstance(query, list): queries = query single_query = False else: raise ValueError( f"Query must be a dict or list of dicts, not {type(query)}" ) queries_str = list(map(json.dumps, queries)) config_str = json.dumps(config) if config is not None else None results_json: List[str] = self._app._run_calculate_path(queries_str, config_str) results: Results = list(map(json.loads, results_json)) if single_query and len(results) == 1: return results[0] return results