"""reVRt routing module utilities"""
import contextlib
import logging
from collections.abc import Iterable, Mapping
from itertools import batched, product
from pathlib import Path
from warnings import warn
import dask
import geopandas as gpd
import numpy as np
import pandas as pd
import rasterio
from shapely.geometry import Point
from revrt.exceptions import revrtRuntimeError, revrtValueError
from revrt.utilities.base import region_mapper, transform_xy
from revrt.utilities.handlers import IncrementalWriter
from revrt.warn import revrtWarning
logger = logging.getLogger(__name__)
_WHILE_LOOP_ITER_MAX = 10_000
[docs]
class PointToFeatureMapper:
"""Map points to features within specified regions and/or radii"""
def __init__(
self,
crs,
features_fp,
regions=None,
region_identifier_column="rid",
connection_identifier_column="end_feat_id",
batch_size=500,
max_workers=None,
):
"""
Parameters
----------
crs : str or pyproj.crs.CRS
Coordinate reference system to use for all spatial data.
features_fp : path-like
File path to transmission features to map points to.
regions : geopandas.GeoDataFrame or path-like, optional
Regions to use for clipping features around points. Features
will not extend beyond these regions, and point will only
map to features within their own region. This input can be
paired with the `radius` parameter (e.g. to forbid any
connections to cross state boundaries, even if that
connection would be shorter than some desired length). At
least one of `regions` or `radius` must be provided;
otherwise, an error is raised. By default, ``None``.
region_identifier_column : str, default="rid"
Column in `regions` data to use as the identifier for that
region. If not given, a simple index will be put under the
`rid` column. By default, ``"rid"``.
connection_identifier_column : str, optional
Column in output data (both features and points) that will
be used to link the points to the features that should be
routed to. By default, ``"end_feat_id"``.
batch_size : int, default=500
Number of points to process in each batch when writing
clipped features to file. By default, ``500``.
max_workers : int, optional
Number of parallel workers to use for point-to-feature
clipping. If ``None`` or >1, clipping is performed in
parallel using Dask's process scheduler.
By default, ``None``, which uses all CPU cores.
"""
self._crs = crs
self._features_fp = features_fp
self._regions = None
self._rid_column = region_identifier_column
self._conn_id_column = connection_identifier_column
self._set_regions(regions)
self._batch_size = batch_size
self._max_workers = max_workers
def _set_regions(self, regions):
"""Set the regions GeoDataFrame"""
if regions is None:
return
try:
self._regions = regions.to_crs(self._crs)
except AttributeError:
self._regions = gpd.read_file(regions).to_crs(self._crs)
if self._rid_column not in self._regions.columns:
self._regions[self._rid_column] = range(len(self._regions))
[docs]
def map_points(
self,
points,
feature_out_fp,
route_table_out_fp=None,
radius=None,
expand_radius=True,
clip_points_to_regions=False,
voltages=None,
polarities=None,
):
"""Map points to features within the point region
Parameters
----------
points : geopandas.GeoDataFrame
Points to map to features.
feature_out_fp : path-like
File path to save clipped features to. This should end in
'.gpkg' to ensure proper format. If not, the extension will
be added automatically. This file will contain all features
clipped to each point (with a feature ID column added to
link back to the points).
route_table_out_fp : path-like, optional
Optional file path to incrementally persist mapped point
assignments. If the route table and feature outputs already
exist, the mapper resumes from the previously completed
points. By default, ``None``.
radius : float or str, optional
Radius (in CRS units) around each point to clip features to.
If `str`, the column in `points` to use for radius values.
If ``None``, only regions are used for clipping.
By default, ``None``.
expand_radius : bool, optional
If ``True``, the radius is expanded until at least one
feature is found. By default, ``True``.
clip_points_to_regions : bool, default=False
If ``True``, points are clipped to the given regions before
mapping to features. If the `regions` input is not set,
this parameter has no effect. If ``False``, all points are
used as-is, which means points outside of the regions domain
are mapped to the closest region. By default, ``False``.
voltages : dict of list of str or int, optional
Voltage values to assign to the output route table, keyed
by routing option. If provided, each mapped
point-to-feature connection is duplicated once per
per-option voltage combination and explicit
``voltage_<option>`` columns are added for each configured
routing option. By default, ``None``.
polarities : dict of list of str, optional
Polarity values to assign to the output route table, keyed
by routing option. If provided, each mapped
point-to-feature connection is duplicated once per
per-option polarity combination and explicit
``polarity_<option>`` columns are added for each configured
routing option. By default, ``None``.
Returns
-------
geopandas.GeoDataFrame
Input points with added column giving mapped feature IDs.
"""
if self._regions is None and radius is None:
msg = (
"Must provide either `regions` or a radius to map points "
"to features!"
)
raise revrtValueError(msg)
points = points.to_crs(self._crs).copy(deep=True)
points[self._conn_id_column] = np.nan
if self._regions is not None:
points = self._map_points_to_nearest_region(
points, clip_points_to_regions
)
points, next_conn_id = _resume_points(
points,
feature_out_fp,
route_table_out_fp,
self._conn_id_column,
self._rid_column,
)
writer = _init_streaming_writer(feature_out_fp)
route_writer = None
if route_table_out_fp is not None:
route_writer = IncrementalWriter(route_table_out_fp)
pending_points = points[points[self._conn_id_column].isna()]
if pending_points.empty:
logger.info("All points already mapped; nothing to resume")
total_pending_points = len(pending_points)
if total_pending_points:
logger.info(
"Mapping %d point(s) to nearby features in batches of %d "
"using %s workers",
total_pending_points,
self._batch_size,
"all"
if self._max_workers in {None, 0}
else f"{self._max_workers:,d}",
)
processed_points = 0
for point_batch in self._iter_point_batches(
pending_points, start_conn_id=next_conn_id
):
self._run_batch(
points,
point_batch,
writer,
radius,
expand_radius,
route_writer=route_writer,
)
processed_points += len(point_batch)
logger.info(
"%d/%d (%.2f%%) points processed",
processed_points,
total_pending_points,
processed_points / total_pending_points * 100,
)
route_table = self._drop_unpaired_points(points)
route_table = _add_voltage_polarity(route_table, voltages, polarities)
if route_table_out_fp is not None:
route_table.drop(columns="geometry").to_csv(
route_table_out_fp, index=False
)
return route_table
def _iter_point_batches(self, points, start_conn_id=0):
"""Generate batches of points for processing"""
jobs = (
(ind, row_ind, point)
for ind, (row_ind, point) in enumerate(
points.iterrows(), start=start_conn_id
)
)
yield from batched(jobs, self._batch_size)
def _run_batch(
self,
points,
point_batch,
writer,
radius,
expand_radius,
route_writer=None,
):
"""Process one point batch and flush complete feature chunks"""
results = []
routed_point_indices = []
for row_ind, conn_id, clipped_features in self._compute_point_batch(
point_batch, radius, expand_radius
):
if clipped_features.empty:
continue
clipped_features[self._conn_id_column] = conn_id
points.loc[row_ind, self._conn_id_column] = conn_id
results.append(clipped_features)
routed_point_indices.append(row_ind)
if not results:
return
logger.debug("Dumping batch of %d features to file...", len(results))
writer.save(pd.concat(results))
if route_writer is None:
return
route_table_batch = points.loc[routed_point_indices].drop(
columns="geometry"
)
route_writer.save(route_table_batch)
def _compute_point_batch(self, point_batch, radius, expand_radius):
"""Compute clipped features for a batch of points"""
if self._max_workers == 1:
return [
self._clip_point_job(job, radius, expand_radius)
for job in point_batch
]
num_workers = (
None if self._max_workers in {None, 0} else self._max_workers
)
logger.debug(
"Processing %d point(s) in parallel with Dask num_workers=%r",
len(point_batch),
num_workers,
)
tasks = [
dask.delayed(self._clip_point_job)(job, radius, expand_radius)
for job in point_batch
]
return list(
dask.compute(
*tasks,
scheduler="processes",
num_workers=num_workers,
)
)
def _clip_point_job(self, job, radius, expand_radius):
"""Clip features for one point and return point metadata"""
conn_id, row_ind, point = job
clipped_features = self._clip_to_point(point, radius, expand_radius)
return row_ind, conn_id, clipped_features
def _map_points_to_nearest_region(self, points, clip_points_to_regions):
"""Map points to nearest region; optionally clip to regions"""
if clip_points_to_regions:
logger.info("Clipping points to regions...")
logger.debug("\t- Initial number of points: %d", len(points))
points = gpd.sjoin(
points,
self._regions[["geometry"]],
predicate="within",
how="inner",
).drop(columns="index_right")
logger.debug("\t- Final number of points: %d", len(points))
map_func = region_mapper(self._regions, self._rid_column)
points[self._rid_column] = points.geometry.apply(map_func)
return points
def _clip_to_point(self, point, radius=None, expand_radius=True):
"""Clip features to be within the point region"""
logger.debug("Clipping features to point:\n%s", point)
features = None
if self._regions is not None:
features = self._clip_to_region(point)
if radius is not None:
features = self._clip_to_radius(
point, radius, features, expand_radius
)
return _filter_transmission_features(features)
def _clip_to_region(self, point):
"""Clip features to region and record the region ID"""
rid = point[self._rid_column]
mask = self._regions[self._rid_column] == rid
region = self._regions[mask]
logger.debug(" - Clipping features to region: %s", rid)
features = self._clipped_features(region.geometry, features=None)
features[self._rid_column] = rid
logger.debug(
"%d transmission feature(s) found in region with id %s ",
len(features),
rid,
)
return features
def _clip_to_radius(
self, point, radius, input_features=None, expand_radius=True
):
"""Clip features to radius
If no features are found within the initial radius, it is
expanded (linearly by incrementally increasing the clipping
buffer) until at least one connection feature is found.
"""
if radius is None:
return input_features
if input_features is not None and len(input_features) == 0:
return input_features
if isinstance(radius, str):
radius = point[radius]
clipped_features = self._clipped_features(
point.geometry.buffer(radius), features=input_features
)
clipping_buffer = 1
ind = 0
while expand_radius and len(clipped_features) <= 0:
clipping_buffer += 0.05
clipped_features = self._clipped_features(
point.geometry.buffer(radius * clipping_buffer),
input_features,
)
ind += 1
if ind >= _WHILE_LOOP_ITER_MAX: # pragma: no cover
msg = "Maximum iterations reached when expanding radius!"
raise revrtRuntimeError(msg)
logger.info(
"%d transmission features found in clipped area with radius %.2f",
len(clipped_features),
radius * clipping_buffer,
)
return clipped_features.copy(deep=True)
def _clipped_features(self, region, features=None):
"""Clip features to region"""
if features is None:
features = gpd.read_file(self._features_fp, mask=region).to_crs(
self._crs
)
return gpd.clip(features, region)
def _drop_unpaired_points(self, points):
"""Drop points that did not map to any features"""
feature_ids = points[self._conn_id_column]
if not feature_ids.isna().any():
return points
empty_point_indices = feature_ids[feature_ids.isna()].index.tolist()
msg = (
f"No features found for {len(empty_point_indices)} point(s) "
f"at the following indices: {empty_point_indices}"
)
warn(msg, revrtWarning)
points = points[~feature_ids.isna()]
return points.reset_index(drop=True)
[docs]
def map_to_costs(route_points, crs, transform, shape):
"""Map route table to cost indices and drop out-of-bounds rows
Parameters
----------
route_points : pandas.DataFrame
Route definitions table with at least `start_lat`, `start_lon`,
`end_lat`, and `end_lon` coordinate columns.
crs : str or pyproj.crs.CRS
Coordinate reference system for the cost raster.
transform : affine.Affine
Rasterio affine transform giving pixel origin and resolution.
shape : tuple
Raster height and width for bounds checking.
Returns
-------
pandas.DataFrame
Updated route table filtered to routes within the cost domain.
"""
logger.debug("Map %d routes to cost raster", len(route_points))
logger.debug("First few routes:\n%s", route_points.head())
logger.debug("Transform:\n%s", transform)
route_points = convert_lat_lon_to_row_col(
route_points,
crs,
transform,
lat_col="start_lat",
lon_col="start_lon",
out_row_name="start_row",
out_col_name="start_col",
)
route_points = convert_lat_lon_to_row_col(
route_points,
crs,
transform,
lat_col="end_lat",
lon_col="end_lon",
out_row_name="end_row",
out_col_name="end_col",
)
logger.debug("Mapping done!")
return filter_points_outside_cost_domain(route_points, shape)
[docs]
def make_rev_sc_points(excl_rows, excl_cols, crs, transform, resolution=128):
"""Generate reV supply curve points for a given exclusion grid size
Parameters
----------
excl_rows, excl_cols : int
Dimensions of the exclusion grid.
crs : str or pyproj.crs.CRS
Coordinate reference system for the exclusion raster.
transform : affine.Affine
Affine transform object representing the exclusion raster
transformation.
resolution : int, default=128
Resolution of the supply curve grid w.r.t the exclusion grid
(i.e. reV aggregation factor). By default, ``128``.
Returns
-------
geopandas.GeoDataFrame
Supply curve points with geometry and start row/column indices.
"""
n_rows = int(np.ceil(excl_rows / resolution))
n_cols = int(np.ceil(excl_cols / resolution))
sc_col_ind, sc_row_ind = np.meshgrid(np.arange(n_cols), np.arange(n_rows))
sc_points = pd.DataFrame(
{
"sc_row_ind": sc_row_ind.flatten().copy(),
"sc_col_ind": sc_col_ind.flatten().copy(),
}
)
sc_points.index.name = "gid"
sc_points["sc_point_gid"] = sc_points.index.to_numpy()
row = np.round(sc_points["sc_row_ind"] * resolution + resolution / 2)
sc_points["start_row"] = row.astype(int)
col = np.round(sc_points["sc_col_ind"] * resolution + resolution / 2)
sc_points["start_col"] = col.astype(int)
x, y = rasterio.transform.xy(
transform, sc_points["start_row"].values, sc_points["start_col"].values
)
geo = [Point(xy) for xy in zip(x, y, strict=True)]
sc_points = gpd.GeoDataFrame(sc_points, crs=crs, geometry=geo)
lon, lat = transform_xy(crs, "EPSG:4326", x, y)
sc_points["latitude"] = lat
sc_points["longitude"] = lon
return gpd.GeoDataFrame(sc_points, crs=crs, geometry=geo)
[docs]
def points_csv_to_geo_dataframe(points, crs, transform):
"""Convert points CSV or DataFrame to GeoDataFrame
Parameters
----------
points : path-like or pandas.DataFrame
Points CSV file path or DataFrame. If a file path is given, the
file is read as a CSV. The CSV or DataFrame must have `latitude`
and `longitude` columns.
crs : str or pyproj.crs.CRS
Coordinate reference system for the exclusion raster.
transform : affine.Affine
Affine transform object representing the exclusion raster
transformation.
Returns
-------
geopandas.GeoDataFrame
Points as a GeoDataFrame in the given CRS. Will contain
`start_row` and `start_col` columns giving the row and column
indices in the cost raster.
"""
with contextlib.suppress(TypeError, UnicodeDecodeError):
points = pd.read_csv(points)
points = convert_lat_lon_to_row_col(
points,
crs,
transform,
lat_col="latitude",
lon_col="longitude",
out_row_name="start_row",
out_col_name="start_col",
)
return gpd.GeoDataFrame(
points,
geometry=gpd.points_from_xy(points.longitude, points.latitude),
crs="EPSG:4326",
).to_crs(crs)
[docs]
def convert_lat_lon_to_row_col(
points,
crs,
transform,
lat_col="latitude",
lon_col="longitude",
out_row_name="start_row",
out_col_name="start_col",
):
"""Convert latitude and longitude columns to row and column indices
Parameters
----------
points : pandas.DataFrame or geopandas.GeoDataFrame
Data with latitude and longitude columns that need to be
converted to row and column indices.
crs : str or pyproj.crs.CRS
Coordinate reference system for the target grid.
transform : affine.Affine
Affine transform for the target grid.
lat_col : str, default="latitude"
Name of latitude column in input `point`.
By default, ``"latitude"``.
lon_col : str, default="longitude"
Name of longitude column in input `point`.
By default, ``"longitude"``.
out_row_name : str, default="start_row"
Name of output column for row indices.
By default, ``"start_row"``.
out_col_name : str, default="start_col"
Name of output column for column indices.
By default, ``"start_col"``.
Returns
-------
pandas.DataFrame or geopandas.GeoDataFrame
Input `points` with added row and column index columns.
"""
start_lat = points[lat_col].astype("float32")
start_lon = points[lon_col].astype("float32")
start_row, start_col = _transform_lat_lon_to_row_col(
transform, crs, start_lat, start_lon
)
points[out_row_name] = start_row.astype("int32")
points[out_col_name] = start_col.astype("int32")
return points
[docs]
def filter_points_outside_cost_domain(route_table, shape):
"""Drop routes whose indices fall outside the cost domain
Parameters
----------
route_table : pandas.DataFrame or geopandas.GeoDataFrame
Route definitions table with at least `start_row`, `start_col`.
Can also optionally have `end_row`, and `end_col` columns.
shape : tuple
Raster height and width for bounds checking.
Returns
-------
pandas.DataFrame or geopandas.GeoDataFrame
Input `points` filtered to only those within the cost domain.
"""
logger.debug("Filtering out points outside cost domain...")
mask = route_table["start_row"] >= 0
mask &= route_table["start_row"] < shape[0]
mask &= route_table["start_col"] >= 0
mask &= route_table["start_col"] < shape[1]
if "end_row" in route_table.columns:
mask &= route_table["end_row"] >= 0
mask &= route_table["end_row"] < shape[0]
if "end_col" in route_table.columns:
mask &= route_table["end_col"] >= 0
mask &= route_table["end_col"] < shape[1]
logger.debug("Mask computed!")
if any(~mask):
msg = (
f"The following {sum(~mask)} feature(s) are outside of the cost "
f"domain and will be dropped:\n{route_table.loc[~mask]}"
)
warn(msg, revrtWarning)
route_table = route_table.loc[mask].reset_index(drop=True)
return route_table
def compute_lens(route, cell_size):
"""[NOT PUBLIC API] Compute lengths of LCP"""
# Use Pythagorean theorem to calculate length between cells (km)
# Use c**2 = a**2 + b**2 to determine length of individual paths
lens = np.sqrt(np.sum(np.diff(route, axis=0) ** 2, axis=1))
total_path_length = np.sum(lens) * cell_size / 1000
# Need to determine distance coming into and out of any cell.
# Assume paths start and end at the center of a cell. Therefore,
# distance traveled in the cell is half the distance entering it
# and half the distance exiting it. Duplicate all lengths,
# pad 0s on ends for start and end cells, and divide all
# distance by half.
lens = np.repeat(lens, 2)
lens = np.insert(np.append(lens, 0), 0, 0)
lens /= 2
# Group entrance and exits distance together, and add them
lens = lens.reshape((int(lens.shape[0] / 2), 2))
lens = np.sum(lens, axis=1)
return lens, total_path_length
def _transform_lat_lon_to_row_col(transform, cost_crs, lat, lon):
"""Convert WGS84 coordinates to cost grid row and column arrays"""
x, y = transform_xy("EPSG:4326", cost_crs, lon, lat)
row, col = rasterio.transform.rowcol(transform, x, y)
row = np.array(row)
col = np.array(col)
return row, col
def _init_streaming_writer(feature_out_fp):
"""Initialize an incremental writer for clipped features"""
feature_out_fp = Path(feature_out_fp)
if feature_out_fp.suffix.lower() != ".gpkg":
msg = (
"Output feature file should have a '.gpkg' extension to "
f"ensure proper format! Got input file: '{feature_out_fp}'. "
"Adding '.gpkg' extension... "
)
warn(msg, revrtWarning)
feature_out_fp = feature_out_fp.with_suffix(".gpkg")
return IncrementalWriter(feature_out_fp)
def _filter_transmission_features(features):
"""Filter loaded transmission features"""
features = features.drop(
columns=["bgid", "egid", "cap_left"], errors="ignore"
)
features = features.rename(columns={"gid": "trans_gid"})
if "category" in features.columns:
features = _drop_empty_categories(features)
return features.reset_index(drop=True)
def _drop_empty_categories(features):
"""Drop features with empty category field"""
mask = features["category"].isna()
if mask.any():
msg = f"Dropping {mask.sum():,} feature(s) with NaN category!"
warn(msg, revrtWarning)
features = features[~mask].reset_index(drop=True)
return features
def _resume_points(
points, feature_out_fp, route_table_out_fp, conn_id_column, rid_column
):
"""Recover previously completed mappings from prior outputs"""
if feature_out_fp is None or route_table_out_fp is None:
return points, 0
feature_out_fp = Path(feature_out_fp)
route_table_out_fp = Path(route_table_out_fp)
feature_output_missing = not feature_out_fp.exists()
route_table_missing = not route_table_out_fp.exists()
if feature_output_missing or route_table_missing:
feature_out_fp.unlink(missing_ok=True)
route_table_out_fp.unlink(missing_ok=True)
logger.info(
"Feature output missing: %r; route table missing: %r. "
"Restarting point mapping...",
feature_output_missing,
route_table_missing,
)
return points, 0
route_table = _read_route_table(route_table_out_fp)
if route_table.empty:
feature_out_fp.unlink(missing_ok=True)
route_table_out_fp.unlink(missing_ok=True)
logger.info("Existing route table is empty; restarting point mapping")
return points, 0
key_columns = _resume_key_columns(points, route_table, rid_column)
existing_route_table = route_table[[*key_columns, conn_id_column]]
existing_route_table = existing_route_table.drop_duplicates(
subset=key_columns, keep="last"
)
conn_ids = pd.to_numeric(
existing_route_table[conn_id_column], errors="coerce"
)
if conn_ids.isna().any():
msg = (
"Cannot resume point-to-feature mapping because the "
f"existing route table contains invalid "
f"'{conn_id_column}' values"
)
raise revrtValueError(msg)
existing_lookup = existing_route_table.set_index(key_columns)[
conn_id_column
]
point_keys = pd.MultiIndex.from_frame(points[key_columns])
points[conn_id_column] = existing_lookup.reindex(point_keys).to_numpy()
completed = int(points[conn_id_column].notna().sum())
logger.info(
"Resuming point-to-feature mapping from %d completed point(s)",
completed,
)
next_conn_id = int(conn_ids.max()) + 1
return points, next_conn_id
def _resume_key_columns(points, route_table, rid_column):
"""Determine columns used to match existing route rows"""
key_columns = ["start_row", "start_col"]
missing_columns = [
col
for col in key_columns
if col not in points.columns or col not in route_table.columns
]
if missing_columns:
msg = (
"Cannot resume point-to-feature mapping because the "
"existing route table is missing required columns: "
f"{missing_columns}"
)
raise revrtValueError(msg)
if rid_column in points.columns and rid_column in route_table.columns:
key_columns.append(rid_column)
return key_columns
def _read_route_table(route_table_out_fp):
"""Load a route table from disk, tolerating empty files"""
try:
return pd.read_csv(route_table_out_fp)
except pd.errors.EmptyDataError:
return pd.DataFrame()
def _add_voltage_polarity(route_table, voltages, polarities):
"""Add explicit per-option voltage and polarity columns"""
voltages_by_option = _normalize_route_value_mapping(
voltages, value_name="voltages"
)
polarities_by_option = _normalize_route_value_mapping(
polarities, value_name="polarities"
)
routing_options = list(
dict.fromkeys([*voltages_by_option, *polarities_by_option])
)
if not routing_options:
return route_table
per_option_values = []
for option in routing_options:
voltage_values = voltages_by_option.get(option) or ["unknown"]
polarity_values = polarities_by_option.get(option) or ["unknown"]
per_option_values.append(
[
{
f"voltage_{option}": voltage,
f"polarity_{option}": polarity,
}
for voltage, polarity in product(
voltage_values, polarity_values
)
]
)
route_tables = []
for option_values in product(*per_option_values):
route_value_columns = {}
for value_mapping in option_values:
route_value_columns.update(value_mapping)
route_tables.append(route_table.assign(**route_value_columns))
return pd.concat(route_tables, ignore_index=True)
def _normalize_route_value_mapping(route_values, value_name):
"""Validate and normalize per-option route value mappings"""
if route_values is None:
return {}
if not isinstance(route_values, Mapping):
msg = (
f"`{value_name}` must be a dict mapping routing options to "
"lists of values"
)
raise revrtValueError(msg)
normalized = {}
for option, values in route_values.items():
if isinstance(values, (str, bytes)) or not isinstance(
values, Iterable
):
msg = (
f"`{value_name}` values must be lists of route values. "
f"Got {values!r} for routing option {option!r}"
)
raise revrtValueError(msg)
normalized[option] = list(values)
return normalized