Source code for revrt.costs.cli

"""Code to build cost layer file"""

import json
import logging
from pathlib import Path
from warnings import warn

import rioxarray
import dask.config
import xarray as xr
import dask.distributed
from gaps.cli import CLICommandFromFunction

from revrt.models.cost_layers import ALL, TransmissionLayerCreationConfig
from revrt.costs.layer_creator import LayerCreator
from revrt.costs.dry_costs_creator import DryCostsCreator
from revrt.costs.masks import Masks
from revrt.utilities import (
    LayeredFile,
    load_data_using_layer_file_profile,
    save_data_using_layer_file_profile,
)
from revrt.exceptions import revrtAttributeError, revrtConfigurationError
from revrt.warn import revrtWarning


logger = logging.getLogger(__name__)
CONFIG_ACTIONS = ["layers", "dry_costs", "merge_friction_and_barriers"]


[docs] def build_masks( land_mask_shp_fp, template_file, masks_dir, reproject_vector=True, chunks="auto", ): """Build masks from land vector file Parameters ---------- land_mask_shp_fp : path-like Path to land polygon GPKG or shp file. template_file : path-like Path to template GeoTIFF (``*.tif`` or ``*.tiff``) or Zarr (``*.zarr``) file containing the profile and transform to be used for the masks. masks_dir : path-like Directory to output mask GeoTIFFs. reproject_vector : bool, default=True Option to reproject CRS of input land vector file to match CRS of template file. By default, ``True``. chunks : str, optional Chunk size to use when reading the template file. This will be passed down as the ``chunks`` argument to :func:`rioxarray.open_rasterio` or :func:`xarray.open_dataset`. By default, ``"auto"``. """ land_mask_shp_fp = Path(land_mask_shp_fp) template_file = Path(template_file) masks_dir = Path(masks_dir) if template_file.suffix == ".zarr": open_func = xr.open_dataset kwargs = {"consolidated": False, "engine": "zarr"} else: open_func = rioxarray.open_rasterio kwargs = {} with open_func(template_file, chunks=chunks, **kwargs) as fh: masks = Masks( shape=fh.rio.shape, crs=fh.rio.crs, transform=fh.rio.transform(), masks_dir=masks_dir, ) masks.create( land_mask_shp_fp=land_mask_shp_fp, save_tiff=True, reproject_vector=reproject_vector, )
[docs] def build_routing_layers( # noqa: PLR0917, PLR0913 routing_file, template_file=None, input_layer_dir=".", output_tiff_dir=".", masks_dir=".", layers=None, dry_costs=None, merge_friction_and_barriers=None, max_workers=1, memory_limit_per_worker="auto", create_kwargs=None, ): """Create costs, barriers, and frictions from a config file You can re-run this function on an existing file to add new layers without overwriting existing layers or needing to change your original config. Parameters ---------- routing_file : path-like Path to GeoTIFF/Zarr file to store cost layers in. If the file does not exist, it will be created based on the `template_file` input. template_file : path-like, optional Path to template GeoTIFF (``*.tif`` or ``*.tiff``) or Zarr (``*.zarr``) file containing the profile and transform to be used for the layered costs file. If ``None``, then the `routing_file` is assumed to exist on disk already. By default, ``None``. input_layer_dir : path-like, optional Directory to search for input layers in, if not found in current directory. By default, ``'.'``. output_tiff_dir : path-like, optional Directory where cost layers should be saved as GeoTIFF. By default, ``"."``. masks_dir : path-like, optional Directory for storing/finding mask GeoTIFFs (wet, dry, landfall, wet+, dry+). By default, ``"."``. layers : list of LayerConfig, optional Configuration for layers to be built and added to the file. At least one of `layers`, `dry_costs`, or `merge_friction_and_barriers` must be defined. By default, ``None``. dry_costs : DryCosts, optional Configuration for dry cost layers to be built and added to the file. At least one of `layers`, `dry_costs`, or `merge_friction_and_barriers` must be defined. By default, ``None``. merge_friction_and_barriers : MergeFrictionBarriers, optional Configuration for merging friction and barriers and adding to the layered costs file. At least one of `layers`, `dry_costs`, or `merge_friction_and_barriers` must be defined. By default, ``None`` max_workers : int, optional Number of parallel workers to use for file creation. If ``None`` or >1, processing is performed in parallel using Dask. By default, ``1``. memory_limit_per_worker : str, float, int, or None, default="auto" Sets the memory limit *per worker*. This only applies if ``max_workers != 1``. If ``None`` or ``0``, no limit is applied. If ``"auto"``, the total system memory is split evenly between the workers. If a float, that fraction of the system memory is used *per worker*. If a string giving a number of bytes (like "1GiB"), that amount is used *per worker*. If an int, that number of bytes is used *per worker*. By default, ``"auto"`` create_kwargs : dict, optional Additional keyword arguments to pass to :meth:`~revrt.utilities.handlers.LayeredFile.create_new` when creating a new layered file. Do not include ``template_file``; it will be ignored. By default, ``None``. """ config = _validated_config( routing_file=routing_file, template_file=template_file or routing_file, input_layer_dir=input_layer_dir, output_tiff_dir=output_tiff_dir, masks_dir=masks_dir, layers=layers, dry_costs=dry_costs, merge_friction_and_barriers=merge_friction_and_barriers, ) logger.debug( "Using dask config:\n%s", json.dumps(dask.config.config, indent=4) ) lock = None if max_workers != 1: client = dask.distributed.Client( n_workers=max_workers, memory_limit=memory_limit_per_worker ) logger.info( "Dask client created with %s workers and %r memory limit per " "worker", max_workers, memory_limit_per_worker, ) logger.info("Dashboard link: %s", client.dashboard_link) lock = dask.distributed.Lock("rioxarray-write", client=client) lf_handler = LayeredFile(fp=config.routing_file) if not lf_handler.fp.exists(): create_kwargs = create_kwargs or {} create_kwargs.pop("template_file", None) logger.info( "%s not found. Creating new layered file with kwargs:\n%r", lf_handler.fp, create_kwargs, ) lf_handler.create_new( template_file=config.template_file, **create_kwargs ) masks = _load_masks(config, lf_handler) builder = LayerCreator( lf_handler, masks, input_layer_dir=config.input_layer_dir, output_tiff_dir=config.output_tiff_dir, ) _build_layers(config, builder, lf_handler, lock=lock) if config.dry_costs is not None: _build_dry_costs(config, masks, lf_handler, lock=lock) if config.merge_friction_and_barriers is not None: _combine_friction_and_barriers(config, lf_handler, lock=lock)
def _validated_config(**config_dict): """Validate use config inputs""" config = TransmissionLayerCreationConfig.model_validate(config_dict) if not any(config.model_dump()[key] is not None for key in CONFIG_ACTIONS): msg = f"At least one of {CONFIG_ACTIONS!r} must be in the config file" raise revrtConfigurationError(msg) return config def _load_masks(config, lf_handler): """Load masks based on config file""" masks = Masks( shape=lf_handler.shape, crs=lf_handler.profile["crs"], transform=lf_handler.profile["transform"], masks_dir=config.masks_dir, ) if not config.layers: return masks build_configs = [lc.build for lc in config.layers] need_masks = any( lc.extent != ALL for bc in build_configs for lc in bc.values() ) if need_masks: masks.load(lf_handler.fp) return masks def _build_layers(config, builder, lf_handler, lock): """Build layers from config file""" existing_layers = set(lf_handler.data_layers) for lc in config.layers or []: if lc.layer_name in existing_layers: logger.info( "Layer %r already exists in %s! Skipping...", lc.layer_name, lf_handler.fp, ) continue builder.build( lc.layer_name, lc.build, values_are_costs_per_mile=lc.values_are_costs_per_mile, write_to_file=lc.include_in_file, description=lc.description, lock=lock, ) def _build_dry_costs(config, masks, lf_handler, lock): """Build dry costs from config file""" dc = config.dry_costs dry_mask = None try: dry_mask = masks.dry_mask except revrtAttributeError: msg = "Dry mask not found! Computing dry costs for full extent!" warn(msg, revrtWarning) dcc = DryCostsCreator( lf_handler, input_layer_dir=config.input_layer_dir, output_tiff_dir=config.output_tiff_dir, ) cost_configs = None if not dc.cost_configs else str(dc.cost_configs) dcc.build( iso_region_tiff=dc.iso_region_tiff, nlcd_tiff=dc.nlcd_tiff, slope_tiff=dc.slope_tiff, transmission_config=cost_configs, mask=dry_mask, default_mults=dc.default_mults, extra_tiffs=dc.extra_tiffs, lock=lock, ) def _combine_friction_and_barriers(config, io_handler, lock): """Combine friction and barriers and save to layered file""" logger.info("Loading friction and raw barriers") merge_config = config.merge_friction_and_barriers friction = load_data_using_layer_file_profile( io_handler.fp, f"{merge_config.friction_layer}.tif", layer_dirs=[config.output_tiff_dir, config.input_layer_dir], ) barriers = load_data_using_layer_file_profile( io_handler.fp, f"{merge_config.barrier_layer}.tif", layer_dirs=[config.output_tiff_dir, config.input_layer_dir], ) combined = friction + barriers * merge_config.barrier_multiplier out_fp = config.output_tiff_dir / f"{merge_config.output_layer_name}.tif" logger.debug("Saving combined barriers to %s", out_fp) save_data_using_layer_file_profile( layer_fp=io_handler.fp, data=combined, geotiff=out_fp, lock=lock ) logger.info("Writing combined barriers to H5") io_handler.write_layer(combined, merge_config.output_layer_name) build_masks_command = CLICommandFromFunction( build_masks, name="build-masks", add_collect=False, split_keys=None, ) build_routing_layers_command = CLICommandFromFunction( build_routing_layers, name="build-routing-layers", add_collect=False, split_keys=None, )