"""reVRt routing from a point to many points"""
import json
import time
import logging
from pathlib import Path
from warnings import warn
from functools import cached_property
import rasterio
import numpy as np
import xarray as xr
import pandas as pd
import dask.array as da
import geopandas as gpd
from shapely.geometry import Point
from shapely.geometry.linestring import LineString
from revrt import RouteFinder, simplify_using_slopes
from revrt.utilities.handlers import IncrementalWriter
from revrt.exceptions import (
revrtKeyError,
revrtLeastCostPathNotFoundError,
revrtRustError,
)
from revrt.warn import revrtWarning, revrtDeprecationWarning
logger = logging.getLogger(__name__)
LCP_AGG_COST_LAYER_NAME = "lcp_agg_costs"
"""Special name reserved for internally-built cost layer"""
[docs]
class RoutingScenario:
"""Container for routing scenario configuration"""
def __init__(
self,
cost_fpath,
cost_layers,
friction_layers=None,
tracked_layers=None,
cost_multiplier_layer=None,
cost_multiplier_scalar=1,
ignore_invalid_costs=True,
):
"""
Parameters
----------
cost_fpath : path-like
Path to the cost layer Zarr store used for routing.
cost_layers : list
List of dictionaries containing layer definitions
contributing to the summed routing cost.
friction_layers : list, optional
List of dictionaries defining layers that influence routing
but are excluded from reports.
tracked_layers : dict, optional
Layers to summarize along the path, mapped to aggregation
names.
cost_multiplier_layer : str, optional
Layer name providing spatial multipliers for total cost.
cost_multiplier_scalar : int or float, optional
Scalar multiplier applied to the final cost surface.
ignore_invalid_costs : bool, optional
Flag indicating whether non-positive costs block traversal.
"""
self.cost_fpath = cost_fpath
self.cost_layers = cost_layers
self.friction_layers = friction_layers or []
self.tracked_layers = tracked_layers or {}
self.cost_multiplier_layer = cost_multiplier_layer
self.cost_multiplier_scalar = cost_multiplier_scalar
self.ignore_invalid_costs = ignore_invalid_costs
def __repr__(self):
return (
"RoutingScenario:"
f"\n\t- cost_layers: {self.cost_layers}"
f"\n\t- friction_layers: {self.friction_layers}"
f"\n\t- cost_multiplier_layer: {self.cost_multiplier_layer}"
f"\n\t- cost_multiplier_scalar: {self.cost_multiplier_scalar}"
)
[docs]
@cached_property
def cost_function_json(self):
"""str: JSON string describing configured cost layers"""
return json.dumps(
{
"cost_layers": list(self._cost_layers_for_rust()),
"friction_layers": list(self._friction_layers_for_rust()),
"ignore_invalid_costs": self.ignore_invalid_costs,
}
)
def _cost_layers_for_rust(self):
"""Cost layers formatted for Rust ingestion"""
for layer in self.cost_layers:
out_layer = layer.copy()
out_layer.pop("include_in_report", None)
out_layer.pop("include_in_final_cost", None)
yield out_layer
def _friction_layers_for_rust(self):
"""Friction layers formatted for Rust ingestion"""
for layer in self.friction_layers:
out_layer = layer.copy()
if "layer_name" in out_layer:
msg = (
"Specifying `layer_name` for a friction layer is "
"deprecated! The default behavior of friction layers is "
"to be multiplied to the aggregated cost layer. Please "
"remove this key in order to silence this warning."
)
warn(msg, revrtDeprecationWarning)
out_layer.pop("layer_name")
if "mask" in out_layer:
out_layer["multiplier_layer"] = out_layer.pop("mask")
out_layer.pop("include_in_report", None)
yield out_layer
[docs]
class RoutingLayerManager:
"""Class to build routing layers from user input"""
def __init__(self, routing_scenario, chunks="auto"):
"""
Parameters
----------
routing_scenario : RoutingScenario
Scenario containing cost, friction, and tracking metadata.
chunks : str or mapping, default="auto"
Chunk specification forwarded to ``xarray.open_dataset``.
By default, ``"auto"``.
"""
self.routing_scenario = routing_scenario
self._layer_fh = xr.open_dataset(
self.routing_scenario.cost_fpath,
chunks=chunks,
consolidated=False,
engine="zarr",
)
self.tracked_layers = []
self.transform = self._layer_fh.rio.transform()
self._full_shape = self._layer_fh.rio.shape
self.cost_crs = self._layer_fh.rio.crs
self._layers = set(self._layer_fh.variables)
self.cost = None
self.li_cost = None
self.untracked_cost = None
self.final_routing_layer = None
def __repr__(self):
return f"RoutingLayerManager for {self.routing_scenario!r}"
@property
def latitudes(self):
"""xarray.DataArray: Latitude coordinates for the cost grid"""
return self._layer_fh["latitude"]
@property
def longitudes(self):
"""xarray.DataArray: Longitude coordinates for the cost grid"""
return self._layer_fh["longitude"]
def _verify_layer_exists(self, layer_name):
"""Verify that layer exists in cost file"""
if layer_name not in self._layers:
msg = (
f"Did not find layer {layer_name!r} in cost "
f"file {str(self.routing_scenario.cost_fpath)!r}"
)
raise revrtKeyError(msg)
[docs]
def close(self):
"""Close the underlying xarray file handle"""
self._layer_fh.close()
[docs]
def build(self):
"""Build lazy routing arrays from layered file"""
logger.debug("Building %r", self)
self._build_cost_layer()
self._build_final_routing_layer()
self._build_tracked_layers()
return self
def _build_cost_layer(self):
"""Build out the main cost layer"""
self.cost = da.zeros(self._full_shape, dtype=np.float32)
self.li_cost = da.zeros(self._full_shape, dtype=np.float32)
self.untracked_cost = da.zeros(self._full_shape, dtype=np.float32)
for layer_info in self.routing_scenario.cost_layers:
layer_name = layer_info["layer_name"]
is_li = layer_info.get("is_invariant", False)
cost = self._extract_and_scale_cost_layer(layer_info)
cost.values = da.where(cost > 0, cost, 0)
if layer_info.get("include_in_final_cost", True):
if is_li:
self.li_cost += cost
else:
self.cost += cost
else:
self.untracked_cost += cost
if layer_info.get("include_in_report", True):
self.tracked_layers.append(
CharacterizedLayer(
layer_name, cost, is_length_invariant=is_li
)
)
if mll := self.routing_scenario.cost_multiplier_layer:
self._verify_layer_exists(mll)
self.cost *= self._layer_fh[mll].isel(band=0).astype(np.float32)
self.cost *= self.routing_scenario.cost_multiplier_scalar
self.li_cost += self.cost * 0 # make sure arrays are all the same type
self.cost += self.li_cost * 0 # make sure arrays are all the same type
def _build_final_routing_layer(self):
"""Build out the routing array"""
self.final_routing_layer = self.cost.copy()
self.final_routing_layer += self.untracked_cost
frictions = da.zeros(self._full_shape, dtype=np.float32)
for layer_info in self.routing_scenario.friction_layers:
layer_name = (
layer_info["mask"]
if "mask" in layer_info
else layer_info.get("multiplier_layer")
)
friction_layer = self._extract_and_scale_friction_layer(
layer_name, layer_info
)
if layer_info.get("include_in_report", False):
self.tracked_layers.append(
CharacterizedLayer(layer_name, friction_layer)
)
frictions += friction_layer
frictions = da.where(frictions <= -1, -1.0 + 1e-7, frictions)
self.final_routing_layer *= 1 + frictions
self.final_routing_layer += self.li_cost
self.final_routing_layer.values = da.where(
self.final_routing_layer <= 0,
-1 if self.routing_scenario.ignore_invalid_costs else 1e10,
self.final_routing_layer,
)
def _extract_and_scale_cost_layer(self, layer_info):
"""Extract layer based on name and scale according to input"""
cost = self._extract_layer(layer_info["layer_name"])
multiplier_layer_name = layer_info.get(
"mask", layer_info.get("multiplier_layer")
)
if multiplier_layer_name:
cost *= self._extract_layer(multiplier_layer_name)
cost *= layer_info.get("multiplier_scalar", 1)
return cost
def _extract_and_scale_friction_layer(self, mask_layer_name, layer_info):
"""Extract layer based on name and scale according to input"""
if not mask_layer_name:
msg = (
"Friction layers must specify a 'mask' or "
"'multiplier_layer' key!"
)
raise revrtKeyError(msg)
cost = self._extract_layer(mask_layer_name)
cost *= layer_info.get("multiplier_scalar", 1)
return cost
def _extract_layer(self, layer_name):
"""Extract layer based on name"""
self._verify_layer_exists(layer_name)
return self._layer_fh[layer_name].isel(band=0).astype(np.float32)
def _build_tracked_layers(self):
"""Build out a dictionary of tracked layers"""
for (
tracked_layer,
method,
) in self.routing_scenario.tracked_layers.items():
if getattr(da, method, None) is None:
msg = (
f"Did not find method {method!r} in dask.array! "
f"Skipping tracked layer {tracked_layer!r}"
)
warn(msg, revrtWarning)
continue
if tracked_layer not in self._layers:
msg = (
f"Did not find layer {tracked_layer!r} in cost file "
f"{str(self.routing_scenario.cost_fpath)!r}. "
"Skipping..."
)
warn(msg, revrtWarning)
continue
layer = (
self._layer_fh[tracked_layer].isel(band=0).astype(np.float32)
)
self.tracked_layers.append(
CharacterizedLayer(tracked_layer, layer, agg_method=method)
)
[docs]
class CharacterizedLayer:
"""Encapsulate tracked routing layer metadata"""
def __init__(
self, name, layer, is_length_invariant=False, agg_method=None
):
"""
Parameters
----------
name : str
Identifier used when reporting layer-derived metrics.
layer : xarray.DataArray or dask.array.Array
Data values sampled from the routing stack.
is_length_invariant : bool, default=False
Flag signaling cost values should ignore segment length.
By default, ``False``.
agg_method : str, optional
Name of dask aggregation used when tracking statistics.
By default, ``None``.
"""
self.name = name
self.layer = layer
self.is_length_invariant = is_length_invariant
self.agg_method = agg_method
def __repr__(self):
return (
f"CharacterizedLayer(name={self.name!r}, "
f"is_length_invariant={self.is_length_invariant}, "
f"agg_method={self.agg_method!r})"
)
[docs]
def compute(self, route, cell_size):
"""Compute layer metrics along a route
Parameters
----------
route : sequence
Ordered ``(row, col)`` indices describing the path.
cell_size : float
Raster cell size in meters for distance calculations.
Returns
-------
dict
Mapping of metric names to aggregated layer values.
"""
rows, cols = np.array(route).T
layer_values = self.layer.isel(
y=xr.DataArray(rows, dims="points"),
x=xr.DataArray(cols, dims="points"),
)
if self.agg_method is None:
return self._compute_total_and_length(
layer_values, route, cell_size
)
return self._compute_agg(layer_values)
def _compute_total_and_length(self, layer_values, route, cell_size):
"""Compute total cost and length metrics for the layer"""
if len(route) == 1:
return {
f"{self.name}_cost": 0,
f"{self.name}_length_km": 0,
}
lens, __ = _compute_lens(route, cell_size)
layer_data = getattr(layer_values, "data", layer_values)
if not isinstance(layer_data, da.Array): # pragma: no cover
layer_data = da.asarray(layer_data)
if self.is_length_invariant:
layer_cost = da.sum(layer_data[1:])
else:
layer_cost = da.sum(layer_data * lens)
layer_length = da.sum(lens[layer_data > 0]) * cell_size / 1000
return {
f"{self.name}_cost": layer_cost.astype(np.float32).compute(),
f"{self.name}_length_km": (
layer_length.astype(np.float32).compute()
),
}
def _compute_agg(self, layer_values):
"""Compute aggregated statistic for tracked layer"""
aggregate = getattr(da, self.agg_method)(layer_values).astype(
np.float32
)
return {f"{self.name}_{self.agg_method}": aggregate.compute()}
[docs]
class RouteMetrics:
"""Class to compute route characteristics given layer cost maps"""
def __init__(
self,
routing_layers,
route,
optimized_objective,
add_geom=False,
attrs=None,
):
"""
Parameters
----------
routing_layers : RoutingLayerManager
Routing layer manager containing cost and tracker arrays.
route : list
Ordered row and column indices defining the path.
optimized_objective : float
Objective value returned by the routing solver.
add_geom : bool, default=False
Include shapely geometry in the output when ``True``.
By default, ``False``.
attrs : dict, optional
Additional metadata merged into the result dictionary.
By default, ``None``.
"""
self._routing_layers = routing_layers
self._route = route
self._optimized_objective = optimized_objective
self.__lens = self._total_path_length = None
self._by_layer_results = {}
self._add_geom = add_geom
self._attrs = attrs or {}
@property
def cell_size(self):
"""float: Raster cell size in meters"""
return abs(self._routing_layers.transform.a)
@property
def _lens(self):
"""array-like: Cached per-cell travel distances"""
if self.__lens is None:
self._compute_path_length()
return self.__lens
@property
def total_path_length(self):
"""float: Total path length in kilometers"""
if self._total_path_length is None:
self._compute_path_length()
return self._total_path_length
@property
def cost(self):
"""float: Optimized objective evaluated over the route"""
rows, cols = np.array(self._route).T
cell_costs = self._routing_layers.cost.isel(
y=xr.DataArray(rows, dims="points"),
x=xr.DataArray(cols, dims="points"),
)
cost = da.sum(cell_costs * self._lens)
inv_cell_costs = self._routing_layers.li_cost.isel(
y=xr.DataArray(rows, dims="points"),
x=xr.DataArray(cols, dims="points"),
)
invariant_cost = da.sum(inv_cell_costs[1:])
# Multiple distance travel through cell by cost and sum it!
return (cost + invariant_cost).compute()
@property
def end_lat(self):
"""float: Latitude of the terminal path cell"""
row, col = self._route[-1]
return (
self._routing_layers.latitudes.isel(y=row, x=col)
.astype(np.float32)
.compute()
.item()
)
@property
def end_lon(self):
"""float: Longitude of the terminal path cell"""
row, col = self._route[-1]
return (
self._routing_layers.longitudes.isel(y=row, x=col)
.astype(np.float32)
.compute()
.item()
)
@property
def geom(self):
"""shapely.GeometryType: Geometry for the computed path"""
rows, cols = np.array(self._route).T
x, y = rasterio.transform.xy(
self._routing_layers.transform, rows, cols
)
if len(self._route) == 1:
return Point(x, y)
return LineString(simplify_using_slopes(list(zip(x, y, strict=True))))
[docs]
def compute(self):
"""Assemble route metrics and optional geometry payload"""
results = {
"length_km": self.total_path_length,
"cost": self.cost,
"poi_lat": self.end_lat,
"poi_lon": self.end_lon,
"start_row": self._route[0][0],
"start_col": self._route[0][1],
"end_row": self._route[-1][0],
"end_col": self._route[-1][1],
"optimized_objective": self._optimized_objective,
}
results.update(self._attrs)
for layer in self._routing_layers.tracked_layers:
layer_result = layer.compute(self._route, self.cell_size)
results.update(layer_result)
if self._add_geom:
results["geometry"] = self.geom
return results
def _compute_path_length(self):
"""Compute the total length and cell by cell length of LCP"""
self.__lens, self._total_path_length = _compute_lens(
self._route, self.cell_size
)
[docs]
class IncrementalRouteWriter(IncrementalWriter):
"""Stream results to disk by appending each new result to a file
A new file is created if one does not exist.
"""
def __init__(self, out_fp, crs=None):
"""
Parameters
----------
out_fp : path-like
Path to output file.
crs : rasterio.crs.CRS or dict, optional
Coordinate reference system for geometries when saving to
GeoPackage. By default, ``None``.
"""
super().__init__(out_fp)
self.crs = crs
[docs]
def preprocess_chunk(self, result):
"""Turn result into a dataframe chunk
Parameters
----------
result : dict
Route result dictionary as built by
``RouteMetrics.compute()``.
Returns
-------
pandas.DataFrame or geopandas.GeoDataFrame
A dataframe holding the route result.
"""
if "geometry" in result:
return gpd.GeoDataFrame(
[result], geometry="geometry", crs=self.crs
)
return pd.DataFrame([result])
[docs]
class BatchRouteProcessor:
"""Class to manage batches of route computations"""
def __init__(self, routing_scenario, route_definitions, route_attrs=None):
"""
Parameters
----------
routing_scenario : RoutingScenario
Scenario describing the cost layers and routing options.
route_definitions : Iterable
Sequence of ``(start_points, end_points)`` tuples defining
which points to route between. Each of ``start_points`` and
``end_points`` should be a list of ``(row, col)`` index
tuples.
route_attrs : dict, optional
Mapping of tuples of the form (int, (int, int)) where the
first integer represents the route ID and the tuple of
integers represents the starting index to additional
attributes to include in the output for that route.
By default, ``None``.
"""
self.routing_scenario = routing_scenario
self._route_definitions = route_definitions
self._route_attrs = route_attrs or {}
[docs]
@cached_property
def default_attrs(self):
"""dict: Default attributes for all routes"""
keys = set().union(*[set(x) for x in self._route_attrs.values()])
return dict.fromkeys(keys)
[docs]
@cached_property
def route_attrs(self):
"""dict: Mapping of frozen route node pair sets to attributes"""
return {
k: {**self.default_attrs, **v}
for k, v in self._route_attrs.items()
}
[docs]
@cached_property
def route_definitions(self):
"""list: Validated route definitions for computation"""
return self._compile_valid_route_definitions()
[docs]
@cached_property
def routing_layers(self):
"""RoutingLayerManager: Built routing layers for the scenario"""
return RoutingLayerManager(self.routing_scenario).build()
[docs]
def process(self, out_fp, save_paths=False):
"""Compute all routes and save to disk
Parameters
----------
out_fp : path-like
Path to output file. If ``save_paths=True``, a GeoPackage
will be created (recommend to pass in a filepath ending in
".gpkg"). Otherwise, a CSV file will be created (recommend
to pass in a filepath ending in ".csv").
save_paths : bool, default=False
Include shapely geometries in the output when ``True``.
By default, ``False``.
"""
if not self.route_definitions:
return
ts = time.monotonic()
try:
self._compute_routes(out_fp, save_paths=save_paths)
finally:
self._reset_routing_layers()
time_elapsed = f"{(time.monotonic() - ts) / 60:.4f} min"
logger.debug(
"Routing for %d route definitions computed in %s",
len(self.route_definitions),
time_elapsed,
)
def _compute_routes(self, out_fp, save_paths):
"""Evaluate route definitions and build result records"""
out_fp = _validate_out_fp(out_fp, save_paths)
writer = IncrementalRouteWriter(
out_fp, crs=self.routing_layers.cost_crs
)
for indices, optimized_objective, attrs in self._route_results():
metrics = RouteMetrics(
self.routing_layers,
indices,
optimized_objective,
add_geom=save_paths,
attrs=attrs,
)
route_result = metrics.compute()
writer.save(route_result)
def _route_results(self):
"""Generator yielding route results from Rust computations"""
route_results = RouteFinder(
zarr_fp=self.routing_scenario.cost_fpath,
cost_function=self.routing_scenario.cost_function_json,
route_definitions=[
(rid, sp, ep)
for rid, (sp, ep) in self.route_definitions.items()
],
cache_size=2_000_000_000,
log_level=logging.getLogger("revrt").level or None,
)
yield from self._skip_failed_routes(route_results)
def _compile_valid_route_definitions(self):
"""Filter route definitions to those with valid route nodes"""
if not self._route_definitions:
return {}
sample_definition = self._route_definitions[0]
if len(sample_definition) == 2: # noqa: PLR2004
self._route_definitions = _add_route_ids(self._route_definitions)
routes_to_compute = {}
for route_id, start_points, end_points in self._route_definitions:
filtered_start_points = self._validate_start_points(start_points)
if not filtered_start_points:
msg = (
f"All start points are invalid for route with ID "
f"{route_id}: {start_points}\nSkipping..."
)
warn(msg, revrtWarning)
continue
try:
filtered_end_points = self._validate_end_points(end_points)
except revrtLeastCostPathNotFoundError:
continue
if not filtered_end_points:
msg = (
f"All end points are invalid for route with ID "
f"{route_id}: {end_points}\nSkipping..."
)
warn(msg, revrtWarning)
continue
routes_to_compute[route_id] = (
filtered_start_points,
filtered_end_points,
)
return routes_to_compute
def _skip_failed_routes(self, routing_results):
"""Yield only successfully computed routes from Rust results"""
results_iter = iter(routing_results)
while True:
try:
route_id, solutions = next(results_iter)
start_points, end_points = self.route_definitions[route_id]
if not solutions:
msg = (
f"Unable to find route from {start_points} to any of "
f"{end_points} (route ID: {route_id}). Please verify "
"that the start and end points are not separated by "
"hard barriers or invalid cost cells."
)
logger.error(msg)
continue
logger.debug(
"Got result from Rust for route_id %d. Processing..."
"\n\t- Start points: %r\n\t- End points: %r",
route_id,
start_points,
end_points,
)
for indices, optimized_objective in solutions:
attrs_key = (route_id, indices[0])
attrs = self.route_attrs.get(attrs_key, self.default_attrs)
yield indices, optimized_objective, attrs
except revrtRustError: # pragma: no cover
logger.exception("Rust error when computing route")
continue
except StopIteration:
logger.debug("Routing complete")
break
def _validate_start_points(self, points):
"""Validate start points by removing cells invalid cost"""
points = _get_valid_points(
points, self.routing_layers.cost.shape, point_type="start"
)
if not points or not self.routing_scenario.ignore_invalid_costs:
return points
rows, cols = np.array(points).T
costs = self.routing_layers.cost.isel(
y=xr.DataArray(rows, dims="points"),
x=xr.DataArray(cols, dims="points"),
)
cost_values = costs.compute()
bad_point_inds = np.where(np.isnan(cost_values) | (cost_values <= 0))[
0
]
if not bad_point_inds.size:
return points
invalid_points = {points[i] for i in bad_point_inds}
msg = (
f"One or more of the start points have an invalid cost "
f"(must be > 0): {invalid_points}\n"
"Dropping these from consideration..."
)
warn(msg, revrtWarning)
return [p for p in points if p not in invalid_points]
def _validate_end_points(self, points):
"""Filter out invalid endpoints; raise if all are invalid"""
points = _get_valid_points(
points, self.routing_layers.cost.shape, point_type="end"
)
if not points or not self.routing_scenario.ignore_invalid_costs:
return points
rows, cols = np.array(points).T
costs = self.routing_layers.cost.isel(
y=xr.DataArray(rows, dims="points"),
x=xr.DataArray(cols, dims="points"),
)
if not np.any(costs.compute() > 0):
msg = (
f"None of the end points have a valid cost (must be > 0): "
f"{points}"
)
raise revrtLeastCostPathNotFoundError(msg)
return points
def _reset_routing_layers(self):
"""Close handler and remove built routing layers from memory"""
self.routing_layers.close()
del self.routing_layers
def _validate_out_fp(out_fp, save_paths):
"""Validate output filepath extension"""
out_fp = Path(out_fp)
if save_paths and out_fp.suffix.lower() != ".gpkg":
msg = (
"When saving paths, the output file should have a '.gpkg' "
f"extension to ensure proper format! Got input file: '{out_fp}'. "
"Adding '.gpkg' extension... "
)
warn(msg, revrtWarning)
out_fp = out_fp.with_suffix(".gpkg")
elif not save_paths and out_fp.suffix.lower() != ".csv":
msg = (
"When not saving paths, the output file should have a '.csv' "
f"extension to ensure proper format! Got input file: '{out_fp}'. "
"Adding '.csv' extension... "
)
warn(msg, revrtWarning)
out_fp = out_fp.with_suffix(".csv")
logger.debug("Validated output filepath: %s", out_fp)
return out_fp
def _get_valid_points(points, arr_shape, point_type):
"""Get only points that are within array bounds"""
valid_points = []
invalid_points = []
for point in points:
if _is_valid_point(point, arr_shape):
valid_points.append(point)
else:
invalid_points.append(point)
if invalid_points:
msg = (
f"One or more of the {point_type} points are out of bounds for an "
f"array of shape {arr_shape}: {invalid_points}\n"
"Dropping these from consideration..."
)
warn(msg, revrtWarning)
return valid_points
def _is_valid_point(point, arr_shape):
"""Check if point is within array bounds"""
row, col = point
return 0 <= row < arr_shape[0] and 0 <= col < arr_shape[1]
def _add_route_ids(route_definitions):
"""Add route IDs to route definitions missing them"""
logger.info(
"Route ID's missing from route definitions - adding definition "
"index as route ID..."
)
return [
(ind, start_points, end_points)
for ind, (start_points, end_points) in enumerate(route_definitions)
]
def _compute_lens(route, cell_size):
"""Compute the total length and cell by cell length of LCP"""
# Use Pythagorean theorem to calculate length between cells (km)
# Use c**2 = a**2 + b**2 to determine length of individual paths
lens = np.sqrt(np.sum(np.diff(route, axis=0) ** 2, axis=1))
total_path_length = np.sum(lens) * cell_size / 1000
# Need to determine distance coming into and out of any cell.
# Assume paths start and end at the center of a cell. Therefore,
# distance traveled in the cell is half the distance entering it
# and half the distance exiting it. Duplicate all lengths,
# pad 0s on ends for start and end cells, and divide all
# distance by half.
lens = np.repeat(lens, 2)
lens = np.insert(np.append(lens, 0), 0, 0)
lens /= 2
# Group entrance and exits distance together, and add them
lens = lens.reshape((int(lens.shape[0] / 2), 2))
lens = np.sum(lens, axis=1)
return lens, total_path_length