"""
Processing utilities for preparing geothermal favorability layers.
This module provides classes and functions to transform raw geospatial data
gridded datasets suitable for consistent spatial analysis. Operations largely
act on the `pfa` dictionary (built with the data_readers module) and include
data cleaning, interpolation, and other various preprocessing functions for
PFA layer combination.
"""
import time
import warnings
import geopandas as gpd
import pandas as pd
import scipy
import numpy as np
import math
import shapely
from shapely.geometry.polygon import Polygon, LineString, Point
from shapely.ops import unary_union
from pykrige.ok3d import OrdinaryKriging3D
from scipy.interpolate import griddata
from osgeo import osr
from itertools import starmap
from scipy.interpolate import LinearNDInterpolator
from scipy.spatial import cKDTree
from geopfa.transformation import transform
from geopfa.extrapolation import backfill_gdf, drop_z_from_geometry
try:
# Shapely 2.0 vectorized accessors (fast path)
from shapely import get_z
_HAS_GET_Z = True
except Exception:
_HAS_GET_Z = False
[docs]
class Cleaners:
"""Class of functions for use in processing data into models"""
@staticmethod
def set_crs(pfa, target_crs=3857):
"""Function to project all data layers to the same desired CRS. Used to get all
data layers on the same CRS.
Parameters
----------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames to convert CRS.
target_crs : int
Nubmber associated with the desired CRS of resulting interpolation. Defaults
to 3857, which is WGS84.
Returns
-------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames projected onto target_crs.
"""
for criteria in pfa["criteria"]:
for component in pfa["criteria"][criteria]["components"]:
for layer in pfa["criteria"][criteria]["components"][
component
]["layers"]:
pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data"] = pfa["criteria"][criteria]["components"][
component
]["layers"][layer]["data"].to_crs(target_crs)
return pfa
@staticmethod
def clean_unnamed_columns(pfa):
"""
Loops through the `pfa` dictionary, finds GeoDataFrames, and drops columns that start with 'Unnamed:'.
Parameters
----------
pfa : dict
Nested dictionary structure containing criteria, components, and layers.
Returns
-------
None
The function modifies the GeoDataFrames in-place within the `pfa` dictionary.
"""
for criteria in pfa["criteria"]:
print(criteria)
for component in pfa["criteria"][criteria]["components"]:
print("\t" + component)
for layer in pfa["criteria"][criteria]["components"][
component
]["layers"]:
print("\t\t" + layer)
# Access the GeoDataFrame
gdf = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["model"]
# Check if it is a GeoDataFrame and contains 'Unnamed:' columns
if isinstance(gdf, gpd.GeoDataFrame):
unnamed_columns = [
col
for col in gdf.columns
if col.startswith("Unnamed:")
]
if unnamed_columns:
print(
f"\t\t\tDropping columns: {', '.join(unnamed_columns)}"
)
gdf = gdf.drop(columns=unnamed_columns)
else:
print("\t\t\tNo 'Unnamed:' columns found.")
return pfa
@staticmethod
def convert_z_measurements(gdf, z_meas, target_z_meas):
"""
Converts depth or elevation measurements from one reference system to another using GDAL Python Bindings.
Parameters
----------
gdf (GeoDataFrame): GeoDataFrame containing point geometry and Z values in the geometry.
z_meas (str): Current measurement reference (e.g., 'm-msl', 'epsg:####', or 'ft-msl').
target_z_meas (str): Target measurement reference (e.g., 'm-msl', 'epsg:####', or 'ft-msl').
Returns
-------
GeoDataFrame: A GeoDataFrame with updated geometry where the Z value is converted to the target reference.
"""
METERS_TO_FEET = 3.28084
FEET_TO_METERS = 1 / METERS_TO_FEET
# Set up source and target spatial references
source_srs = osr.SpatialReference()
if z_meas.startswith("epsg:"):
source_srs.ImportFromEPSG(int(z_meas.split(":")[1]))
print("\t\t successful import")
target_srs = osr.SpatialReference()
if target_z_meas.startswith("epsg:"):
target_srs.ImportFromEPSG(int(target_z_meas.split(":")[1]))
print("\t\t successful import")
# Coordinate transformation
transform = osr.CoordinateTransformation(source_srs, target_srs)
# Function to update Z values based on input and target references
def convert_z(geom):
current_z = geom.z
new_z = current_z
if z_meas == "m-msl" and target_z_meas == "ft-msl":
new_z = current_z * METERS_TO_FEET
elif z_meas == "ft-msl" and target_z_meas == "m-msl":
new_z = current_z * FEET_TO_METERS
elif z_meas.startswith("epsg:") and target_z_meas.startswith(
"epsg:"
):
print("\t\t ", "transforming ", z_meas, " to ", target_z_meas)
_x, _y, z = transform.TransformPoint(geom.x, geom.y, current_z)
new_z = z # Updated Z from the transformation
return shapely.geometry.Point(geom.x, geom.y, new_z)
# Apply Z conversion
gdf["geometry"] = gdf.geometry.apply(convert_z)
return gdf
@staticmethod
def filter(data, quantile=0.9):
"""Filter out data values above a specified quantile by setting them to that quantile.
Parameters
----------
data : pandas.Series
Series of data values to filter
quantile : int
Number representing the quantile, which when exceeded, produces an outlier.
Returns
-------
data : pandas.Series
Filtered version of the input data, with values above specified quantile set to
that quantile.
"""
q = data.quantile(quantile)
data.loc[data > q] = q
return data
@staticmethod
def filter_series(series, quantile=0.9):
"""Filter out data values above a specified quantile by setting them to that quantile.
Parameters
----------
series : pandas.Series
Series of data values to filter.
quantile : float
Number representing the quantile, which when exceeded, produces an outlier.
Returns
-------
series : pandas.Series
Filtered version of the input data, with values above the specified quantile set to
that quantile.
"""
series = series.copy()
q = series.quantile(quantile)
series.loc[series > q] = q
return series
@staticmethod
def filter_geodataframe(gdf, column, quantile=0.9):
"""Apply the filter function to specified columns in a GeoDataFrame.
Parameters
----------
gdf : geopandas.GeoDataFrame
GeoDataFrame containing the data to filter.
column : list of str
Column name to apply the filter to.
quantile : float
Number representing the quantile, which when exceeded, produces an outlier.
Returns
-------
gdf_filtered : geopandas.GeoDataFrame
GeoDataFrame with the specified columns filtered.
"""
if column in gdf.columns:
gdf[column] = Cleaners.filter_series(gdf[column], quantile)
else:
print(
f"Column '{column}' could not be filtered because it is not in the dataframe."
)
return gdf
@staticmethod
def get_extent(gdf: gpd.GeoDataFrame, dim="auto"):
"""
Compute the spatial extent (bounding box) of a GeoDataFrame.
Parameters
----------
gdf : geopandas.GeoDataFrame
GeoDataFrame containing Point, LineString, Polygon,
Multi*, or GeometryCollection geometries.
dim : {"auto", 2, 3}, optional
Controls dimensionality of returned extent.
- "auto" (default): returns 2D extent if all geometries
are 2D. Returns 3D extent if ANY geometries contain Z
values; warns if mixed 2D/3D.
- 2: always return 2D extent [xmin, ymin, xmax, ymax],
even if geometries contain Z values.
- 3: force 3D extent [xmin, ymin, zmin, xmax, ymax, zmax].
Raises ValueError if no Z values are present.
Returns
-------
extent : list
2D: [xmin, ymin, xmax, ymax]
3D: [xmin, ymin, zmin, xmax, ymax, zmax]
Raises
------
ValueError
If GeoDataFrame is empty.
If dim=3 but no Z values exist.
If dim is invalid.
Warns
-----
UserWarning
If mixed 2D and 3D geometries are present and dim="auto".
Z bounds will be computed using geometries with Z only.
Notes
-----
- XY bounds are computed using GeoDataFrame.total_bounds.
- Z bounds are computed by iterating over all coordinates.
- Multi* and GeometryCollection geometries are handled recursively.
"""
if gdf.empty or gdf.geometry.is_empty.all():
raise ValueError("Cannot compute extent of empty GeoDataFrame.")
if dim not in ("auto", 2, 3):
raise ValueError("dim must be one of {'auto', 2, 3}.")
xmin, ymin, xmax, ymax = gdf.total_bounds
# Always return 2D if explicitly requested
if dim == 2:
return [xmin, ymin, xmax, ymax]
# Detect Z presence
has_z_flags = gdf.geometry.apply(
lambda geom: getattr(geom, "has_z", False)
)
any_z = has_z_flags.any()
all_z = has_z_flags.all()
if not any_z:
if dim == 3:
raise ValueError(
"dim=3 requested but geometries do not contain Z values."
)
return [xmin, ymin, xmax, ymax]
if dim == "auto" and not all_z:
warnings.warn(
"Mixed 2D and 3D geometries detected. "
"Z extent will be computed using geometries with Z values only.",
UserWarning,
stacklevel=2,
)
def extract_z(geom):
if geom.is_empty:
return []
zs = []
def recurse(g):
if hasattr(g, "geoms"):
for part in g.geoms:
recurse(part)
else:
try:
coords = list(g.coords)
except Exception:
try:
coords = list(g.exterior.coords)
for ring in g.interiors:
coords += list(ring.coords)
except Exception:
return
for c in coords:
if len(c) == 3:
zs.append(c[2])
recurse(geom)
return zs
z_values = []
for geom in gdf.geometry:
z_values.extend(extract_z(geom))
if not z_values:
if dim == 3:
raise ValueError(
"dim=3 requested but no valid Z coordinates found."
)
return [xmin, ymin, xmax, ymax]
return [xmin, ymin, min(z_values), xmax, ymax, max(z_values)]
@staticmethod
def set_extent(gdf: gpd.GeoDataFrame, extent):
"""
Clip a GeoDataFrame to a 2D or 3D extent.
Parameters
----------
gdf : geopandas.GeoDataFrame
GeoDataFrame containing geometries to clip.
extent : list or tuple
- Length 4: [xmin, ymin, xmax, ymax]
- Length 6: [xmin, ymin, zmin, xmax, ymax, zmax]
Returns
-------
geopandas.GeoDataFrame
Clipped GeoDataFrame.
Raises
------
ValueError
If extent length is not 4 or 6.
Warns
-----
UserWarning
If a 3D extent is provided but geometries do not
contain Z values.
Notes
-----
- 2D clipping uses GeoPandas .clip() with a shapely box.
- For 3D extents:
1. Geometries are clipped in XY.
2. Z filtering is applied based on bounding-box overlap.
- Geometries are NOT sliced in Z. They are either
retained or discarded based on Z intersection.
"""
if len(extent) not in (4, 6):
raise ValueError("Extent must be length 4 (2D) or 6 (3D).")
if len(extent) == 4:
xmin, ymin, xmax, ymax = extent
else:
xmin, ymin, zmin, xmax, ymax, zmax = extent
if xmax <= xmin or ymax <= ymin or zmax <= zmin:
raise ValueError(
"Invalid extent: xmax/xmin, ymax/ymin, or zmin/zmax "
"ordering is incorrect."
)
bbox = shapely.geometry.box(xmin, ymin, xmax, ymax)
gdf_xy = gdf.clip(bbox)
if len(extent) == 4:
return gdf_xy
# 3D case
xmin, ymin, zmin, xmax, ymax, zmax = extent
bbox = shapely.geometry.box(xmin, ymin, xmax, ymax)
gdf_xy = gdf.clip(bbox)
has_z_flags = gdf_xy.geometry.apply(
lambda geom: getattr(geom, "has_z", False) if geom else False
)
if not has_z_flags.any():
warnings.warn(
"3D extent provided but geometries do not contain Z values. "
"Only XY clipping was applied.",
UserWarning,
stacklevel=2,
)
return gdf_xy
def z_overlap(geom):
if not geom.has_z:
return False
zs = []
def recurse(g):
if hasattr(g, "geoms"):
for part in g.geoms:
recurse(part)
else:
try:
coords = list(g.coords)
except Exception:
try:
coords = list(g.exterior.coords)
for ring in g.interiors:
coords += list(ring.coords)
except Exception:
return
for c in coords:
if len(c) == 3:
zs.append(c[2])
recurse(geom)
if not zs:
return False
return not (max(zs) < zmin or min(zs) > zmax)
mask = gdf_xy.geometry.apply(z_overlap)
return gdf_xy[mask].copy()
[docs]
class Exclusions:
"""Class of functions to handle exclusion areas in a PFA area"""
@staticmethod
def mask_exclusion_areas(
gdf_points, gdf_exclusion_areas, value_col="value", set_to=0
):
"""
Mask points within exclusion areas by setting their values to zero.
Parameters
----------
gdf_points : geopandas.GeoDataFrame
GeoDataFrame containing point geometries and values.
gdf_exclusion_areas : geopandas.GeoDataFrame
GeoDataFrame containing polygon geometries representing exclusion areas.
value_col : str, optional
The column name in gdf_points that contains the values to be masked, by default 'value'.
Returns
-------
GeoDataFrame
Updated GeoDataFrame with points within exclusion areas masked to zero.
"""
# Ensure both GeoDataFrames have the same CRS
if gdf_points.crs != gdf_exclusion_areas.crs:
gdf_exclusion_areas = gdf_exclusion_areas.to_crs(gdf_points.crs)
# Perform a spatial join to find points within exclusion areas
joined = gpd.sjoin(
gdf_points, gdf_exclusion_areas, how="left", predicate="within"
)
# Mask points within exclusion areas
gdf_points.loc[~joined.index_right.isna(), value_col] = set_to
return gdf_points
@staticmethod
def add_exclusions(pfa, pr_label="pr"):
"""
Masks exclusion areas by setting probability or favorability values to a specified value (e.g., zero)
within those areas in the provided point data.
This function iterates through the exclusion components in the provided `pfa` object and updates
the probability/favorability (`pr`) values by applying exclusion masks. The exclusion areas are
defined by geometries stored in the `pfa['exclusions']` dictionary, and the function sets the
`pr_excl` attribute in `pfa` to store the modified probability values.
Parameters
----------
pfa : dict
A dictionary containing the exclusion components and point data, including:
- 'exclusions': Contains the exclusion areas and the value to which `pr` should be set within
these areas.
- pr_label : str, optional
The label of the probability/favorability column (default is 'pr').
pr_label : str, optional
The label of the probability or favorability data in the `pfa` dictionary to be updated.
Defaults to 'pr'.
Returns
-------
dict
The updated `pfa` dictionary, where `pfa['pr_excl']` contains the probability/favorability
values after exclusion masks have been applied.
Notes
------
- The exclusion areas are applied sequentially, with each subsequent exclusion potentially
modifying the previously excluded points.
- The exclusion areas are stored in shapefiles within the ``pfa['exclusions']`` structure,
and each area is associated with a ``set_to`` value indicating what the probability/favorability
should be set to inside the exclusion area.
"""
c = 0
for exclusion_component in pfa["exclusions"]["components"]:
for layer in pfa["exclusions"]["components"][exclusion_component][
"layers"
]:
set_to = pfa["exclusions"]["components"][exclusion_component][
"set_to"
]
# Transition approach to be cleaned once model usage is consolidated.
shp = pfa["exclusions"]["components"][exclusion_component][
"layers"
][layer]
if "model" in shp:
shp = shp["model"]
else:
raise ValueError(
f"Exclusion layer {layer} does not contain 'model' key."
)
value_col = "favorability"
gdf_points = pfa[pr_label].copy() if c == 0 else pfa["pr_excl"]
pfa["pr_excl"] = Exclusions.mask_exclusion_areas(
gdf_points=gdf_points,
gdf_exclusion_areas=shp,
value_col=value_col,
set_to=set_to,
)
c += 1
return pfa
@staticmethod
def buffer_distance(
gdf_points, gdf_exclusion_areas, buffer_distance, value_col="value"
):
"""
Mask points within exclusion areas (defined by buffers around points) by setting their values to zero.
Parameters
----------
gdf_points : geopandas.GeoDataFrame
GeoDataFrame containing point geometries and values.
gdf_exclusion_areas : geopandas.GeoDataFrame
GeoDataFrame containing point geometries representing exclusion areas.
buffer_distance : float
The distance to buffer around exclusion points.
value_col : str, optional
The column name in gdf_points that contains the values to be masked, by default 'value'.
Returns
-------
GeoDataFrame
Updated GeoDataFrame with points within exclusion areas masked to zero.
"""
# Ensure both GeoDataFrames have the same CRS
if gdf_points.crs != gdf_exclusion_areas.crs:
gdf_exclusion_areas = gdf_exclusion_areas.to_crs(gdf_points.crs)
# Create buffers around exclusion points
gdf_exclusion_buffers = gdf_exclusion_areas.copy()
gdf_exclusion_buffers["geometry"] = gdf_exclusion_areas.buffer(
buffer_distance
)
# Perform a spatial join to find points within exclusion buffers
joined = gpd.sjoin(
gdf_points, gdf_exclusion_buffers, how="left", predicate="within"
)
# Ensure no duplicate indices in the joined DataFrame
joined = joined[~joined.index.duplicated(keep="first")]
# Create a boolean index with the correct length
is_within = joined.index_right.notna().reindex(
gdf_points.index, fill_value=False
)
# Mask points within exclusion buffers
gdf_points.loc[is_within, value_col] = 0
return gdf_points
[docs]
class Processing:
"""Class of functions for use in processing data into models"""
# ------ 2D-specific functions (from _Processing2D) ------
@staticmethod
def interpolate_points_2d(
pfa,
criteria,
component,
layer,
nx,
ny,
extent=None,
interp_method="linear",
):
"""Funtion to interpolate, or go from points to a 2D image.
Parameters
----------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames.
criteria : str
Criteria associated with data to interpolate.
component : str
Component associated with data to interpolate.
layer : str
Layer associated with data to interpolate.
nx : int
Number of points in the x-direction
ny : int
Number of points in the y-direction
extent : list
List of length 4 containing the extent (i.e., bounding box) of the gdf,
in this order: [x_min, y_min, x_max, y_max]
interp_method : str
Method to use for interpolation. Can be one of: 'nearest' (KNN), 'linear',
or 'cubic'.
power: int
Determines how the distance between data points affects the interpolation result.
*Specifically, it defines the rate at which the influence of a data point decreases
as the distance from the interpolation location increases.
*Adjust power parameter accordingly to change how strongly the distance influences the
interpolation (default is 2).
Returns
-------
pfa : dict
Updated pfa config which includes interpolation
"""
gdf = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data"]
data_col = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data_col"]
if gdf.geometry.type.iloc[0] == "Polygon":
print(
"Notice: interpolate_points() function recieved GeoDataFrame with geometry type 'Polygon.' Converting geometry to 'Point' geometry using centroids."
)
gdf.geometry = gdf.geometry.centroid
# Extract coordinates and values from the GeoDataFrame
x = gdf.geometry.x
y = gdf.geometry.y
values = gdf[data_col]
# Define the grid for interpolation
if extent is None:
x_min = min(x)
x_max = max(x)
y_min = min(y)
y_max = max(y)
else:
x_min, y_min, x_max, y_max = extent
x_grid = np.linspace(x_min, x_max, nx)
y_grid = np.linspace(y_min, y_max, ny)
xv, yv = np.meshgrid(x_grid, y_grid)
# TODO: Properly implement IDW. The commented out code below does not work
# Choose interpolation method
# if interp_method == 'idw':
# # IDW interpolation inline
# grid = np.zeros_like(xv)
# distances = np.sqrt((x[:, None, None] - xv[None, :, :])**2 + (y[:, None, None] - yv[None, :, :])**2)
# weights = 1 / np.power(distances, power)
# weighted_values = weights * values[:, None, None]
# grid = np.sum(weighted_values, axis=0) / np.sum(weights, axis=0)
# pygem IDW option
# if interp_method == 'idw':
# mesh_points = np.array([x.values, y.values, values.values])
# idw = IDW(power)
# idw.read_parameters('tests/test_datasets/parameters_idw_cube.prm')
# new_mesh_points = idw(mesh_points.T)
# grid = idw(xv.flatten(), yv.flatten()).reshape(nx, ny)
# else:
# Otherwise, default to scipy interpolation
# Clean out any rows with NaNs before interpolation
valid = ~(x.isna() | y.isna() | values.isna())
x, y, values = x[valid], y[valid], values[valid]
grid = scipy.interpolate.griddata(
(x, y), values, (xv, yv), method=interp_method
)
# Create a new GeoDataFrame with the interpolated values
interpolated_gdf = gpd.GeoDataFrame(
{"value_interpolated": grid.flatten()},
geometry=gpd.points_from_xy(xv.flatten(), yv.flatten()),
crs=gdf.crs,
)
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = interpolated_gdf
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "value_interpolated"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["units"]
return pfa
@staticmethod
def mark_buffer_areas(
pfa,
criteria,
component,
layer,
extent,
nx,
ny,
buffer_distance,
polygon_value,
buffer_value,
background_value,
):
"""
Marks grid points within a spatial extent based on their location relative to polygons and buffer areas,
using vectorized operations with GeoPandas for performance optimization.
This function creates a grid of points within a given spatial extent and classifies each point as either:
- Inside a polygon (assigned `polygon_value`)
- Inside a buffer around a polygon but outside the polygon itself (assigned `buffer_value`)
- Outside both the polygon and its buffer (assigned `background_value`)
The classifications are performed using vectorized operations for efficiency, and the results are stored
in the `pfa` dictionary under the specified `criteria`, `component`, and `layer`.
Parameters:
----------
pfa : dict
A dictionary containing spatial data, including:
- 'criteria': Holds various components and layers, where polygon and buffer data are stored.
criteria : str
The key for the specific criterion in the `pfa` dictionary under which the polygon data is stored.
component : str
The key for the specific component in the `pfa['criteria']` dictionary where the data layer is located.
layer : str
The key for the specific layer within the component where the polygon geometries are stored.
extent : tuple of float
The spatial extent in which the grid of points will be created, defined as (x_min, y_min, x_max, y_max).
nx : int
The number of points to generate along the x-axis within the extent.
ny : int
The number of points to generate along the y-axis within the extent.
buffer_distance : float
The distance to create buffer zones around the polygons.
polygon_value : float
The classification value to assign to points inside a polygon.
buffer_value : float
The classification value to assign to points inside a buffer but outside the polygon.
background_value : float
The classification value to assign to points outside both the polygon and buffer areas.
Returns:
-------
dict
The updated `pfa` dictionary, where the specified layer's grid points are classified based on their
spatial relationship to the polygons and buffers. The classification is stored in the `model` attribute
of the layer, with the 'classification' column representing the assigned values.
Notes:
------
- The function generates a grid of points within the provided extent using `numpy` and classifies the
points based on spatial relationships to the polygons and buffers.
- The polygon geometries and their buffers are extracted from the `pfa` dictionary and processed with
vectorized GeoPandas operations for performance optimization.
- The results are stored back in the `pfa` dictionary, with the classifications as part of the layer's model data.
"""
gdf_polygons = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data"]
# Create a grid of points within the spatial extent
x_min, y_min, x_max, y_max = extent
x_points = np.linspace(x_min, x_max, nx)
y_points = np.linspace(y_min, y_max, ny)
xv, yv = np.meshgrid(x_points, y_points)
points = np.c_[xv.ravel(), yv.ravel()]
# Create a GeoDataFrame from the generated points
gdf_points = gpd.GeoDataFrame(
geometry=[shapely.geometry.Point(p) for p in points],
crs=gdf_polygons.crs,
)
# Create buffer areas around polygons
buffers = gdf_polygons.buffer(buffer_distance)
# Vectorized operations to classify points
gdf_points["inside_polygon"] = gdf_points.geometry.apply(
lambda point: gdf_polygons.contains(point).any()
)
gdf_points["inside_buffer"] = gdf_points.geometry.apply(
lambda point: buffers.contains(point).any()
)
# Assign classifications based on the spatial relationship
gdf_points["classification"] = np.where(
gdf_points["inside_polygon"],
polygon_value,
np.where(
gdf_points["inside_buffer"], buffer_value, background_value
),
)
# Drop the temporary columns
gdf_points = gdf_points.drop(
columns=["inside_polygon", "inside_buffer"]
)
# Update the pfa dictionary with the new model
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = gdf_points
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "classification"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = f"classification ({polygon_value}=polygon, {buffer_value}=buffer, {background_value}=outside)"
return pfa
@staticmethod
def polygons_to_points(pfa, criteria, component, layer, extent, nx, ny):
"""Calculate aggregated polygon values (sum or average) within a specified grid.
Parameters
----------
pfa : dict
Configuration dictionary specifying relationships between criteria, components,
and data layers.
criteria : str
Criteria associated with Polygon data.
component : str
Component associated with Polygon data.
layer : str
Layer associated with Polygon data.
extent : list
List of length 4 containing the extent [x_min, y_min, x_max, y_max].
nx : int
Number of grid cells in the x direction.
ny : int
Number of grid cells in the y direction.
Returns
-------
pfa : dict
Updated pfa config which includes the aggregated polygon values as a point model.
"""
# Extract GeoDataFrame containing polygons
gdf_polygons = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data"]
col = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data_col"]
# Define the extent
x_min, y_min, x_max, y_max = extent
# Calculate cell size from nx and ny
cell_size_x = (x_max - x_min) / nx
cell_size_y = (y_max - y_min) / ny
# Create a grid over the specified extent
grid_cells = []
for i in range(nx):
for j in range(ny):
x_start = x_min + i * cell_size_x
y_start = y_min + j * cell_size_y
grid_cell = shapely.geometry.box(
x_start,
y_start,
x_start + cell_size_x,
y_start + cell_size_y,
)
grid_cells.append(grid_cell)
# Create GeoDataFrame from grid cells
grid_gdf = gpd.GeoDataFrame(geometry=grid_cells, crs=gdf_polygons.crs)
# Initialize columns to store the aggregated values
grid_gdf["sum"] = 0.0
grid_gdf["count"] = 0
# Use spatial indexing to speed up the intersection checks
sindex = gdf_polygons.sindex
# Iterate over each grid cell to calculate the aggregated values
for grid_idx, grid_cell in grid_gdf.iterrows():
possible_matches_index = list(
sindex.intersection(grid_cell.geometry.bounds)
)
possible_matches = gdf_polygons.iloc[possible_matches_index]
for _, polygon in possible_matches.iterrows():
poly = polygon.geometry
value = polygon[col]
if poly.intersects(grid_cell.geometry):
intersection = poly.intersection(grid_cell.geometry)
intersection_area = intersection.area
grid_gdf.at[grid_idx, "sum"] += value * intersection_area
grid_gdf.at[grid_idx, "count"] += intersection_area
# Calculate the center point of each grid cell
points = []
for _, row in grid_gdf.iterrows():
centroid = row.geometry.centroid
if row["count"] > 0:
average_value = row["sum"] / row["count"]
else:
average_value = 0
points.append((centroid, average_value))
# Create a GeoDataFrame with point representation
point_geometries = [
shapely.geometry.Point(xy[0].x, xy[0].y) for xy in points
]
values = [xy[1] for xy in points]
point_gdf = gpd.GeoDataFrame(
{"geometry": point_geometries, "value": values},
crs=gdf_polygons.crs,
)
# Update the pfa dictionary with the new point representation model
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = point_gdf
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "value"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = "aggregated value"
return pfa
@staticmethod
def generate_grid_points(extent, nx, ny, crs):
"""
Generate grid points (centroids) for a regular grid within a given extent.
Args:
extent (tuple): A tuple of (xmin, ymin, xmax, ymax) defining the bounding box.
nx (int): Number of grid cells in the x direction.
ny (int): Number of grid cells in the y direction.
crs: Coordinate reference system for the GeoDataFrame.
Returns:
GeoDataFrame containing centroids of the grid cells.
"""
xmin, ymin, xmax, ymax = extent
# Generate the x and y coordinates for the grid
x_coords = np.linspace(xmin, xmax, nx)
y_coords = np.linspace(ymin, ymax, ny)
# Create centroids by calculating the middle of each grid cell
centroids = []
for x in x_coords:
for y in y_coords:
centroids.append(shapely.geometry.Point(x, y))
# Create a GeoDataFrame with the centroids
gdf_points = gpd.GeoDataFrame(geometry=centroids, crs=crs)
return gdf_points
@staticmethod
def distance_from_lines(pfa, criteria, component, layer, extent, nx, ny):
"""
Function to calculate the minimum distance from each point in a grid
to any LineString in the specified GeoDataFrame. The result is stored
in `pfa` as a new GeoDataFrame with a 'distance' column.
Parameters
----------
pfa : dict
Data structure specifying criteria, components, and layers (with
associated GeoDataFrames). Must include a 'data' GeoDataFrame
containing LineString geometries.
criteria : str
Criteria name.
component : str
Component name.
layer : str
Layer name (which holds the GeoDataFrame to measure distance from).
extent : list
[x_min, y_min, x_max, y_max] bounding box for the grid points.
nx : int
Number of points along the x-direction.
ny : int
Number of points along the y-direction.
Returns
-------
pfa : dict
Updated dictionary that now includes the distance model under:
pfa['criteria'][criteria]['components'][component]['layers'][layer]['model']
"""
gdf_linestrings = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data"]
# Create a grid of points within the spatial extent
x_min, y_min, x_max, y_max = extent
x_points = np.linspace(x_min, x_max, nx)
y_points = np.linspace(y_min, y_max, ny)
points = [
shapely.geometry.Point(x, y) for x in x_points for y in y_points
]
# Create GeoDataFrame of the grid points
gdf_points = gpd.GeoDataFrame(geometry=points, crs=gdf_linestrings.crs)
# Combine all LineString geometries into a single (multi-)geometry
lines_union = gdf_linestrings.geometry.unary_union
# Calculate distance from each point to the union of lines
distances = [pt.distance(lines_union) for pt in gdf_points.geometry]
# Create a new GeoDataFrame with distances
gdf_distances = gdf_points.copy()
gdf_distances["distance"] = distances
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = gdf_distances
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "distance"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = "distance (m)"
return pfa
@staticmethod
def calculate_intersections(gdf_lines):
"""
Calculate intersection points between 2D line geometries in a GeoDataFrame.
Intersections between LineString and MultiLineString geometries are detected.
If the intersection geometry is:
- Point / MultiPoint: all points are included
- LineString / MultiLineString: the centroid of each overlapping segment is used
- GeometryCollection: points and line-derived centroids are extracted
Parameters
----------
gdf_lines : geopandas.GeoDataFrame
GeoDataFrame containing 2D line geometries in the 'geometry' column.
Returns
-------
geopandas.GeoDataFrame
GeoDataFrame of intersection points, using the same CRS as the input.
"""
intersections = []
lines = gdf_lines.geometry.tolist()
for i, line1 in enumerate(lines):
for line2 in lines[i + 1 :]:
if not line1.intersects(line2):
continue
intersection = line1.intersection(line2)
if isinstance(intersection, shapely.geometry.Point):
intersections.append(intersection)
elif isinstance(intersection, shapely.geometry.MultiPoint):
intersections.extend(list(intersection.geoms))
elif isinstance(intersection, shapely.geometry.LineString):
intersections.append(intersection.centroid)
elif isinstance(
intersection, shapely.geometry.MultiLineString
):
intersections.extend(
geom.centroid for geom in intersection.geoms
)
elif isinstance(
intersection, shapely.geometry.GeometryCollection
):
for geom in intersection.geoms:
if isinstance(geom, shapely.geometry.Point):
intersections.append(geom)
elif isinstance(
geom,
(
shapely.geometry.LineString,
shapely.geometry.MultiLineString,
),
):
intersections.append(geom.centroid)
return gpd.GeoDataFrame(geometry=intersections, crs=gdf_lines.crs)
@staticmethod
def vectorized_distance_calculation(
gdf_points, tree, intersection_tree=None
):
"""Calculates the nearest line and intersection distances for each point in a GeoDataFrame using vectorized operations
and spatial indexing with STRtree.
This function computes the shortest distance from each point in the `gdf_points` GeoDataFrame to the nearest line
in the `tree` (a spatial index of line geometries). Optionally, it can also compute the nearest distance to intersections
(if an `intersection_tree` is provided).
Parameters:
----------
gdf_points : GeoDataFrame
A GeoDataFrame containing point geometries for which distances will be calculated.
tree : STRtree
A spatial index (STRtree) containing line geometries. This allows for efficient querying of the nearest line
for each point.
intersection_tree : STRtree, optional
An optional spatial index (STRtree) containing intersection geometries. If provided, the function calculates
the nearest distance to intersections as well. If not provided, the intersection distances are set to infinity.
Returns:
-------
tuple
A tuple containing two pandas Series:
- nearest_line_distances: The nearest distance from each point to the nearest line.
- nearest_intersection_distances: The nearest distance from each point to the nearest intersection (or infinity
if no intersection tree is provided).
Notes:
------
- The function uses an inner helper `get_nearest_line_distance` to query the spatial index (`tree`) and calculate the
distance between a point and its nearest line.
- If no line is found for a point, the distance is set to infinity (`float('inf')`).
- When an `intersection_tree` is provided, it computes the minimum distance between a point and the intersection geometries.
- The function returns `float('inf')` for intersection distances if no intersections are found or if the `intersection_tree`
is not provided.
"""
# Function to get the nearest line distance for each point
def get_nearest_line_distance(point):
# Query the STRtree for the nearest lines
nearest_lines = tree.query(point)
if nearest_lines: # If lines are found
# Ensure nearest_lines are geometries, unpacking from STRtree structure if needed
nearest_line_geom = (
nearest_lines[0].geometry
if hasattr(nearest_lines[0], "geometry")
else nearest_lines[0]
)
return point.distance(
nearest_line_geom
) # Calculate distance between point and line
return float("inf") # If no lines are found, return infinity
# Calculate the nearest line distance for each point in the GeoDataFrame
nearest_line_distances = gdf_points.geometry.apply(
get_nearest_line_distance
)
# Handle intersections similarly (if available)
if intersection_tree:
nearest_intersection_distances = gdf_points.geometry.apply(
lambda point: min(
[
point.distance(geom)
for geom in intersection_tree.query(point)
],
default=float("inf"),
)
)
else:
nearest_intersection_distances = pd.Series(
[float("inf")] * len(gdf_points)
)
return nearest_line_distances, nearest_intersection_distances
@staticmethod
def distance_from_lines_with_intersections(
pfa,
criteria,
component,
layer,
extent,
nx,
ny,
weight_line=1.0,
weight_intersection=0.5,
):
"""
TODO: Get this to work!!!
Calculate the weighted distances from points to the nearest lines and intersections,
and update the provided PFA (Potential Field Analysis) dictionary with the computed distance model.
This function creates a grid of points over a specified spatial extent, computes the distance
from each point to the nearest line and nearest line intersection, and then combines the distances
using specified weights. The result is stored in the 'pfa' dictionary under the specified criteria,
component, and layer.
Parameters:
----------
pfa : dict
The PFA (Potential Field Analysis) dictionary that contains geospatial data for various criteria,
components, and layers.
criteria : str
The criteria key within the PFA dictionary to access the desired data.
component : str
The component key within the criteria to access the desired layer data.
layer : str
The layer key within the component to access the lines for distance calculation.
extent : tuple
A tuple (x_min, y_min, x_max, y_max) specifying the spatial extent for the grid of points.
nx : int
Number of points along the x-axis for the grid.
ny : int
Number of points along the y-axis for the grid.
weight_line : float, optional (default=0.7)
The weight assigned to the distance from the nearest line.
weight_intersection : float, optional (default=0.3)
The weight assigned to the distance from the nearest line intersection.
Returns:
-------
pfa : dict
The updated PFA dictionary with a new distance model stored in the specified layer.
"""
# Extract lines and intersections from the PFA
gdf_lines = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data"] # Lines data
gdf_intersections = Processing.calculate_intersections(gdf_lines)
# Define the CRS (Coordinate Reference System) to match the lines
crs = gdf_lines.crs
# Generate the grid of centroids within the given extent
gdf_points = Processing.generate_grid_points(extent, nx, ny, crs)
# Build spatial index using STRtree for lines and intersections
tree = shapely.strtree.STRtree(gdf_lines.geometry) # STRtree for lines
intersection_tree = (
shapely.strtree.STRtree(gdf_intersections.geometry)
if gdf_intersections is not None
else None
) # STRtree for intersections
# Perform vectorized distance calculations for both lines and intersections
line_distances, intersection_distances = (
Processing.vectorized_distance_calculation(
gdf_points, tree, intersection_tree
)
)
# Combine the distances with weights (line and intersection distances)
combined_distances = (
weight_line * line_distances
+ weight_intersection * intersection_distances
)
# Assign combined distances to the points
gdf_points["distance"] = combined_distances
# Update the PFA dictionary with the calculated distances
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = gdf_points
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "distance"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = "distance (weighted by line and intersection proximity)"
return pfa
@staticmethod
def extrapolate_2d(
pfa,
criteria,
component,
layer,
dataset="model",
*,
data_col="value_interpolated",
training_size=0.2,
verbose=False,
):
"""Function to extrapolate 2D fields to max extent grid using Gaussian Process Regression.
Parameters
----------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames.
criteria : str
Criteria associated with data to interpolate.
component : str
Component associated with data to interpolate.
layer : str
Layer associated with data to interpolate.
dataset : str
Layer data source for extrapolation.
data_col : str
Column in `dataset` to use for extrapolation input observations.
training_size : float
Percent of randomly select input observations to train on.
verbose : bool
Display training progress, assessment metrics, and final plots.
Returns
-------
pfa : dict
Updated pfa config which includes extrapolated layer.
"""
gdf = pfa["criteria"][criteria]["components"][component]["layers"][
layer
][dataset]
# drop z value if present
gdf = drop_z_from_geometry(gdf, geom_col="geometry")
test_size = 1 - training_size
extrapolated_gdf = backfill_gdf(
gdf,
value_col=data_col,
z_value=None,
verbose=verbose,
test_size=test_size,
)
# Update the PFA dictionary with extrapolation results
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = extrapolated_gdf
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "value_extrapolated"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["units"]
return pfa
@staticmethod
def distance_from_polygons(
pfa, criteria, component, layer, extent, nx, ny
):
"""
Calculate the true distance from a grid of points to a set of polygons,
using Shapely's .distance() method on the union of all polygons.
A point inside any polygon will get a distance of 0.
Parameters
----------
pfa : dict
Configuration dictionary specifying relationships between criteria, components,
and data layers.
criteria : str
Criteria associated with Polygon data to calculate distances from.
component : str
Component associated with Polygon data to calculate distances from.
layer : str
Layer associated with Polygon data to calculate distances from.
extent : list
[x_min, y_min, x_max, y_max] bounding box for the grid.
nx : int
Number of points in the x-direction.
ny : int
Number of points in the y-direction.
Returns
-------
pfa : dict
Updated pfa config, with a new GeoDataFrame in
pfa['criteria'][criteria]['components'][component]['layers'][layer]['distance_model']
that contains the grid points plus a 'distance' column.
"""
gdf_polygons = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data"]
# Create a single geometry (union) from all polygons
polygons_union = gdf_polygons.geometry.unary_union
# Create a grid of points within the specified extent
x_min, y_min, x_max, y_max = extent
x_points = np.linspace(x_min, x_max, nx)
y_points = np.linspace(y_min, y_max, ny)
points = [
shapely.geometry.Point(x, y) for x in x_points for y in y_points
]
# Convert that list of points to a GeoDataFrame
grid_gdf = gpd.GeoDataFrame(geometry=points, crs=gdf_polygons.crs)
# Compute distances and store in a new GeoDataFrame
distances = [pt.distance(polygons_union) for pt in grid_gdf.geometry]
distance_gdf = grid_gdf.copy()
distance_gdf["distance"] = distances
# Save the result in the pfa dictionary (under 'distance_model')
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"distance_model"
] = distance_gdf
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "distance"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = "distance (m)"
return pfa
@staticmethod
def distance_from_points(pfa, criteria, component, layer, extent, nx, ny):
"""Function to calculate distance from Point objects.
Parameters
----------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames, particularly the gdf with Point
geometry to calculate distances from.
criteria : str
Criteria associated with Point data to calculate distances from.
component : str
Component associated with Point data to calculate distances from.
layer : str
Layer associated with Point data to calculate distances from.
extent : list
List of length 4 containing the extent (i.e., bounding box) to use to produce the
distance model. Can be produced using get_extent() function below. Should be in
this order: [x_min, y_min, x_max, y_max]
nx : int
Number of points in the x-direction
ny : int
Number of points in the y-direction
Returns
-------
pfa : dict
Updated pfa config which includes interpolation
"""
gdf_points = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data"]
# Ensure we have only simple Point geometries
points = []
for geom in gdf_points.geometry:
if geom.geom_type == "Point":
points.append(geom)
elif geom.geom_type == "MultiPoint":
points.extend(geom.geoms)
else:
raise ValueError(
f"Unsupported geometry type: {geom.geom_type}"
)
if not points:
raise ValueError(
"No valid Point geometries found in the GeoDataFrame"
)
# Convert points to a numpy array of coordinates, ignoring the z-dimension if it exists
points_coords = np.array([(point.x, point.y) for point in points])
# Create a grid of points within the spatial extent
x_min, y_min, x_max, y_max = extent
x_points = np.linspace(x_min, x_max, nx)
y_points = np.linspace(y_min, y_max, ny)
grid_points = [
shapely.geometry.Point(x, y) for x in x_points for y in y_points
]
# Create a GeoDataFrame from the generated points
gdf_grid_points = gpd.GeoDataFrame(
geometry=grid_points, crs=gdf_points.crs
)
# Convert grid points to a numpy array of coordinates
grid_coords = np.array(
[(point.x, point.y) for point in gdf_grid_points.geometry]
)
# Ensure both coordinate arrays have the same number of dimensions
assert points_coords.shape[1] == 2, (
"Points coordinates should have two columns (x, y)"
)
assert grid_coords.shape[1] == 2, (
"Grid coordinates should have two columns (x, y)"
)
# Calculate distances between the grid points and the specified points
distances = scipy.spatial.distance.cdist(grid_coords, points_coords)
# Store minimum distances in a GeoDataFrame
gdf_distances = gdf_grid_points.copy()
gdf_distances["distance"] = distances.min(axis=1)
# Update pfa dictionary with the distance model
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = gdf_distances
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "distance"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = "distance (m)"
return pfa
@staticmethod
def weighted_distance_from_points(
pfa,
criteria,
component,
layer,
extent,
nx,
ny,
alpha=1000.0,
weight_points=True,
weight_min=1.0,
weight_max=2.0,
):
"""
Compute a "weighted distance score" from a set of points,
where each point has a data value in `data_col`. That data
value is normalized/transformed to produce a point weight (0-1),
then each cell's score is sum_i [ w_i * exp(-dist_i / alpha ) ].
Parameters
----------
pfa : dict
The PFA dictionary, which must contain:
pfa['criteria'][criteria]['components'][component]['layers'][layer]['data']
...a GeoDataFrame of points, each with a relevant 'data_col'.
criteria : str
E.g. 'geologic'
component : str
E.g. 'fluid'
layer : str
E.g. 'hot_springs'
extent : list of float
[x_min, y_min, x_max, y_max] bounding box for the grid
nx, ny : int
Number of points along x and y in the final grid
alpha : float, default=1000
Decay constant for distance (in meters).
weight_points : bool, default=True
Whether to apply data-based point weighting or not.
weight_min : float, default=1.0
Minimum weight for a transformed point.
weight_max : float, default=2.0
Maximum weight for a transformed point.
Returns
-------
pfa : dict
The updated dictionary, storing a new GeoDataFrame with column
"weighted_point_score" in:
pfa['criteria'][criteria]['components'][component]['layers'][layer]['model'].
"""
def normalize_point_weights(values, out_min=1.0, out_max=5.0):
# Scale a 1D array of values to a user-defined positive range [out_min..out_max].
vals = values.copy()
valid_mask = ~np.isnan(vals)
valid_data = vals[valid_mask]
if valid_data.size == 0:
scaled = np.full_like(vals, out_min)
return scaled
vmin = valid_data.min()
vmax = valid_data.max()
rng = vmax - vmin
if rng == 0:
midpoint = (out_min + out_max) / 2.0
scaled = np.full_like(vals, midpoint)
return scaled
# Normal minmax to [0..1] then scale to [out_min..out_max]
mm = (valid_data - vmin) / rng
scaled_valid = mm * (out_max - out_min) + out_min
scaled = np.full_like(vals, np.nan)
scaled[valid_mask] = scaled_valid
return scaled
layer_dict = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]
data_col = layer_dict.get("data_col", None)
gdf_points = layer_dict["data"].copy()
if data_col is None:
print("No data_col => defaulting all weights=1.0")
gdf_points["temp_val"] = 1.0
data_col = "temp_val"
# Collect geometry + data
point_list, data_list = [], []
for geom, val in zip(gdf_points.geometry, gdf_points[data_col]):
if geom.geom_type == "Point":
point_list.append(geom)
data_list.append(val)
elif geom.geom_type == "MultiPoint":
for subg in geom.geoms:
point_list.append(subg)
data_list.append(val)
else:
raise ValueError(f"Unsupported geometry: {geom.geom_type}")
if not point_list:
print("No valid points found.")
return pfa
# Convert data_list to 1D NumPy
data_array_1d = np.array(data_list, dtype=float)
# Transform data values into weight
arr_2d = data_array_1d.reshape(-1, 1)
arr_2d = np.nan_to_num(arr_2d, nan=0) # handle NaNs
arr_1d = arr_2d.ravel()
# scale them to [out_min..out_max]
if not weight_points:
print("User not weighting points. Defaulting all weights=1.0")
out_min = out_max = 1.0 # all points get equal weight
else:
out_min = weight_min
out_max = weight_max
# Scale once with the chosen range
weights_1d = normalize_point_weights(
arr_1d,
out_min=out_min,
out_max=out_max,
)
# Create grid, compute distances
x_min, y_min, x_max, y_max = extent
x_coords = np.linspace(x_min, x_max, nx)
y_coords = np.linspace(y_min, y_max, ny)
grid_points = [
shapely.geometry.Point(x, y) for x in x_coords for y in y_coords
]
gdf_grid = gpd.GeoDataFrame(geometry=grid_points, crs=gdf_points.crs)
point_coords = np.array([(p.x, p.y) for p in point_list])
grid_coords = np.array([(pt.x, pt.y) for pt in gdf_grid.geometry])
dist_matrix = scipy.spatial.distance.cdist(
grid_coords, point_coords
) # shape=(nx*ny, n_points)
# Weighted sum => sum_i [ w_i * exp(-dist_ij / alpha) ]
decays = np.exp(-dist_matrix / alpha)
weighted = (
decays * weights_1d
) # broadcasting => shape=(n_grid, n_points)
score_1d = np.sum(weighted, axis=1)
gdf_grid["weighted_point_score"] = score_1d
layer_dict["model"] = gdf_grid
layer_dict["model_data_col"] = "weighted_point_score"
layer_dict["model_units"] = "score (0-∞)"
return pfa
@staticmethod
def point_density(
pfa, criteria, component, layer, extent, cell_size, nx, ny
):
"""Calculate point density within a specified extent and return as a GeoDataFrame.
Parameters
----------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames, particularly the gdf with Point
geometry to calculate distances from.
criteria : str
Criteria associated with Point data to calculate distances from.
component : str
Component associated with Point data to calculate distances from.
layer : str
Layer associated with Point data to calculate distances from.
extent : list
List of length 4 containing the extent (i.e., bounding box) to use to produce the
distance model. Can be produced using get_extent() function below. Should be in
this order: [x_min, y_min, x_max, y_max]
cell_size : float
Size of each cell in the grid used for density calculation.
Example Cell Sizes for EPSG:3857
High Detail: 50 meters
Moderate Detail: 100 meters
Lower Detail: 500 meters - 1 kilometer
nx : int
Number of cells in the x direction for the output grid.
ny : int
Number of cells in the y direction for the output grid.
Returns
-------
density_gdf : GeoDataFrame
GeoDataFrame populated with point density within the specified extent, with
point geometry.
"""
# Extract GeoDataFrame from pfa dictionary
gdf = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data"]
# Define the extent
xmin, ymin, xmax, ymax = extent
# Create grid for density calculation using cell_size
x_cells = int((xmax - xmin) / cell_size)
y_cells = int((ymax - ymin) / cell_size)
density_grid = np.zeros((y_cells, x_cells))
for geom in gdf.geometry:
# Check if geometry is Point or MultiPoint
if isinstance(geom, shapely.geometry.Point):
points = [geom]
elif isinstance(geom, shapely.geometry.MultiPoint):
points = geom.geoms
else:
points = []
# Iterate through each point in the geometry
for point in points:
# Ignore the z component by only considering the x and y coordinates
if (xmin <= point.x <= xmax) and (ymin <= point.y <= ymax):
x_index = int((point.x - xmin) / cell_size)
y_index = int((point.y - ymin) / cell_size)
if (x_index > x_cells - 1) | (y_index > y_cells - 1):
continue
density_grid[y_index, x_index] += 1
# Calculate the size of each cell in the high-resolution grid
x_step = (xmax - xmin) / nx
y_step = (ymax - ymin) / ny
# Initialize high-resolution density grid
high_res_density = np.zeros((ny, nx))
# Fill high-resolution density grid with aggregated values from density_grid
for i in range(x_cells):
for j in range(y_cells):
value = density_grid[j, i]
x_start = int(i * cell_size / x_step)
y_start = int(j * cell_size / y_step)
x_end = int((i + 1) * cell_size / x_step)
y_end = int((j + 1) * cell_size / y_step)
high_res_density[y_start:y_end, x_start:x_end] = value
# Create points for the high-resolution grid
points = [
shapely.geometry.Point(
xmin + (i + 0.5) * x_step, ymin + (j + 0.5) * y_step
)
for j in range(ny)
for i in range(nx)
]
# Flatten the high-resolution density grid to match the points
densities = high_res_density.flatten()
# Ensure the lengths match
assert len(points) == len(densities), (
f"Points length {len(points)} and densities length {len(densities)} do not match."
)
# Create the GeoDataFrame
density_gdf = gpd.GeoDataFrame(
{"geometry": points, "density": densities}
)
density_gdf = density_gdf.set_crs(gdf.crs)
# Update the pfa dictionary with the new layer
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = density_gdf
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "density"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = "density per" + str(cell_size) + " m^2"
return pfa
@staticmethod
def polygon_density(
pfa, criteria, component, layer, extent, cell_size, nx, ny
):
"""Calculate polygon density within a specified extent and return as a GeoDataFrame.
Parameters
----------
pfa : dict
Configuration specifying criteria, components, and data layers' relationship.
criteria : str
Criteria associated with Polygon data to calculate density from.
component : str
Component associated with Polygon data to calculate density from.
layer : str
Layer associated with Polygon data to calculate density from.
extent : list
List of length 4 containing the extent [x_min, y_min, x_max, y_max].
cell_size : float
Size of each cell in the grid used for density calculation.
nx : int
Number of cells in the x direction for the output grid.
ny : int
Number of cells in the y direction for the output grid.
Returns
-------
pfa : dict
Updated pfa config which includes polygon density as a GeoDataFrame.
"""
# Extract GeoDataFrame containing polygons
gdf_polygons = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data"]
# Define the extent
xmin, ymin, xmax, ymax = extent
# Create grid for density calculation using cell_size
x_cells = int((xmax - xmin) / cell_size)
y_cells = int((ymax - ymin) / cell_size)
density_grid = np.zeros((y_cells, x_cells))
for geom in gdf_polygons.geometry:
# Check if geometry is Polygon or MultiPolygon
if isinstance(geom, shapely.geometry.Polygon):
polygons = [geom]
elif isinstance(geom, shapely.geometry.MultiPolygon):
polygons = geom.geoms
else:
polygons = []
# Iterate through each polygon in the geometry
for polygon in polygons:
# Get bounding box of polygon
bbox = polygon.bounds
xmin_poly, ymin_poly, xmax_poly, ymax_poly = bbox
# Calculate intersecting grid cells
x_start = max(0, int((xmin_poly - xmin) / cell_size))
x_end = min(x_cells, int((xmax_poly - xmin) / cell_size)) + 1
y_start = max(0, int((ymin_poly - ymin) / cell_size))
y_end = min(y_cells, int((ymax_poly - ymin) / cell_size)) + 1
# Increment density count for intersecting grid cells
density_grid[y_start:y_end, x_start:x_end] += 1
# Calculate the size of each cell in the high-resolution grid
x_step = (xmax - xmin) / nx
y_step = (ymax - ymin) / ny
# Initialize high-resolution density grid
high_res_density = np.zeros((ny, nx))
# Fill high-resolution density grid with aggregated values from density_grid
for i in range(x_cells):
for j in range(y_cells):
value = density_grid[j, i]
x_start = int(i * cell_size / x_step)
y_start = int(j * cell_size / y_step)
x_end = int((i + 1) * cell_size / x_step)
y_end = int((j + 1) * cell_size / y_step)
high_res_density[y_start:y_end, x_start:x_end] = value
# Create points for the high-resolution grid
points = [
shapely.geometry.Point(
xmin + (i + 0.5) * x_step, ymin + (j + 0.5) * y_step
)
for j in range(ny)
for i in range(nx)
]
# Flatten the high-resolution density grid to match the points
densities = high_res_density.flatten()
# Ensure the lengths match
assert len(points) == len(densities), (
f"Points length {len(points)} and densities length {len(densities)} do not match."
)
# Create the GeoDataFrame
density_gdf = gpd.GeoDataFrame(
{"geometry": points, "density": densities}
)
density_gdf = density_gdf.set_crs(gdf_polygons.crs)
# Update the pfa dictionary with the new layer
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = density_gdf
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "density"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = "density per " + str(cell_size) + " m^2"
return pfa
@staticmethod
def extract_fault_traces_slice(
pfa, criteria, component, layer, min_z=None, tolerance=1e6
):
"""
Extracts a 2D representation of faults by taking a slice at the bottom of the model.
Parameters:
----------
gdf : GeoDataFrame
A GeoDataFrame containing 3D fault geometries (Point geometries).
fault_id_col : str
The column name representing fault IDs to group points into separate faults.
bottom_z_threshold : float
A Z-value threshold used to select the bottom slice of the model. Points with
Z-values within this threshold from the minimum Z will be included.
Returns:
-------
GeoDataFrame
A GeoDataFrame with LineString geometries representing the bottom traces of each fault.
"""
# Extract GeoDataFrame containing 3D data
gdf_3d = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data"]
fault_id_col = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["id_col"]
# Ensure the geometry column contains 3D Points
if not all(gdf_3d.geometry.geom_type == "Point"):
raise ValueError("All geometries must be Point geometries.")
if not all(gdf_3d.geometry.apply(lambda geom: geom.has_z)):
raise ValueError("All geometries must be 3D (with Z-coordinates).")
if min_z is None:
# Find the minimum Z value in the dataset
min_z = gdf_3d.geometry.apply(lambda geom: geom.z).min()
print(min_z)
# Filter points at exactly min_z (or within a small tolerance)
bottom_points = gdf_3d[
gdf_3d.geometry.apply(
lambda geom: abs(geom.z - min_z) <= tolerance
)
]
# Extract fault traces (LineStrings) for each unique fault ID
fault_traces = []
for fault_id, group in bottom_points.groupby(fault_id_col):
# Ensure there are enough points to create a LineString
if len(group) > 1:
# Sort points by X and Y for proper LineString creation
group = group.sort_values(
by=["geometry"],
key=lambda col: col.apply(lambda geom: (geom.x, geom.y)),
)
trace = shapely.geometry.LineString(
group.geometry.apply(
lambda geom: (geom.x, geom.y, geom.z)
).tolist()
)
fault_traces.append({"fault_id": fault_id, "geometry": trace})
if not fault_traces:
raise ValueError(
"No valid fault traces found at the minimum Z value."
)
# Create a GeoDataFrame from the fault traces
fault_traces_gdf = gpd.GeoDataFrame(
fault_traces, geometry="geometry", crs=gdf_3d.crs
)
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"data"
] = fault_traces_gdf
return pfa
@staticmethod
def process_faults(
pfa,
criteria,
component,
layer,
extent,
nx,
ny,
alpha_fault=1000.0,
alpha_intersection=500.0,
weight_fault=0.7,
weight_intersection=0.3,
use_intersections=True,
):
"""
Process faults by:
1) Computing distance from faults (via distance_from_lines).
2) (Optional) Finding fault intersections, computing distance to those intersections.
3) Applying exponential decays to fault distance and (optionally) intersection distance.
4) Combining via a weighted sum to produce a final favorability.
Parameters
----------
pfa : dict
The PFA dictionary containing fault data in:
pfa['criteria'][criteria]['components'][component]['layers'][layer]['data']
Must have valid line geometry for distance_from_lines to work.
criteria : str
E.g. 'geologic'
component : str
E.g. 'fluid'
layer : str
E.g. 'intersecting_faults'
extent : list
[x_min, y_min, x_max, y_max] bounding box
nx, ny : int
Grid dimensions in x and y directions
alpha_fault : float
Decay constant for fault distance
alpha_intersection : float
Decay constant for intersection distance
weight_fault : float
Weight for the fault distance favorability
weight_intersection : float
Weight for the intersection distance favorability
use_intersections : bool
If False, ignore intersection distances (treat them as zero).
If True, do the intersection calculation + decay.
Returns
-------
pfa : dict
Updated dictionary with a new 'model' containing 'favorability'
under pfa['criteria'][criteria]['components'][component]['layers'][layer].
"""
# Compute distance from faults
pfa = Processing.distance_from_lines(
pfa, criteria, component, layer, extent, nx, ny
)
fault_layer_dict = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]
fault_model_gdf = fault_layer_dict.get("model", None)
dist_col = fault_layer_dict.get("model_data_col", None)
if fault_model_gdf is None or dist_col != "distance":
print(
"ERROR: distance_from_lines did not produce a valid 'model' with 'distance' column."
)
return pfa
fault_dist_1d = fault_model_gdf["distance"].to_numpy()
try:
fault_dist_2d = fault_dist_1d.reshape((ny, nx))
except ValueError as e:
print("ERROR reshaping fault distance array:", e)
return pfa
# find fault intersections
gdf_lines = fault_layer_dict["data"]
if not use_intersections or gdf_lines.empty:
# skip intersection logic
intersect_score_2d = np.zeros((ny, nx))
if not use_intersections:
print(
"use_intersections=False => intersection favorability is zero."
)
else:
print(
"WARNING: No line geometry found, skipping intersections."
)
else:
gdf_intersections = Processing.calculate_intersections(gdf_lines)
if gdf_intersections.empty:
print(
"No fault intersections found => intersection favorability is zero."
)
intersect_favor_2d = np.zeros((ny, nx))
else:
intersections_union = gdf_intersections.geometry.union_all()
grid_points = Processing.generate_grid_points(
extent, nx, ny, crs=gdf_lines.crs
)
dist_intersect_1d = np.array(
[
pt.distance(intersections_union)
for pt in grid_points.geometry
]
)
try:
dist_intersect_2d = dist_intersect_1d.reshape((ny, nx))
except ValueError as e:
print("ERROR reshaping intersection distances:", e)
dist_intersect_2d = np.zeros((ny, nx))
# Exponential decay for intersection distance
intersect_score_2d = np.exp(
-dist_intersect_2d / alpha_intersection
)
# Exponential decay for fault distances
fault_score_2d = np.exp(-fault_dist_2d / alpha_fault)
# Combine with weights in a weighted sum
composite_2d = (
weight_fault * fault_score_2d
+ weight_intersection * intersect_score_2d
)
composite_1d = composite_2d.flatten()
fault_model_gdf = fault_model_gdf.copy()
fault_model_gdf["fault score"] = composite_1d
fault_layer_dict["model"] = fault_model_gdf
fault_layer_dict["model_data_col"] = "fault score"
fault_layer_dict["model_units"] = "fault score (0-1)"
return pfa
# ------ 3D-specific functions (from _Processing3D) ------
@staticmethod
def interpolate_points_3d(
pfa,
criteria,
component,
layer,
nx,
ny,
nz,
extent=None,
method="linear",
):
"""
Function to interpolate, or go from points to a 3D image (voxel grid).
Parameters
----------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames.
criteria : str
Criteria associated with data to interpolate.
component : str
Component associated with data to interpolate.
layer : str
Layer associated with data to interpolate.
nx : int
Number of points in the x-direction.
ny : int
Number of points in the y-direction.
nz : int
Number of points in the z-direction.
extent : list or tuple, optional
List of length 6 containing the 3D extent (i.e., bounding box) of the grid,
in this order: [x_min, y_min, z_min, x_max, y_max, z_max].
If None, the extent is inferred from the input data.
method : str
Method to use for interpolation. Can be one of:
'nearest', 'linear', or 'cubic'.
Passed directly to scipy.interpolate.griddata.
Returns
-------
pfa : dict
Updated pfa config which includes the interpolated 3D model
stored in the specified layer under:
- layer["model"]
- layer["model_data_col"] = "value_interpolated"
Notes
-----
- Uses scipy.interpolate.griddata to perform interpolation.
- Builds a full 3D meshgrid in memory before evaluation.
- Suitable for small to moderate grid sizes.
- For very large grids (e.g., > ~1e6 voxels), consider using
fast_interpolate_points_3d() for improved performance
and reduced memory usage.
"""
total_pts = nx * ny * nz
if total_pts > 1_000_000:
warnings.warn(
"Large 3D grid detected (>1e6 cells). "
"If computation is prohibitively slow, consider "
"fast_interpolate_points_3d() for improved "
"performance and lower memory usage.",
RuntimeWarning,
stacklevel=2,
)
gdf = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data"]
data_col = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data_col"]
# Convert polygons → centroids if needed
if gdf.geometry.type.iloc[0] == "Polygon":
t1 = time.time()
gdf.geometry = gdf.geometry.centroid
# Extract coordinates + values
t2 = time.time()
x = gdf.geometry.x
y = gdf.geometry.y
z = gdf.geometry.apply(lambda geom: geom.z)
values = gdf[data_col]
# Determine grid extents
t3 = time.time()
if extent is None:
x_min, x_max = x.min(), x.max()
y_min, y_max = y.min(), y.max()
z_min, z_max = z.min(), z.max()
else:
x_min, y_min, z_min, x_max, y_max, z_max = extent
# Build grid
t4 = time.time()
x_grid = np.linspace(x_min, x_max, nx)
y_grid = np.linspace(y_min, y_max, ny)
z_grid = np.linspace(z_min, z_max, nz)
xv, yv, zv = np.meshgrid(x_grid, y_grid, z_grid, indexing="ij")
# Interpolation
t5 = time.time()
points = np.vstack((x.to_numpy(), y.to_numpy(), z.to_numpy())).T
grid_values = griddata(
points, values.values, (xv, yv, zv), method=method
)
# Construct GeoDataFrame
t6 = time.time()
interpolated_points = np.vstack((xv.ravel(), yv.ravel(), zv.ravel())).T
interpolated_gdf = gpd.GeoDataFrame(
{"value_interpolated": grid_values.ravel()},
geometry=gpd.points_from_xy(
interpolated_points[:, 0],
interpolated_points[:, 1],
z=interpolated_points[:, 2],
),
crs=gdf.crs,
)
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = interpolated_gdf
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "value_interpolated"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["units"]
return pfa
@staticmethod
def fast_interpolate_points_3d(
pfa,
criteria,
component,
layer,
nx,
ny,
nz,
extent=None,
method="linear", # "linear" or "nearest"
build_gdf=True, # set False to skip heavy GeoDataFrame creation
chunk_points=2_000_000, # max # of grid points per chunk to evaluate
use_representative_point=True, # faster/safer than centroid for polygons
dtype=np.float32, # memory saver vs float64
):
"""
Function to interpolate, or go from points to a 3D image (voxel grid),
using a memory-efficient and scalable approach.
Parameters
----------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames.
criteria : str
Criteria associated with data to interpolate.
component : str
Component associated with data to interpolate.
layer : str
Layer associated with data to interpolate.
nx : int
Number of points in the x-direction.
ny : int
Number of points in the y-direction.
nz : int
Number of points in the z-direction.
extent : list or tuple, optional
List of length 6 containing the 3D extent (i.e., bounding box) of the grid,
in this order: [x_min, y_min, z_min, x_max, y_max, z_max].
If None, the extent is inferred from the input data.
method : str
Method to use for interpolation. Can be one of:
'nearest' or 'linear'.
- 'nearest' uses scipy.spatial.cKDTree (fast KNN lookup).
- 'linear' uses scipy.interpolate.LinearNDInterpolator.
build_gdf : bool, optional
If True (default), builds a GeoDataFrame of 3D Points for the output.
If False, stores compact NumPy arrays instead to reduce memory usage.
chunk_points : int, optional
Maximum number of grid points evaluated per chunk.
Controls memory usage during interpolation.
use_representative_point : bool, optional
If True, converts Polygon geometries to representative_point()
before interpolation (faster and safer than centroid for complex shapes).
dtype : numpy dtype, optional
Data type used for coordinate and value arrays (default: float32).
Lower precision reduces memory usage.
Returns
-------
pfa : dict
Updated pfa config which includes the interpolated 3D model
stored in the specified layer. Depending on build_gdf:
- If True: layer["model"] is a GeoDataFrame of 3D Points.
- If False: layer["model"] is a dictionary containing
coordinate arrays and a 3D values array.
Notes
-----
- Designed for large 3D grids where full meshgrid construction
would be memory-intensive.
- Performs interpolation in chunks to reduce peak memory usage.
- Typically much faster and more scalable than interpolate_points_3d()
for large grids.
- Does not support cubic interpolation.
"""
t0 = time.time()
node = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]
gdf = node["data"]
data_col = node["data_col"]
# If polygons: convert to points once
if gdf.geometry.type.iloc[0] == "Polygon":
print(
"Converting polygons to points "
f"via {'representative_point()' if use_representative_point else 'centroid'}"
)
if use_representative_point:
gdf = gdf.set_geometry(gdf.geometry.representative_point())
else:
gdf = gdf.set_geometry(gdf.geometry.centroid)
# Extract coordinates (vectorized for x,y; z via shapely.get_z if available)
x = gdf.geometry.x.to_numpy(dtype=dtype, copy=False)
y = gdf.geometry.y.to_numpy(dtype=dtype, copy=False)
# --- z extraction (vectorized when Shapely 2 is present) ---
geoms = gdf.geometry.to_numpy() # a GeometryArray
if _HAS_GET_Z:
# Shapely 2.x: just pass the array; no ".data"
try:
z = get_z(geoms).astype(dtype, copy=False)
except Exception:
# If any geometry lacks Z, fall back gracefully
z = np.fromiter(
(getattr(geom, "z", np.nan) for geom in geoms),
count=len(geoms),
dtype=dtype,
)
else:
# Shapely 1.x fallback (still vector-ish, avoids .apply)
z = np.fromiter(
(getattr(geom, "z", np.nan) for geom in geoms),
count=len(geoms),
dtype=dtype,
)
values = gdf[data_col].to_numpy(dtype=dtype, copy=False)
# Define grid
if extent is None:
x_min, x_max = float(x.min()), float(x.max())
y_min, y_max = float(y.min()), float(y.max())
z_min, z_max = float(z.min()), float(z.max())
else:
x_min, y_min, z_min, x_max, y_max, z_max = map(float, extent)
x_grid = np.linspace(x_min, x_max, nx, dtype=dtype)
y_grid = np.linspace(y_min, y_max, ny, dtype=dtype)
z_grid = np.linspace(z_min, z_max, nz, dtype=dtype)
print(f"\t\tGrid resolution: {nx} x {ny} x {nz}")
# Construct grid coordinate arrays lazily to avoid a huge full mesh in memory
# We'll evaluate in chunks of flattened (x,y,z) points.
total_pts = nx * ny * nz
# Build interpolator / index once
t1 = time.time()
if method == "nearest":
tree = cKDTree(np.column_stack((x, y, z)))
interp_obj = tree # alias
elif method == "linear":
interp_obj = LinearNDInterpolator(
np.column_stack((x, y, z)), values, fill_value=np.nan
)
else:
raise ValueError(
"method must be 'linear' or 'nearest' for this optimized version"
)
print(f"\t\tInterpolator built in {time.time() - t1:.3f}s")
# Preallocate output
out = np.empty(total_pts, dtype=dtype)
# Helper to evaluate a chunk of linear indices
def eval_chunk(flat_idx_slice):
# Map flat indices -> (ix, iy, iz)
inds = np.arange(
flat_idx_slice.start, flat_idx_slice.stop, dtype=np.int64
)
iz = inds % nz
iy = (inds // nz) % ny
ix = inds // (ny * nz)
X = x_grid[ix]
Y = y_grid[iy]
Z = z_grid[iz]
pts = np.column_stack((X, Y, Z))
if method == "nearest":
dists, locs = interp_obj.query(pts, k=1, workers=-1)
vals = values[locs].astype(dtype, copy=False)
return vals
vals = interp_obj(pts)
# interp_obj returns float64 by default; cast down
return vals.astype(dtype, copy=False)
# Chunked evaluation to keep peak memory in check
t2 = time.time()
if chunk_points is None or chunk_points >= total_pts:
out[:] = eval_chunk(slice(0, total_pts))
else:
start = 0
while start < total_pts:
end = min(start + int(chunk_points), total_pts)
out[start:end] = eval_chunk(slice(start, end))
start = end
print(f"\t\tInterpolation evaluated in {time.time() - t2:.3f}s")
# Optionally build GeoDataFrame (slow for very large grids)
if build_gdf:
t3 = time.time()
# Create coordinates for geometry creation (still heavy; consider skipping for huge grids)
xv, yv, zv = np.meshgrid(x_grid, y_grid, z_grid, indexing="ij")
interpolated_points = np.column_stack(
(xv.ravel(), yv.ravel(), zv.ravel())
)
interpolated_gdf = gpd.GeoDataFrame(
{"value_interpolated": out},
geometry=gpd.points_from_xy(
interpolated_points[:, 0],
interpolated_points[:, 1],
z=interpolated_points[:, 2],
),
crs=gdf.crs,
)
print(f"\t\tGeoDataFrame built in {time.time() - t3:.3f}s")
model_obj = interpolated_gdf
model_col = "value_interpolated"
else:
# Store compact arrays—let downstream code build a GDF only if truly needed
model_obj = {
"x": x_grid,
"y": y_grid,
"z": z_grid,
"values": out.reshape(nx, ny, nz),
"crs": gdf.crs,
}
model_col = None # not applicable
# Update PFA
node["model"] = model_obj
node["model_data_col"] = (
model_col if model_col is not None else "values"
)
node["model_units"] = node["units"]
print(f"\t\tTotal time: {time.time() - t0:.3f}s")
return pfa
@staticmethod
def kriging_3d(
pfa,
criteria,
component,
layer,
nx,
ny,
nz,
extent=None,
variogram_model="linear",
):
"""Function to interpolate 3D points to a 3D grid using PyKrige.
Parameters
----------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames.
criteria : str
Criteria associated with data to interpolate.
component : str
Component associated with data to interpolate.
layer : str
Layer associated with data to interpolate.
nx, ny, nz : int
Number of grid points in the x, y, and z directions.
extent : list
List of length 6 containing the 3D extent of the grid,
in this order: [x_min, y_min, z_min, x_max, y_max, z_max].
variogram_model : str
Variogram model to use for kriging. Default is 'linear'.
Returns
-------
pfa : dict
Updated pfa config which includes interpolation results.
"""
gdf = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data"]
data_col = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data_col"]
if gdf.geometry.type.iloc[0] == "Polygon":
print(
"Notice: interpolate_points_3d() received GeoDataFrame with geometry type 'Polygon.' Converting geometry to 'Point' geometry using centroids."
)
gdf.geometry = gdf.geometry.centroid
# Extract coordinates and values from the GeoDataFrame
x = gdf.geometry.x
y = gdf.geometry.y
z = gdf.geometry.apply(
lambda geom: geom.z
) # Extract z directly from the geometry
values = gdf[data_col]
# Define the 3D grid for interpolation
if extent is None:
x_min, x_max = x.min(), x.max()
y_min, y_max = y.min(), y.max()
z_min, z_max = z.min(), z.max()
else:
x_min, y_min, z_min, x_max, y_max, z_max = extent
x_grid = np.linspace(x_min, x_max, nx)
y_grid = np.linspace(y_min, y_max, ny)
z_grid = np.linspace(z_min, z_max, nz)
xv, yv, zv = np.meshgrid(x_grid, y_grid, z_grid, indexing="ij")
# Perform 3D Kriging interpolation
kriging = OrdinaryKriging3D(
x.values,
y.values,
z.values,
values.values,
variogram_model=variogram_model,
)
grid, _ = kriging.execute("grid", x_grid, y_grid, z_grid)
# Create a new GeoDataFrame with the interpolated values
interpolated_points = np.vstack((xv.ravel(), yv.ravel(), zv.ravel())).T
interpolated_gdf = gpd.GeoDataFrame(
{"value_interpolated": grid.ravel()},
geometry=gpd.points_from_xy(
interpolated_points[:, 0],
interpolated_points[:, 1],
z=interpolated_points[:, 2],
),
crs=gdf.crs,
)
# Update the PFA dictionary with interpolation results
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = interpolated_gdf
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "value_interpolated"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["units"]
return pfa
@staticmethod
def distance_from_lines_3d(
pfa, criteria, component, layer, extent, nx, ny, nz
):
"""Function to calculate distance from LineString objects in 3D.
Parameters
----------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames, particularly the gdf with LineString
geometry to calculate distances from.
criteria : str
Criteria associated with LineString data to calculate distances from.
component : str
Component associated with LineString data to calculate distances from.
layer : str
Layer associated with LineString data to calculate distances from.
extent : list
List of length 6 containing the 3D extent (i.e., bounding box) to use to produce the
distance model. Order: [x_min, y_min, z_min, x_max, y_max, z_max].
nx, ny, nz : int
Number of points in the x, y, and z directions.
Returns
-------
pfa : dict
Updated pfa config which includes distances from LineString objects.
"""
gdf_linestrings = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["data"]
# Define extent
x_min, y_min, z_min, x_max, y_max, z_max = extent
# Generate 3D grid of points
x_points = np.linspace(x_min, x_max, nx)
y_points = np.linspace(y_min, y_max, ny)
z_points = np.linspace(z_min, z_max, nz)
grid_points = [
shapely.geometry.Point(x, y, z)
for x in x_points
for y in y_points
for z in z_points
]
# Create a GeoDataFrame for grid points
gdf_points = gpd.GeoDataFrame(
geometry=grid_points, crs=gdf_linestrings.crs
)
# Convert LineString geometries to 3D if not already 3D
def ensure_3d(geom):
if geom.has_z:
return geom
return shapely.geometry.LineString(
[(x, y, 0) for x, y in geom.coords]
)
gdf_linestrings["geometry"] = gdf_linestrings.geometry.apply(ensure_3d)
# Calculate 3D distances
distances = []
for point in gdf_points.geometry:
min_distance = float("inf")
for line in gdf_linestrings.geometry:
# Compute distance between the point and the LineString in 3D
distance = line.distance(point)
min_distance = min(min_distance, distance)
distances.append(min_distance)
# Add distances to the points GeoDataFrame
gdf_points["distance"] = distances
# Update the PFA dictionary with the distance results
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = gdf_points
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "distance"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = "distance (m)"
return pfa
@staticmethod
def distance_from_3d_solids(
pfa,
*,
criteria: str,
component: str,
layer: str,
extent: tuple,
nx: int,
ny: int,
nz: int,
):
"""
Compute a 3D distance field to the 3D Polygon geometries whose vertices have
z-coordinates.
For each horizontal slice:
1. Extract 2D footprints from the 3D solids at the given z-level.
2. Union the footprints and calculate 2D planar distances on a grid.
3. For slices without geometry, propagate distances from the nearest valid layer
while accounting for vertical gaps.
The result is stored as a 3D GeoDataFrame of grid points with a 'distance' attribute,
saved to `layer['model']` in the PFA dictionary.
Parameters
----------
pfa : dict
PFA dictionary.
criteria : str
Name of the criteria level in the PFA hierarchy.
component : str
Name of the component within the criteria.
layer : str
Layer name containing the 3D solid geometries.
extent : tuple
Bounding box [xmin, ymin, zmin, xmax, ymax, zmax] in same CRS units.
nx : int
Number of grid cells in the x-direction.
ny : int
Number of grid cells in the y-direction.
nz : int
Number of grid layers in the z-direction.
Returns
-------
pfa : dict
Updated PFA dictionary with distance model.
"""
layer_dict = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]
gdf3 = layer_dict["data"]
xmin, ymin, zmin, xmax, ymax, zmax = extent
xs = np.linspace(xmin, xmax, nx)
ys = np.linspace(ymin, ymax, ny)
zs = np.linspace(zmin, zmax, nz)
dz = zs[1] - zs[0] if nz > 1 else 0
ztol = dz * 0.5 + 1e-9
# initialize distance volume
D = np.full((nz, ny, nx), np.inf)
valid = []
for iz, zv in enumerate(zs):
fps = []
for geom3d in gdf3.geometry:
fp = Processing.slice_geometry_at_z(geom3d, zv, z_tol=ztol)
if fp is None or fp.is_empty:
continue
fps.append(fp)
if not fps:
continue
uni = unary_union(fps)
pts2 = [shapely.geometry.Point(x, y) for y in ys for x in xs]
dist2 = np.array([pt.distance(uni) for pt in pts2]).reshape(ny, nx)
D[iz] = dist2
valid.append(iz)
# vertical propagation
if valid:
for iz in range(nz):
if iz not in valid:
nearest = min(valid, key=lambda v: abs(v - iz))
D[iz] = D[nearest] + abs(zs[nearest] - zs[iz])
# flatten to GeoDataFrame
all_pts, all_d = [], []
for iz in range(nz):
for iy in range(ny):
for ix in range(nx):
all_pts.append(
shapely.geometry.Point(xs[ix], ys[iy], zs[iz])
)
all_d.append(D[iz, iy, ix])
gdf_out = gpd.GeoDataFrame(
{"distance": all_d}, geometry=all_pts, crs=gdf3.crs
)
layer_dict["model"] = gdf_out
layer_dict["model_data_col"] = "distance"
layer_dict["model_units"] = "m"
return pfa
@staticmethod
def weighted_distance_from_points_3d(
pfa,
criteria,
component,
layer,
extent,
nx,
ny,
nz,
alpha=1000.0,
weight_points=True,
weight_min=1.0,
weight_max=2.0,
mode="max",
k_nearest=None,
):
"""
Compute a weighted influence field from 3D points over a 3D grid.
For each voxel centroid, calculates an exponential decay function
of distance to all 3D points, optionally weighted by transformed
attribute values. Influence scores across points are aggregated
using the specified mode.
The result is stored as a 3D GeoDataFrame of voxel centroids with
'weighted_point_score', saved to `layer['model']` in the PFA dictionary.
Aggregation Modes
-----------------
- "mean" :
Averages the influence from all points at each voxel.
Useful for sparse datasets or when you want an overall
picture of distributed influence.
- "max" :
Selects only the single strongest influence at each voxel.
Helps highlight individual points outside clusters, avoiding
dilution by nearby high-density areas.
- "knearest" :
Sums the influence from the k nearest points to each voxel.
Provides a balance between focusing on dominant single points
and capturing the effect of clusters. Setting k=1 behaves
like "max"; higher values blend influence from multiple neighbors.
Parameters
----------
pfa : dict
PFA dictionary.
criteria : str
Name of the criteria level in the PFA hierarchy.
component : str
Name of the component within the criteria.
layer : str
Name of the layer containing point geometries whose influence is computed.
Attribute values for weighting are read from `layer_dict['data']` using
the column specified in `layer_dict['data_col']` if it exists.
extent : tuple
Bounding box [xmin, ymin, zmin, xmax, ymax, zmax] in same CRS units.
nx : int
Number of grid cells in the x-direction.
ny : int
Number of grid cells in the y-direction.
nz : int
Number of grid layers in the z-direction.
alpha : float, optional
Decay length scale in the same units as the CRS (e.g. meters). Default is 1000.
A larger value gives slower decay, making the influence spread farther.
weight_points : bool, optional
Whether to apply weighting based on attribute values. Default is True.
weight_min : float, optional
Minimum weight after normalization. Default is 1.0.
weight_max : float, optional
Maximum weight after normalization. Default is 2.0.
mode : str, optional
Aggregation mode: one of 'mean', 'max', or 'knearest'. Default is 'max'.
k_nearest : int, optional
Number of nearest neighbors to consider if mode='knearest'.
Returns
-------
pfa : dict
Updated PFA dictionary with weighted influence model.
"""
# helper
def normalize(vals, lo=1.0, hi=5.0):
a = vals.astype(float)
mask = ~np.isnan(a)
if not mask.any():
return np.full_like(a, lo)
vmin, vmax = a[mask].min(), a[mask].max()
if vmin == vmax:
return np.full_like(a, (lo + hi) / 2.0)
a[mask] = (a[mask] - vmin) / (vmax - vmin) * (hi - lo) + lo
return a
layer_dict = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]
gdf_pts = layer_dict["data"]
data_col = layer_dict.get("data_col")
# collect points and values
pts, vals = [], []
for geom, val in zip(
gdf_pts.geometry,
gdf_pts[data_col] if data_col else np.ones(len(gdf_pts)),
):
if geom.geom_type == "Point":
pts.append(geom)
vals.append(val if data_col else 1.0)
elif geom.geom_type == "MultiPoint":
for sub in geom.geoms:
pts.append(sub)
vals.append(val if data_col else 1.0)
if not pts:
print("No valid points - pfa unchanged.")
return pfa
arr_vals = np.nan_to_num(np.asarray(vals, dtype=float))
weights = (
normalize(arr_vals, weight_min, weight_max)
if weight_points
else np.ones_like(arr_vals)
)
# build grid
xmin, ymin, zmin, xmax, ymax, zmax = extent
xs = np.linspace(xmin, xmax, nx)
ys = np.linspace(ymin, ymax, ny)
zs = np.linspace(zmin, zmax, nz)
grid_xyz = np.array(
[(x, y, z) for z in zs for y in ys for x in xs]
) # (n_grid, 3)
# distances & decays
pt_xyz = np.array([(p.x, p.y, p.z) for p in pts])
dmat = scipy.spatial.distance.cdist(
grid_xyz, pt_xyz
) # (n_grid, n_pts)
decays = np.exp(-dmat / alpha)
# aggregation modes
if mode == "mean":
score = np.mean(decays * weights, axis=1)
elif mode == "max":
score = np.max(decays * weights, axis=1)
elif mode == "knearest":
if k_nearest is None:
raise ValueError("k_nearest must be set for mode='knearest'")
idx = np.argpartition(dmat, k_nearest, axis=1)[:, :k_nearest]
rows = np.repeat(
np.arange(dmat.shape[0])[:, None], k_nearest, axis=1
)
dk = dmat[rows, idx]
wk = weights[idx]
score = np.sum(np.exp(-dk / alpha) * wk, axis=1)
else:
raise ValueError("mode must be one of ['mean','max','knearest']")
# store results
gdf_grid = gpd.GeoDataFrame(
geometry=list(starmap(shapely.geometry.Point, grid_xyz)),
data={"weighted_point_score": score},
crs=gdf_pts.crs,
)
layer_dict["model"] = gdf_grid
layer_dict["model_data_col"] = "weighted_point_score"
layer_dict["model_units"] = "score (0-inf)"
return pfa
@staticmethod
def point_density_3d_grid(
pfa,
criteria,
component,
layer,
extent,
cell_size_x,
cell_size_y,
cell_size_z,
):
"""Compute point density on a 3D voxel grid.
Counts the number of points in each cell of a grid defined by
cell_size_x, cell_size_y, and cell_size_z within the given extent.
Returns a GeoDataFrame of voxel centroids with density values.
"""
# Extract GeoDataFrame from pfa dictionary
gdf = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data"]
# Define the extent
xmin, ymin, zmin, xmax, ymax, zmax = extent
# Create grid for density calculation
x_cells = int((xmax - xmin) / cell_size_x)
y_cells = int((ymax - ymin) / cell_size_y)
z_cells = int((zmax - zmin) / cell_size_z)
density_grid = np.zeros((z_cells, y_cells, x_cells))
# Count points in each 3D cell
for geom in gdf.geometry:
if isinstance(geom, shapely.geometry.Point):
points = [geom]
elif isinstance(geom, shapely.geometry.MultiPoint):
points = geom.geoms
else:
points = []
for point in points:
# Check if the point is within the extent
if (
(xmin <= point.x <= xmax)
and (ymin <= point.y <= ymax)
and (zmin <= point.z <= zmax)
):
# Calculate indices
x_index = int((point.x - xmin) / cell_size_x)
y_index = int((point.y - ymin) / cell_size_y)
z_index = int((point.z - zmin) / cell_size_z)
# Ensure indices are within bounds
if (
0 <= x_index < x_cells
and 0 <= y_index < y_cells
and 0 <= z_index < z_cells
):
density_grid[z_index, y_index, x_index] += 1
# Create a uniform grid-based GeoDataFrame
points = []
densities = []
for z_idx in range(z_cells):
for y_idx in range(y_cells):
for x_idx in range(x_cells):
# Calculate centroid of each grid cell
x_coord = xmin + (x_idx + 0.5) * cell_size_x
y_coord = ymin + (y_idx + 0.5) * cell_size_y
z_coord = zmin + (z_idx + 0.5) * cell_size_z
# Append geometry and density
points.append(
shapely.geometry.Point(x_coord, y_coord, z_coord)
)
densities.append(density_grid[z_idx, y_idx, x_idx])
# Create the GeoDataFrame
density_gdf = gpd.GeoDataFrame(
{"geometry": points, "density": densities}
)
density_gdf = density_gdf.set_crs(gdf.crs)
# Update the pfa dictionary with the new layer
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = density_gdf
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "density"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = f"density per {cell_size_x}x{cell_size_y}x{cell_size_z} m^3"
return pfa
@staticmethod
def point_density_3d_projected(
pfa,
criteria,
component,
layer,
extent,
cell_size_x,
cell_size_y,
cell_size_z,
nx,
ny,
nz,
):
"""Project coarse 3D point density onto a finer grid.
First counts points on a coarse voxel grid (cell_size_*),
then assigns those densities to a higher-resolution block
model defined by nx, ny, nz. Each fine voxel inherits the
density of its containing coarse cell.
"""
# Extract GeoDataFrame from pfa dictionary
gdf = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data"]
# Define the extent
xmin, ymin, zmin, xmax, ymax, zmax = extent
# Calculate lower-resolution grid dimensions
x_cells = int((xmax - xmin) / cell_size_x)
y_cells = int((ymax - ymin) / cell_size_y)
z_cells = int((zmax - zmin) / cell_size_z)
# Initialize lower-resolution density grid
density_grid = np.zeros((z_cells, y_cells, x_cells))
# Count points in each lower-resolution cell
for geom in gdf.geometry:
if isinstance(geom, shapely.geometry.Point):
points = [geom]
elif isinstance(geom, shapely.geometry.MultiPoint):
points = geom.geoms
else:
points = []
for point in points:
# Check if the point is within the extent
if (
(xmin <= point.x <= xmax)
and (ymin <= point.y <= ymax)
and (zmin <= point.z <= zmax)
):
# Calculate indices for the lower-resolution grid
x_index = int((point.x - xmin) / cell_size_x)
y_index = int((point.y - ymin) / cell_size_y)
z_index = int((point.z - zmin) / cell_size_z)
# Ensure indices are within bounds
if (
0 <= x_index < x_cells
and 0 <= y_index < y_cells
and 0 <= z_index < z_cells
):
density_grid[z_index, y_index, x_index] += 1
# Initialize higher-resolution grid
higher_resolution_densities = np.zeros((nz, ny, nx))
higher_resolution_points = []
# Calculate higher-resolution cell sizes
cell_size_x_hr = (xmax - xmin) / nx
cell_size_y_hr = (ymax - ymin) / ny
cell_size_z_hr = (zmax - zmin) / nz
# Assign densities from lower-resolution grid to higher-resolution grid
for z_idx_hr in range(nz):
for y_idx_hr in range(ny):
for x_idx_hr in range(nx):
# Determine the centroid of the higher-resolution cell
x_coord = xmin + (x_idx_hr + 0.5) * cell_size_x_hr
y_coord = ymin + (y_idx_hr + 0.5) * cell_size_y_hr
z_coord = zmin + (z_idx_hr + 0.5) * cell_size_z_hr
# Determine which lower-resolution cell this higher-resolution cell falls into
x_idx_lr = int((x_coord - xmin) / cell_size_x)
y_idx_lr = int((y_coord - ymin) / cell_size_y)
z_idx_lr = int((z_coord - zmin) / cell_size_z)
# Assign density from lower-resolution cell if within bounds
if (
0 <= x_idx_lr < x_cells
and 0 <= y_idx_lr < y_cells
and 0 <= z_idx_lr < z_cells
):
density = density_grid[z_idx_lr, y_idx_lr, x_idx_lr]
else:
density = 0 # Assign zero if outside bounds
higher_resolution_densities[
z_idx_hr, y_idx_hr, x_idx_hr
] = density
higher_resolution_points.append(
shapely.geometry.Point(x_coord, y_coord, z_coord)
)
# Create the GeoDataFrame with the higher-resolution grid
density_gdf = gpd.GeoDataFrame(
{
"geometry": higher_resolution_points,
"density": higher_resolution_densities.flatten(),
}
)
density_gdf = density_gdf.set_crs(gdf.crs)
# Update the pfa dictionary with the new layer
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model"
] = density_gdf
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_data_col"
] = "density"
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"model_units"
] = f"density per {cell_size_x}x{cell_size_y}x{cell_size_z} m^3"
return pfa
# ------ General processing functions (not strictly 2D or 3D) ------
@staticmethod
def extrude_2d_to_3d(
pfa,
criteria,
component,
layer,
extent,
nz,
strike=None,
dip=None,
target_z_meas=None,
):
"""
Extrude 2D/3D geometries into 3D solids using a 3D extent and optional dip.
Parameters
----------
pfa : dict
Project/frame archive structure.
criteria, component, layer : str
Keys into pfa["criteria"][criteria]["components"][component]["layers"][layer].
extent : list or tuple
[xmin, ymin, zmin, xmax, ymax, zmax].
The smallest of (zmin, zmax) is treated as the bottom,
the largest as the top of the extrusion. x/y are used to clip geometry.
nz : int
Number of vertical layers (stored as metadata).
strike : float, optional
Global strike azimuth in degrees, clockwise from North.
If None or used with dip=None, extrusion is vertical.
dip : float, optional
Global dip angle in degrees from horizontal (0 to 90).
If None or dip ~ 90°, extrusion is vertical.
target_z_meas : any, optional
Stored in layer_dict["z_meas"] for downstream use.
"""
# Unpack extent (note: your convention is [xmin, ymin, zmin, xmax, ymax, zmax])
xmin, ymin, z0, xmax, ymax, z1 = extent
# Enforce consistent vertical ordering
z_min = min(z0, z1)
z_max = max(z0, z1)
# Build XY clipping box
xy_box = shapely.geometry.box(xmin, ymin, xmax, ymax)
layer_dict = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]
gdf2 = layer_dict["data"]
# Backup original data
layer_dict["old_data"] = gdf2.copy()
layer_dict["z_meas"] = target_z_meas
def _dip_offset(z_top, z_bot, strike_deg, dip_deg):
"""
Compute (dx, dy) offset for the bottom trace along dip direction over
the vertical span from z_top to z_bot.
"""
if strike_deg is None or dip_deg is None:
return 0.0, 0.0
# Treat near-vertical as vertical extrusion
if dip_deg >= 89.9:
return 0.0, 0.0
dz = z_top - z_bot # positive if z_top > z_bot
if dz == 0:
return 0.0, 0.0
dip_rad = math.radians(dip_deg)
# tan(dip) = vertical / horizontal => horizontal = vertical / tan(dip)
horiz = abs(dz) / math.tan(dip_rad)
strike_rad = math.radians(strike_deg)
dip_az = strike_rad + math.pi / 2.0 # dip direction
# azimuth convention: x = East, y = North
dx = horiz * math.sin(dip_az)
dy = horiz * math.cos(dip_az)
return dx, dy
# Global offset for bottom surfaces (same z_min/z_max for all features)
dx_dip, dy_dip = _dip_offset(z_max, z_min, strike, dip)
# Helper to safely get x,y from a coordinate (supports 2D or 3D coords)
def _xy(coord):
return coord[0], coord[1]
geoms3 = []
for geom in gdf2.geometry:
if geom.is_empty:
continue
# Clip to XY box first (works for 2D or 3D, z is dropped by intersection)
geom_clipped = geom.intersection(xy_box)
if geom_clipped.is_empty:
continue
# Lines → fault walls
if geom_clipped.geom_type in ("LineString", "MultiLineString"):
parts = (
geom_clipped.geoms
if geom_clipped.geom_type == "MultiLineString"
else [geom_clipped]
)
for line in parts:
coords = [_xy(c) for c in line.coords]
# Top trace at z_max
top = [(x, y, z_max) for x, y in coords]
# Bottom trace at z_min, shifted along dip direction
bot = [
(x + dx_dip, y + dy_dip, z_min)
for x, y in reversed(coords)
]
ring = top + bot
geoms3.append(shapely.geometry.Polygon(ring))
# Polygons → 3D prisms (bottom offset along dip)
elif geom_clipped.geom_type in ("Polygon", "MultiPolygon"):
polys = (
geom_clipped.geoms
if geom_clipped.geom_type == "MultiPolygon"
else [geom_clipped]
)
for poly in polys:
ext = [_xy(c) for c in poly.exterior.coords]
top_ext = [(x, y, z_max) for x, y in ext]
bot_ext = [
(x + dx_dip, y + dy_dip, z_min)
for x, y in reversed(ext)
]
holes3 = []
for hole in poly.interiors:
hc = [_xy(c) for c in hole.coords]
top_h = [(x, y, z_max) for x, y in hc]
bot_h = [
(x + dx_dip, y + dy_dip, z_min)
for x, y in reversed(hc)
]
holes3.append(top_h + bot_h)
geoms3.append(
shapely.geometry.Polygon(
top_ext + bot_ext, holes=holes3
)
)
# Skip points and unsupported geometry types
else:
continue
gdf3 = gpd.GeoDataFrame(geometry=geoms3, crs=gdf2.crs)
# Mark how it was extruded
gdf3.attrs["extruded"] = True
gdf3.attrs["z_min"] = z_min
gdf3.attrs["z_max"] = z_max
gdf3.attrs["nz"] = nz
if strike is not None:
gdf3.attrs["strike"] = strike
if dip is not None:
gdf3.attrs["dip"] = dip
# Replace layer data with 3D solids
layer_dict["data"] = gdf3
layer_dict["data_col"] = "None"
layer_dict["units"] = ""
layer_dict["z_meas"] = target_z_meas
return pfa
@staticmethod
def create_fault_surfaces_from_points(gdf_points, fault_number_col):
"""Create surfaces representing faults from point data.
Parameters
----------
gdf_points : GeoDataFrame
GeoDataFrame containing point data with x, y, z coordinates and fault numbers.
fault_number_col : str
Column name in the GeoDataFrame that contains fault numbers.
Returns
-------
gdf_surfaces : GeoDataFrame
GeoDataFrame containing surfaces (Polygons or MultiPolygons) for each fault.
"""
fault_surfaces = []
# Group points by fault number
grouped = gdf_points.groupby(fault_number_col)
for fault_number, group in grouped:
points = group.geometry
coords = [(point.x, point.y, point.z) for point in points]
# Create a surface (e.g., convex hull) for the fault
try:
surface = shapely.geometry.MultiPoint(coords).convex_hull
fault_surfaces.append(
{"fault_number": fault_number, "geometry": surface}
)
except Exception as e:
print(f"Error creating surface for fault {fault_number}: {e}")
# Create GeoDataFrame for surfaces
gdf_surfaces = gpd.GeoDataFrame(fault_surfaces)
gdf_surfaces = gdf_surfaces.set_crs(gdf_points.crs)
return gdf_surfaces
@staticmethod
def slice_geometry_at_z(geom3d, z, z_tol=1e-8):
"""
Extract the 2D footprint of a 3D polygon geometry at a specific elevation.
If the specified z-level lies outside the vertical extent of the geometry,
returns None. Degenerate or invalid polygon slices are converted to their
boundary LineString.
Parameters
----------
geom3d : shapely Polygon
3D solid polygon geometry with z-coordinates in its vertices.
z : float
Target elevation for slicing the geometry.
z_tol : float, optional
Vertical tolerance for including slices near the target elevation.
Returns
-------
shapely geometry or None
2D Polygon or LineString footprint at elevation z, or None if outside range.
"""
if not hasattr(geom3d, "exterior"):
return None
coords3 = list(geom3d.exterior.coords)
zs = [c[2] for c in coords3]
zlo, zhi = min(zs), max(zs)
if z < zlo - z_tol or z > zhi + z_tol:
return None
xy = [(x, y) for (x, y, _) in coords3]
# attempt full polygon
try:
fp = shapely.geometry.Polygon(xy)
except Exception:
fp = shapely.geometry.LineString(xy)
# convert zero-area or invalid polygons to boundary
if fp.geom_type == "Polygon" and (fp.area == 0 or not fp.is_valid):
fp = fp.boundary
return fp
@staticmethod
def convert_3d_to_2d(pfa, criteria, component, layer):
"""
Convert a 3D GeoDataFrame layer to a 2D representation by collapsing along the Z-dimension.
This function processes a 3D layer stored in the `pfa` dictionary structure, filtering out empty geometries,
sorting the data by X, Y, and Z coordinates, and aggregating data by grouping on X and Y. The resulting
2D GeoDataFrame replaces the original 3D data in the `pfa` dictionary.
Parameters:
----------
pfa : dict
A nested dictionary containing geospatial data organized by criteria, components, and layers.
The specific layer to be processed is accessed using the given `criteria`, `component`, and `layer` keys.
criteria : str
The key in `pfa['criteria']` identifying the specific criteria containing the 3D layer.
component : str
The key in `pfa['criteria'][criteria]['components']` identifying the specific component containing the 3D layer.
layer : str
The key in `pfa['criteria'][criteria]['components'][component]['layers']` identifying the specific 3D layer.
Returns:
-------
dict
The updated `pfa` dictionary with the 3D layer converted to a 2D representation. The resulting 2D GeoDataFrame
is stored in `pfa['criteria'][criteria]['components'][component]['layers'][layer]['data']`.
Raises:
------
ValueError
If the `geometry` column in the GeoDataFrame is not of type `Point` or contains invalid geometries.
Notes:
------
- Empty geometries (e.g., `POINT EMPTY`) are filtered out before processing.
- Sorting is performed by X, Y, and Z coordinates. Empty points are handled gracefully.
- Aggregation sums the values in the specified data column while keeping a representative geometry for each (X, Y) pair.
- The units of the data are updated to reflect the aggregation performed.
"""
# Extract GeoDataFrame containing 3D data
gdf_3d = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data"]
col = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data_col"]
units = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["units"]
crs = gdf_3d.crs
# Ensure the geometry column is of Point type
if not gdf_3d.geometry.geom_type.isin(["Point"]).all():
raise ValueError("Geometry column must contain Point geometries.")
# Filter out empty geometries
gdf_3d = gdf_3d[~gdf_3d.geometry.is_empty].reset_index(drop=True)
# Sort geometries by X, Y, Z, handling empty points gracefully
gdf_3d = gdf_3d.sort_values(
by=["geometry"],
key=lambda col: col.apply(
lambda geom: (
(geom.x, geom.y, geom.z)
if not geom.is_empty
else (float("nan"), float("nan"), float("nan"))
)
),
).reset_index(drop=True)
# Step 1: Aggregate by unique (X, Y, Z) points
gdf_3d = gdf_3d.groupby(
gdf_3d.geometry.apply(lambda geom: (geom.x, geom.y, geom.z)),
as_index=False,
).agg(
{
"geometry": "first", # Keep the representative geometry
col: "sum", # Sum the data column for unique (X, Y, Z)
}
)
gdf_3d = gdf_3d.set_geometry("geometry")
gdf_3d.set_crs(crs, inplace=True)
# Step 2: Collapse Z dimension to (X, Y)
gdf_2d = gdf_3d.groupby(
gdf_3d.geometry.apply(lambda geom: (geom.x, geom.y)),
as_index=False,
).agg(
{
"geometry": "first", # Keep the representative geometry
col: "sum", # Sum across Z for unique (X, Y)
}
)
gdf_2d = gdf_2d.set_geometry("geometry")
gdf_2d.set_crs(crs, inplace=True)
# Update the pfa dictionary with the new layer
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"data"
] = gdf_2d
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"units"
] = "summed " + units
return pfa
@staticmethod
def extract_fault_traces_top(pfa, criteria, component, layer):
"""
Create a 2D representation of 3D faults by extracting the trace at the top of each fault.
Parameters:
----------
gdf_3d : GeoDataFrame
A GeoDataFrame containing 3D fault data with geometries (Polygons or MultiPolygons).
fault_id_col : str
The name of the column in `gdf_3d` that uniquely identifies faults.
Returns:
-------
GeoDataFrame
A GeoDataFrame containing 2D fault traces (LineStrings) for the top of each fault.
Notes:
------
- The Z-coordinate is extracted for the "top" of each fault.
- The resulting GeoDataFrame is 2D (ignoring Z-coordinates in the output geometries).
"""
# Extract GeoDataFrame containing 3D data
gdf_3d = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data"]
fault_id_col = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["id_col"]
# Check if the geometry is 3D
if not gdf_3d.geometry.apply(lambda geom: geom.has_z).all():
raise ValueError(
"All geometries in the GeoDataFrame must have Z-coordinates."
)
# Extract Z-coordinates as a separate column
gdf_3d["z"] = gdf_3d.geometry.apply(lambda geom: geom.z)
# Extract the top trace for each fault
fault_traces = []
for fault_id, group in gdf_3d.groupby(fault_id_col):
# Sort by Z to get the topmost points
top_points = group.sort_values(by="z", ascending=False)
# Create a LineString from the topmost points
trace = shapely.geometry.LineString(
[
(point.geometry.x, point.geometry.y)
for _, point in top_points.iterrows()
]
)
fault_traces.append({"fault_id": fault_id, "geometry": trace})
# Create a new GeoDataFrame with the 2D fault traces
gdf_2d = gpd.GeoDataFrame(
fault_traces, geometry="geometry", crs=gdf_3d.crs
)
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"data"
] = gdf_2d
return pfa
@staticmethod
def extract_fault_traces_bottom(
pfa, criteria, component, layer, min_z=None, tolerance=1e6
):
"""
Extracts a 2D representation of faults by taking a slice at the bottom of the model.
Parameters:
----------
gdf : GeoDataFrame
A GeoDataFrame containing 3D fault geometries (Point geometries).
fault_id_col : str
The column name representing fault IDs to group points into separate faults.
bottom_z_threshold : float
A Z-value threshold used to select the bottom slice of the model. Points with
Z-values within this threshold from the minimum Z will be included.
Returns:
-------
GeoDataFrame
A GeoDataFrame with LineString geometries representing the bottom traces of each fault.
"""
# Extract GeoDataFrame containing 3D data
gdf_3d = pfa["criteria"][criteria]["components"][component]["layers"][
layer
]["data"]
fault_id_col = pfa["criteria"][criteria]["components"][component][
"layers"
][layer]["id_col"]
# Ensure the geometry column contains 3D Points
if not all(gdf_3d.geometry.geom_type == "Point"):
raise ValueError("All geometries must be Point geometries.")
if not all(gdf_3d.geometry.apply(lambda geom: geom.has_z)):
raise ValueError("All geometries must be 3D (with Z-coordinates).")
if min_z is None:
# Find the minimum Z value in the dataset
min_z = gdf_3d.geometry.apply(lambda geom: geom.z).min()
print(min_z)
# Filter points at exactly min_z (or within a small tolerance)
bottom_points = gdf_3d[
gdf_3d.geometry.apply(
lambda geom: abs(geom.z - min_z) <= tolerance
)
]
# Extract fault traces (LineStrings) for each unique fault ID
fault_traces = []
for fault_id, group in bottom_points.groupby(fault_id_col):
# Ensure there are enough points to create a LineString
if len(group) > 1:
# Sort points by X and Y for proper LineString creation
group = group.sort_values(
by=["geometry"],
key=lambda col: col.apply(lambda geom: (geom.x, geom.y)),
)
trace = shapely.geometry.LineString(
group.geometry.apply(
lambda geom: (geom.x, geom.y, geom.z)
).tolist()
)
fault_traces.append({"fault_id": fault_id, "geometry": trace})
if not fault_traces:
raise ValueError(
"No valid fault traces found at the minimum Z value."
)
# Create a GeoDataFrame from the fault traces
fault_traces_gdf = gpd.GeoDataFrame(
fault_traces, geometry="geometry", crs=gdf_3d.crs
)
pfa["criteria"][criteria]["components"][component]["layers"][layer][
"data"
] = fault_traces_gdf
return pfa
@staticmethod
def interpolate_points(
pfa,
criteria,
component,
layer,
nx,
ny,
nz=None,
extent=None,
interp_method="linear",
):
"""
Function to interpolate, or go from points to either a 2D image
or a 3D image (voxel grid), depending on user input.
This function serves as a unified interface for interpolation.
If `nz` is provided, 3D interpolation is performed.
If `nz` is None, 2D interpolation is performed.
Parameters
----------
pfa : dict
Config specifying criteria, components, and data layers' relationship to one another.
Includes data layers' associated GeoDataFrames.
criteria : str
Criteria associated with data to interpolate.
component : str
Component associated with data to interpolate.
layer : str
Layer associated with data to interpolate.
nx : int
Number of points in the x-direction.
ny : int
Number of points in the y-direction.
nz : int, optional
Number of points in the z-direction.
If provided, 3D interpolation is performed.
If None (default), 2D interpolation is performed.
extent : list or tuple, optional
Spatial extent defining the interpolation grid.
- For 2D interpolation: length 4
[x_min, y_min, x_max, y_max]
- For 3D interpolation: length 6
[x_min, y_min, z_min, x_max, y_max, z_max]
If None, the extent is inferred from the input data.
interp_method : str
Method to use for interpolation.
For 2D: supports 'nearest', 'linear', or 'cubic'.
For 3D: supported methods depend on the underlying 3D function
(typically 'nearest' or 'linear').
Returns
-------
pfa : dict
Updated pfa config which includes the interpolated model
stored in the specified layer.
Notes
-----
- This function dispatches internally to either
`interpolate_points_2d()` or `interpolate_points_3d()`.
- Dimensionality is determined solely by whether `nz` is provided.
- For large 3D grids, consider using
`fast_interpolate_points_3d()` directly for improved
performance and memory efficiency.
"""
if nz is not None:
if extent is not None and len(extent) != 6:
raise ValueError(
"3D interpolation requires extent of length 6 "
"[xmin, ymin, zmin, xmax, ymax, zmax]"
)
return Processing.interpolate_points_3d(
pfa,
criteria,
component,
layer,
nx,
ny,
nz=nz,
extent=extent,
method=interp_method,
)
if extent is not None and len(extent) != 4:
raise ValueError(
"2D interpolation requires extent of length 4 "
"[xmin, ymin, xmax, ymax]"
)
return Processing.interpolate_points_2d(
pfa,
criteria,
component,
layer,
nx,
ny,
extent=extent,
interp_method=interp_method,
)