"""
Set of various methods to weight and combine data layers for use in PFA.
The methods included in this class are based on those outlined by the PFA
Best Practices Report (Pauling et al. 2023).
"""
import warnings
import numpy as np
from geopfa import transformation
import geopandas as gpd
[docs]
def detect_geom_dimension(gdf: gpd.GeoDataFrame) -> int:
"""Detect whether a GeoDataFrame geometry is 2D or 3D.
Raises a ValueError if gdf is empty or has mixed dimensionality.
"""
if gdf is None or len(gdf) == 0:
raise ValueError(
"Cannot detect geometry dimension from an empty GeoDataFrame."
)
dims = set()
for geom in gdf.geometry:
if geom is None:
continue
# shapely Point has .has_z; for other geometry types, fall back on coord length
has_z = getattr(geom, "has_z", False)
if has_z:
dims.add(3)
else:
dims.add(2)
if len(dims) > 1:
break
if len(dims) == 0:
raise ValueError(
"Could not determine geometry dimension; all geometries are None."
)
if len(dims) > 1:
raise ValueError(
"Mixed 2D and 3D geometries found within a single GeoDataFrame. "
"All geometries for a given layer must be consistently 2D or 3D."
)
return dims.pop()
[docs]
def detect_pfa_dimension(pfa: dict) -> int:
"""Detect whether the PFA uses 2D or 3D geometries.
All non-empty layer GeoDataFrames must share the same dimensionality.
"""
dim = None
for criteria in pfa.get("criteria", {}):
for component in pfa["criteria"][criteria].get("components", {}):
for layer_cfg in pfa["criteria"][criteria]["components"][
component
]["layers"].values():
gdf = layer_cfg.get("model")
if gdf is None or len(gdf) == 0:
continue
layer_dim = detect_geom_dimension(gdf)
if dim is None:
dim = layer_dim
elif dim != layer_dim:
raise ValueError(
"PFA configuration mixes 2D and 3D geometries across layers. "
"All layers used in voter-veto must be consistently 2D or 3D."
)
if dim is None:
raise ValueError(
"Could not detect geometry dimension from PFA. "
"No non-empty layer models were found."
)
return dim
[docs]
class VoterVeto:
"""
Class for combining geospatial evidence layers into favorability models
using the voter-veto framework.
This class implements the full PFA aggregation workflow, including Layer,
component, and criteria aggregation using voter and veto logic The
implementation is dimension-agnostic (2D or 3D).
"""
@staticmethod
def get_w0(Pr0):
"""
Derives w0 value from reference 'favorability', or prior 'favorability', using logit
function. Is specific to a required component of a resource.
Parameters
----------
Pr0 : float
Reference 'favorability', or prior 'favorability'. Can be defined using expert
opinion or other means. Is specific to a required component of a resource.
Returns
-------
w0 : float
Value used to incorporate a reference 'favorability' (prior 'favorability') into
the voter equation (generalized linear model). Is specific to a required
component of a resource.
"""
w0 = np.log(Pr0 / (1 - Pr0))
return w0
@staticmethod
def voter(w, z, w0):
"""
Generalized voter for N-dimensional grids.
Combine processed, transformed, and scaled data layers into a 'favorability'
grid for a specific required resource component using a generalized linear model.
Parameters
----------
w : numpy.ndarray
Array of weights, one per data layer, shape ``(n_layers,)``.
z : numpy.ndarray
Array containing processed, transformed, and scaled data layers,
shape ``(n_layers, *spatial_shape)``.
w0 : float
Prior favorability term.
Returns
-------
PrX : numpy.ndarray
Rasterized array of 'favorabilities' for an individual required
resource component, shape ``spatial_shape``.
"""
w = np.asarray(w)
if w.ndim > 1:
warnings.warn(
"Weights array should be 1D, i.e. one value per data layer. "
"Squeezing to 1D.",
stacklevel=2,
)
w = w.squeeze()
if z.ndim < 2: # noqa: PLR2004
raise ValueError(
"z must have at least 2 dimensions: (n_layers, *spatial_dims)."
)
if w.shape[0] != z.shape[0]:
raise ValueError(
f"Number of weights ({w.shape[0]}) must match number of data layers ({z.shape[0]})."
)
# Broadcast weights over any number of spatial dimensions
spatial_dims = (1,) * (z.ndim - 1)
w_broadcast = w.reshape((w.shape[0], *spatial_dims))
e = -w0 - np.nansum(w_broadcast * z, axis=0)
PrX = 1 / (1 + np.exp(e))
return PrX
@staticmethod
def veto(PrXs):
"""Original veto: element-wise multiplication of component favorabilities."""
PrR = PrXs[0].copy()
for c in PrXs[1:]:
PrR = np.multiply(PrR, c)
return PrR
@staticmethod
def modified_veto(PrXs, w, veto=True):
"""
Combine component 'favorability' grids into a resource 'favorability' model, optionally
vetoing areas where any one component is not present (0% 'favorability'). This method
combines component 'favorability' grids using a weighted sum, and then normalizing.
Parameters
----------
PrXs : numpy.ndarray
Array of rasterized 'favorability' arrays for each required component or
criteria of a resource
w : numpy.ndarray
Array of weights for each component or criteria of a resource
veto : boolean
Boolean value indicating whether or not the function should set indices
to zero where one component or criteria does not exist
Returns
-------
PrR : numpy.ndarray
Array of rasterized 'favorability' arrays of a resource being present, taking into
account all components (i.e., heat, fluid, perm, etc.).
"""
PrR = np.zeros_like(PrXs[0])
max_PrR = PrXs[0].copy()
# Iterate from the second component onward
for i, c in enumerate(PrXs):
if i > 0:
max_PrR = np.multiply(max_PrR, c)
wPrX = w[i] * c
PrR += wPrX
if veto:
PrR[c == 0] = 0
# Normalize and scale to maintain valid 'favorability' distribution
try:
max_val = np.nanmax(PrR)
except ValueError:
# All values are NaN
warnings.warn(
"modified_veto: all values are NaN; normalization skipped.",
stacklevel=2,
)
return PrR
if max_val > 0:
PrR = PrR / max_val * np.nanmax(max_PrR)
else:
warnings.warn(
"modified_veto: maximum non-NaN of PrR is <= 0; normalization skipped.",
stacklevel=2,
)
return PrR
@staticmethod
def prepare_for_combination(arr, nan_mode="propagate_shared"):
"""
Prepare a stack of layers/components for combination by handling NaNs.
Parameters
----------
arr : numpy.ndarray
Array of shape ``(n_items, *spatial_shape)``, where ``n_items`` is
the number of layers (at layer→component level), components (at
component→criteria), or criteria (at criteria→final).
nan_mode : {"propagate_shared", "propagate_any"}
Strategy for handling NaN values during aggregation:
``"propagate_shared"``
Propagates NaNs through combined layers only where *all* contributing
inputs are NaN. That is, NaNs appear in combined layers only at
pixels where all input layers are NaN. NaNs present in only a subset
of inputs are treated as neutral (non-contributing) evidence.
``"propagate_any"``
Propagates NaNs through combined layers whenever *any* contributing
input is NaN.
Returns
-------
filled : numpy.ndarray
Same shape as ``arr``, with NaNs replaced according to ``nan_mode``
for the purpose of numerical combination.
mask_nan : numpy.ndarray
Boolean mask of shape ``spatial_shape`` indicating where the final
combined result should be set to NaN.
"""
arr = np.asarray(arr, dtype=float)
if arr.ndim < 2: # noqa: PLR2004
raise ValueError(
"Input to _prepare_for_combination must have at least 2 dimensions: "
"(n_items, *spatial_dims)."
)
# mask_valid: True where that item has data at that pixel
mask_valid = ~np.isnan(arr)
# coverage masks
has_data_any = np.any(mask_valid, axis=0) # at least one item has data
has_data_all = np.all(mask_valid, axis=0) # all items have data
# shared-no-data mask: no item has data here
shared_nan_mask = ~has_data_any
filled = arr.copy()
if nan_mode == "propagate_shared":
for i in range(filled.shape[0]):
layer = filled[i]
# pixels where this layer is NaN but at least one other has data
layer_nan = np.isnan(layer) & has_data_any
if not np.any(layer_nan):
continue
neutral = np.nanmedian(layer)
# Use median as a neutral fill so missing data contributes neither
# positively nor negatively after transformation + normalization.
# TODO: Revisit whether mean makes more sense in certain cases.
if np.isnan(neutral):
# entire layer is NaN wherever there is coverage; fall back to 0
neutral = 0.0
layer[layer_nan] = neutral
filled[i] = layer
mask_nan = shared_nan_mask
elif nan_mode == "propagate_any":
any_nan = ~np.all(mask_valid, axis=0)
for i in range(filled.shape[0]):
layer = filled[i]
layer_nan = np.isnan(layer) & has_data_any
if not np.any(layer_nan):
continue
# NOTE: Neutral fill here is only to keep operations stable.
# Pixels are always masked out via mask_nan for propagate_any.
neutral = np.nanmedian(layer)
if np.isnan(neutral):
neutral = 0.0
layer[layer_nan] = neutral
filled[i] = layer
mask_nan = any_nan
else:
raise ValueError(
f"Invalid nan_mode '{nan_mode}'. "
"Must be one of {'propagate_shared', 'propagate_any'}."
)
return filled, mask_nan
@classmethod
def do_voter_veto( # noqa: PLR0915, PLR0914, PLR0913, PLR0917, PLR0912
cls,
pfa,
normalize_method,
component_veto=False,
criteria_veto=True,
normalize=True,
norm_to=5,
nan_mode="propagate_shared",
):
# TODO: refactor into helpers to satisy ignored Ruff hits
"""
Combine individual data layers into a resource 'favorability' model,
using the unified 2D/3D voter-veto implementation with NaN handling.
Parameters
----------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames to weight and combine into 'favorability'
models.
normalize_method : str
Method to use to normalize data layers. Can be one of ['minmax','mad'].
component_veto : bool
Whether to apply veto at the component level.
criteria_veto : bool
Whether to apply veto at the criteria level.
normalize : bool
Whether to normalize output favorability GeoDataFrames.
norm_to : float
Max value for normalization of favorability in GeoDataFrames.
nan_mode : {"propagate_shared", "propagate_any"}
Strategy for handling NaN values during aggregation:
``"propagate_shared"``
Propagates NaNs through combined layers only where *all* contributing
inputs are NaN. That is, NaNs appear in combined layers only at
pixels where all input layers are NaN. NaNs present in only a subset
of inputs are treated as neutral (non-contributing) evidence.
``"propagate_any"``
Propagates NaNs through combined layers whenever *any* contributing
input is NaN.
Returns
-------
pfa : dict
Updated PFA config with component, criteria, and final favorability models.
"""
dim = detect_pfa_dimension(pfa)
print(f"Combining {dim}D PFA layers with the voter-veto method. ")
if nan_mode != "propagate_shared":
print(f"Nan mode: {nan_mode}.")
if dim == 2: # noqa: PLR2004
rasterize = transformation.rasterize_model_2d
derasterize = transformation.derasterize_model_2d
elif dim == 3: # noqa: PLR2004
rasterize = transformation.rasterize_model_3d
derasterize = transformation.derasterize_model_3d
else:
raise ValueError(
"Invalid PFA dimensionality (must be 2D or 3D). "
"Check input layers. "
)
PrRs = []
w_criteria = []
ref_shape = None
last_model_geom = None
for criteria in pfa["criteria"]:
print(f"criterion: {criteria}")
PrXs = []
w_components = []
for component in pfa["criteria"][criteria]["components"]:
print(f" component: {component}")
z_layers = []
w_layers = []
Pr0 = pfa["criteria"][criteria]["components"][component]["pr0"]
w0 = cls.get_w0(Pr0)
for layer in pfa["criteria"][criteria]["components"][
component
]["layers"]:
print(f" layer: {layer}")
layer_cfg = pfa["criteria"][criteria]["components"][
component
]["layers"][layer]
model = layer_cfg["model"]
col = layer_cfg["model_data_col"]
transformation_method = layer_cfg["transformation_method"]
last_model_geom = (
model.copy()
) # keep reference for later derasterization
model_array = rasterize(model, col)
if ref_shape is None:
ref_shape = model_array.shape
elif model_array.shape != ref_shape:
raise ValueError("Layer grid shape mismatch.")
# transform
if transformation_method not in {"none", "None"}:
model_array = transformation.transform(
model_array, transformation_method
)
print(
f" - Transformed with method: {transformation_method}"
)
# normalize
model_array = transformation.normalize_array(
model_array, method=normalize_method
)
print(
f" - Normalized with method: {normalize_method}"
)
z_layers.append(model_array)
w_layers.append(layer_cfg["weight"])
z_arr = np.array(z_layers)
w_arr = np.array(w_layers)
# NaN handling at layer -> component level
z_filled, mask_layers = cls.prepare_for_combination(
z_arr, nan_mode=nan_mode
)
# voter combination
PrX = cls.voter(w_arr, z_filled, w0)
# apply propagated NaNs
PrX[mask_layers] = np.nan
# derasterize to GeoDataFrame
dr = derasterize(PrX, model)
pfa["criteria"][criteria]["components"][component]["pr"] = dr
if normalize:
pfa["criteria"][criteria]["components"][component][
"pr_norm"
] = transformation.normalize_gdf(
dr,
col="favorability",
norm_to=norm_to,
)
PrXs.append(PrX)
w_components.append(
pfa["criteria"][criteria]["components"][component][
"weight"
]
)
# NaN handling at component -> criteria level
PrXs_filled, mask_components = cls.prepare_for_combination(
np.array(PrXs), nan_mode=nan_mode
)
PrR_criteria = cls.modified_veto(
PrXs_filled, np.array(w_components), veto=component_veto
)
PrR_criteria[mask_components] = np.nan
# derasterize criteria-level favorability
dr = derasterize(PrR_criteria, last_model_geom)
pfa["criteria"][criteria]["pr"] = dr
if normalize:
pfa["criteria"][criteria]["pr_norm"] = (
transformation.normalize_gdf(
pfa["criteria"][criteria]["pr"],
col="favorability",
norm_to=norm_to,
)
)
PrRs.append(PrR_criteria)
w_criteria.append(pfa["criteria"][criteria]["weight"])
# NaN handling at criteria -> final level
PrRs_filled, mask_criteria = cls.prepare_for_combination(
np.array(PrRs), nan_mode=nan_mode
)
# final resource favorability
PrR_final = cls.modified_veto(
PrRs_filled, np.array(w_criteria), veto=criteria_veto
)
PrR_final[mask_criteria] = np.nan
# derasterize final favorability
dr = derasterize(PrR_final, last_model_geom)
pfa["pr"] = dr
if normalize:
pfa["pr_norm"] = transformation.normalize_gdf(
pfa["pr"], col="favorability", norm_to=norm_to
)
return pfa