Source code for geopfa.extrapolation

#!/usr/bin/env python3
"""
exptrapolation.py - Gaussian Process Regression (GPR) for extrapolating values on a 2D grid over multiple Z slices.

This module provides functions for:
- **I/O and Data Handling**: reading data into GeoDataFrames and saving/loading GPy models.
- **Pre-processing**: preparing data for modeling (extracting slices, standardizing).
- **Modeling**: building Gaussian Process models (with composite kernels) and making predictions.
- **Validation/Evaluation**: assessing model performance and diagnostic tests on residuals.
- **Visualization**: plotting residuals, uncertainty, and comparison of predictions vs true values.

"""

import os
import sys
import logging
from pathlib import Path
from functools import reduce
import operator
import warnings

import geopandas as gpd
import GPy
import joblib
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

from mpl_toolkits.axes_grid1 import make_axes_locatable
from numpy.typing import ArrayLike
from scipy.spatial import cKDTree
from scipy.stats import shapiro, normaltest, jarque_bera, levene
from shapely.geometry import Point
from shapely.geometry.base import BaseGeometry
from sklearn.cluster import KMeans
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from statsmodels.stats.diagnostic import acorr_ljungbox
from tqdm import trange
from pprint import pprint

# Suppress specific warnings or verbose logs (e.g., from paramz transformation in GPy)
logging.getLogger("paramz.transformations").setLevel(logging.ERROR)

# === I/O and Data Handling Functions ===


[docs] def save_gpy_model(model: "GPy.core.GP", filepath: str) -> None: """ Save a GPy model to disk using joblib. Parameters ---------- model : GPy.core.GP Trained GPy model object to save. filepath : str Destination file path (e.g., ``'my_model.joblib'``). Parent directories are created automatically. Returns ------- None """ # Ensure parent directories exist os.makedirs(os.path.dirname(filepath), exist_ok=True) # Save model with joblib joblib.dump(model, filepath) # Preserve original behavior: print confirmation print(f"GPy model saved to: {filepath}")
[docs] def load_gpy_model(filepath: str) -> "GPy.core.GP": """ Load a serialized GPy model from disk using joblib. Parameters ---------- filepath : str File path to the saved ``.joblib`` GPy model. Returns ------- GPy.core.GP Loaded GPy model instance. Raises ------ FileNotFoundError If the file does not exist at the given path. """ file_path = Path(filepath) if not file_path.is_file(): raise FileNotFoundError(f"No file found at {filepath}") # Load model model = joblib.load(filepath) # Preserve original behavior: print confirmation print(f"GPy model loaded from: {filepath}") return model
[docs] def prepare_slice( gdf: gpd.GeoDataFrame, *, value_col: str, z_value: float | None = None, z_tol: float = 1e-6, ) -> pd.DataFrame: """ Prepare a tidy slice of (x, y, value) for an optional z-level filter. Parameters ---------- gdf : geopandas.GeoDataFrame Input GeoDataFrame containing either explicit ``x``/``y`` columns or a geometry column with Point coordinates. May also contain a ``z`` column if filtering by height is required. value_col : str Name of the column whose values will be extracted and renamed to ``value``. z_value : float or None, optional If provided, only rows satisfying ``|z - z_value| <= z_tol`` are kept. z_tol : float, optional Absolute tolerance for matching z-values. Returns ------- pandas.DataFrame A tidy DataFrame with columns ``['x', 'y', 'value']``. Raises ------ ValueError If z filtering is requested but the GeoDataFrame lacks a ``z`` column. If neither explicit x/y nor geometry is present. """ # ------------------------------------------------------------------ # Filter by z-value if provided # ------------------------------------------------------------------ if z_value is not None: if "z" not in gdf.columns: raise ValueError( "`z_value` was provided but no 'z' column exists in gdf." ) mask = np.abs(gdf["z"] - z_value) <= z_tol df_slice = gdf.loc[mask].copy() else: df_slice = gdf.copy() # ------------------------------------------------------------------ # Ensure x and y exist # ------------------------------------------------------------------ if "x" not in df_slice.columns or "y" not in df_slice.columns: if "geometry" not in df_slice.columns: raise ValueError( "GeoDataFrame must contain x/y or geometry for coordinate extraction." ) df_slice["x"] = df_slice.geometry.x df_slice["y"] = df_slice.geometry.y # ------------------------------------------------------------------ # Final tidy result # ------------------------------------------------------------------ return df_slice[["x", "y", value_col]].rename(columns={value_col: "value"})
[docs] def grid_from_tidy( df: pd.DataFrame, ) -> tuple[pd.DataFrame, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Convert tidy (x, y, value) data into a rectangular grid and aligned meshgrids. Parameters ---------- df : pandas.DataFrame A tidy DataFrame with columns ``'x'``, ``'y'``, and ``'value'``. Returns ------- grid : pandas.DataFrame Pivoted grid of shape ``(ny, nx)``, with sorted ``y`` as rows and sorted ``x`` as columns. xs : numpy.ndarray Sorted unique ``x`` coordinates, shape ``(nx,)``. ys : numpy.ndarray Sorted unique ``y`` coordinates, shape ``(ny,)``. x_2d : numpy.ndarray 2D meshgrid of ``x`` coordinates, shape ``(ny, nx)``. y_2d : numpy.ndarray 2D meshgrid of ``y`` coordinates, shape ``(ny, nx)``. Notes ----- - This function assumes a rectangular lattice is desired, and therefore sorts x and y independently. - Unobserved (x, y) pairs appear as NaN in the returned grid. """ # Sorted coordinate axes xs = np.sort(df["x"].unique()) ys = np.sort(df["y"].unique()) # Pivot into a grid and ensure sorted indexing grid = df.pivot_table(index="y", columns="x", values="value") grid = grid.reindex(index=ys, columns=xs) # Construct 2D meshgrid arrays explicitly x_2d = np.tile(xs, (len(ys), 1)) y_2d = np.tile(ys.reshape(-1, 1), (1, len(xs))) return grid, xs, ys, x_2d, y_2d
[docs] def standardize_xy( X_train: np.ndarray, X_full: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Standardize coordinate arrays using the mean and standard deviation computed from the training coordinates. Parameters ---------- X_train : numpy.ndarray Training coordinate array, shape ``(n_train, 2)``. X_full : numpy.ndarray Full coordinate array to be standardized using training statistics, shape ``(n_full, 2)``. Returns ------- X_train_std : numpy.ndarray Standardized training coordinates, shape ``(n_train, 2)``. X_full_std : numpy.ndarray Standardized full-grid coordinates, shape ``(n_full, 2)``. mean : numpy.ndarray Per-dimension mean of the training data, shape ``(2,)``. std : numpy.ndarray Per-dimension standard deviation of the training data, shape ``(2,)``. Notes ----- - This function computes statistics (mean, std) **only** from the training array `X_train`, which ensures proper scaling when evaluating generalization. """ # Compute scaling statistics from training data only mean = X_train.mean(axis=0) std = X_train.std(axis=0) # Standardize training and full-grid coordinates X_train_std = (X_train - mean) / std X_full_std = (X_full - mean) / std return X_train_std, X_full_std, mean, std
[docs] def load_2d_data( # noqa PLR0913, PLR0914 gdf, *, value_col, z_value=None, z_tol=1e-6, test_size=0.2, seed=42, ): """ Prepare all arrays needed for 2D Gaussian process interpolation. Parameters ---------- gdf : geopandas.GeoDataFrame Input GeoDataFrame containing coordinates and values. value_col : str Column name containing the target variable. z_value : float, optional Z slice to filter. If ``None``, no Z filtering is applied. z_tol : float, optional Tolerance for selecting points around ``z_value``. test_size : float, optional Fraction of non-missing data allocated to validation. seed : int, optional Random seed for the train/validation split. Returns ------- dict Dictionary with the same keys as the original implementation: includes standardized XY, grids, masks, pivoted arrays, and stats. """ # --------------------------------------------------------- # Slice preparation # --------------------------------------------------------- df_slice = prepare_slice( gdf, value_col=value_col, z_value=z_value, z_tol=z_tol, ) # --------------------------------------------------------- # Pivot tidy data into rectangular grid # --------------------------------------------------------- df_pivot, _xs, _ys, x_2d, y_2d = grid_from_tidy(df_slice) Y_2d = df_pivot.to_numpy() nan_mask = np.isnan(Y_2d) # --------------------------------------------------------- # Extract non-missing training samples # --------------------------------------------------------- df_no_nan = df_slice.dropna(subset=["value"]) sample_x = df_no_nan["x"].to_numpy() sample_y = df_no_nan["y"].to_numpy() sample_Y = df_no_nan["value"].to_numpy() X = np.column_stack([sample_x, sample_y]) Y = sample_Y.reshape(-1, 1) X_train, X_val, Y_train, Y_val = train_test_split( X, Y, test_size=test_size, random_state=seed ) # --------------------------------------------------------- # Standardize coordinates # --------------------------------------------------------- X_full = np.column_stack([x_2d.ravel(), y_2d.ravel()]) X_train_stdized, X_full_stdized, X_mean, X_std = standardize_xy( X_train, X_full, ) X_val_stdized = (X_val - X_mean) / X_std # Full grid standardized (reshaped) X_full_stdized_2d = X_full_stdized.reshape(*x_2d.shape, 2) # Missing positions / standardized X_missing = np.column_stack([x_2d[nan_mask], y_2d[nan_mask]]) X_missing_stdized = X_full_stdized[nan_mask.ravel()] # --------------------------------------------------------- # Standardize Y # --------------------------------------------------------- Y_train_mean = Y_train.mean(axis=0) Y_train_std = Y_train.std(axis=0) Y_train_stdized = (Y_train - Y_train_mean) / Y_train_std Y_val_stdized = (Y_val - Y_train_mean) / Y_train_std # Standardized grid values Y_2d_stdized = (Y_2d - Y_train_mean) / Y_train_std # Also standardize original 1D x-y lists x = df_slice["x"].to_numpy() y = df_slice["y"].to_numpy() x_stdized = (x - X_mean[0]) / X_std[0] y_stdized = (y - X_mean[1]) / X_std[1] # --------------------------------------------------------- # Return dictionary in original structure # --------------------------------------------------------- return { "x_2d": x_2d, "y_2d": y_2d, "nan_mask": nan_mask, "X_missing_stdized": X_missing_stdized, "x": x, "y": y, "x_stdized": x_stdized, "y_stdized": y_stdized, "Y_2d": Y_2d, "X_train": X_train, "Y_train": Y_train, "X_val": X_val, "Y_val": Y_val, "X_train_stdized": X_train_stdized, "Y_train_stdized": Y_train_stdized, "X_val_stdized": X_val_stdized, "Y_val_stdized": Y_val_stdized, "X_train_mean": X_mean, "X_train_std": X_std, "Y_train_mean": Y_train_mean, "Y_train_std": Y_train_std, "df_pivot": df_pivot, "df_slice": df_slice, "X_missing": X_missing, }
[docs] def drop_z_from_geometry( gdf: gpd.GeoDataFrame, geom_col: str = "geometry", ) -> gpd.GeoDataFrame: """ Remove Z coordinates from 3D Point geometries in a GeoDataFrame, converting them into standard 2D Points. Parameters ---------- gdf : geopandas.GeoDataFrame Input GeoDataFrame whose geometry column may contain 3D Points. geom_col : str, optional Name of the geometry column containing shapely Point geometries. Returns ------- geopandas.GeoDataFrame A new GeoDataFrame where all Point geometries are guaranteed to be 2D. Notes ----- - Rows with geometries that are already 2D are left unchanged. - Only Point geometries are supported; behavior for LineString/Polygon geometries is not modified from the original implementation. """ def _drop_z(geom: BaseGeometry) -> BaseGeometry: if hasattr(geom, "has_z") and geom.has_z: return Point(geom.x, geom.y) return geom gdf_out = gdf.copy() gdf_out[geom_col] = gdf_out[geom_col].apply(_drop_z) return gdf_out
[docs] def estimate_variance(Y: ArrayLike) -> float: """ Estimate the unbiased sample variance of an array. Parameters ---------- Y : numpy.ndarray Observed values (1-D or 2-D array-like). If 2-D, the array is flattened before variance computation. Returns ------- float Unbiased sample variance (using ``ddof=1``). """ Y_arr = np.asarray(Y).flatten() return float(np.var(Y_arr, ddof=1))
[docs] def recommend_likelihood_params( Y: np.ndarray, Y_var: float | None = None, ) -> dict: """ Recommend initialization parameters and a prior for the Gaussian likelihood (observation-noise variance) used in GPy models. Parameters ---------- Y : numpy.ndarray Target values of shape (N, 1) or (N,). Flattening is handled internally. Y_var : float or None, optional Variance of the target values. If None, it is estimated using :func:`estimate_variance`. Returns ------- dict A dictionary containing: - ``'init'`` : float Initial noise variance estimate. - ``'prior'`` : GPy.priors.Gamma Gamma prior placed on the noise variance. - ``'bounds'`` : tuple(float, float) Lower and upper bounds for the noise variance parameter. The dictionary is packaged under the key ``'variance'`` so that callers can directly configure ``model.Gaussian_noise.variance``. Notes ----- - The initial noise variance is set to approximately 1% of the variance of Y. - A Gamma prior is used to bias the optimization toward small but positive values, reflecting typical assumptions about measurement noise. """ # Estimate variance if not provided if Y_var is None: Y_var = estimate_variance(Y) # 1% of data variance as initial noise level noise_level = Y_var * 0.01 # Gamma prior for noise variance variance_prior = GPy.priors.Gamma(2.0, noise_level / 2.0) return { "variance": { "init": noise_level, "prior": variance_prior, "bounds": (1e-5, Y_var * 0.1), } }
[docs] def compute_global_radius(X_std: np.ndarray) -> float: """ Compute a global spatial radius for standardized 2D coordinates. This is used to construct stable, dataset-independent kernel lengthscale bounds by estimating the largest half-extent of the standardized domain. Parameters ---------- X_std : numpy.ndarray Standardized training coordinates, shape ``(N, 2)``. Each row is ``[x, y]``. Returns ------- float The global radius of the dataset in standardized space, defined as half of the larger of the x-range or y-range. Notes ----- - The input must already be standardized (e.g., via ``standardize_xy``). - This radius is used to derive lower and upper bounds for ARD lengthscales. """ x_min, x_max = X_std[:, 0].min(), X_std[:, 0].max() y_min, y_max = X_std[:, 1].min(), X_std[:, 1].max() R = 0.5 * max(x_max - x_min, y_max - y_min) return float(R)
[docs] def compute_lengthscale_bounds_from_global_radius( R: float, lower_frac: float = 0.02, upper_frac: float = 0.20, ) -> tuple[float, float]: """ Compute dataset-agnostic kernel lengthscale bounds based on the global spatial radius of the standardized coordinate domain. Parameters ---------- R : float Global radius computed from :func:`compute_global_radius`. lower_frac : float, optional Fraction of ``R`` used to set the minimum allowed lengthscale. upper_frac : float, optional Fraction of ``R`` used to set the maximum allowed lengthscale. Returns ------- (float, float) A tuple ``(lower_bound, upper_bound)`` specifying the recommended bounds for ARD lengthscales in the GP kernel. Notes ----- - These scaled bounds help stabilize GP optimization across different datasets by relating lengthscale limits to domain size. """ lower = lower_frac * R upper = upper_frac * R return lower, upper
[docs] def build_rbf_kernel_global( X_stdized: np.ndarray, Y_stdized: np.ndarray, input_dim: int = 2, lower_frac: float = 0.02, upper_frac: float = 0.20, ) -> tuple[GPy.kern.RBF, dict]: """ Construct an RBF kernel whose ARD lengthscale bounds are derived from the global radius of the standardized training coordinates. Parameters ---------- X_stdized : numpy.ndarray Standardized training coordinates, shape ``(N, 2)``. Y_stdized : numpy.ndarray Standardized training targets, shape ``(N, 1)``. input_dim : int, optional Dimensionality of the input space. Default is 2. lower_frac : float, optional Fraction of the global radius used as the lower lengthscale bound. upper_frac : float, optional Fraction of the global radius used as the upper lengthscale bound. Returns ------- (GPy.kern.RBF, dict) The constructed RBF kernel and a dictionary describing the applied constraints and initialization values. Notes ----- - The global radius stabilizes ARD lengthscale bounds across datasets with different spatial extents. - Variance is initialized based on the variance of `Y_stdized` and constrained with a Gamma prior. """ # ------------------------------------------------------------------ # Compute global-radius-based lengthscale bounds # ------------------------------------------------------------------ R = compute_global_radius(X_stdized) lower, upper = compute_lengthscale_bounds_from_global_radius( R, lower_frac, upper_frac ) # ------------------------------------------------------------------ # Create RBF kernel with ARD # ------------------------------------------------------------------ kern = GPy.kern.RBF(input_dim=input_dim, ARD=True) # ------------------------------------------------------------------ # Kernel variance setup # ------------------------------------------------------------------ Y_var = float(np.var(Y_stdized)) kern.variance[:] = max(Y_var, 1e-6) kern.variance.set_prior(GPy.priors.Gamma(2.0, Y_var / 2.0)) kern.variance.constrain_bounded(Y_var * 0.01, Y_var * 10.0) # ------------------------------------------------------------------ # Lengthscale initialization # ------------------------------------------------------------------ init_ls = 0.5 * (lower + upper) kern.lengthscale[:] = np.full(input_dim, init_ls) kern.lengthscale.constrain_bounded(lower, upper) # ------------------------------------------------------------------ # Record applied constraints # ------------------------------------------------------------------ constraints = { "R": R, "ls_lower": lower, "ls_upper": upper, "ls_init": init_ls, "var": kern.variance.values.copy(), # noqa: PD011 } return kern, constraints
[docs] def build_matern32_kernel_global( X_stdized: np.ndarray, Y_stdized: np.ndarray, input_dim: int = 2, lower_frac: float = 0.02, upper_frac: float = 0.20, ) -> tuple[GPy.kern.Matern32, dict]: """ Construct a Matern 3/2 kernel with ARD lengthscales constrained by the global radius of the dataset in standardized space. Parameters ---------- X_stdized : numpy.ndarray Standardized training coordinates, shape ``(N, 2)``. Y_stdized : numpy.ndarray Standardized training targets, shape ``(N, 1)``. input_dim : int, optional Dimensionality of the input space. Default is 2. lower_frac : float, optional Fraction of the global radius used for the lower lengthscale bound. upper_frac : float, optional Fraction of the global radius used for the upper lengthscale bound. Returns ------- (GPy.kern.Matern32, dict) The Matern 3/2 kernel and a dictionary of initialization values and bound settings. Notes ----- - Variance is initialized to half the variance of Y and given a Gamma prior, matching the original implementation. - Lengthscales are constrained to dataset-scaled bounds. """ # ------------------------------------------------------------------ # Global-radius lengthscale bounds # ------------------------------------------------------------------ R = compute_global_radius(X_stdized) lower, upper = compute_lengthscale_bounds_from_global_radius( R, lower_frac, upper_frac ) # ------------------------------------------------------------------ # Base Matern32 kernel # ------------------------------------------------------------------ kern = GPy.kern.Matern32(input_dim=input_dim, ARD=True) # ------------------------------------------------------------------ # Variance initialization and prior # ------------------------------------------------------------------ Y_var = float(np.var(Y_stdized)) kern.variance[:] = max(0.5 * Y_var, 1e-6) kern.variance.set_prior(GPy.priors.Gamma(2.0, (0.5 * Y_var) / 2.0)) kern.variance.constrain_bounded(Y_var * 0.01, Y_var * 10.0) # ------------------------------------------------------------------ # Lengthscale initialization from midpoint of bounds # ------------------------------------------------------------------ init_ls = 0.5 * (lower + upper) kern.lengthscale[:] = np.full(input_dim, init_ls) kern.lengthscale.constrain_bounded(lower, upper) # ------------------------------------------------------------------ # Export constraint info # ------------------------------------------------------------------ constraints = { "R": R, "ls_lower": lower, "ls_upper": upper, "ls_init": init_ls, "var": kern.variance.values.copy(), # noqa: PD011 } return kern, constraints
[docs] def build_combined_kernel( # noqa: PLR0913, PLR0917 X_stdized: np.ndarray, Y_stdized: np.ndarray, use_matern: bool = True, use_rbf: bool = True, bias: bool = True, white: bool = True, longscale: bool = True, lower_frac: float = 0.02, upper_frac: float = 0.20, ) -> tuple[GPy.kern.Kern, dict]: """ Construct a composite kernel consisting of optional components: RBF, Matern 3/2, long-scale RBF, Bias, and White noise kernels. Parameters ---------- X_stdized : numpy.ndarray Standardized training coordinates. Y_stdized : numpy.ndarray Standardized training targets. use_matern : bool, optional Whether to include a Matern 3/2 component. use_rbf : bool, optional Whether to include a short/medium-scale RBF component. bias : bool, optional Whether to include a constant Bias kernel. white : bool, optional Whether to include a White noise kernel. longscale : bool, optional Whether to include a very smooth long-scale RBF background kernel. lower_frac : float, optional Lower bound fraction (of global radius) for ARD lengthscales. upper_frac : float, optional Upper bound fraction (of global radius) for ARD lengthscales. Returns ------- kernel : GPy.kern.Kern Combined kernel created by summing enabled components. info : dict Diagnostic information for each component, including learned bounds and initialization values. Notes ----- - Behavior is identical to the original implementation. - The long-scale RBF kernel is fixed to very smooth lengthscales and tiny variance, acting as a gentle background component. """ parts: list[GPy.kern.Kern] = [] info: dict = {} # ------------------------------------------------------------------ # Global radius (useful for long-scale kernel and subkernels) # ------------------------------------------------------------------ R = compute_global_radius(X_stdized) info["global_radius"] = R # ------------------------------------------------------------------ # RBF kernel # ------------------------------------------------------------------ if use_rbf: rbf, rbf_info = build_rbf_kernel_global( X_stdized, Y_stdized, lower_frac=lower_frac, upper_frac=upper_frac, ) parts.append(rbf) info["rbf"] = rbf_info # ------------------------------------------------------------------ # Matern 3/2 kernel # ------------------------------------------------------------------ if use_matern: m32, m32_info = build_matern32_kernel_global( X_stdized, Y_stdized, lower_frac=lower_frac, upper_frac=upper_frac, ) parts.append(m32) info["matern32"] = m32_info # ------------------------------------------------------------------ # Long-scale RBF (fixed ultra-smooth component) # ------------------------------------------------------------------ if longscale: K_long = GPy.kern.RBF(input_dim=2, ARD=True) # Very small fixed variance K_long.variance[:] = 0.05 K_long.variance.fix() # Very large fixed lengthscale long_ls = 2.0 * R K_long.lengthscale[:] = np.full(2, long_ls) K_long.lengthscale.fix() parts.append(K_long) info["longscale"] = {"variance": 0.05, "lengthscale": long_ls} # ------------------------------------------------------------------ # Bias kernel: constant offset # ------------------------------------------------------------------ if bias: b = GPy.kern.Bias(input_dim=2) b.variance.fix(1.0) parts.append(b) info["bias"] = {"variance": 1.0} # ------------------------------------------------------------------ # White noise kernel # ------------------------------------------------------------------ if white: w = GPy.kern.White(input_dim=2, variance=1e-5) w.variance.fix() parts.append(w) info["white"] = {"variance": 1e-5} # ------------------------------------------------------------------ # Combine components # ------------------------------------------------------------------ if not parts: raise ValueError("No kernel components were enabled.") if len(parts) == 1: return parts[0], info kernel = reduce(operator.add, parts) return kernel, info
[docs] def get_gaussian_noise_bounds( lik_params: dict, ) -> dict: """ Extract Gaussian noise variance bounds from a likelihood-parameter dictionary. Parameters ---------- lik_params : dict Dictionary returned by :func:`recommend_likelihood_params`, containing keys such as ``'variance' : {'init', 'prior', 'bounds'}``. Returns ------- dict Dictionary containing: - ``'value'`` : float Initial noise variance. - ``'lower'`` : float Lower bound on the noise variance. - ``'upper'`` : float Upper bound on the noise variance. The returned structure mirrors the shape used for kernel constraint introspection throughout the codebase. """ return { "variance": { "value": lik_params["variance"]["init"], "lower": lik_params["variance"]["bounds"][0], "upper": lik_params["variance"]["bounds"][1], } }
[docs] def build_and_fit_gp( # noqa: PLR0913, PLR0917 X_train_stdized: np.ndarray, Y_train_stdized: np.ndarray, optimize_restarts: int = 0, verbose: bool = False, save_path: str | None = None, n_inducing: int = 300, lower_frac: float = 0.02, upper_frac: float = 0.20, ) -> tuple[GPy.models.SparseGPRegression, dict]: """ Build and train a Sparse Gaussian Process regression model using a globally stabilized multi-kernel combination (RBF + Matern32 + optional Bias/White). This method improves extrapolation stability by: - Constraining ARD lengthscales based on global radius, - Distributing inducing points across the domain via KMeans, - Anchoring the prior with a constant-mean mapping, - Adding bias and white kernels to reduce drift, - Using a Gamma prior on the Gaussian likelihood noise variance. Parameters ---------- X_train_stdized : numpy.ndarray Standardized training coordinates, shape ``(N, D)``. Y_train_stdized : numpy.ndarray Standardized training targets, shape ``(N, 1)``. optimize_restarts : int, optional Number of multi-start optimization attempts. Default is 0. verbose : bool, optional Whether to print optimization diagnostics. save_path : str or None, optional If provided, the model is saved to this path via joblib. n_inducing : int, optional Target number of inducing points for SparseGPRegression. lower_frac : float, optional Fraction of global radius used for lower lengthscale bounds. upper_frac : float, optional Fraction of global radius used for upper lengthscale bounds. Returns ------- model : GPy.models.SparseGPRegression Trained sparse Gaussian Process model. constraint_info : dict Dictionary containing kernel and likelihood constraint diagnostics. """ # ---------------------------------------------------------------------- # 1. Build combined kernel using global-radius constraints # ---------------------------------------------------------------------- kernel, kernel_info = build_combined_kernel( X_train_stdized, Y_train_stdized, use_matern=True, use_rbf=True, bias=True, white=True, lower_frac=lower_frac, upper_frac=upper_frac, ) # ---------------------------------------------------------------------- # 2. Constant mean function (frozen) # ---------------------------------------------------------------------- mean_func = GPy.mappings.Constant( input_dim=X_train_stdized.shape[1], output_dim=1, ) mean_func.C[:] = float(Y_train_stdized.mean()) mean_func.C.fix() # ---------------------------------------------------------------------- # 3. Inducing point initialization via KMeans # ---------------------------------------------------------------------- N = X_train_stdized.shape[0] M = min(n_inducing, max(20, N // 10)) km = KMeans(n_clusters=M, n_init="auto") Z = km.fit(X_train_stdized).cluster_centers_ # ---------------------------------------------------------------------- # 4. Construct sparse GP model # ---------------------------------------------------------------------- model = GPy.models.SparseGPRegression( X=X_train_stdized, Y=Y_train_stdized, kernel=kernel, mean_function=mean_func, Z=Z, normalizer=False, ) # ---------------------------------------------------------------------- # 5. Gaussian likelihood setup # ---------------------------------------------------------------------- Y_var = float(np.var(Y_train_stdized)) lik_params = recommend_likelihood_params(Y_train_stdized, Y_var=Y_var) model.Gaussian_noise.variance = lik_params["variance"]["init"] model.Gaussian_noise.variance.constrain_bounded( *lik_params["variance"]["bounds"] ) model.Gaussian_noise.variance.set_prior(lik_params["variance"]["prior"]) constraint_info = { "kernel": kernel_info, "Gaussian_noise": get_gaussian_noise_bounds(lik_params), } # ---------------------------------------------------------------------- # 6. Optimize # ---------------------------------------------------------------------- model.optimize(messages=verbose, max_iters=2000) if optimize_restarts > 0: model.optimize_restarts( num_restarts=optimize_restarts, verbose=verbose, robust=True, parallel=False, ) # ---------------------------------------------------------------------- # 7. Optional save to disk # ---------------------------------------------------------------------- if save_path: save_gpy_model(model, save_path) # ---------------------------------------------------------------------- # 8. Diagnostics # ---------------------------------------------------------------------- if verbose: print("\n=== Kernel Lengthscale Diagnostics ===") for part in model.kern.parts: if hasattr(part, "lengthscale"): print( f"{part.name}: lengthscales = {np.array(part.lengthscale.values)}" ) print("\n=== Inducing Point Ranges ===") print( "X range:", model.Z[:, 0].min(), "→", model.Z[:, 0].max(), "| Y range:", model.Z[:, 1].min(), "→", model.Z[:, 1].max(), ) return model, constraint_info
[docs] def get_predictions( model: GPy.models.GPRegression, X: np.ndarray, kvals_df: pd.DataFrame | dict | None = None, Y_mean: float | None = None, Y_std: float | None = None, ): """ Generate GP predictions, optionally converting back to original Y-units and optionally reshaping predictions into a 2D grid defined by x/y coordinates. Parameters ---------- model : GPy.models.GPRegression Trained GP regression model. X : numpy.ndarray Input features in the same standardized space used during training, shape ``(N, D)``. kvals_df : pandas.DataFrame or dict, optional Must contain ``'x'`` and ``'y'``. If provided, predictions are reshaped into a pivoted 2-D grid of shape (len(y_unique), len(x_unique)) Y_mean : float or None, optional Mean of Y from the training dataset (for de-standardizing predictions). Y_std : float or None, optional Standard deviation of Y from the training dataset. Returns ------- numpy.ndarray or tuple If ``kvals_df`` is None, returns ``(Y_pred_flat, Y_std_flat)`` where both arrays have shape ``(N,)``. If ``kvals_df`` is provided, returns ``(Y_pred_grid, Y_std_grid, x_unique, y_unique)`` where the grids have shape ``(ny, nx)`` and the coordinate arrays contain the sorted unique values from the pivot. Notes ----- - This function reproduces the original behavior precisely: * No additional sorting is applied beyond pandas pivot ordering. * De-standardization (mean/std) is optional and only applied if both parameters are provided. """ # ------------------------------------------------------------------ # 1. Predict in standardized space # ------------------------------------------------------------------ Y_pred, Y_var = model.predict(X) # shapes: (N, 1) # ------------------------------------------------------------------ # 2. Optional de-standardization # ------------------------------------------------------------------ if Y_mean is not None and Y_std is not None: Y_pred = Y_pred * Y_std + Y_mean Y_var = (Y_std**2) * Y_var # Flatten outputs Y_pred_flat = Y_pred.ravel() Y_std_flat = np.sqrt(Y_var).ravel() # ------------------------------------------------------------------ # 3. If no coordinate grid provided, return flat arrays # ------------------------------------------------------------------ if kvals_df is None: return Y_pred_flat, Y_std_flat # ------------------------------------------------------------------ # 4. Grid reshape using x/y coordinates supplied by user # ------------------------------------------------------------------ coords = pd.DataFrame({"x": kvals_df["x"], "y": kvals_df["y"]}) coords["pred"] = Y_pred_flat coords["std"] = Y_std_flat grid_pred = coords.pivot_table(index="y", columns="x", values="pred") grid_std = coords.pivot_table(index="y", columns="x", values="std") Y_pred_grid = grid_pred.to_numpy() Y_std_grid = grid_std.to_numpy() x_unique = grid_pred.columns.to_numpy() y_unique = grid_pred.index.to_numpy() return Y_pred_grid, Y_std_grid, x_unique, y_unique
[docs] def check_param_limits_hit_from_constraints( model: GPy.core.GP, constraints: dict, ) -> list[tuple]: r""" Identify kernel parameters whose optimized values lie at or extremely near their constrained lower or upper bounds. Parameters ---------- model : GPy.core.GP Trained GPy model. Must expose ``parameter_names()`` and allow indexing parameters via ``model[name]``. constraints : dict Dictionary describing parameter bounds (e.g., from ``build_and_fit_gp`` or ``build_combined_kernel``). Expected format:: { "rbf": { "variance": {"value": ..., "lower": ..., "upper": ...}, "lengthscale": {...}, }, "matern32": { ... }, "Gaussian_noise": { "variance": {"value": ..., "lower": ..., "upper": ...} }, ... } Returns ------- list of tuple A list of entries like ``(parameter_name_in_model, current_value_array, (lower, upper))`` Each item corresponds to a parameter whose value is at/near its bounds (:math:`|value - bound| \leq 10^{-6}`). Returns an empty list if none. Notes ----- - Uses substring-prefix mapping (e.g., "rbf" → "sum.rbf") to resolve to GPy's internal parameter names. - Bounds must be present in ``constraints`` for comparison. """ param_limits_hit: list[tuple] = [] # ------------------------------------------------------------------ # Map short-names to the kernel prefixes used in GPy parameter names # ------------------------------------------------------------------ kernel_prefix_map = { "rbf": "sum.rbf", "Mat32": "sum.Mat32", "white": "sum.white", "Gaussian_noise": "Gaussian_noise", } # ------------------------------------------------------------------ # Make lookup table of model parameter names (case-insensitive) # ------------------------------------------------------------------ model_params = {p.lower(): p for p in model.parameter_names()} # ------------------------------------------------------------------ # Iterate over constraint groups # ------------------------------------------------------------------ for short_name, params in constraints.items(): prefix = kernel_prefix_map.get(short_name) if prefix is None: continue # Each param inside the constraint entry (e.g., variance, lengthscale) for param_name, info in params.items(): full_name_key = f"{prefix}.{param_name}".lower() if full_name_key not in model_params: continue # Extract parameter from model param_value = model[model_params[full_name_key]].values # noqa: PD011 lower = info.get("lower") upper = info.get("upper") if lower is None or upper is None: continue # ------------------------------------------------------------------ # Check if any element lies extremely close to a bound # ------------------------------------------------------------------ at_lower = np.any(np.isclose(param_value, lower, atol=1e-6)) at_upper = np.any(np.isclose(param_value, upper, atol=1e-6)) if at_lower or at_upper: param_limits_hit.append( ( model_params[full_name_key], param_value.copy(), (lower, upper), ) ) return param_limits_hit
[docs] def assess_gp_model_fit( model: GPy.models.GPRegression, Y_true: np.ndarray, Y_pred: np.ndarray, Y_pred_std: np.ndarray, constraints: dict, ) -> dict: """ Compute regression performance metrics and model diagnostics for a trained Gaussian Process model. Parameters ---------- model : GPy.models.GPRegression Trained GP regression model. Y_true : numpy.ndarray True observed target values. Flattening is handled internally. Y_pred : numpy.ndarray Predicted mean values from the GP model (same shape as Y_true). Y_pred_std : numpy.ndarray Predicted standard deviations for the GP predictions. constraints : dict Constraint information returned during GP construction, used to check whether any hyperparameters lie on their bounds. Returns ------- dict A dictionary with the following entries: - ``RMSE`` : float Root Mean Square Error. - ``R2`` : float Coefficient of determination. - ``MAE`` : float Mean Absolute Error. - ``Coverage_95`` : float Fraction of observations lying inside ±2 sigma predictive intervals. - ``LogLikelihood`` : float GP model log marginal likelihood. - ``AIC`` : float Akaike Information Criterion ``2*k - 2*logL``. - ``BIC`` : float Bayesian Information Criterion ``k*ln(n) - 2*logL``. - ``Params_at_bounds`` : list of tuple Parameters effectively pinned at their allowable bounds. Notes ----- - No transformations or standardization adjustments are performed here. Inputs should already be in the desired scale. - The ±2 sigma coverage statistic provides a simple approximate 95% interval assessment for GP predictive uncertainty. """ # Flatten inputs (behavior preserved) Y_true = np.asarray(Y_true).ravel() Y_pred = np.asarray(Y_pred).ravel() Y_pred_std = np.asarray(Y_pred_std).ravel() # ------------------------------------------------------------------ # Basic regression metrics # ------------------------------------------------------------------ rmse = float(np.sqrt(mean_squared_error(Y_true, Y_pred))) r2 = float(r2_score(Y_true, Y_pred)) mae = float(mean_absolute_error(Y_true, Y_pred)) # ------------------------------------------------------------------ # Likelihood and information criteria # ------------------------------------------------------------------ logL = float(model.log_likelihood()) k = model.num_params n = len(Y_true) aic = float(2 * k - 2 * logL) bic = float(k * np.log(n) - 2 * logL) # ------------------------------------------------------------------ # Predictive interval coverage # ------------------------------------------------------------------ lower = Y_pred - 2 * Y_pred_std upper = Y_pred + 2 * Y_pred_std coverage = float(np.mean((Y_true >= lower) & (Y_true <= upper))) # ------------------------------------------------------------------ # Bound-check diagnostics # ------------------------------------------------------------------ param_limits = check_param_limits_hit_from_constraints(model, constraints) return { "RMSE": rmse, "R2": r2, "MAE": mae, "Coverage_95": coverage, "LogLikelihood": logL, "AIC": aic, "BIC": bic, "Params_at_bounds": param_limits, }
[docs] def bootstrap_assess_residuals_stats( # noqa: PLR0913, PLR0914 Y_true: ArrayLike, Y_pred: ArrayLike, *, alpha: float = 0.05, n_boot: int = 100, sample_size: int = 500, random_state: int | np.random.Generator | None = None, ) -> pd.DataFrame: """ Perform bootstrap-based residual diagnostic tests for Gaussian Process residuals. This function repeatedly draws bootstrap samples of residuals and computes standard diagnostic tests: - **Shapiro-Wilk** (normality, small/medium N) - **D'Agostino K²** (normality via skewness & kurtosis) - **Jarque-Bera** (normality via joint skew/kurtosis) - **Levene** (homoscedasticity / equal variance) - **Ljung-Box** (no autocorrelation at lag 10) Per-test p-values are collected across bootstrap samples, and summary statistics (mean/median p-value, rejection rate) are returned. Parameters ---------- Y_true : numpy.ndarray True target observations. Y_pred : numpy.ndarray Predicted values from the GP. alpha : float, optional Significance threshold used for computing rejection rates. n_boot : int, optional Number of bootstrap iterations. sample_size : int, optional Size of each bootstrap sample. If larger than the dataset, the full dataset is used. random_state : int, numpy.random.Generator, or None, optional Seed or initialized generator for reproducibility. Returns ------- pandas.DataFrame A dataframe with rows for each test: ['Shapiro-Wilk', "D'Agostino", 'Jarque-Bera', 'Levene', 'Ljung-Box'] and columns: - ``Test`` - ``Purpose`` - ``Mean p-value`` - ``Median p-value`` - ``Rejection Rate`` (fraction of p < alpha) - ``Warn`` (True if rejection rate > 0.05) Notes ----- - All exception handling is preserved exactly as in the original version. - Tests that fail or produce NaN p-values are handled identically. - The logic for the “Warn” column is unchanged. """ # ------------------------------------------------------------------ # Prepare residuals # ------------------------------------------------------------------ Y_true = np.ravel(Y_true) Y_pred = np.ravel(Y_pred) residuals = Y_true - Y_pred n = len(residuals) sample_size = min(sample_size, n) rng = np.random.default_rng(random_state) # ------------------------------------------------------------------ # Test definitions # ------------------------------------------------------------------ tests = [ "Shapiro-Wilk", "D'Agostino", "Jarque-Bera", "Levene", "Ljung-Box", ] purposes = { "Shapiro-Wilk": "Normality (N≤5000)", "D'Agostino": "Normality (skew & kurtosis)", "Jarque-Bera": "Normality (joint skew/kurt)", "Levene": "Equal Variance (homoscedasticity)", "Ljung-Box": "No autocorrelation (lag 10)", } # p-value storage results = {t: [] for t in tests} # ------------------------------------------------------------------ # Constants preserved from original code # ------------------------------------------------------------------ VARIANCE_TOL = 1e-12 REJECTION_RATE = 0.05 # ------------------------------------------------------------------ # Bootstrap loop # ------------------------------------------------------------------ for _ in trange( n_boot, desc="Bootstrapping", disable=not sys.stdout.isatty() ): idx = rng.choice(n, size=sample_size, replace=False) sample_resid = residuals[idx] sample_y = Y_true[idx] # for Levene's homoscedasticity check # Shapiro-Wilk try: _, p_sw = shapiro(sample_resid) except Exception: p_sw = np.nan results["Shapiro-Wilk"].append(p_sw) # D'Agostino K² try: _, p_dag = normaltest(sample_resid) except Exception: p_dag = np.nan results["D'Agostino"].append(p_dag) # Jarque-Bera try: _, p_jb = jarque_bera(sample_resid) except Exception: p_jb = np.nan results["Jarque-Bera"].append(p_jb) # Levene (homoscedasticity: sample_y vs residuals) try: _, p_levene = levene(sample_y, sample_resid) except Exception: p_levene = np.nan results["Levene"].append(p_levene) # Ljung-Box (autocorrelation) if np.var(sample_resid) < VARIANCE_TOL: p_lb = np.nan else: try: lb_result = acorr_ljungbox( sample_resid, lags=[10], return_df=True ) p_lb = lb_result["lb_pvalue"].iloc[0] except Exception: p_lb = np.nan results["Ljung-Box"].append(p_lb) # ------------------------------------------------------------------ # Summaries # ------------------------------------------------------------------ summary = [] for test in tests: pvals = np.array(results[test], dtype=float) pvals = pvals[~np.isnan(pvals)] rejection_rate = np.mean(pvals < alpha) if pvals.size else np.nan summary.append( { "Test": test, "Purpose": purposes[test], "Mean p-value": np.nanmean(pvals) if pvals.size else np.nan, "Median p-value": np.nanmedian(pvals) if pvals.size else np.nan, "Rejection Rate": rejection_rate, "Warn": bool(rejection_rate > REJECTION_RATE) if np.isfinite(rejection_rate) else False, } ) return pd.DataFrame(summary)
[docs] def report_model_fit( model_assessment: dict, report_metrics: bool = True, ) -> None: """ Report (or warn about) model performance diagnostics. This function interprets the dictionary produced by :func:`assess_gp_model_fit` and prints metrics that fall within acceptable ranges. If metrics exceed pre-defined thresholds for poor fit, warnings are raised instead. Parameters ---------- model_assessment : dict Dictionary of GP regression metrics as returned by :func:`assess_gp_model_fit`. Expected keys include: ``'RMSE'``, ``'R2'``, ``'Coverage_95'``, and ``'Params_at_bounds'``. report_metrics : bool, optional If True (default), metrics that do *not* trigger warnings are printed. If False, only warnings are emitted. Notes ----- Thresholds used: - ``HIGH_RMSE = 0.1`` - ``LOW_R2 = 0.8`` - ``LOW_COVERAGE = 0.90`` - ``HIGH_COVERAGE = 0.98`` If any parameter lies at (or extremely near) its constraint bounds, a warning is raised regardless of ``report_metrics``. """ # Thresholds HIGH_RMSE = 0.1 LOW_R2 = 0.8 LOW_COVERAGE = 0.90 HIGH_COVERAGE = 0.98 rmse = model_assessment.get("RMSE") r2 = model_assessment.get("R2") coverage = model_assessment.get("Coverage_95") param_limits_hit = model_assessment.get("Params_at_bounds", []) # ---------------------------- # RMSE # ---------------------------- if rmse is not None: if rmse > HIGH_RMSE: warnings.warn(f"High RMSE: {rmse:.4f}", stacklevel=2) elif report_metrics: print(f"RMSE: {rmse:.4f}") # ---------------------------- # R² # ---------------------------- if r2 is not None: if r2 < LOW_R2: warnings.warn(f"Low R²: {r2:.4f}", stacklevel=2) elif report_metrics: print(f"R²: {r2:.4f}") # ---------------------------- # Predictive interval coverage # ---------------------------- if coverage is not None: if coverage < LOW_COVERAGE or coverage > HIGH_COVERAGE: warnings.warn( f"Unusual 95% coverage: {coverage:.3f}", stacklevel=2 ) elif report_metrics: print(f"95% Coverage: {coverage:.3f}") # ---------------------------- # Parameter bound warnings # ---------------------------- if param_limits_hit: warnings.warn( f"Some parameters at constraint bounds: {param_limits_hit}", stacklevel=2, )
[docs] def plot_residuals( Y_true: ArrayLike, Y_pred: ArrayLike, ) -> None: """ Plot residual diagnostics: a histogram of residuals and a scatter plot of residuals versus predicted values. Parameters ---------- Y_true : numpy.ndarray True observed target values. Will be reshaped to 1-D. Y_pred : numpy.ndarray Predicted target values from the model (same shape as ``Y_true``). Notes ----- - This visualization helps identify skew, outliers, and heteroscedasticity. - Behavior, figure layout, and plotting style are identical to the original version of this function. """ # Flatten for consistency Y_true = np.ravel(Y_true) Y_pred = np.ravel(Y_pred) residuals = Y_true - Y_pred # -------------------------------------------------------------- # Create figure with two subplots: histogram + residual scatter # -------------------------------------------------------------- plt.figure(figsize=(10, 4)) # Histogram plt.subplot(1, 2, 1) plt.hist(residuals, bins=30, color="gray", edgecolor="black") plt.title("Histogram of Residuals") plt.xlabel("Residual") plt.ylabel("Frequency") # Residuals vs predictions plt.subplot(1, 2, 2) plt.scatter(Y_pred, residuals, alpha=0.5) plt.axhline(0, color="red", linestyle="--") plt.title("Residuals vs Predictions") plt.xlabel("Predicted Value") plt.ylabel("Residual") plt.tight_layout() plt.show()
[docs] def make_prediction_comparison_plot_2d( X_grid: np.ndarray, Y_grid: np.ndarray, Z_true: np.ndarray, Z_pred: np.ndarray, title: str | None = None, ) -> None: """ Plot side-by-side heatmaps comparing ground truth and predicted values on a shared color scale. Parameters ---------- X_grid : numpy.ndarray 2-D meshgrid array of X coordinates, same shape as ``Z_true``. Y_grid : numpy.ndarray 2-D meshgrid array of Y coordinates, same shape as ``Z_true``. Z_true : numpy.ndarray 2-D array of ground truth values on the grid. Z_pred : numpy.ndarray 2-D array of predicted values on the same grid. title : str or None, optional Optional title prefix applied above both subplots. Notes ----- - Global ``vmin`` and ``vmax`` are computed jointly to ensure the two heatmaps use an identical color scale. - Plotting behavior matches the original function exactly. """ # ------------------------------------------------------------------ # Shared color scale for comparability # ------------------------------------------------------------------ vmin = float(np.nanmin([np.nanmin(Z_true), np.nanmin(Z_pred)])) vmax = float(np.nanmax([np.nanmax(Z_true), np.nanmax(Z_pred)])) norm = plt.Normalize(vmin=vmin, vmax=vmax) fig, axs = plt.subplots(1, 2, figsize=(14, 6), constrained_layout=True) # Ground truth ------------------------------------------------------ pcm_true = axs[0].pcolormesh( X_grid, Y_grid, Z_true, cmap="viridis", norm=norm, shading="auto", ) axs[0].set_xlabel("X") axs[0].set_ylabel("Y") axs[0].set_title(f"{title}\nGround Truth" if title else "Ground Truth") # Predictions ------------------------------------------------------- pcm_pred = axs[1].pcolormesh( X_grid, Y_grid, Z_pred, cmap="viridis", norm=norm, shading="auto", ) axs[1].set_xlabel("X") axs[1].set_ylabel("Y") axs[1].set_title(f"{title}\nPredictions" if title else "Predictions") # Shared colorbar --------------------------------------------------- cbar = fig.colorbar( pcm_pred, ax=axs, location="right", fraction=0.046, pad=0.04 ) cbar.set_label("Value") plt.show()
[docs] def plot_array_with_coords( # noqa: PLR0913, PLR0917 x, y, Z, title=None, cmap="viridis", vmin=None, vmax=None, aspect="equal", cbar_size="4%", cbar_pad=0.04, cbar_label="Value", figsize=(7, 6), ): """ Plot a 2D array `Z` using provided X-Y coordinates and a colorbar. Parameters ---------- x : numpy.ndarray X coordinates, either 1-D (length ``Z.shape[1]``) or 2-D (same shape as ``Z``). y : numpy.ndarray Y coordinates, same rules as ``x``. Z : numpy.ndarray 2-D array of values to plot. title : str, optional Plot title. cmap : str, optional Colormap name. vmin : float, optional Lower bound of colormap (default: min of Z). vmax : float, optional Upper bound of colormap (default: max of Z). aspect : str, optional Axis aspect ratio, default ``"equal"``. cbar_size : str, optional Width of colorbar relative to plot. cbar_pad : float, optional Padding between plot and colorbar. cbar_label : str, optional Colorbar label text. figsize : tuple, optional Figure size. Returns ------- (Figure, Axes, Colorbar) The created matplotlib figure, axes, and colorbar. """ Z = np.asarray(Z) maskZ = np.ma.masked_invalid(Z) x = np.asarray(x) y = np.asarray(y) # --------------------------------------------------------- # Coordinate grids # --------------------------------------------------------- if x.ndim == 1 and y.ndim == 1: if maskZ.shape != (y.size, x.size): raise ValueError( f"Z shape {maskZ.shape} does not match len(y) x len(x)." ) X_grid, Y_grid = np.meshgrid(x, y) elif x.ndim == 2 and y.ndim == 2: # noqa: PLR2004 if x.shape != y.shape or x.shape != maskZ.shape: raise ValueError("If x and y are 2D, both must match shape of Z.") X_grid, Y_grid = x, y else: raise ValueError("x and y must be both 1D or both 2D arrays.") # --------------------------------------------------------- # Color normalization # --------------------------------------------------------- if vmin is None: vmin = float(np.nanmin(Z)) if vmax is None: vmax = float(np.nanmax(Z)) norm = plt.Normalize(vmin=vmin, vmax=vmax) # --------------------------------------------------------- # Plot # --------------------------------------------------------- fig, ax = plt.subplots(figsize=figsize) pc = ax.pcolormesh( X_grid, Y_grid, maskZ, cmap=cmap, norm=norm, shading="auto", ) ax.set_xlabel("X") ax.set_ylabel("Y") ax.set_aspect(aspect) if title: ax.set_title(title) divider = make_axes_locatable(ax) cax = divider.append_axes("right", size=cbar_size, pad=cbar_pad) cbar = fig.colorbar(pc, cax=cax) cbar.set_label(cbar_label) fig.tight_layout() return fig, ax, cbar
[docs] def update_gdf_with_predictions( gdf: gpd.GeoDataFrame, Y_grid: np.ndarray, x_grid: np.ndarray, y_grid: np.ndarray, ) -> gpd.GeoDataFrame: """ Update a GeoDataFrame with prediction values taken from a 2D grid. The function extracts each point's ``(x, y)`` coordinates, rounds them slightly to reduce floating-point mismatch, and looks up the corresponding predicted value from a flattened 2D prediction grid. Points whose rounded coordinates do not appear in the grid receive ``NaN``. Parameters ---------- gdf : geopandas.GeoDataFrame Input GeoDataFrame containing point geometries. Y_grid : numpy.ndarray Predicted values arranged on a 2-D grid of shape ``(n_y, n_x)``. x_grid : numpy.ndarray 2-D X-coordinates for each grid cell; must match ``Y_grid.shape``. y_grid : numpy.ndarray 2-D Y-coordinates for each grid cell; must match ``Y_grid.shape``. Returns ------- geopandas.GeoDataFrame Copy of the input GeoDataFrame with an added column ``'value_extrapolated'`` containing predictions aligned by coordinate. Notes ----- - Coordinates are matched *exactly* after rounding to 6 decimal places. Consider adjusting this value if coordinates are stored at a different precision. - Uses ``DataFrame.apply`` for readability; preserves your original behavior. - Temporary ``x`` and ``y`` columns are removed before return. Examples -------- >>> import geopandas as gpd >>> import numpy as np >>> from shapely.geometry import Point >>> gdf = gpd.GeoDataFrame(geometry=[Point(0, 0), Point(1, 1)]) >>> x, y = np.meshgrid([0, 1], [0, 1]) >>> Y = np.array([[10, 20], [30, 40]]) >>> update_gdf_with_predictions(gdf, Y, x, y) geometry value_extrapolated 0 POINT (0 0) 10.0 1 POINT (1 1) 40.0 """ # ------------------------------------------------------------------ # Extract coordinate columns # ------------------------------------------------------------------ gdf = gdf.copy() gdf["x"] = gdf.geometry.x gdf["y"] = gdf.geometry.y # Flatten the input grids x_flat = x_grid.ravel() y_flat = y_grid.ravel() Y_flat = Y_grid.ravel() coords = np.column_stack((x_flat, y_flat)) # ------------------------------------------------------------------ # Rounding to mitigate floating-point mismatches # ------------------------------------------------------------------ def round_coord(x: float, y: float, nd: int = 6) -> tuple[float, float]: return (round(float(x), nd), round(float(y), nd)) pred_dict = { round_coord(cx, cy): val for (cx, cy), val in zip(coords, Y_flat) } # ------------------------------------------------------------------ # Map predictions row-by-row # ------------------------------------------------------------------ gdf["value_extrapolated"] = gdf.apply( lambda row: pred_dict.get( round_coord(row.x, row.y), np.nan, ), axis=1, ) # Remove temporary coordinate columns gdf = gdf.drop(columns=["x", "y"]) return gdf
[docs] def backfill_gdf_at_height( # noqa: PLR0913 gdf: gpd.GeoDataFrame, *, value_col: str, z_value: float, z_tol: float, X_grid: np.ndarray, Y_grid: np.ndarray, backfilled_array: np.ndarray, ) -> gpd.GeoDataFrame: """ Backfill missing values in a GeoDataFrame at a specific Z slice using a precomputed 2D grid of fill-in predictions. Only rows whose ``z`` coordinate is within ``z_value ± z_tol`` and whose ``value_col`` is NaN will be filled. The fill values are obtained via nearest-neighbor lookup in the X-Y plane from the provided grid. Parameters ---------- gdf : geopandas.GeoDataFrame Input GeoDataFrame containing numeric columns ``'x'``, ``'y'``, ``'z'``, and the target ``value_col``. value_col : str Column name of the variable to fill. z_value : float The Z slice to target. z_tol : float Allowed absolute difference between a point's ``z`` and ``z_value``. X_grid : numpy.ndarray 2-D array of X coordinates defining the grid. Y_grid : numpy.ndarray 2-D array of Y coordinates defining the grid. backfilled_array : numpy.ndarray 2-D array of predicted values corresponding to ``X_grid`` / ``Y_grid``. Returns ------- geopandas.GeoDataFrame A *copy* of the input GeoDataFrame where NaNs in ``value_col`` at the specified Z slice have been replaced with nearest-neighbor grid values. Notes ----- - Points outside the Z tolerance range are not modified. - Only NaN entries are filled; existing values are left unchanged. - Nearest-neighbor search is performed using ``scipy.spatial.cKDTree``. - Grid shapes must match: ``X_grid.shape == Y_grid.shape == backfilled_array.shape``. """ # ------------------------------------------------------------------ # Validate grid shapes # ------------------------------------------------------------------ if not (X_grid.shape == Y_grid.shape == backfilled_array.shape): raise ValueError( "X_grid, Y_grid, and backfilled_array must have identical shapes." ) # Work on a copy gdf_out = gdf.copy() # ------------------------------------------------------------------ # Identify slice by z-value within tolerance # ------------------------------------------------------------------ if z_value is not None: z_mask = (gdf_out["z"] >= z_value - z_tol) & ( gdf_out["z"] <= z_value + z_tol ) slice_df = gdf_out.loc[z_mask].copy() null_mask = slice_df[value_col].isna() # Nothing to fill ─ return early if not null_mask.any(): return gdf_out else: slice_df = gdf.copy() null_mask = slice_df[value_col].isna() # ------------------------------------------------------------------ # Build KD-tree for nearest-neighbor mapping # ------------------------------------------------------------------ x_flat = X_grid.ravel() y_flat = Y_grid.ravel() vals_flat = backfilled_array.ravel() # KD-tree over grid coordinates grid_tree = cKDTree(np.column_stack((x_flat, y_flat))) # Coordinates requiring fill missing_coords = slice_df.loc[null_mask, ["x", "y"]].to_numpy() # Query nearest grid point indices _, nn_idx = grid_tree.query(missing_coords) fill_values = vals_flat[nn_idx] # ------------------------------------------------------------------ # Assign values back into the original gdf # ------------------------------------------------------------------ gdf_out.loc[slice_df.index[null_mask], value_col] = fill_values return gdf_out
[docs] def backfill_gdf( # noqa: PLR0913, PLR0914 gdf: gpd.GeoDataFrame, value_col: str, *, z_value: float | None = None, z_tol: float = 1e-6, test_size: float = 0.2, seed: int = 42, verbose: bool = True, ) -> gpd.GeoDataFrame: """ Perform Gaussian Process-based extrapolation (or interpolation) to fill missing values in a GeoDataFrame at a specified Z slice. This high-level routine orchestrates a complete 2D GP modeling pipeline: 1. **Slice preparation** via :func:`load_2d_data` 2. **Sparse GP training** via :func:`build_and_fit_gp` 3. **Model evaluation** (regression metrics + optional bootstrap diagnostics) 4. **Prediction** on missing grid cells 5. **Reconstruction** of a full 2D backfilled prediction array 6. **GeoDataFrame merge** using either: - :func:`backfill_gdf_at_height` (if ``z_value`` provided), or - :func:`update_gdf_with_predictions` (if 2D / no Z slice) Parameters ---------- gdf : geopandas.GeoDataFrame Input GeoDataFrame containing 3D point geometries and the target column. Must include columns ``'x'``, ``'y'``, ``'z'`` if Z slicing is used. value_col : str Name of the value column to fill. z_value : float or None, optional Z slice value to target. If ``None``, the method treats the data as 2D. z_tol : float, optional Allowed tolerance for selecting points with z ≈ ``z_value``. test_size : float, optional Fraction of known points reserved for model validation. seed : int, optional Random seed for train/validation splitting. verbose : bool, optional Whether to print progress, diagnostics, and plots. Returns ------- geopandas.GeoDataFrame A copy of the input GeoDataFrame with missing values at the selected slice filled using GP predictions. Notes ----- - All behavior from the original implementation is preserved. - No modifications are made to the original ``gdf``; a filled copy is returned. - Plotting and diagnostics occur only if ``verbose=True``. """ # ------------------------------------------------------------------ # 1. Slice + data preparation # ------------------------------------------------------------------ data = load_2d_data( gdf, value_col=value_col, z_value=z_value, z_tol=z_tol, test_size=test_size, seed=seed, ) nan_mask = data["nan_mask"] X_missing_stdized = data["X_missing_stdized"] X_train = data["X_train_stdized"] Y_train = data["Y_train_stdized"] X_val = data["X_val_stdized"] Y_val = data["Y_val"].ravel() Y_train_mean = data["Y_train_mean"] Y_train_std = data["Y_train_std"] # Standardized full grid (column-stacked version) X_full_stdized = np.column_stack([data["x_stdized"], data["y_stdized"]]) X_grid = data["x_2d"] Y_grid_coords = data["y_2d"] Y_full = data["Y_2d"] # ------------------------------------------------------------------ # 2. Train GP model # ------------------------------------------------------------------ model, constraints = build_and_fit_gp( X_train, Y_train, optimize_restarts=0, verbose=verbose, ) # ------------------------------------------------------------------ # 3. Predict on validation set # ------------------------------------------------------------------ Y_pred_val, Y_pred_val_std = get_predictions( model, X_val, Y_mean=Y_train_mean, Y_std=Y_train_std, ) # ------------------------------------------------------------------ # 4. Fit assessment # ------------------------------------------------------------------ if verbose: assessment = assess_gp_model_fit( model, Y_true=Y_val, Y_pred=Y_pred_val, Y_pred_std=Y_pred_val_std, constraints=constraints, ) pprint(assessment) # ------------------------------------------------------------------ # 5. Residual diagnostics (optional) # ------------------------------------------------------------------ if verbose: residual_stats = bootstrap_assess_residuals_stats( Y_true=Y_val, Y_pred=Y_pred_val, alpha=0.05, n_boot=100, sample_size=500, ) pprint(residual_stats) plot_residuals(Y_true=Y_val, Y_pred=Y_pred_val) # ------------------------------------------------------------------ # 6. Predict at missing grid locations # ------------------------------------------------------------------ Y_missing_pred, _Y_missing_pred_stddev = get_predictions( model, X_missing_stdized, Y_mean=Y_train_mean, Y_std=Y_train_std, ) # ------------------------------------------------------------------ # 7. Reconstruct full 2D backfilled grid # ------------------------------------------------------------------ Y_full_pred = np.array(Y_full, copy=True) Y_full_pred[nan_mask] = Y_missing_pred if verbose: make_prediction_comparison_plot_2d( X_grid, Y_grid_coords, Z_true=Y_full, Z_pred=Y_full_pred, title=f"Z={z_value}", ) # ------------------------------------------------------------------ # 8. Merge back into GeoDataFrame # ------------------------------------------------------------------ if z_value: gdf_filled = backfill_gdf_at_height( gdf, value_col=value_col, z_value=z_value, z_tol=z_tol, X_grid=X_grid, Y_grid=Y_grid_coords, backfilled_array=Y_full_pred, ) gdf_filled = gdf_filled.rename( columns={value_col: "value_extrapolated"} ) else: gdf_filled = update_gdf_with_predictions( gdf=gdf.copy(), y_grid=Y_grid_coords, x_grid=X_grid, Y_grid=Y_full_pred, ) return gdf_filled