Source code for revrt.routing.utilities

"""reVRt routing module utilities"""

import logging
import contextlib
from pathlib import Path
from warnings import warn

import rasterio
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point

from revrt.warn import revrtWarning
from revrt.utilities.base import region_mapper
from revrt.utilities.handlers import IncrementalWriter
from revrt.exceptions import revrtValueError, revrtRuntimeError


logger = logging.getLogger(__name__)
_WHILE_LOOP_ITER_MAX = 10_000


[docs] class PointToFeatureMapper: """Map points to features within specified regions and/or radii""" def __init__( self, crs, features_fp, regions=None, region_identifier_column="rid", connection_identifier_column="end_feat_id", ): """ Parameters ---------- crs : str or pyproj.crs.CRS Coordinate reference system to use for all spatial data. features_fp : path-like File path to transmission features to map points to. regions : geopandas.GeoDataFrame or path-like, optional Regions to use for clipping features around points. Features will not extend beyond these regions, and point will only map to features within their own region. This input can be paired with the `radius` parameter (e.g. to forbid any connections to cross state boundaries, even if that connection would be shorter than some desired length). At least one of `regions` or `radius` must be provided; otherwise, an error is raised. By default, ``None``. region_identifier_column : str, default="rid" Column in `regions` data to use as the identifier for that region. If not given, a simple index will be put under the `rid` column. By default, ``"rid"``. connection_identifier_column : str, optional Column in output data (both features and points) that will be used to link the points to the features that should be routed to. By default, ``"end_feat_id"``. """ self._crs = crs self._features_fp = features_fp self._regions = None self._rid_column = region_identifier_column self._conn_id_column = connection_identifier_column self._set_regions(regions) def _set_regions(self, regions): """Set the regions GeoDataFrame""" if regions is None: return try: self._regions = regions.to_crs(self._crs) except AttributeError: self._regions = gpd.read_file(regions).to_crs(self._crs) if self._rid_column not in self._regions.columns: self._regions[self._rid_column] = range(len(self._regions))
[docs] def map_points( self, points, feature_out_fp, radius=None, expand_radius=True, clip_points_to_regions=False, batch_size=500, ): """Map points to features within the point region Parameters ---------- points : geopandas.GeoDataFrame Points to map to features. feature_out_fp : path-like File path to save clipped features to. This should end in '.gpkg' to ensure proper format. If not, the extension will be added automatically. This file will contain all features clipped to each point (with a feature ID column added to link back to the points). radius : float or str, optional Radius (in CRS units) around each point to clip features to. If `str`, the column in `points` to use for radius values. If ``None``, only regions are used for clipping. By default, ``None``. expand_radius : bool, optional If ``True``, the radius is expanded until at least one feature is found. By default, ``True``. clip_points_to_regions : bool, default=False If ``True``, points are clipped to the given regions before mapping to features. If the `regions` input is not set, this parameter has no effect. If ``False``, all points are used as-is, which means points outside of the regions domain are mapped to the closest region. By default, ``False``. batch_size : int, default=500 Number of points to process in each batch when writing clipped features to file. By default, ``500``. Returns ------- geopandas.GeoDataFrame Input points with added column giving mapped feature IDs. """ if self._regions is None and radius is None: msg = ( "Must provide either `regions` or a radius to map points " "to features!" ) raise revrtValueError(msg) points = points.to_crs(self._crs) points[self._conn_id_column] = np.nan if self._regions is not None: points = self._map_points_to_nearest_region( points, clip_points_to_regions ) writer = _init_streaming_writer(feature_out_fp) batches = [] for ind, (row_ind, point) in enumerate(points.iterrows()): clipped_features = self._clip_to_point( point, radius, expand_radius ) if clipped_features.empty: continue clipped_features[self._conn_id_column] = ind points.loc[row_ind, self._conn_id_column] = ind batches.append(clipped_features) if len(batches) >= batch_size: logger.debug("Dumping batch of features to file...") writer.save(pd.concat(batches)) batches = [] if batches: logger.debug("Dumping batch of features to file...") writer.save(pd.concat(batches)) return self._drop_unpaired_points(points)
def _map_points_to_nearest_region(self, points, clip_points_to_regions): """Map points to nearest region; optionally clip to regions""" if clip_points_to_regions: logger.info("Clipping points to regions...") logger.debug("\t- Initial number of points: %d", len(points)) points = gpd.sjoin( points, self._regions[["geometry"]], predicate="within", how="inner", ).drop(columns="index_right") logger.debug("\t- Final number of points: %d", len(points)) map_func = region_mapper(self._regions, self._rid_column) points[self._rid_column] = points.centroid.apply(map_func) return points def _clip_to_point(self, point, radius=None, expand_radius=True): """Clip features to be within the point region""" logger.debug("Clipping features to point:\n%s", point) features = None if self._regions is not None: features = self._clip_to_region(point) if radius is not None: features = self._clip_to_radius( point, radius, features, expand_radius ) return _filter_transmission_features(features) def _clip_to_region(self, point): """Clip features to region and record the region ID""" rid = point[self._rid_column] mask = self._regions[self._rid_column] == rid region = self._regions[mask] logger.debug(" - Clipping features to region: %s", rid) features = self._clipped_features(region.geometry, features=None) features[self._rid_column] = rid logger.debug( "%d transmission features found in region with id %s ", len(features), rid, ) return features def _clip_to_radius( self, point, radius, input_features=None, expand_radius=True ): """Clip features to radius If no features are found within the initial radius, it is expanded (linearly by incrementally increasing the clipping buffer) until at least one connection feature is found. """ if radius is None: return input_features if input_features is not None and len(input_features) == 0: return input_features if isinstance(radius, str): radius = point[radius] clipped_features = self._clipped_features( point.geometry.buffer(radius), features=input_features ) clipping_buffer = 1 ind = 0 while expand_radius and len(clipped_features) <= 0: clipping_buffer += 0.05 clipped_features = self._clipped_features( point.geometry.buffer(radius * clipping_buffer), input_features, ) ind += 1 if ind >= _WHILE_LOOP_ITER_MAX: # pragma: no cover msg = "Maximum iterations reached when expanding radius!" raise revrtRuntimeError(msg) logger.info( "%d transmission features found in clipped area with radius %.2f", len(clipped_features), radius * clipping_buffer, ) return clipped_features.copy(deep=True) def _clipped_features(self, region, features=None): """Clip features to region""" if features is None: features = gpd.read_file(self._features_fp, mask=region).to_crs( self._crs ) return gpd.clip(features, region) def _drop_unpaired_points(self, points): """Drop points that did not map to any features""" feature_ids = points[self._conn_id_column] if not feature_ids.isna().any(): return points empty_point_indices = feature_ids[feature_ids.isna()].index.tolist() msg = ( f"No features found for {len(empty_point_indices)} point(s) " f"at the following indices: {empty_point_indices}" ) warn(msg, revrtWarning) points = points[~feature_ids.isna()] return points.reset_index(drop=True)
[docs] def map_to_costs(route_points, crs, transform, shape): """Map route table to cost indices and drop out-of-bounds rows Parameters ---------- route_points : pandas.DataFrame Route definitions table with at least `start_lat`, `start_lon`, `end_lat`, and `end_lon` coordinate columns. crs : str or pyproj.crs.CRS Coordinate reference system for the cost raster. transform : affine.Affine Rasterio affine transform giving pixel origin and resolution. shape : tuple Raster height and width for bounds checking. Returns ------- pandas.DataFrame Updated route table filtered to routes within the cost domain. """ logger.debug("Map %d routes to cost raster", len(route_points)) logger.debug("First few routes:\n%s", route_points.head()) logger.debug("Transform:\n%s", transform) route_points = convert_lat_lon_to_row_col( route_points, crs, transform, lat_col="start_lat", lon_col="start_lon", out_row_name="start_row", out_col_name="start_col", ) route_points = convert_lat_lon_to_row_col( route_points, crs, transform, lat_col="end_lat", lon_col="end_lon", out_row_name="end_row", out_col_name="end_col", ) logger.debug("Mapping done!") return filter_points_outside_cost_domain(route_points, shape)
[docs] def make_rev_sc_points(excl_rows, excl_cols, crs, transform, resolution=128): """Generate reV supply curve points for a given exclusion grid size Parameters ---------- excl_rows, excl_cols : int Dimensions of the exclusion grid. crs : str or pyproj.crs.CRS Coordinate reference system for the exclusion raster. transform : affine.Affine Affine transform object representing the exclusion raster transformation. resolution : int, default=128 Resolution of the supply curve grid w.r.t the exclusion grid (i.e. reV aggregation factor). By default, ``128``. Returns ------- geopandas.GeoDataFrame Supply curve points with geometry and start row/column indices. """ n_rows = int(np.ceil(excl_rows / resolution)) n_cols = int(np.ceil(excl_cols / resolution)) sc_col_ind, sc_row_ind = np.meshgrid(np.arange(n_cols), np.arange(n_rows)) sc_points = pd.DataFrame( { "sc_row_ind": sc_row_ind.flatten().copy(), "sc_col_ind": sc_col_ind.flatten().copy(), } ) sc_points.index.name = "gid" sc_points["sc_point_gid"] = sc_points.index.to_numpy() row = np.round(sc_points["sc_row_ind"] * resolution + resolution / 2) sc_points["start_row"] = row.astype(int) col = np.round(sc_points["sc_col_ind"] * resolution + resolution / 2) sc_points["start_col"] = col.astype(int) x, y = rasterio.transform.xy( transform, sc_points["start_row"].values, sc_points["start_col"].values ) geo = [Point(xy) for xy in zip(x, y, strict=True)] sc_points = gpd.GeoDataFrame(sc_points, crs=crs, geometry=geo) lat, lon = list( zip( *((p.y, p.x) for p in sc_points.to_crs("EPSG:4326").geometry), strict=True, ) ) sc_points["latitude"] = lat sc_points["longitude"] = lon return gpd.GeoDataFrame(sc_points, crs=crs, geometry=geo)
[docs] def points_csv_to_geo_dataframe(points, crs, transform): """Convert points CSV or DataFrame to GeoDataFrame Parameters ---------- points : path-like or pandas.DataFrame Points CSV file path or DataFrame. If a file path is given, the file is read as a CSV. The CSV or DataFrame must have `latitude` and `longitude` columns. crs : str or pyproj.crs.CRS Coordinate reference system for the exclusion raster. transform : affine.Affine Affine transform object representing the exclusion raster transformation. Returns ------- geopandas.GeoDataFrame Points as a GeoDataFrame in the given CRS. Will contain `start_row` and `start_col` columns giving the row and column indices in the cost raster. """ with contextlib.suppress(TypeError, UnicodeDecodeError): points = pd.read_csv(points) points = convert_lat_lon_to_row_col( points, crs, transform, lat_col="latitude", lon_col="longitude", out_row_name="start_row", out_col_name="start_col", ) return gpd.GeoDataFrame( points, geometry=gpd.points_from_xy(points.longitude, points.latitude), crs="EPSG:4326", ).to_crs(crs)
[docs] def convert_lat_lon_to_row_col( points, crs, transform, lat_col="latitude", lon_col="longitude", out_row_name="start_row", out_col_name="start_col", ): """Convert latitude and longitude columns to row and column indices Parameters ---------- points : pandas.DataFrame or geopandas.GeoDataFrame Data with latitude and longitude columns that need to be converted to row and column indices. crs : str or pyproj.crs.CRS Coordinate reference system for the target grid. transform : affine.Affine Affine transform for the target grid. lat_col : str, default="latitude" Name of latitude column in input `point`. By default, ``"latitude"``. lon_col : str, default="longitude" Name of longitude column in input `point`. By default, ``"longitude"``. out_row_name : str, default="start_row" Name of output column for row indices. By default, ``"start_row"``. out_col_name : str, default="start_col" Name of output column for column indices. By default, ``"start_col"``. Returns ------- pandas.DataFrame or geopandas.GeoDataFrame Input `points` with added row and column index columns. """ start_lat = points[lat_col].astype("float32") start_lon = points[lon_col].astype("float32") start_row, start_col = _transform_lat_lon_to_row_col( transform, crs, start_lat, start_lon ) points[out_row_name] = start_row.astype("int32") points[out_col_name] = start_col.astype("int32") return points
[docs] def filter_points_outside_cost_domain(route_table, shape): """Drop routes whose indices fall outside the cost domain Parameters ---------- route_table : pandas.DataFrame or geopandas.GeoDataFrame Route definitions table with at least `start_row`, `start_col`. Can also optionally have `end_row`, and `end_col` columns. shape : tuple Raster height and width for bounds checking. Returns ------- pandas.DataFrame or geopandas.GeoDataFrame Input `points` filtered to only those within the cost domain. """ logger.debug("Filtering out points outside cost domain...") mask = route_table["start_row"] >= 0 mask &= route_table["start_row"] < shape[0] mask &= route_table["start_col"] >= 0 mask &= route_table["start_col"] < shape[1] if "end_row" in route_table.columns: mask &= route_table["end_row"] >= 0 mask &= route_table["end_row"] < shape[0] if "end_col" in route_table.columns: mask &= route_table["end_col"] >= 0 mask &= route_table["end_col"] < shape[1] logger.debug("Mask computed!") if any(~mask): msg = ( f"The following {sum(~mask)} feature(s) are outside of the cost " f"domain and will be dropped:\n{route_table.loc[~mask]}" ) warn(msg, revrtWarning) route_table = route_table.loc[mask].reset_index(drop=True) return route_table
def _transform_lat_lon_to_row_col(transform, cost_crs, lat, lon): """Convert WGS84 coordinates to cost grid row and column arrays""" feats = gpd.GeoDataFrame( geometry=[Point(*p) for p in zip(lon, lat, strict=True)] ) coords = feats.set_crs("EPSG:4326").to_crs(cost_crs)["geometry"].centroid row, col = rasterio.transform.rowcol( transform, coords.x.values, coords.y.values ) row = np.array(row) col = np.array(col) return row, col def _init_streaming_writer(feature_out_fp): """Initialize an incremental writer for clipped features""" feature_out_fp = Path(feature_out_fp) if feature_out_fp.suffix.lower() != ".gpkg": msg = ( "Output feature file should have a '.gpkg' extension to " f"ensure proper format! Got input file: '{feature_out_fp}'. " "Adding '.gpkg' extension... " ) warn(msg, revrtWarning) feature_out_fp = feature_out_fp.with_suffix(".gpkg") return IncrementalWriter(feature_out_fp) def _filter_transmission_features(features): """Filter loaded transmission features""" features = features.drop( columns=["bgid", "egid", "cap_left"], errors="ignore" ) features = features.rename(columns={"gid": "trans_gid"}) if "category" in features.columns: features = _drop_empty_categories(features) return features.reset_index(drop=True) def _drop_empty_categories(features): """Drop features with empty category field""" mask = features["category"].isna() if mask.any(): msg = f"Dropping {mask.sum():,} feature(s) with NaN category!" warn(msg, revrtWarning) features = features[~mask].reset_index(drop=True) return features