Source code for revrt.routing.base

"""reVRt base routing functionality"""

import json
import logging
from warnings import warn
from functools import cached_property
from itertools import pairwise

import rasterio
import numpy as np
import xarray as xr
import dask.array as da
from shapely.geometry import Point
from shapely.geometry.linestring import LineString

from revrt import simplify_using_slopes
from revrt.models.cost_layers import BarrierLayer
from revrt.models.routing import (
    validate_driver_configs,
    validate_routing_options,
    validate_transition_cost_configs,
)
from revrt.routing.utilities import compute_lens
from revrt.utilities.parsing import parse_comparison_values
from revrt.exceptions import revrtKeyError
from revrt.warn import revrtWarning, revrtDeprecationWarning


logger = logging.getLogger(__name__)
LCP_AGG_COST_LAYER_NAME = "lcp_agg_costs"
"""Special name reserved for internally-built cost layer"""

ROUTE_SOLUTION_LEN = 3
MIN_ROUTE_POINTS_FOR_TRANSITION_COST = 2


[docs] class RoutingScenario: """Container for routing scenario configuration""" def __init__( self, cost_fpath, routing_options, tracked_layers=None, drivers=None, transition_costs=None, invalid_costs_block_routing=True, algorithm="bidirectional_long_range_dijkstra", ): """ Parameters ---------- cost_fpath : path-like Path to the cost layer Zarr store used for routing. tracked_layers : list, optional List of dictionaries defining layers to summarize along the route after applying optional multiplier inputs. See :class:`~revrt.models.routing.TrackedLayer` for the canonical schema. routing_options : dict Mapping of routing-option names to dictionaries containing cost, friction, and barrier definitions. See :class:`~revrt.models.routing.RoutingOptionConfig` for the canonical schema that is serialized for the Rust routing core. drivers : dict, optional Optional driver-rule configuration keyed by routing option. See :class:`~revrt.models.routing.DriverConfig` and :class:`~revrt.models.routing.DriverZoneConfig`. transition_costs : dict, optional Optional transition-cost configuration between routing options. See :class:`~revrt.models.routing.TransitionCostsConfig` and :class:`~revrt.models.routing.TransitionCostRule`. invalid_costs_block_routing : bool, optional Flag indicating whether non-positive costs block traversal. algorithm : str, default="bidirectional_long_range_dijkstra" Routing algorithm implementation to use. Supported values are ``"astar"``, ``"long_range_astar"``, ``"long_range_dijkstra"``, ``"bidirectional_long_range_dijkstra"``, and ``"dijkstra"``. ``"astar"`` and ``"dijkstra"`` are in-memory implementations that do not respect the memory limit. Prefer a long-range option unless you know for a fact that your route computations will not need much memory and speed is very important to you. By default, ``"bidirectional_long_range_dijkstra"``. """ self.cost_fpath = cost_fpath self.routing_options = validate_routing_options(routing_options) self.drivers = validate_driver_configs(drivers, self.routing_options) self.transition_costs = validate_transition_cost_configs( transition_costs, self.routing_options ) self.tracked_layers = tracked_layers or [] self.invalid_costs_block_routing = invalid_costs_block_routing self.algorithm = algorithm def __repr__(self): return ( "RoutingScenario:" f"\n\t- routing_options: {self.routing_options}" f"\n\t- drivers: {self.drivers}" f"\n\t- transition_costs: {self.transition_costs}" f"\n\t- algorithm: {self.algorithm}" ) @property def routing_option_names(self): """list: Routing option names in solver index order""" return list(self.routing_options)
[docs] @cached_property def cost_function_json(self): """str: JSON string describing configured cost layers""" payload = { "invalid_costs_block_routing": self.invalid_costs_block_routing, "routing_options": self._routing_options_for_rust(), } if self.drivers is not None: payload["drivers"] = _drivers_for_rust(self.drivers) if self.transition_costs is not None: payload["transition_costs"] = self.transition_costs return json.dumps(payload)
def _routing_options_for_rust(self): """dict: Routing options formatted for Rust ingestion""" return { option_name: { "cost_layers": list( _cost_layers_for_rust(option_config.get("cost_layers", [])) ), "friction_layers": list( _friction_layers_for_rust( option_config.get("friction_layers", []) ) ), "barrier_layers": list( _barrier_layers_for_rust( option_config.get("barrier_layers", []) ) ), "cost_multiplier_layer": option_config.get( "cost_multiplier_layer" ), } for option_name, option_config in self.routing_options.items() }
[docs] class RoutingLayerManager: """Class to build routing layers from user input""" def __init__(self, routing_scenario, chunks="auto"): """ Parameters ---------- routing_scenario : RoutingScenario Scenario containing cost, friction, and tracking metadata. chunks : str or mapping, default="auto" Chunk specification forwarded to ``xarray.open_dataset``. By default, ``"auto"``. """ self.routing_scenario = routing_scenario self._layer_fh = xr.open_dataset( self.routing_scenario.cost_fpath, chunks=chunks, consolidated=False, engine="zarr", ) self.tracked_layers = [] self.transform = self._layer_fh.rio.transform() self.full_shape = self._layer_fh.rio.shape self.cost_crs = self._layer_fh.rio.crs self._layers = set(self._layer_fh.variables) self.costs = {} self.li_costs = {} self.final_routing_layers = {} def __repr__(self): return f"RoutingLayerManager for {self.routing_scenario!r}" @property def latitudes(self): """xarray.DataArray: Latitude coordinates for the cost grid""" return self._layer_fh["latitude"] @property def longitudes(self): """xarray.DataArray: Longitude coordinates for the cost grid""" return self._layer_fh["longitude"] def _verify_layer_exists(self, layer_name): """Verify that layer exists in cost file""" if layer_name not in self._layers: msg = ( f"Did not find layer {layer_name!r} in cost " f"file {str(self.routing_scenario.cost_fpath)!r}" ) raise revrtKeyError(msg)
[docs] def close(self): """Close the underlying xarray file handle""" self._layer_fh.close()
[docs] def build(self): """Build lazy routing arrays from layered file""" logger.debug("Building %r", self) self._build_cost_layers() self._build_tracked_layers() return self
def _build_cost_layers(self): """Build a coarse validation layer across routing options""" if self.costs: return for option, config in self.routing_scenario.routing_options.items(): self._build_cost_layer_from_option(option, config) def _empty_cost_layer_data_array(self): """xarray.DataArray: Empty routing-cost layer template""" template = self.latitudes.astype(np.float32).copy(deep=False) template.values = da.zeros(self.full_shape, dtype=np.float32) return template def _build_cost_layer_from_option(self, option, config): """Build a single routing option's cost layer""" option_cost = self._empty_cost_layer_data_array() option_li_cost = self._empty_cost_layer_data_array() option_untracked_cost = self._empty_cost_layer_data_array() for layer_info in config.get("cost_layers", []): cost = self._extract_and_scale_layer(layer_info) cost.values = da.where(cost > 0, cost, 0) is_li = layer_info.get("is_invariant", False) if layer_info.get("include_in_final_cost", True): if is_li: option_li_cost += cost else: option_cost += cost else: option_untracked_cost += cost if layer_info.get("include_in_report", True): layer_name = f"{layer_info['layer_name']}_{option}" self.tracked_layers.append( CharacterizedLayer( layer_name, cost, option=option, is_length_invariant=is_li, ) ) mult = config.get("cost_multiplier_scalar", 1) or 1 option_cost *= mult option_li_cost *= mult option_untracked_cost *= mult multiplier_layer_name = config.get("cost_multiplier_layer") if multiplier_layer_name: multiplier = self._extract_layer(multiplier_layer_name) option_cost *= multiplier option_li_cost *= multiplier option_untracked_cost *= multiplier self.costs[option] = option_cost self.li_costs[option] = option_li_cost self._build_final_routing_layer_from_option( option, config, option_cost + option_li_cost + option_untracked_cost, ) def _build_final_routing_layer_from_option( self, option, config, option_layer ): frictions = da.zeros(self.full_shape, dtype=np.float32) for layer_info in config.get("friction_layers", []): layer_name = layer_info.get("multiplier_layer") friction_layer = self._extract_and_scale_friction_layer( layer_name, layer_info ) if layer_info.get("include_in_report", False): self.tracked_layers.append( CharacterizedLayer( f"{layer_name}_{option}", friction_layer, option=option ) ) frictions += friction_layer frictions = da.where(frictions <= -1, -1.0 + 1e-7, frictions) option_layer *= 1 + frictions barrier_mask = da.zeros(self.full_shape, dtype=bool) for layer_info in config.get("barrier_layers", []): if layer_info.get("barrier_importance") is not None: continue barrier_mask |= self._extract_barrier_layer( BarrierLayer(**layer_info).to_routing_dict() ) option_layer.values = da.where( option_layer <= 0, -1 if self.routing_scenario.invalid_costs_block_routing else 1e10, option_layer, ) option_layer.values = da.where( barrier_mask, da.nan, option_layer.values, ) self.final_routing_layers[option] = option_layer def _extract_and_scale_layer(self, layer_info): """Extract layer based on name and scale according to input""" cost = self._extract_layer(layer_info["layer_name"]) multiplier_layer_name = layer_info.get("multiplier_layer") if multiplier_layer_name: cost *= self._extract_layer(multiplier_layer_name) cost *= layer_info.get("multiplier_scalar", 1) return cost def _extract_and_scale_friction_layer(self, mask_layer_name, layer_info): """Extract layer based on name and scale according to input""" if not mask_layer_name: msg = ( "Friction layers must specify a 'mask' or " "'multiplier_layer' key!" ) raise revrtKeyError(msg) cost = self._extract_layer(mask_layer_name) cost *= layer_info.get("multiplier_scalar", 1) return cost def _extract_barrier_layer(self, layer_info): """Extract one barrier layer mask from the layered file""" layer = self._extract_layer(layer_info["layer_name"]) layer_data = getattr(layer, "data", layer) threshold = layer_info["barrier_threshold"] operator = layer_info["barrier_operator"] if operator == "gt": return layer_data > threshold if operator == "ge": return layer_data >= threshold if operator == "lt": return layer_data < threshold if operator == "le": return layer_data <= threshold if operator == "ne": return layer_data != threshold if operator == "eq": return layer_data == threshold msg = ( "Did not recognize barrier operator " f"{operator!r} for layer {layer_info['layer_name']!r}" ) raise revrtKeyError(msg) def _extract_layer(self, layer_name): """Extract layer based on name""" self._verify_layer_exists(layer_name) return self._layer_fh[layer_name].isel(band=0).astype(np.float32) def _build_tracked_layers(self): """Build out a dictionary of tracked layers""" for tracked_layer_info in self.routing_scenario.tracked_layers: tracked_layer = tracked_layer_info["layer_name"] method = tracked_layer_info.get("agg_method") if method is None: msg = ( f"Tracked layer {tracked_layer!r} must specify an " "'agg_method' key! Skipping..." ) warn(msg, revrtWarning) continue if getattr(da, method, None) is None: msg = ( f"Did not find method {method!r} in dask.array! " f"Skipping tracked layer {tracked_layer!r}" ) warn(msg, revrtWarning) continue if tracked_layer not in self._layers: msg = ( f"Did not find layer {tracked_layer!r} in cost file " f"{str(self.routing_scenario.cost_fpath)!r}. " "Skipping..." ) warn(msg, revrtWarning) continue layer = self._extract_and_scale_layer(tracked_layer_info) is_li = tracked_layer_info.get("is_invariant", False) for option in self.routing_scenario.routing_option_names: self.tracked_layers.append( CharacterizedLayer( f"{tracked_layer}_{option}", layer, option=option, is_length_invariant=is_li, agg_method=method, ) )
[docs] class CharacterizedLayer: """Encapsulate tracked routing layer metadata""" def __init__( self, name, layer, option, is_length_invariant=False, agg_method=None, ): """ Parameters ---------- name : str Identifier used when reporting layer-derived metrics. layer : xarray.DataArray or dask.array.Array Data values sampled from the routing stack. option : str Routing option this layer should characterize. When set, metrics only use matching route cells. is_length_invariant : bool, default=False Flag signaling cost values should ignore segment length. By default, ``False``. agg_method : str, optional Name of dask aggregation used when tracking statistics. By default, ``None``. """ self.name = name self.layer = layer self.option = option self.is_length_invariant = is_length_invariant self.agg_method = agg_method def __repr__(self): return ( f"CharacterizedLayer(name={self.name!r}, " f"option={self.option!r}, " f"is_length_invariant={self.is_length_invariant}, " f"agg_method={self.agg_method!r})" )
[docs] def compute(self, route, cell_size, point_lens): """Compute layer metrics along a route Parameters ---------- route : sequence Ordered ``(row, col, option)`` indices describing the path. cell_size : float Raster cell size in meters for distance calculations. point_lens : array-like Per-route-cell distances aligned with ``route``. Returns ------- dict Mapping of metric names to aggregated layer values. """ route_points = [] filtered_point_lens = [] for p, point_len in zip(route, point_lens, strict=True): *point, point_option = p if point_option != self.option: continue route_points.append(point) filtered_point_lens.append(point_len) if not route_points: return self._empty_metrics() route_points = np.asarray(route_points) filtered_point_lens = np.asarray(filtered_point_lens) rows, cols = np.array(route_points).T layer_values = self.layer.isel( y=xr.DataArray(rows, dims="points"), x=xr.DataArray(cols, dims="points"), ) if self.agg_method is None: return self._compute_total_and_length( layer_values, route_points, cell_size, filtered_point_lens, ) return self._compute_agg(layer_values)
def _empty_metrics(self): """dict: Empty metrics for routes that skip this option""" if self.agg_method is None: return { f"{self.name}_cost": 0, f"{self.name}_length_km": 0, } return {f"{self.name}_{self.agg_method}": np.float32(np.nan)} def _compute_total_and_length( self, layer_values, route, cell_size, point_lens ): """Compute total cost and length metrics for the layer""" if len(route) == 0: return { f"{self.name}_cost": 0, f"{self.name}_length_km": 0, } layer_data = getattr(layer_values, "data", layer_values) if not isinstance(layer_data, da.Array): # pragma: no cover layer_data = da.asarray(layer_data) point_lens = da.asarray(point_lens) if self.is_length_invariant: layer_cost = da.sum(layer_data) else: layer_cost = da.sum(layer_data * point_lens) layer_length = da.sum(point_lens[layer_data > 0]) * cell_size / 1000 return { f"{self.name}_cost": layer_cost.astype(np.float32).compute(), f"{self.name}_length_km": ( layer_length.astype(np.float32).compute() ), } def _compute_agg(self, layer_values): """Compute aggregated statistic for tracked layer""" aggregate = getattr(da, self.agg_method)(layer_values).astype( np.float32 ) return {f"{self.name}_{self.agg_method}": aggregate.compute()}
[docs] class RouteMetrics: """Class to compute route characteristics given layer cost maps""" def __init__( self, routing_layers, route, optimized_objective, add_geom=False, attrs=None, ): """ Parameters ---------- routing_layers : RoutingLayerManager Routing layer manager containing cost and tracker arrays. route : list Ordered row and column indices defining the path. optimized_objective : float Objective value returned by the routing solver. add_geom : bool, default=False Include shapely geometry in the output when ``True``. By default, ``False``. attrs : dict, optional Additional metadata merged into the result dictionary. By default, ``None``. """ self._routing_layers = routing_layers self._route = route self._optimized_objective = optimized_objective self.__lens = self._total_path_length = None self._by_layer_results = {} self._add_geom = add_geom self._attrs = attrs or {} @property def cell_size(self): """float: Raster cell size in meters""" return abs(self._routing_layers.transform.a) @cached_property def _route_options(self): """list | None: Route option ids aligned with route points""" return np.asarray([p[-1] for p in self._route]) @cached_property def _route_row_col(self): """list: List of (row, col) tuples defining the path""" return [x[:2] for x in self._route] @property def _lens(self): """array-like: Cached per-cell travel distances""" if self.__lens is None: self._compute_path_length() return self.__lens @property def total_path_length(self): """float: Total path length in kilometers""" if self._total_path_length is None: self._compute_path_length() return self._total_path_length @property def cost(self): """float: Optimized objective evaluated over the route""" rows, cols = np.array(self._route_row_col).T point_lens = xr.DataArray(self._lens, dims="points") total_cost = da.zeros((), dtype=np.float32) for option in np.unique(self._route_options): mask = self._route_options == option option_rows = xr.DataArray(rows[mask], dims="points") option_cols = xr.DataArray(cols[mask], dims="points") cell_costs = self._routing_layers.costs[option].isel( y=option_rows, x=option_cols ) total_cost += da.sum(cell_costs * point_lens[mask]) inv_cell_costs = self._routing_layers.li_costs[option].isel( y=option_rows, x=option_cols ) total_cost += da.sum(inv_cell_costs) return total_cost.compute() + self._transition_cost() def _transition_cost(self): """float: Total transition cost implied by option changes""" if len(self._route) < MIN_ROUTE_POINTS_FOR_TRANSITION_COST: return 0.0 default_cost, pairwise_costs = _transition_cost_lookup( self._routing_layers.routing_scenario.transition_costs ) return sum( pairwise_costs.get((src, dst), default_cost) for src, dst in pairwise(self._route_options) ) @property def end_lat(self): """float: Latitude of the terminal path cell""" row, col = self._route_row_col[-1] return ( self._routing_layers.latitudes.isel(y=row, x=col) .astype(np.float32) .compute() .item() ) @property def end_lon(self): """float: Longitude of the terminal path cell""" row, col = self._route_row_col[-1] return ( self._routing_layers.longitudes.isel(y=row, x=col) .astype(np.float32) .compute() .item() ) @property def geom(self): """shapely.GeometryType: Geometry for the computed path""" rows, cols = np.array(self._route_row_col).T x, y = rasterio.transform.xy( self._routing_layers.transform, rows, cols ) if len(self._route) == 1: return Point(x, y) return LineString(simplify_using_slopes(list(zip(x, y, strict=True))))
[docs] def compute(self): """Assemble route metrics and optional geometry payload""" results = { "length_km": self.total_path_length, "cost": self.cost, "poi_lat": self.end_lat, "poi_lon": self.end_lon, "start_row": self._route[0][0], "start_col": self._route[0][1], "end_row": self._route[-1][0], "end_col": self._route[-1][1], "optimized_objective": self._optimized_objective, } results.update(self._attrs) for layer in self._routing_layers.tracked_layers: layer_result = layer.compute( self._route, self.cell_size, self._lens ) results.update(layer_result) if self._add_geom: results["geometry"] = self.geom return results
def _compute_path_length(self): """Compute the total length and cell by cell length of LCP""" self.__lens, self._total_path_length = compute_lens( self._route_row_col, self.cell_size )
def _transition_cost_lookup(transition_costs): """Build the directed transition-cost lookup used in reports""" if not transition_costs: return 0.0, {} default_cost = transition_costs.get("default", 0) or 0 pairwise_costs = {} for rule in transition_costs.get("pairwise", []): src, dst = rule["between"] pairwise_costs[(src, src)] = 0 pairwise_costs[(dst, dst)] = 0 pairwise_costs[(src, dst)] = rule["cost"] pairwise_costs[(dst, src)] = rule["cost"] return default_cost, pairwise_costs def _cost_layers_for_rust(layers): """Cost layers formatted for Rust ingestion""" for layer in layers: out_layer = layer.copy() out_layer.pop("include_in_report", None) out_layer.pop("include_in_final_cost", None) yield out_layer def _friction_layers_for_rust(layers): """Friction layers formatted for Rust ingestion""" for layer in layers: out_layer = layer.copy() if "layer_name" in out_layer: msg = ( "Specifying `layer_name` for a friction layer is " "deprecated! The default behavior of friction layers is " "to be multiplied to the aggregated cost layer. Please " "remove this key in order to silence this warning." ) warn(msg, revrtDeprecationWarning) out_layer.pop("layer_name") if "mask" in out_layer: out_layer["multiplier_layer"] = out_layer.pop("mask") out_layer.pop("include_in_report", None) yield out_layer def _barrier_layers_for_rust(layers): """Barrier layers formatted for Rust ingestion""" for layer in layers: out_layer = BarrierLayer(**layer).to_routing_dict() if out_layer.get("barrier_importance") is None: out_layer.pop("barrier_importance") yield out_layer def _drivers_for_rust(drivers): """Driver rules formatted for Rust ingestion""" out_drivers = drivers.copy() if "zones" in drivers: out_drivers["zones"] = list( _driver_zones_for_rust(drivers.get("zones", [])) ) return out_drivers def _driver_zones_for_rust(zones): """Driver zones formatted for Rust ingestion""" for zone in zones: out_zone = zone.copy() where = out_zone.pop("where", None) if where is not None: mask_operator, mask_threshold = parse_comparison_values(where) out_zone["mask_operator"] = mask_operator out_zone["mask_threshold"] = mask_threshold yield out_zone