"""reVRt routing CLI functions and helpers"""
import time
import logging
import contextlib
from math import ceil
from pathlib import Path
from copy import deepcopy
from abc import ABC, abstractmethod
from functools import cached_property
import pandas as pd
import geopandas as gpd
import xarray as xr
from revrt.routing.base import BatchRouteProcessor, RoutingScenario
from revrt.exceptions import revrtKeyError
logger = logging.getLogger(__name__)
_MILLION_USD_PER_MILE_TO_USD_PER_PIXEL = 55923.40730136006
"""Conversion from million dollars/mile to $/pixel
1,000,000 [$/million dollars]
* 90 [meters/pixel]
/ 1609.344 [meters/mile]
= 55923.40730136006 [$/pixel]
"""
[docs]
class RouteToDefinitionConverter(ABC):
"""Abstract base class for route definition converters"""
_GROUP_COLS = ["polarity", "voltage"]
def __init__(
self,
cost_fpath,
route_points,
out_fp,
cost_layers,
friction_layers=None,
transmission_config=None,
):
"""
Parameters
----------
cost_fpath : path-like
Path to layered Zarr file containing cost and other required
routing layers.
route_points : pandas.DataFrame
DataFrame defining the points to be routed. This DataFrame
should contain route definitions to be transformed and
passed down to the Rust routing algorithm.
out_fp : path-like
Path to output file where computed routes will be saved.
This file will be checked for existing routes to avoid
recomputation.
cost_layers : list
List of dictionaries defining the layers that are summed to
determine total costs raster used for routing. Each layer is
pre-processed before summation according to the user input.
See the description of
:func:`revrt.routing.cli.point_to_point.compute_lcp_routes`
for more details.
friction_layers : list
Layers to be multiplied onto the aggregated cost layer to
influence routing but NOT be reported in final cost
(i.e. friction, barriers, etc.). See the description of
:func:`revrt.routing.cli.point_to_point.compute_lcp_routes`
for more details.
transmission_config : path-like or dict, optional
Dictionary of transmission cost configuration values, or
path to JSON/JSON5 file containing this dictionary. See the
description of
:func:`revrt.routing.cli.point_to_point.compute_lcp_routes`
for more details.
"""
self.cost_fpath = cost_fpath
self.route_points = route_points
self.out_fp = Path(out_fp)
self.cost_layers = cost_layers
self.friction_layers = friction_layers or []
self.transmission_config = transmission_config
@property
def num_routes(self):
"""int: Number of routes to be computed"""
return len(self.route_points)
[docs]
@cached_property
def existing_routes(self):
"""set: Already computed routes in the output file"""
if self.out_fp is None or not self.out_fp.exists():
return set()
if self.out_fp.suffix.lower() == ".gpkg":
existing_df = gpd.read_file(self.out_fp)
else:
existing_df = pd.read_csv(self.out_fp)
return {
self._route_as_tuple(row) for __, row in existing_df.iterrows()
}
def __iter__(self):
if self.num_routes == 0:
return
for polarity, voltage, routes in self._paths_to_compute:
logger.info(
"Computing routes for %d points with polarity: %r and "
"voltage: %r",
len(routes),
polarity,
voltage,
)
route_cl = self._update_cl(polarity, voltage)
route_fl = self._update_fl(polarity, voltage)
route_definitions, route_attrs = (
self._convert_to_route_definitions(routes)
)
yield route_cl, route_fl, route_definitions, route_attrs
@property
def _paths_to_compute(self):
"""Generator that yields route groups to be computed"""
self._validate_route_points()
for group_info, routes in self.route_points.groupby(self._GROUP_COLS):
if self.existing_routes:
mask = routes.apply(
lambda row: (
self._route_as_tuple(row) not in self.existing_routes
),
axis=1,
)
routes = routes[mask] # noqa: PLW2901
if routes.empty:
continue
yield *group_info, routes
def _validate_route_points(self):
"""Ensure route points has required columns"""
for check_col in self._GROUP_COLS:
if check_col not in self.route_points.columns:
self.route_points[check_col] = "unknown"
def _update_cl(self, polarity, voltage):
"""Update multipliers for cost layers"""
return update_multipliers(
self.cost_layers, polarity, voltage, self.transmission_config
)
def _update_fl(self, polarity, voltage):
"""Update multipliers for friction layers"""
return update_multipliers(
self.friction_layers, polarity, voltage, self.transmission_config
)
@abstractmethod
def _route_as_tuple(self, row):
"""Convert route row to a tuple for existing route checking"""
raise NotImplementedError
@abstractmethod
def _convert_to_route_definitions(self, routes):
"""Convert route DataFrame to route definitions format"""
raise NotImplementedError
def run_lcp(
cost_fpath,
out_fp,
routes_to_compute,
cost_multiplier_layer=None,
cost_multiplier_scalar=1,
tracked_layers=None,
ignore_invalid_costs=True,
):
"""[NOT PUBLIC API] Run LCP routing and save to output file"""
ts = time.monotonic()
out_fp = Path(out_fp)
save_paths = out_fp.suffix.lower() == ".gpkg"
logger.info(
"Computing best routes for %d point pairs",
routes_to_compute.num_routes,
)
for route_batch in routes_to_compute:
route_cl, route_fl, route_definitions, route_attrs = route_batch
scenario = RoutingScenario(
cost_fpath=cost_fpath,
cost_layers=route_cl,
friction_layers=route_fl,
tracked_layers=tracked_layers,
cost_multiplier_layer=cost_multiplier_layer,
cost_multiplier_scalar=cost_multiplier_scalar,
ignore_invalid_costs=ignore_invalid_costs,
)
route_computer = BatchRouteProcessor(
routing_scenario=scenario,
route_definitions=route_definitions,
route_attrs=route_attrs,
)
route_computer.process(out_fp=out_fp, save_paths=save_paths)
time_elapsed = f"{(time.monotonic() - ts) / 3600:.4f} hour(s)"
logger.info(
"Routing for %d points completed in %s",
routes_to_compute.num_routes,
time_elapsed,
)
def route_points_subset(route_points, split_params):
"""[NOT PUBLIC API] Get indices of points sorted by location"""
with contextlib.suppress(TypeError, UnicodeDecodeError):
route_points = pd.read_csv(route_points)
sort_cols = ["start_lat", "start_lon"]
if not set(sort_cols).issubset(route_points.columns):
sort_cols = ["start_row", "start_col"]
route_points = route_points.sort_values(sort_cols).reset_index(drop=True)
start_ind, n_chunks = split_params or (0, 1)
chunk_size = ceil(len(route_points) / n_chunks)
return route_points.iloc[
start_ind * chunk_size : (start_ind + 1) * chunk_size
]
def split_routes(config):
"""[NOT PUBLIC API] Compute route split params inside of config"""
exec_control = config.get("execution_control", {})
if exec_control.get("option") == "local":
num_nodes = 1
else:
num_nodes = exec_control.pop("nodes", 1)
config["_split_params"] = [(i, num_nodes) for i in range(num_nodes)]
return config
def update_multipliers(layers, polarity, voltage, transmission_config):
"""[NOT PUBLIC API] Update layer multipliers based on user input"""
output_layers = deepcopy(layers)
polarity = "unknown" if polarity in {None, "unknown"} else str(polarity)
voltage = "unknown" if voltage in {None, "unknown"} else str(int(voltage))
for layer in output_layers:
if layer.pop("apply_row_mult", False):
row_multiplier = _get_row_multiplier(transmission_config, voltage)
layer["multiplier_scalar"] = (
layer.get("multiplier_scalar", 1) * row_multiplier
)
if layer.pop("apply_polarity_mult", False):
polarity_multiplier = _get_polarity_multiplier(
transmission_config, voltage, polarity
)
layer["multiplier_scalar"] = (
layer.get("multiplier_scalar", 1)
* polarity_multiplier
* _MILLION_USD_PER_MILE_TO_USD_PER_PIXEL
)
return output_layers
def _get_row_multiplier(transmission_config, voltage):
"""Get right-of-way width multiplier for a given voltage"""
try:
row_widths = transmission_config["row_width"]
except KeyError as e:
msg = (
"`apply_row_mult` was set to `True`, but 'row_width' "
"not found in transmission config!"
)
raise revrtKeyError(msg) from e
try:
row_multiplier = row_widths[voltage]
except KeyError as e:
msg = (
"`apply_row_mult` was set to `True`, but voltage ' "
f"{voltage}' not found in transmission config "
"'row_width' settings. Available voltages: "
f"{list(row_widths)}"
)
raise revrtKeyError(msg) from e
return row_multiplier
def _get_polarity_multiplier(transmission_config, voltage, polarity):
"""Get multiplier for a given voltage and polarity"""
try:
polarity_config = transmission_config["voltage_polarity_mult"]
except KeyError as e:
msg = (
"`apply_polarity_mult` was set to `True`, but "
"'voltage_polarity_mult' not found in transmission config!"
)
raise revrtKeyError(msg) from e
try:
polarity_voltages = polarity_config[voltage]
except KeyError as e:
msg = (
"`apply_polarity_mult` was set to `True`, but voltage ' "
f"{voltage}' not found in polarity config. Available voltages: "
f"{list(polarity_config)}"
)
raise revrtKeyError(msg) from e
try:
polarity_multiplier = polarity_voltages[polarity]
except KeyError as e:
msg = (
"`apply_polarity_mult` was set to `True`, but polarity ' "
f"{polarity}' not found in voltage config. Available polarities: "
f"{list(polarity_voltages)}"
)
raise revrtKeyError(msg) from e
return polarity_multiplier