Source code for geopfa.data_readers

"""
Set of methods to read in data in various formats.
"""

import os
from pathlib import Path
from contextlib import suppress

import geopandas as gpd
import pandas as pd
import numpy as np
from shapely import wkt
import shapely
import pyproj
import rasterio
import re
from itertools import starmap
from geopfa.processing import Processing


[docs] class GeospatialDataReaders: """Read geospatial data in various formats""" @staticmethod def read_shapefile(path): """Reads in a shapefile and returns a geopandas dataframe. Parameters ---------- path : str Path to shapefile Returns ------- data : geopandas.GeoDataFrame GeoDataFrame containing contents of shapefile """ data = gpd.read_file(path) return data @staticmethod def read_csv( # noqa: PLR0913, PLR0917 path, crs, x_col=None, y_col=None, z_col=None, geometry_column_name="geometry", ): """Reads in a CSV file and returns a geopandas dataframe. Parameters ---------- path : str Path to csv file crs : str or int String or integer version of coordinate reference system associated with the CSV file. x_col : str Name of x geometry column if no combined geometry column is provided. y_col : str Name of y geometry column if no combined geometry column is provided. z_col : str Name of z geometry column if 3D, and if no combined geometry column is provided. geometry_column_name : str Name of column containing the geometry information. Defaults to 'geometry.' Returns ------- gdf : geopandas.GeoDataFrame GeoDataFrame containing contents of CSV file """ # Read the CSV file df = pd.read_csv(path) # noqa: PD901 # Validate input geometry columns if sum([(x_col is None), (y_col is None)]) == 1: raise ValueError( "Must specify both x_col and y_col, or a combined geometry column." ) if (z_col is not None) and (x_col is None or y_col is None): raise ValueError( "Cannot specify z_col without also specifying x_col and y_col." ) # Validate CRS try: crs = pyproj.CRS.from_user_input(crs) if crs else None except pyproj.exceptions.CRSError as e: raise ValueError(f"Invalid CRS provided: {crs}. Error: {e}") # Create geometry from a combined geometry column if x_col is None and y_col is None and z_col is None: df = df.rename(columns={geometry_column_name: "geometry"}) # noqa: PD901 df["geometry"] = df["geometry"].apply(wkt.loads) gdf = gpd.GeoDataFrame(df, crs=crs) # Create 2D geometry from x and y columns elif z_col is None: gdf = gpd.GeoDataFrame( df, geometry=gpd.points_from_xy(df[x_col], df[y_col]) ) gdf.set_crs(crs, inplace=True) # Create 3D geometry from x, y, and z columns else: gdf = gpd.GeoDataFrame( df, geometry=list( map( shapely.geometry.Point, df[x_col], df[y_col], df[z_col] ) ), ) gdf.set_crs(crs, inplace=True) return gdf @staticmethod def read_raster(path): """Reads in a raster file and returns a rasterio dataset object. Parameters ---------- path : str Path to raster file Returns ------- data : geopandas.GeoDataFrame GeoDataFrame containing contents of raster file """ # TODO: Return geopandas dataframe instead of rasterio dataset object data = rasterio.open(path) return data @staticmethod def read_tif(tif_path): """Read a TIFF file and convert it to a GeoDataFrame with points. Parameters ---------- tif_path : str Path to the TIFF file. Returns ------- GeoDataFrame GeoDataFrame with points representing the raster data. """ # Open the TIFF file with rasterio.open(tif_path) as src: # Read the raster data data = src.read(1) # Read the first band transform = src.transform crs = src.crs nodata = src.nodata # Get all indices where data is not equal to nodata rows, cols = np.where(data != nodata) # Convert raster indices to spatial coordinates x_coords, y_coords = rasterio.transform.xy(transform, rows, cols) # Create points and values arrays points = list(map(shapely.geometry.Point, x_coords, y_coords)) values = data[rows, cols] # Create a GeoDataFrame gdf = gpd.GeoDataFrame({"geometry": points, "value": values}) gdf.set_crs(crs, inplace=True) return gdf @staticmethod def read_tec( # noqa: PLR0917 path, crs, x_col=None, y_col=None, z_col=None, geometry_column_name="geometry", ): """Reads in a double-space-delimited TEC file and returns a geopandas dataframe. Assumes: - First row: title (ignored) - Second row: Tecplot-style VARIABLES declaration, e.g. ``VARIABLES = "X" "Y" "Z" "P(Pa)" ...`` - Third row: assumptions / zone line (ignored) - Data begins on row 4 Parameters ---------- path : str Path to TEC file. crs : str or int String or integer version of coordinate reference system associated with the TEC file. x_col : str Name of x geometry column if no combined geometry column is provided. y_col : str Name of y geometry column if no combined geometry column is provided. z_col : str Name of z geometry column if 3D, and if no combined geometry column is provided. geometry_column_name : str Name of column containing the geometry information. Defaults to 'geometry.' Returns ------- gdf : geopandas.GeoDataFrame GeoDataFrame containing contents of TEC file """ # Read all lines path = Path(path) with path.open(encoding="utf-8") as f: lines = f.readlines() if len(lines) < 4: raise ValueError( "File must contain at least title, header, assumptions, and data rows." ) # ------------------------------------------------------------------ # Parse column names from Tecplot-style header: # VARIABLES = "X" "Y" "Z" "P(Pa)" ... # ------------------------------------------------------------------ header_line = lines[1].strip() # Get everything inside quotes: ["X", "Y", "Z", "P(Pa)", ...] columns = re.findall(r'"([^"]+)"', header_line) # Fallback if we didn't find quoted names (more generic TEC) if not columns: tokens = header_line.split() # Drop leading VARIABLES token if present if tokens and tokens[0].lower().startswith("variables"): tokens = tokens[1:] columns = [t.strip('"') for t in tokens] data = [] for line in lines[3:]: if not line.strip(): continue values = line.split() # If line is short/long, pad or truncate to match number of columns if len(values) < len(columns): values.extend([None] * (len(columns) - len(values))) elif len(values) > len(columns): values = values[: len(columns)] data.append(values) # Create DataFrame df = pd.DataFrame(data, columns=columns) # Clean column names: strip surrounding whitespace and quotes df = df.rename(columns=lambda c: c.strip().strip('"')) # Convert numeric columns where possible for col in df.columns: with suppress(ValueError, TypeError): df[col] = pd.to_numeric(df[col]) # Validate geometry input if sum([(x_col is None), (y_col is None)]) == 1: raise ValueError( "Must specify both x_col and y_col, or a combined geometry column." ) if (z_col is not None) and (x_col is None or y_col is None): raise ValueError( "Cannot specify z_col without also specifying x_col and y_col." ) # Validate CRS try: crs = pyproj.CRS.from_user_input(crs) if crs else None except pyproj.exceptions.CRSError as e: raise ValueError(f"Invalid CRS provided: {crs}. Error: {e}") # Create geometry from a combined geometry column (rare for TEC, but supported) if x_col is None and y_col is None and z_col is None: df = df.rename(columns={geometry_column_name: "geometry"}) # noqa: PD901 df["geometry"] = df["geometry"].apply(wkt.loads) gdf = gpd.GeoDataFrame(df, crs=crs) # Create 2D geometry from x and y columns elif z_col is None: gdf = gpd.GeoDataFrame( df, geometry=gpd.points_from_xy(df[x_col], df[y_col]), ) gdf.set_crs(crs, inplace=True) # Create 3D geometry from x, y, and z columns else: gdf = gpd.GeoDataFrame( df, geometry=list( map( shapely.geometry.Point, df[x_col], df[y_col], df[z_col], ) ), ) gdf.set_crs(crs, inplace=True) return gdf @staticmethod def read_well_path_csv( csv_path, x_col, y_col, z_col=None, *, well_name_col=None, # optional value_col=None, # optional per-vertex values (temp/GR/etc.) source_crs=None, to_crs=None, sort_by=None, md_col=None, dropna_any=True, deduplicate_consecutive=True, z_meas=None, target_z_meas=None, convert_z_after=True, ): """Reads a well-path CSV and returns a point GeoDataFrame (ordered vertices) and optional values. Parameters ---------- csv_path : str Path to the CSV file containing well path vertices. x_col : str Column name for X (or longitude). y_col : str Column name for Y (or latitude). z_col : str, optional Column name for Z (depth or elevation). If omitted, Z=0. well_name_col : str, optional Column name for well name; stored in output. value_col : str, optional Column name for per-vertex values (e.g., Temperature, GR). source_crs : 'int or str', optional Input CRS for X/Y. Example: 32613 or 4326. to_crs : int or str, optional Output CRS for reprojection. sort_by : str, optional Column to sort by before building points (e.g., 'MD_m'). md_col : str, optional If provided and sort_by is None, rows are sorted by this column. dropna_any : bool If True, drop rows with any NaN in X/Y/Z; else only drop rows with all NaNs. deduplicate_consecutive : bool If True, drop consecutive duplicate XY Z vertices. z_meas : str, optional Vertical reference of Z in the CSV (e.g., 'depth', 'm-msl', 'epsg:5703'). target_z_meas : str, optional Desired vertical reference (e.g., 'm-msl', 'ft-msl', 'epsg:6360'). convert_z_after : bool If True and z_meas/target_z_meas provided, converts Z via convert_z_measurements(). Returns ------- well_gdf : geopandas.GeoDataFrame Ordered vertices as 3D Point geometries (one row per vertex). values : numpy.ndarray or None Per-vertex values aligned with rows (or None if value_col not provided). """ df = pd.read_csv(csv_path) # Pull coordinates x = df[x_col].astype(float).to_numpy() y = df[y_col].astype(float).to_numpy() if z_col is not None: z = df[z_col].astype(float).to_numpy() else: z = np.zeros_like(x, dtype=float) coords = np.column_stack([x, y, z]) # Drop NaNs if dropna_any: mask = ~np.isnan(coords).any(axis=1) else: mask = ~(np.isnan(coords).all(axis=1)) df = df.loc[mask].reset_index(drop=True) coords = coords[mask] if coords.shape[0] < 2: raise ValueError( "Not enough valid vertices to define a well path." ) # Sorting if sort_by is None and md_col is not None and md_col in df.columns: sort_by = md_col if sort_by is not None and sort_by in df.columns: order = np.argsort(df[sort_by].to_numpy()) df = df.iloc[order].reset_index(drop=True) coords = coords[order] # De-duplicate consecutive vertices if deduplicate_consecutive and len(coords) > 1: keep = np.ones(len(coords), dtype=bool) keep[1:] = (np.abs(np.diff(coords, axis=0)) > 0).any(axis=1) df = df.loc[keep].reset_index(drop=True) coords = coords[keep] if len(coords) < 2: raise ValueError( "Well path collapsed to one vertex after de-duplication." ) # Build point GeoDataFrame geometries = [ shapely.geometry.Point(float(X), float(Y), float(Z)) for X, Y, Z in coords ] well_name = ( df[well_name_col].iloc[0] if (well_name_col and well_name_col in df.columns) else None ) well_gdf = gpd.GeoDataFrame( {"name": [well_name] * len(geometries)}, geometry=geometries, crs=source_crs, ) # Reproject horizontal CRS if requested if to_crs is not None: well_gdf = well_gdf.to_crs(to_crs) # Optional per-vertex values values = None if value_col is not None: if value_col not in df.columns: raise ValueError(f"Column '{value_col}' not found for values.") values = df[value_col].to_numpy() # Align to vertex count if len(values) > len(well_gdf): values = values[: len(well_gdf)] elif len(values) < len(well_gdf): pad = np.full(len(well_gdf) - len(values), np.nan) values = np.concatenate([values, pad]) # Optional vertical conversion (expects point Zs in geometry) if ( convert_z_after and z_meas is not None and target_z_meas is not None ): well_gdf = Processing.convert_z_measurements( well_gdf, z_meas, target_z_meas ) return well_gdf, values @classmethod def gather_data(cls, data_dir, pfa, file_types): # noqa: PLR0912 """Function to read in data layers associated with each component of each criteria. Note that data must be stored in a directory with the following structure which matches the config: criteria/component/layers. Criteria directory, component directory, and data file names must match the critera, components, and layers specified in the pfa, and file extensions must match those specified in file_types. Parameters ---------- data_dir : str Path to directory where data is stored pfa : dict Config specifying criteria, components, and data layers' relationship to one another. Read in from json file. file_types : list List of file types to look for when gathering data. File types excluded from list will be ignored. Returns ------- pfa : dict Updated pfa config which includes data """ data_dir = Path(data_dir) for criteria in pfa["criteria"]: # noqa: PLR1702 print("criteria: " + criteria) for component in pfa["criteria"][criteria]["components"]: print("\t component: " + component) COMPONENT_DIR = data_dir / criteria / component file_names = sorted(COMPONENT_DIR.iterdir()) # --- Shapefiles ------------------------------------------------- if ".shp" in file_types: shapefile_names = [ x.name for x in file_names if (x.suffix == ".shp") ] for layer in pfa["criteria"][criteria]["components"][ component ]["layers"]: if f"{layer}.shp" in shapefile_names: print("\t\t reading layer: " + layer) pfa["criteria"][criteria]["components"][component][ "layers" ][layer][ "data" ] = GeospatialDataReaders.read_shapefile( COMPONENT_DIR / f"{layer}.shp" ) # --- CSV files --------------------------------------------------- if ".csv" in file_types: csv_file_names = [ x.name for x in file_names if (x.suffix == ".csv") ] for layer in pfa["criteria"][criteria]["components"][ component ]["layers"]: if f"{layer}.csv" in csv_file_names: print("\t\t reading layer: " + layer) layer_config = pfa["criteria"][criteria][ "components" ][component]["layers"][layer] csv_crs = layer_config["crs"] if ( "x_col" in layer_config and "y_col" in layer_config ): x_col = layer_config["x_col"] y_col = layer_config["y_col"] if "z_col" in layer_config: z_col = layer_config["z_col"] data = GeospatialDataReaders.read_csv( COMPONENT_DIR / f"{layer}.csv", csv_crs, x_col, y_col, z_col, ) else: data = GeospatialDataReaders.read_csv( COMPONENT_DIR / f"{layer}.csv", csv_crs, x_col, y_col, ) else: data = GeospatialDataReaders.read_csv( COMPONENT_DIR / f"{layer}.csv", csv_crs, ) pfa["criteria"][criteria]["components"][component][ "layers" ][layer]["data"] = data # --- TEC files --------------------------------------------------- if ".tec" in file_types: tec_file_names = [ x.name for x in file_names if (x.suffix == ".tec") ] for layer in pfa["criteria"][criteria]["components"][ component ]["layers"]: if f"{layer}.tec" in tec_file_names: print("\t\t reading layer: " + layer) layer_config = pfa["criteria"][criteria][ "components" ][component]["layers"][layer] tec_crs = layer_config["crs"] if ( "x_col" in layer_config and "y_col" in layer_config ): x_col = layer_config["x_col"] y_col = layer_config["y_col"] if "z_col" in layer_config: z_col = layer_config["z_col"] data = GeospatialDataReaders.read_tec( COMPONENT_DIR / f"{layer}.tec", tec_crs, x_col, y_col, z_col, ) else: data = GeospatialDataReaders.read_tec( COMPONENT_DIR / f"{layer}.tec", tec_crs, x_col, y_col, ) else: # Fall back to read_tec with only path + CRS data = GeospatialDataReaders.read_tec( COMPONENT_DIR / f"{layer}.tec", tec_crs, ) pfa["criteria"][criteria]["components"][component][ "layers" ][layer]["data"] = data # --- Unknown file types ----------------------------------------- for file_type in file_types: if file_type not in {".shp", ".csv", ".tec"}: print( f"Warning: file type: {file_type} " " not currently compatible with geoPFA." ) return pfa @classmethod def gather_processed_data(cls, data_dir, pfa, crs): """Function to read in processed data layers associated with each component of each criteria. Note that data must be stored in a directory with the following structure which matches the config: criteria/component/layers. Criteria directory, component directory, and data file names must match the critera, components, and layers specified in the pfa, and file extensions must match those specified in file_types. Parameters ---------- data_dir : str Path to directory where data is stored pfa : dict Config specifying criteria, components, and data layers' relationship to one another. Read in from json file. Returns ------- pfa : dict Updated pfa config which includes data """ data_dir = Path(data_dir) for criteria in pfa["criteria"]: print("criteria: " + criteria) for component in pfa["criteria"][criteria]["components"]: print("\t component: " + component) COMPONENT_DIR = data_dir / criteria / component file_names = sorted(COMPONENT_DIR.iterdir()) csv_file_names = [ x.name for x in file_names if x.name.endswith("_processed.csv") ] for layer in pfa["criteria"][criteria]["components"][ component ]["layers"]: if layer + "_processed.csv" in csv_file_names: print("\t\t reading layer: " + layer) pfa["criteria"][criteria]["components"][component][ "layers" ][layer]["model"] = GeospatialDataReaders.read_csv( COMPONENT_DIR / f"{layer}_processed.csv", crs, ) return pfa @classmethod def gather_exclusion_areas(cls, data_dir, pfa, target_crs): """Gathers/reads in exclusion area shapefiles for a given set of exclusion components and layers, transforming them into the target coordinate reference system (CRS) and storing them in the `pfa` dictionary. This function iterates over the exclusion components and their associated layers in the `pfa` dictionary, reads the corresponding shapefiles from the specified directory, filters the shapefile data based on the `DN` field (keeping only entries where `DN > 0`), and reprojects the geometries to the target CRS. The processed shapefiles are stored back into the `pfa` dictionary under the relevant component and layer. Parameters ---------- cls : type The class that the method belongs to. This is typically passed automatically in class methods. data_dir : str The directory path where the exclusion shapefiles are stored. The shapefiles are expected to be located under a subdirectory named 'exclusion' within this directory. pfa : dict A dictionary containing spatial data and exclusion components. The function reads exclusion areas for each component and layer and updates the dictionary with the processed shapefiles target_crs : str or dict The target Coordinate Reference System (CRS) to which the exclusion shapefiles will be reprojected. This can be a CRS string (e.g., 'EPSG:4326') or a CRS dictionary format. Returns ------- dict The updated `pfa` dictionary, where the processed exclusion areas are stored under `pfa['exclusions']['components'][exclusion_component]['layers'][layer]['model']`. Notes ------ - The function assumes that the exclusion shapefiles are stored in the `data_dir` under a subdirectory named 'exclusion' and that the filenames match the layer names. - Only shapefile records where the `DN` field has a value greater than 0 are retained for further processing. - The shapefile geometries are reprojected to the specified `target_crs` to ensure consistent spatial reference. - The processed shapefiles are stored in the `pfa` dictionary under their respective exclusion components and layers. """ data_dir = Path(data_dir) for exclusion_component in pfa["exclusions"]["components"]: for layer in pfa["exclusions"]["components"][exclusion_component][ "layers" ]: print("reading " + layer) path = data_dir / "exclusion" / f"{layer}.shp" shp = GeospatialDataReaders.read_shapefile(path) shp = shp[shp.DN > 0] shp = shp.to_crs(target_crs) pfa["exclusions"]["components"][exclusion_component]["layers"][ layer ]["model"] = shp return pfa