"""reVRt rasterization utilities"""
import logging
from math import ceil, hypot
import rasterio
import geopandas as gpd
import numpy as np
from shapely import make_valid
from shapely.geometry import box
from revrt.constants import DEFAULT_DTYPE
from revrt.exceptions import revrtValueError
from revrt.utilities.monitoring import log_runtime
logger = logging.getLogger(__name__)
DEFAULT_RASTERIZE_TILE_SIZE = 2048
"""int: Default tile size to use for rasterization"""
[docs]
def rasterize_shape_file( # noqa: PLR0913, PLR0917
fname,
width,
height,
transform,
tile_size=None,
buffer_dist=None,
all_touched=False,
dest_crs=None,
burn_value=1,
boundary_only=False,
simply_before_rasterize=False,
dtype=DEFAULT_DTYPE,
fill=0,
):
"""Rasterize a vector layer
Parameters
----------
fname : str
Full path to GPKG or shp file.
width : int
Width of output raster.
height : int
Height of output raster.
transform : affine.Affine
Affine transform for output raster.
tile_size : int, optional
Size of tiles to use for rasterization. If None, uses a default
tile size. By default, ``None``.
buffer_dist : float, optional
Distance to buffer features in fname by. Same units as the
template raster. By default, ``None``.
all_touched : bool, default=False
Set all cells touched by vector to 1. False results in less
cells being set to 1. By default, ``False``.
reproject_vector : bool, default=True
Reproject CRS of vector to match template raster if ``True``.
By default, ``True``.
burn_value : int, float, or str, default=1
Value used to burn vectors into raster, or the name of a
column containing per-feature values to burn.
By default, ``1``.
boundary_only : bool, default=False
If ``True``, rasterize boundary of vector.
By default, ``False``.
simply_before_rasterize : bool, default=False
If ``True``, simplify geometries to half of the raster cell size
before rasterization. By default, ``False``.
dtype : np.dtype, default="float32"
Datatype to use. By default, ``float32``.
fill : int, float, or None, default=0
Value used to fill raster cells not burned by vector. If None,
uses np.nan. By default, ``0``.
Returns
-------
array-like
Rasterized vector data
"""
gdf = gpd.read_file(fname)
if dest_crs is not None:
logger.debug("Reprojecting vector")
gdf = gdf.to_crs(crs=dest_crs)
with log_runtime(f"Rasterization of {fname}"):
return rasterize(
gdf,
width,
height,
transform,
tile_size=tile_size,
buffer_dist=buffer_dist,
all_touched=all_touched,
burn_value=burn_value,
boundary_only=boundary_only,
simply_before_rasterize=simply_before_rasterize,
dtype=dtype,
fill=fill,
)
[docs]
def rasterize( # noqa: PLR0913, PLR0917
gdf,
width,
height,
transform,
tile_size=None,
buffer_dist=None,
all_touched=False,
burn_value=1,
boundary_only=False,
simply_before_rasterize=False,
dtype=DEFAULT_DTYPE,
fill=0,
):
"""Rasterize a vector layer
Parameters
----------
gdf : DataFrame
Geopandas DataFrame contains shapes to rasterize.
width : int
Width of output raster.
height : int
Height of output raster.
transform : affine.Affine
Affine transform for output raster.
tile_size : int, optional
Size of tiles to use for rasterization. If None, uses a default
tile size. By default, ``None``.
buffer_dist : float, optional
Distance to buffer features in fname by. Same units as the
template raster. By default, ``None``.
all_touched : bool, default=False
Set all cells touched by vector to 1. False results in less
cells being set to 1. By default, ``False``.
burn_value : int, float, or str, default=1
Value used to burn vectors into raster, or the name of a
column containing per-feature values to burn.
By default, ``1``.
boundary_only : bool, default=False
If ``True``, rasterize boundary of vector.
By default, ``False``.
simply_before_rasterize : bool, default=False
If ``True``, simplify geometries to half of the raster cell size
before rasterization. By default, ``False``.
dtype : np.dtype, default="float32"
Datatype to use. By default, ``float32``.
fill : int, float, or None, default=0
Value used to fill raster cells not burned by vector. If None,
uses np.nan. By default, ``0``.
Returns
-------
array-like
Rasterized vector data
"""
gdf = _clean_shapes_for_rasterize(gdf)
if simply_before_rasterize:
gdf = simplify_shapes(gdf, transform)
if buffer_dist is not None:
gdf = gdf.copy()
logger.debug("Buffering shapes by %s", buffer_dist)
gdf.geometry = gdf.geometry.buffer(buffer_dist)
logger.debug("Buffering done. %d features before cleaning.", len(gdf))
gdf = gdf[~gdf.is_empty] # Negative buffer may result in empty feats
logger.debug("%d features after removing empty features.", len(gdf))
logger.debug("Rasterizing %d shape(s)", len(gdf))
with log_runtime("Rasterization of shape(s)"):
return _tile_rasterize(
gdf,
width,
height,
transform,
tile_size or DEFAULT_RASTERIZE_TILE_SIZE,
all_touched=all_touched,
burn_value=_resolve_burn_values(gdf, burn_value),
boundary_only=boundary_only,
dtype=dtype,
fill=fill,
)
[docs]
def integer_dimension_window(bounds, transform):
"""Make valid-size window with integer dimensions
This method guarantees the window dimensions are integers >= 1.
Window offsets are also floored to the nearest integer.
Parameters
----------
bounds : tuple
4-tuple defining bounding box (left, bottom, right, top).
transform : affine.Affine
Affine transform for raster for which to generate window.
Returns
-------
rasterio.windows.Window
Window with integer offsets and non-zero integer dimensions.
"""
window = rasterio.windows.from_bounds(*bounds, transform=transform)
return rasterio.windows.Window(
ceil(window.col_off - 1),
ceil(window.row_off - 1),
max(1, ceil(window.width)) + 1,
max(1, ceil(window.height)) + 1,
)
[docs]
def simplify_shapes(gdf, transform):
"""Simplify geometries to half of the raster cell size
Parameters
----------
gdf : geopandas.GeoDataFrame
GeoDataFrame containing the geometries to simplify.
transform : affine.Affine
Affine transform used to derive the raster cell size.
Returns
-------
geopandas.GeoDataFrame
Input ``gdf`` with simplified geometries and empty features
removed.
"""
tolerance = _simplify_tolerance(transform)
logger.debug(
"Simplifying %d shape(s) with tolerance %s", len(gdf), tolerance
)
gdf.geometry = gdf.geometry.simplify(tolerance, preserve_topology=True)
gdf = gdf[~gdf.is_empty]
logger.debug("%d shape(s) remain after simplification", len(gdf))
return gdf
def _tile_rasterize(
gdf,
width,
height,
transform,
tile_size,
all_touched,
burn_value,
boundary_only,
dtype,
fill,
):
"""Rasterize shapes window by window into a preallocated array"""
out = np.zeros((height, width), dtype=dtype)
shapes = gdf.boundary if boundary_only else gdf.geometry
use_feature_values = hasattr(burn_value, "loc")
for window in _iter_tile_windows(width, height, tile_size=int(tile_size)):
bounds = rasterio.windows.bounds(window, transform)
tile_geom = box(*bounds)
intersects = shapes.intersects(tile_geom)
if not intersects.any():
continue
tile_shapes = shapes.loc[intersects].intersection(tile_geom)
tile_shapes = tile_shapes[~tile_shapes.is_empty]
if tile_shapes.empty:
continue
rasterize_kwargs = {
"out_shape": (window.height, window.width),
"fill": np.nan if fill is None else fill,
"out": None,
"transform": rasterio.windows.transform(window, transform),
"all_touched": all_touched,
"dtype": dtype,
}
if use_feature_values:
tile_values = burn_value.loc[tile_shapes.index]
rasterize_shapes = zip(tile_shapes, tile_values, strict=True)
else:
rasterize_shapes = tile_shapes
rasterize_kwargs["default_value"] = burn_value
out[*window.toslices()] = rasterio.features.rasterize(
rasterize_shapes,
**rasterize_kwargs,
)
return out
def _iter_tile_windows(width, height, tile_size):
"""Yield raster windows covering the full output extent"""
num_iterations = ceil(width / tile_size) * ceil(height / tile_size)
progress_markers = _tile_window_progress_markers(num_iterations)
ind = 0
for row_off in range(0, height, tile_size):
win_height = min(tile_size, height - row_off)
for col_off in range(0, width, tile_size):
win_width = min(tile_size, width - col_off)
yield rasterio.windows.Window(
col_off, row_off, win_width, win_height
)
ind += 1
progress = progress_markers.get(ind)
if progress_markers.get(ind) is not None:
logger.debug(
"Rasterized %d of %d tiles (%d%%)",
ind,
num_iterations,
progress,
)
def _tile_window_progress_markers(num_iterations):
"""Return milestone percentages keyed by tile-window count"""
progress_markers = {}
for progress in (25, 50, 75, 100):
marker = ceil(num_iterations * progress / 100)
progress_markers[marker] = progress
return progress_markers
def _simplify_tolerance(transform):
"""Return simplification tolerance from the raster cell size"""
x_res = hypot(transform.a, transform.b)
y_res = hypot(transform.d, transform.e)
return max(x_res, y_res) * 0.5
def _resolve_burn_values(gdf, burn_value):
"""Resolve scalar or column-based burn values for rasterization"""
if not isinstance(burn_value, str):
return burn_value
if burn_value not in gdf.columns:
msg = (
"Rasterize value column "
f"{burn_value!r} was not found in vector data. "
f"Available columns: {sorted(gdf.columns)!r}"
)
raise revrtValueError(msg)
return gdf[burn_value]
def _clean_shapes_for_rasterize(gdf):
"""Repair invalid shapes and drop null or empty geometries"""
gdf = gdf[gdf.geometry.notna()]
if gdf.empty:
return gdf
invalid = ~gdf.geometry.is_valid
if invalid.any():
logger.debug(
"Repairing %d invalid shape(s) before rasterization",
int(invalid.sum()),
)
gdf.loc[:, "geometry"] = make_valid(gdf.geometry)
return gdf[~gdf.is_empty]