"""reVRt routing module utilities"""
import logging
import contextlib
from pathlib import Path
from warnings import warn
import rasterio
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from revrt.warn import revrtWarning
from revrt.utilities.base import region_mapper
from revrt.utilities.handlers import IncrementalWriter
from revrt.exceptions import revrtValueError, revrtRuntimeError
logger = logging.getLogger(__name__)
_WHILE_LOOP_ITER_MAX = 10_000
[docs]
class PointToFeatureMapper:
"""Map points to features within specified regions and/or radii"""
def __init__(
self,
crs,
features_fp,
regions=None,
region_identifier_column="rid",
connection_identifier_column="end_feat_id",
):
"""
Parameters
----------
crs : str or pyproj.crs.CRS
Coordinate reference system to use for all spatial data.
features_fp : path-like
File path to transmission features to map points to.
regions : geopandas.GeoDataFrame or path-like, optional
Regions to use for clipping features around points. Features
will not extend beyond these regions, and point will only
map to features within their own region. This input can be
paired with the `radius` parameter (e.g. to forbid any
connections to cross state boundaries, even if that
connection would be shorter than some desired length). At
least one of `regions` or `radius` must be provided;
otherwise, an error is raised. By default, ``None``.
region_identifier_column : str, default="rid"
Column in `regions` data to use as the identifier for that
region. If not given, a simple index will be put under the
`rid` column. By default, ``"rid"``.
connection_identifier_column : str, optional
Column in output data (both features and points) that will
be used to link the points to the features that should be
routed to. By default, ``"end_feat_id"``.
"""
self._crs = crs
self._features_fp = features_fp
self._regions = None
self._rid_column = region_identifier_column
self._conn_id_column = connection_identifier_column
self._set_regions(regions)
def _set_regions(self, regions):
"""Set the regions GeoDataFrame"""
if regions is None:
return
try:
self._regions = regions.to_crs(self._crs)
except AttributeError:
self._regions = gpd.read_file(regions).to_crs(self._crs)
if self._rid_column not in self._regions.columns:
self._regions[self._rid_column] = range(len(self._regions))
[docs]
def map_points(
self,
points,
feature_out_fp,
radius=None,
expand_radius=True,
clip_points_to_regions=False,
batch_size=500,
):
"""Map points to features within the point region
Parameters
----------
points : geopandas.GeoDataFrame
Points to map to features.
feature_out_fp : path-like
File path to save clipped features to. This should end in
'.gpkg' to ensure proper format. If not, the extension will
be added automatically. This file will contain all features
clipped to each point (with a feature ID column added to
link back to the points).
radius : float or str, optional
Radius (in CRS units) around each point to clip features to.
If `str`, the column in `points` to use for radius values.
If ``None``, only regions are used for clipping.
By default, ``None``.
expand_radius : bool, optional
If ``True``, the radius is expanded until at least one
feature is found. By default, ``True``.
clip_points_to_regions : bool, default=False
If ``True``, points are clipped to the given regions before
mapping to features. If the `regions` input is not set,
this parameter has no effect. If ``False``, all points are
used as-is, which means points outside of the regions domain
are mapped to the closest region. By default, ``False``.
batch_size : int, default=500
Number of points to process in each batch when writing
clipped features to file. By default, ``500``.
Returns
-------
geopandas.GeoDataFrame
Input points with added column giving mapped feature IDs.
"""
if self._regions is None and radius is None:
msg = (
"Must provide either `regions` or a radius to map points "
"to features!"
)
raise revrtValueError(msg)
points = points.to_crs(self._crs)
points[self._conn_id_column] = np.nan
if self._regions is not None:
points = self._map_points_to_nearest_region(
points, clip_points_to_regions
)
writer = _init_streaming_writer(feature_out_fp)
batches = []
for ind, (row_ind, point) in enumerate(points.iterrows()):
clipped_features = self._clip_to_point(
point, radius, expand_radius
)
if clipped_features.empty:
continue
clipped_features[self._conn_id_column] = ind
points.loc[row_ind, self._conn_id_column] = ind
batches.append(clipped_features)
if len(batches) >= batch_size:
logger.debug("Dumping batch of features to file...")
writer.save(pd.concat(batches))
batches = []
if batches:
logger.debug("Dumping batch of features to file...")
writer.save(pd.concat(batches))
return self._drop_unpaired_points(points)
def _map_points_to_nearest_region(self, points, clip_points_to_regions):
"""Map points to nearest region; optionally clip to regions"""
if clip_points_to_regions:
logger.info("Clipping points to regions...")
logger.debug("\t- Initial number of points: %d", len(points))
points = gpd.sjoin(
points,
self._regions[["geometry"]],
predicate="within",
how="inner",
).drop(columns="index_right")
logger.debug("\t- Final number of points: %d", len(points))
map_func = region_mapper(self._regions, self._rid_column)
points[self._rid_column] = points.centroid.apply(map_func)
return points
def _clip_to_point(self, point, radius=None, expand_radius=True):
"""Clip features to be within the point region"""
logger.debug("Clipping features to point:\n%s", point)
features = None
if self._regions is not None:
features = self._clip_to_region(point)
if radius is not None:
features = self._clip_to_radius(
point, radius, features, expand_radius
)
return _filter_transmission_features(features)
def _clip_to_region(self, point):
"""Clip features to region and record the region ID"""
rid = point[self._rid_column]
mask = self._regions[self._rid_column] == rid
region = self._regions[mask]
logger.debug(" - Clipping features to region: %s", rid)
features = self._clipped_features(region.geometry, features=None)
features[self._rid_column] = rid
logger.debug(
"%d transmission features found in region with id %s ",
len(features),
rid,
)
return features
def _clip_to_radius(
self, point, radius, input_features=None, expand_radius=True
):
"""Clip features to radius
If no features are found within the initial radius, it is
expanded (linearly by incrementally increasing the clipping
buffer) until at least one connection feature is found.
"""
if radius is None:
return input_features
if input_features is not None and len(input_features) == 0:
return input_features
if isinstance(radius, str):
radius = point[radius]
clipped_features = self._clipped_features(
point.geometry.buffer(radius), features=input_features
)
clipping_buffer = 1
ind = 0
while expand_radius and len(clipped_features) <= 0:
clipping_buffer += 0.05
clipped_features = self._clipped_features(
point.geometry.buffer(radius * clipping_buffer),
input_features,
)
ind += 1
if ind >= _WHILE_LOOP_ITER_MAX: # pragma: no cover
msg = "Maximum iterations reached when expanding radius!"
raise revrtRuntimeError(msg)
logger.info(
"%d transmission features found in clipped area with radius %.2f",
len(clipped_features),
radius * clipping_buffer,
)
return clipped_features.copy(deep=True)
def _clipped_features(self, region, features=None):
"""Clip features to region"""
if features is None:
features = gpd.read_file(self._features_fp, mask=region).to_crs(
self._crs
)
return gpd.clip(features, region)
def _drop_unpaired_points(self, points):
"""Drop points that did not map to any features"""
feature_ids = points[self._conn_id_column]
if not feature_ids.isna().any():
return points
empty_point_indices = feature_ids[feature_ids.isna()].index.tolist()
msg = (
f"No features found for {len(empty_point_indices)} point(s) "
f"at the following indices: {empty_point_indices}"
)
warn(msg, revrtWarning)
points = points[~feature_ids.isna()]
return points.reset_index(drop=True)
[docs]
def map_to_costs(route_points, crs, transform, shape):
"""Map route table to cost indices and drop out-of-bounds rows
Parameters
----------
route_points : pandas.DataFrame
Route definitions table with at least `start_lat`, `start_lon`,
`end_lat`, and `end_lon` coordinate columns.
crs : str or pyproj.crs.CRS
Coordinate reference system for the cost raster.
transform : affine.Affine
Rasterio affine transform giving pixel origin and resolution.
shape : tuple
Raster height and width for bounds checking.
Returns
-------
pandas.DataFrame
Updated route table filtered to routes within the cost domain.
"""
logger.debug("Map %d routes to cost raster", len(route_points))
logger.debug("First few routes:\n%s", route_points.head())
logger.debug("Transform:\n%s", transform)
route_points = convert_lat_lon_to_row_col(
route_points,
crs,
transform,
lat_col="start_lat",
lon_col="start_lon",
out_row_name="start_row",
out_col_name="start_col",
)
route_points = convert_lat_lon_to_row_col(
route_points,
crs,
transform,
lat_col="end_lat",
lon_col="end_lon",
out_row_name="end_row",
out_col_name="end_col",
)
logger.debug("Mapping done!")
return filter_points_outside_cost_domain(route_points, shape)
[docs]
def make_rev_sc_points(excl_rows, excl_cols, crs, transform, resolution=128):
"""Generate reV supply curve points for a given exclusion grid size
Parameters
----------
excl_rows, excl_cols : int
Dimensions of the exclusion grid.
crs : str or pyproj.crs.CRS
Coordinate reference system for the exclusion raster.
transform : affine.Affine
Affine transform object representing the exclusion raster
transformation.
resolution : int, default=128
Resolution of the supply curve grid w.r.t the exclusion grid
(i.e. reV aggregation factor). By default, ``128``.
Returns
-------
geopandas.GeoDataFrame
Supply curve points with geometry and start row/column indices.
"""
n_rows = int(np.ceil(excl_rows / resolution))
n_cols = int(np.ceil(excl_cols / resolution))
sc_col_ind, sc_row_ind = np.meshgrid(np.arange(n_cols), np.arange(n_rows))
sc_points = pd.DataFrame(
{
"sc_row_ind": sc_row_ind.flatten().copy(),
"sc_col_ind": sc_col_ind.flatten().copy(),
}
)
sc_points.index.name = "gid"
sc_points["sc_point_gid"] = sc_points.index.to_numpy()
row = np.round(sc_points["sc_row_ind"] * resolution + resolution / 2)
sc_points["start_row"] = row.astype(int)
col = np.round(sc_points["sc_col_ind"] * resolution + resolution / 2)
sc_points["start_col"] = col.astype(int)
x, y = rasterio.transform.xy(
transform, sc_points["start_row"].values, sc_points["start_col"].values
)
geo = [Point(xy) for xy in zip(x, y, strict=True)]
sc_points = gpd.GeoDataFrame(sc_points, crs=crs, geometry=geo)
lat, lon = list(
zip(
*((p.y, p.x) for p in sc_points.to_crs("EPSG:4326").geometry),
strict=True,
)
)
sc_points["latitude"] = lat
sc_points["longitude"] = lon
return gpd.GeoDataFrame(sc_points, crs=crs, geometry=geo)
[docs]
def points_csv_to_geo_dataframe(points, crs, transform):
"""Convert points CSV or DataFrame to GeoDataFrame
Parameters
----------
points : path-like or pandas.DataFrame
Points CSV file path or DataFrame. If a file path is given, the
file is read as a CSV. The CSV or DataFrame must have `latitude`
and `longitude` columns.
crs : str or pyproj.crs.CRS
Coordinate reference system for the exclusion raster.
transform : affine.Affine
Affine transform object representing the exclusion raster
transformation.
Returns
-------
geopandas.GeoDataFrame
Points as a GeoDataFrame in the given CRS. Will contain
`start_row` and `start_col` columns giving the row and column
indices in the cost raster.
"""
with contextlib.suppress(TypeError, UnicodeDecodeError):
points = pd.read_csv(points)
points = convert_lat_lon_to_row_col(
points,
crs,
transform,
lat_col="latitude",
lon_col="longitude",
out_row_name="start_row",
out_col_name="start_col",
)
return gpd.GeoDataFrame(
points,
geometry=gpd.points_from_xy(points.longitude, points.latitude),
crs="EPSG:4326",
).to_crs(crs)
[docs]
def convert_lat_lon_to_row_col(
points,
crs,
transform,
lat_col="latitude",
lon_col="longitude",
out_row_name="start_row",
out_col_name="start_col",
):
"""Convert latitude and longitude columns to row and column indices
Parameters
----------
points : pandas.DataFrame or geopandas.GeoDataFrame
Data with latitude and longitude columns that need to be
converted to row and column indices.
crs : str or pyproj.crs.CRS
Coordinate reference system for the target grid.
transform : affine.Affine
Affine transform for the target grid.
lat_col : str, default="latitude"
Name of latitude column in input `point`.
By default, ``"latitude"``.
lon_col : str, default="longitude"
Name of longitude column in input `point`.
By default, ``"longitude"``.
out_row_name : str, default="start_row"
Name of output column for row indices.
By default, ``"start_row"``.
out_col_name : str, default="start_col"
Name of output column for column indices.
By default, ``"start_col"``.
Returns
-------
pandas.DataFrame or geopandas.GeoDataFrame
Input `points` with added row and column index columns.
"""
start_lat = points[lat_col].astype("float32")
start_lon = points[lon_col].astype("float32")
start_row, start_col = _transform_lat_lon_to_row_col(
transform, crs, start_lat, start_lon
)
points[out_row_name] = start_row.astype("int32")
points[out_col_name] = start_col.astype("int32")
return points
[docs]
def filter_points_outside_cost_domain(route_table, shape):
"""Drop routes whose indices fall outside the cost domain
Parameters
----------
route_table : pandas.DataFrame or geopandas.GeoDataFrame
Route definitions table with at least `start_row`, `start_col`.
Can also optionally have `end_row`, and `end_col` columns.
shape : tuple
Raster height and width for bounds checking.
Returns
-------
pandas.DataFrame or geopandas.GeoDataFrame
Input `points` filtered to only those within the cost domain.
"""
logger.debug("Filtering out points outside cost domain...")
mask = route_table["start_row"] >= 0
mask &= route_table["start_row"] < shape[0]
mask &= route_table["start_col"] >= 0
mask &= route_table["start_col"] < shape[1]
if "end_row" in route_table.columns:
mask &= route_table["end_row"] >= 0
mask &= route_table["end_row"] < shape[0]
if "end_col" in route_table.columns:
mask &= route_table["end_col"] >= 0
mask &= route_table["end_col"] < shape[1]
logger.debug("Mask computed!")
if any(~mask):
msg = (
f"The following {sum(~mask)} feature(s) are outside of the cost "
f"domain and will be dropped:\n{route_table.loc[~mask]}"
)
warn(msg, revrtWarning)
route_table = route_table.loc[mask].reset_index(drop=True)
return route_table
def _transform_lat_lon_to_row_col(transform, cost_crs, lat, lon):
"""Convert WGS84 coordinates to cost grid row and column arrays"""
feats = gpd.GeoDataFrame(
geometry=[Point(*p) for p in zip(lon, lat, strict=True)]
)
coords = feats.set_crs("EPSG:4326").to_crs(cost_crs)["geometry"].centroid
row, col = rasterio.transform.rowcol(
transform, coords.x.values, coords.y.values
)
row = np.array(row)
col = np.array(col)
return row, col
def _init_streaming_writer(feature_out_fp):
"""Initialize an incremental writer for clipped features"""
feature_out_fp = Path(feature_out_fp)
if feature_out_fp.suffix.lower() != ".gpkg":
msg = (
"Output feature file should have a '.gpkg' extension to "
f"ensure proper format! Got input file: '{feature_out_fp}'. "
"Adding '.gpkg' extension... "
)
warn(msg, revrtWarning)
feature_out_fp = feature_out_fp.with_suffix(".gpkg")
return IncrementalWriter(feature_out_fp)
def _filter_transmission_features(features):
"""Filter loaded transmission features"""
features = features.drop(
columns=["bgid", "egid", "cap_left"], errors="ignore"
)
features = features.rename(columns={"gid": "trans_gid"})
if "category" in features.columns:
features = _drop_empty_categories(features)
return features.reset_index(drop=True)
def _drop_empty_categories(features):
"""Drop features with empty category field"""
mask = features["category"].isna()
if mask.any():
msg = f"Dropping {mask.sum():,} feature(s) with NaN category!"
warn(msg, revrtWarning)
features = features[~mask].reset_index(drop=True)
return features