"""reVRt monitoring utilities"""
import time
import psutil
import logging
from pathlib import Path
from contextlib import nullcontext, contextmanager
from uuid import uuid4
import numpy as np
import xarray as xr
import dask.array as da
from dask.distributed import performance_report
logger = logging.getLogger(__name__)
[docs]
def log_mem(log_level="DEBUG"):
"""Log the memory usage to the input logger object
Parameters
----------
log_level : str, default="DEBUG"
Logging level to use. Can be any valid log level string, such
as DEBUG or INFO for different log levels for this log message.
By default, ``"DEBUG"``.
Returns
-------
msg : str
Memory utilization log message string.
"""
mem = psutil.virtual_memory()
msg = (
f"Memory utilization is {mem.used / (1024.0**3):.3f} GB "
f"out of {mem.total / (1024.0**3):.3f} GB total "
f"({mem.used / mem.total:.1%} used)"
)
log_level = logging.getLevelNamesMapping().get(log_level.upper(), "DEBUG")
logger.log(log_level, msg)
return msg
[docs]
def log_array_backend(fname, data, kind):
"""Log backend information for layer data
Parameters
----------
fname : str or path-like
Layer name or source identifier to include in the log message.
data : xarray.DataArray, dask.array.Array, numpy.ndarray, or object
Data object to inspect. For xarray inputs, the underlying array
backend is checked to determine whether the data are NumPy- or
Dask-backed. Other objects are logged using their concrete type
name, along with any ``dtype`` and ``shape`` attributes if
present.
kind : str
Short label describing the kind of the layer, such as
``"processed"`` or ``"lazy reload"``.
"""
if isinstance(data, xr.DataArray):
backend = data.data
backend_name = type(backend).__name__
if isinstance(backend, da.Array):
storage = "Dask"
elif isinstance(backend, np.ndarray):
storage = "NumPy"
else:
storage = backend_name
elif isinstance(data, da.Array):
storage = "Dask"
backend_name = type(data).__name__
elif isinstance(data, np.ndarray):
storage = "NumPy"
backend_name = type(data).__name__
else:
storage = type(data).__name__
backend_name = storage
logger.debug(
"%s layer %s is %s-backed (%s) with dtype %s, and shape %s",
kind,
fname,
storage,
backend_name,
getattr(data, "dtype", None),
getattr(data, "shape", None),
)
[docs]
@contextmanager
def log_runtime(message, log_level=logging.INFO):
"""Log the time taken to run a block of code
Parameters
----------
message : str
Message to log with the time taken. The time taken will be
appended to this message.
log_level : int, default="INFO"
Logging level to use for the message.
By default, ``logging.INFO``.
hours : bool, default=False
If ``True``, log time in hours instead of minutes.
By default, ``False``.
"""
start_time = time.monotonic()
try:
yield
finally:
end_time = time.monotonic()
elapsed_time = elapsed_time_as_str(end_time - start_time)
msg = f"{message} took {elapsed_time} to run"
logger.log(log_level, msg)
[docs]
def elapsed_time_as_str(seconds_elapsed):
"""Format elapsed time into human readable string
Parameters
----------
seconds_elapsed : int
Number of seconds that should be represented in string form.
Returns
-------
str
Human-readable string representing the number of elapsed
seconds.
"""
days, seconds = divmod(int(seconds_elapsed), 24 * 3600)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
time_str = f"{hours:d}:{minutes:02d}:{seconds:02d}"
if days:
time_str = f"{days:,d} day{'s' if abs(days) != 1 else ''}, {time_str}"
return time_str
[docs]
def close_dask_client(client, timeout=600):
"""Close a Dask client without failing on shutdown timeouts
Parameters
----------
client : distributed.Client or None
Dask client to close. If ``None``, this function does nothing.
timeout : int, optional
Number of seconds to wait for the client to close before timing
out. If the client fails to close within this time, a warning
is logged and the client is marked as closed anyway to allow
cleanup to proceed without hanging. By default, ``600``.
"""
if client is None:
return
try:
client.close(timeout=timeout)
except TimeoutError:
logger.warning(
"Timed out closing Dask client after %s seconds; "
"continuing cleanup",
timeout,
exc_info=True,
)
client.status = "closed"
vars(client)["_Client__loop"] = None