"""reVRt point-to-feature routing CLI command"""
import logging
import re
from collections import abc
from pathlib import Path
from warnings import warn
import rasterio
import numpy as np
import geopandas as gpd
from gaps.cli import CLICommandFromFunction
from gaps.utilities import TAG
from gaps.pipeline import parse_previous_status
from revrt.routing.cli.base import (
run_lcp,
route_points_subset,
split_routes,
RouteToDefinitionConverter,
)
from revrt.utilities import strip_path_keys, strip_path, log_runtime
from revrt.routing.utilities import map_to_costs
from revrt.costs.config import parse_config
from revrt.utilities.raster import integer_dimension_window
from revrt.exceptions import revrtConfigurationError
from revrt.warn import revrtWarning
logger = logging.getLogger(__name__)
[docs]
class PointToFeatureRouteDefinitionConverter(RouteToDefinitionConverter):
"""Convert route points DataFrame to route definition for Rust"""
def __init__(
self,
cost_fpath,
route_points,
features_fpath,
out_fp,
routing_options,
transmission_config=None,
drivers=None,
transition_costs=None,
connection_identifier_column="end_feat_id",
):
"""
Parameters
----------
cost_fpath : path-like
Path to layered Zarr file containing cost and other required
routing layers.
route_points : pandas.DataFrame
DataFrame defining the points to be routed. This DataFrame
should contain route definitions to be transformed and
passed down to the Rust routing algorithm.
out_fp : path-like
Path to output file where computed routes will be saved.
This file will be checked for existing routes to avoid
recomputation.
routing_options : dict
Mapping of routing-option names to dictionaries describing
the cost, friction, barrier, and option-level multiplier
inputs for each option. See
:class:`~revrt.models.routing.RoutingOptionConfig`.
transmission_config : path-like or dict, optional
Dictionary of transmission cost configuration values, or
path to JSON/JSON5 file containing this dictionary. See the
description of
:func:`revrt.routing.cli.point_to_point.compute_lcp_routes`
for more details.
"""
super().__init__(
cost_fpath=cost_fpath,
route_points=route_points,
out_fp=out_fp,
routing_options=routing_options,
transmission_config=transmission_config,
drivers=drivers,
transition_costs=transition_costs,
)
self.features_fpath = features_fpath
self.connection_identifier_column = connection_identifier_column
def _rp_with_expected_cols(self):
"""Ensure route points has required columns"""
if (
"start_row" not in self._input_route_points.columns
or "start_col" not in self._input_route_points.columns
):
logger.info("Mapping route start points to cost grid...")
self._input_route_points = map_to_costs(
self._input_route_points,
crs=self.cost_metadata["crs"],
transform=self.cost_metadata["transform"],
shape=self.cost_metadata["shape"],
)
return super()._rp_with_expected_cols()
def _route_as_tuple(self, row):
"""Convert route row to a tuple for existing route checking"""
return (
int(row["start_row"]),
int(row["start_col"]),
str(row.get("start_option", self._routing_options.default)),
str(row[self.connection_identifier_column]),
*self._route_value_signature(row),
)
def _convert_to_route_definitions(self, routes):
"""Convert route DataFrame to route definitions format"""
start_point_cols = ["start_row", "start_col"]
start_option = "start_option"
route_definitions = []
route_attrs = {}
cost_height, cost_width = self.cost_metadata["shape"]
for route_id, ((feat_id, eo), sub_routes) in enumerate(
routes.groupby([self.connection_identifier_column, "end_option"])
):
end_feats = gpd.read_file(
self.features_fpath,
where=f"{self.connection_identifier_column} == {feat_id}",
)
if end_feats.empty:
msg = (
f"No features found with "
f"{self.connection_identifier_column} == {feat_id}!"
)
warn(msg, revrtWarning)
continue
rows, cols = self._end_feats_to_row_col(end_feats)
start_points = []
for __, info in sub_routes.iterrows():
start_idx = (
*info[start_point_cols].astype("int32"),
info[start_option],
)
route_attrs[(route_id, start_idx)] = info.to_dict()
start_points.append(start_idx)
route_definitions.append(
(
route_id,
start_points,
[
(int(r), int(c), str(eo))
for r, c in zip(rows, cols, strict=True)
if 0 <= r < cost_height and 0 <= c < cost_width
],
)
)
return route_definitions, route_attrs
def _end_feats_to_row_col(self, end_feats):
"""Convert end features to row/col indices in cost grid"""
window = integer_dimension_window(
bounds=end_feats.total_bounds,
transform=self.cost_metadata["transform"],
)
window_transform = rasterio.windows.transform(
window=window, transform=self.cost_metadata["transform"]
)
mask = rasterio.features.geometry_mask(
[end_feats.union_all()],
out_shape=(window.height, window.width),
transform=window_transform,
invert=True,
)
rows, cols = np.where(mask)
rows += window.row_off
cols += window.col_off
return rows, cols
[docs]
def compute_lcp_routes( # noqa: PLR0913, PLR0917
cost_fpath,
out_dir,
job_name,
routing_options,
route_table_fpath="PIPELINE",
features_fpath="PIPELINE",
drivers=None,
transition_costs=None,
tracked_layers=None,
transmission_config=None,
save_paths=False,
save_routing_layer=False,
invalid_costs_block_routing=False,
connection_identifier_column="end_feat_id",
algorithm="bidirectional_long_range_dijkstra",
memory_utilization_limit=0.9,
system_mem_limit_gb=5,
_split_params=None,
):
r"""Run least-cost path routing for points mapped to features
Given a table that defines each route as a start point (via latitude
and longitude input or preferably a row/column index into the data)
and a feature ID representing the feature to connect to, compute the
least-cost paths (LCPs) for each route using the routing layers
defined in `routing_options`.
Parameters
----------
cost_fpath : path-like
Path to layered Zarr file containing cost and other required
routing layers.
out_dir : path-like
Directory where routing outputs should be written.
job_name : str
Label used to name the generated output file.
route_table_fpath : path-like, str, or list, default="PIPELINE"
Route-table input defining all route start points and end
features.
If set to ``"PIPELINE"``, then ``features_fpath`` must also
be ``"PIPELINE"`` and the route tables will be pulled from
the previous pipeline step. Pipeline inputs require previous
outputs containing matching CSV route tables and GPKG
feature files.
Otherwise, provide either a single CSV path or a list of
CSV paths. When a list is provided, ``features_fpath`` must
also be a list with the same length, and each route table
is paired with the feature file at the same list index.
Each route table must have the following columns:
- "start_lat": Stating point latitude (can alternatively use
"start_col" to define the start point column index in the
cost raster).
- "start_lon": Stating point longitude (can alternatively
use "start_row" to define the start point row index in the
cost raster).
- `connection_identifier_column`: ID of the feature that
should be mapped to. This ID should match at least one of
the feature IDs in the `features_fpath` input; otherwise,
no route will be computed for that point.
You can also specify `polarity` and `voltage` columns
which apply to every routing option. If you want to
provide per-option polarity and voltage, use
`polarity_<option>` and `voltage_<option>`. Options that
are omitted will use `polarity` and `voltage` column
values.
features_fpath : path-like, str, or list, default="PIPELINE"
Feature input containing the vector geometries to map points to.
If set to ``"PIPELINE"``, then ``route_table_fpath`` must
also be ``"PIPELINE"`` and the feature files will be pulled
from the previous pipeline step.
Otherwise, provide either a single vector path or a list of
vector paths. When a list is provided,
``route_table_fpath`` must also be a list with the same
length, and each feature file is paired with the route table
at the same list index.
Each feature file must have a column matching the
`connection_identifier_column` parameter that maps each
feature back to the starting points defined in the
`route_table`.
routing_options : dict
Mapping of routing-option names to dictionaries describing the
cost, friction, barrier, and option-level multiplier inputs for
each option. See
:class:`~revrt.models.routing.RoutingOptionConfig` for details.
drivers : dict, optional
Optional driver-rule configuration keyed by routing option. See
:class:`~revrt.models.routing.DriverConfig` for details.
transition_costs : dict, optional
Optional transition-cost configuration between routing
options. See
:class:`~revrt.models.routing.TransitionCostsConfig` for
details.
tracked_layers : list, optional
List of dictionaries defining route-characterization layers.
These layers do not influence the routing objective and are
only summarized for output characterization. See
:class:`~revrt.models.routing.TrackedLayer` for details.
transmission_config : path-like or dict, optional
Dictionary of transmission cost configuration values, or
path to JSON/JSON5 file containing this dictionary. The
dictionary should have a subset of the following keys:
- base_line_costs
- iso_lookup
- iso_multipliers
- land_use_classes
- new_substation_costs
- power_classes
- power_to_voltage
- transformer_costs
- upgrade_substation_costs
- voltage_polarity_mult
- row_width
Each of these keys should point to another dictionary or
path to JSON/JSON5 file containing a dictionary of
configurations for each section. For the expected contents
of each dictionary, see the default config. If ``None``,
values from the default config are used.
By default, ``None``.
save_paths : bool, default=False
Save outputs as a GeoPackage with path geometries when ``True``.
Defaults to ``False``.
save_routing_layer : bool, default=False
Save Rust routing layer outputs to ``out_dir/extra_outputs``
when ``True``. Defaults to ``False``.
invalid_costs_block_routing : bool, optional
Optional flag to treat any invalid cost values (<= 0) as
impassable (i.e. no paths can ever cross this). If ``False``,
invalid cost values (<= 0) are set to a large value to simulate
a strong but permeable "quasi-barrier". By default, ``False``.
connection_identifier_column : str, default="end_feat_id"
Column in the `features_fpath` data used to uniquely identify
each feature. This column is also expected to be in the
`route_table` input to map points to features. If a column name
is given that does not exist in the data, an error will be
raised. By default, ``"end_feat_id"``.
algorithm : str, default="bidirectional_long_range_dijkstra"
Routing algorithm implementation to use. Supported values
are ``"astar"``, ``"long_range_astar"``,
``"long_range_dijkstra"``,
``"bidirectional_long_range_dijkstra"``, and
``"dijkstra"``. ``"astar"`` and ``"dijkstra"`` are
in-memory implementations that do not respect the memory
limit. Prefer a long-range option unless you know for a fact
that your route computations will not need much memory and
speed is very important to you.
By default, ``"bidirectional_long_range_dijkstra"``.
memory_utilization_limit : float, default=0.9
Fraction of `system_mem_limit_gb` to utilize for routing. Should
be a value between 0 and 1. By default, ``0.9``.
system_mem_limit_gb : int or float, default=5
Maximum amount of system memory (in GB) to utilize for routing.
This is used in conjunction with `memory_utilization_limit` to
determine the memory limit for routing. By default, ``5`` GB.
Returns
-------
str or None
Path to the output table if any routes were computed.
See Also
--------
revrt.routing.cli.point_to_point.compute_lcp_routes
Compute LCP routes between pairs of points.
revrt.routing.cli.build_route_table.point_to_feature_route_table
Helper function to build a routing table for points mapped to
features.
"""
with log_runtime("LCP processing"):
out_dir = Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
logger.debug("Tracked layers input: %r", tracked_layers)
logger.debug("Transmission config input: %r", transmission_config)
transmission_config = parse_config(config=transmission_config)
route_points = route_points_subset(
route_table_fpath, split_params=_split_params
)
if len(route_points) == 0:
logger.info("No routes to process!")
return None
out_fp = (
out_dir / f"{job_name}.gpkg"
if save_paths
else out_dir / f"{job_name}.csv"
)
routes_to_compute = PointToFeatureRouteDefinitionConverter(
cost_fpath=cost_fpath,
route_points=route_points,
features_fpath=features_fpath,
out_fp=out_fp,
routing_options=routing_options,
transmission_config=transmission_config,
drivers=drivers,
transition_costs=transition_costs,
connection_identifier_column=connection_identifier_column,
)
run_lcp(
cost_fpath,
out_fp=out_fp,
routes_to_compute=routes_to_compute,
job_name=job_name,
tracked_layers=tracked_layers,
invalid_costs_block_routing=invalid_costs_block_routing,
user_mem_limit_gb=memory_utilization_limit * system_mem_limit_gb,
save_routing_layer=save_routing_layer,
algorithm=algorithm,
)
return str(out_fp)
def _prep_config(config, nodes, project_dir, command_name):
"""Pre-process config inputs for point-to-feature routing"""
config = split_routes(config, nodes)
config = _handle_route_table_and_features_input(
config, project_dir, command_name
)
return strip_path_keys(config, keys_to_fix={"cost_fpath", "out_dir"})
def _handle_route_table_and_features_input(config, project_dir, command_name):
"""Handle route table and features input from user"""
rt_is_pipeline = config["route_table_fpath"] == "PIPELINE"
f_is_pipeline = config["features_fpath"] == "PIPELINE"
if rt_is_pipeline != f_is_pipeline:
msg = (
"Both `route_table_fpath` and `features_fpath` must be set "
"to 'PIPELINE' for pipeline runs."
)
raise revrtConfigurationError(msg)
if not rt_is_pipeline:
return _handle_non_pipeline_input(config)
files = [
strip_path(fp)
for fp in parse_previous_status(project_dir, command_name)
]
route_tables, feature_files = _split_pipeline_route_inputs(files)
config["route_table_fpath"] = route_tables
config["features_fpath"] = feature_files
return config
def _split_pipeline_route_inputs(files):
"""Split and align pipeline route-table and feature outputs"""
route_tables = [fp for fp in files if Path(fp).suffix.lower() == ".csv"]
feature_files = [fp for fp in files if Path(fp).suffix.lower() == ".gpkg"]
if not route_tables or not feature_files:
msg = (
"Pipeline route-features input requires previous outputs with "
"both CSV route tables and GPKG feature files."
)
raise revrtConfigurationError(msg)
if len(route_tables) != len(feature_files):
msg = (
"Pipeline route-features input requires the same number of "
"CSV route tables and GPKG feature files."
)
raise revrtConfigurationError(msg)
if len(route_tables) == 1:
return route_tables, feature_files
feature_map = _match_pipeline_feature_files(route_tables, feature_files)
return route_tables, [feature_map[fp] for fp in route_tables]
def _match_pipeline_feature_files(route_tables, feature_files):
"""Match route tables to feature files using the GAPs split tag"""
route_tags = {_pipeline_file_tag(fp): fp for fp in route_tables}
feature_tags = {_pipeline_file_tag(fp): fp for fp in feature_files}
if None in route_tags or None in feature_tags:
msg = "Pipeline route-features inputs are ambiguously tagged."
raise revrtConfigurationError(msg)
if len(route_tags) != len(route_tables) or len(feature_tags) != len(
feature_files
):
msg = "Pipeline route-features inputs are ambiguously tagged."
raise revrtConfigurationError(msg)
if set(route_tags) != set(feature_tags):
msg = (
"Could not align pipeline route-table CSV outputs with "
"mapped-feature GPKG outputs using the GAPs split tag."
)
raise revrtConfigurationError(msg)
return {route_tags[tag]: feature_tags[tag] for tag in sorted(route_tags)}
def _pipeline_file_tag(fp):
"""Extract the GAPs split tag from a pipeline output filepath"""
match = re.search(rf"({re.escape(TAG)}\d+)$", Path(fp).stem)
if match is None:
return None
return match.group(1)
def _handle_non_pipeline_input(config):
"""Handle non-pipeline input from user"""
rt_fp = config["route_table_fpath"]
f_fp = config["features_fpath"]
rt_is_sequence = isinstance(rt_fp, abc.Sequence) and not isinstance(
rt_fp, str
)
f_is_sequence = isinstance(f_fp, abc.Sequence) and not isinstance(
f_fp, str
)
if rt_is_sequence or f_is_sequence:
if not (rt_is_sequence and f_is_sequence):
msg = (
"`route_table_fpath` and `features_fpath` must both "
"be sequences or both be strings."
)
raise revrtConfigurationError(msg)
if len(rt_fp) != len(f_fp):
msg = (
"`route_table_fpath` and `features_fpath` sequences "
"must be the same length."
)
raise revrtConfigurationError(msg)
config["route_table_fpath"] = [strip_path(p) for p in rt_fp]
config["features_fpath"] = [strip_path(p) for p in f_fp]
return config
if not isinstance(rt_fp, str) or not isinstance(f_fp, str):
msg = (
"`route_table_fpath` and `features_fpath` must both be "
"sequences or both be strings."
)
raise revrtConfigurationError(msg)
config["route_table_fpath"] = [strip_path(rt_fp)]
config["features_fpath"] = [strip_path(f_fp)]
return config
route_features_command = CLICommandFromFunction(
compute_lcp_routes,
name="route-features",
add_collect=False,
split_keys=[("route_table_fpath", "features_fpath"), "_split_params"],
config_preprocessor=_prep_config,
skip_doc_params=["system_mem_limit_gb"],
)