Source code for compass.utilities.parsing
"""COMPASS ordinance parsing utilities"""
import json
import logging
from pathlib import Path
import pyjson5
import numpy as np
from compass.exceptions import COMPASSValueError
logger = logging.getLogger(__name__)
_ORD_CHECK_COLS = ["value", "summary"]
[docs]
def clean_backticks_from_llm_response(content):
"""Remove markdown-style backticks from an LLM response
Parameters
----------
content : str
LLM response that may contain markdown triple backticks.
Returns
-------
str
Response stripped of all leading and trailing backtick markers.
"""
content = content.lstrip().rstrip()
return content.removeprefix("```").lstrip("\n").removesuffix("```")
[docs]
def llm_response_as_json(content):
"""Parse a raw LLM response into JSON-compatible data
Parameters
----------
content : str
Response text expected to contain a JSON object, possibly with
Markdown fences or Python boolean literals.
Returns
-------
dict
Parsed JSON structure. When parsing fails, the function returns
an empty dictionary.
Notes
-----
The parser strips Markdown code fences, coerces Python-style
booleans to lowercase JSON literals, and logs the raw response on
decode failure. The logging includes guidance for increasing token
limits or updating prompts.
"""
content = clean_backticks_from_llm_response(content)
content = content.removeprefix("json").lstrip("\n")
content = content.replace("True", "true").replace("False", "false")
try:
content = json.loads(content)
except json.decoder.JSONDecodeError:
logger.exception(
"LLM returned improperly formatted JSON. "
"This is likely due to the completion running out of tokens. "
"Setting a higher token limit may fix this error. "
"Also ensure you are requesting JSON output in your prompt. "
"JSON returned:\n%s",
content,
)
content = {}
return content
[docs]
def merge_overlapping_texts(text_chunks, n=300):
"""Merge text chunks while trimming overlapping boundaries
Overlap detection compares at most ``n`` characters at each
boundary but never more than half the length of the accumulated
output. Chunks that do not overlap are concatenated with a newline
separator.
Parameters
----------
text_chunks : iterable of str
Iterable containing text chunks which may or may not contain
consecutive overlapping portions.
n : int, optional
Number of characters to check at the beginning of each message
for overlap with the previous message. Will always be reduced to
be less than or equal to half of the length of the previous
chunk. By default, ``300``.
Returns
-------
str
Merged text assembled from the non-overlapping portions.
"""
text_chunks = list(filter(None, text_chunks))
if not text_chunks:
return ""
out_text = text_chunks[0]
for next_text in text_chunks[1:]:
half_chunk_len = len(out_text) // 2
check_len = min(n, half_chunk_len)
next_chunks_start_ind = out_text[half_chunk_len:].find(
next_text[:check_len]
)
if next_chunks_start_ind == -1:
out_text = f"{out_text}\n{next_text}"
continue
next_chunks_start_ind += half_chunk_len
out_text = "".join([out_text[:next_chunks_start_ind], next_text])
return out_text
[docs]
def num_ordinances_in_doc(doc, exclude_features=None):
"""Count the number of ordinance entries on a document
Parameters
----------
doc : elm.web.document.BaseDocument
Document potentially containing ordinances for a jurisdiction.
If no ordinance values are found, this function returns ``0``.
exclude_features : iterable of str, optional
Optional features to exclude from ordinance count.
By default, ``None``.
Returns
-------
int
Number of ordinance rows represented in ``doc``.
"""
if doc is None or doc.attrs.get("ordinance_values") is None:
return 0
return num_ordinances_dataframe(
doc.attrs["ordinance_values"], exclude_features=exclude_features
)
[docs]
def num_ordinances_dataframe(data, exclude_features=None):
"""Count ordinance rows contained in a DataFrame
Parameters
----------
data : pandas.DataFrame
DataFrame potentially containing ordinances for a jurisdiction.
If no ordinance values are found, this function returns ``0``.
exclude_features : iterable of str, optional
Optional features to exclude from ordinance count.
By default, ``None``.
Returns
-------
int
Count of rows meeting the ordinance criteria.
Raises
------
KeyError
If the input DataFrame lacks the ``feature`` column when
``exclude_features`` is provided.
"""
if exclude_features:
mask = ~data["feature"].str.casefold().isin(exclude_features)
data = data[mask].copy()
return ordinances_bool_index(data).sum()
[docs]
def ordinances_bool_index(data):
"""Compute a boolean mask indicating ordinance rows
Parameters
----------
data : pandas.DataFrame
DataFrame potentially containing ordinances for a jurisdiction.
If no ordinance values are found, this function returns ``0``.
Returns
-------
numpy.ndarray
Boolean mask identifying rows that contain ordinance values.
"""
if data is None or data.empty:
return np.array([], dtype=bool)
check_cols = [col for col in _ORD_CHECK_COLS if col in data]
if not check_cols:
return np.array([], dtype=bool)
found_features = (~data[check_cols].isna()).to_numpy().sum(axis=1)
return found_features > 0
[docs]
def load_config(config_fp):
"""Load configuration data from JSON or JSON5 sources
Parameters
----------
config_fp : path-like
Path to config file to open and load.
Returns
-------
dict
Parsed configuration object.
Raises
------
COMPASSValueError
If the file path does not exist or the extension is not
``.json`` or ``.json5``.
Notes
-----
JSON5 enables comments and trailing commas, among other
quality-of-life improvements over vanilla JSON.
"""
config_fp = Path(config_fp)
if not config_fp.exists():
msg = f"Config file does not exist: {config_fp}"
raise COMPASSValueError(msg)
if config_fp.suffix == ".json5":
with config_fp.open(encoding="utf-8") as fh:
return pyjson5.decode_io(fh)
if config_fp.suffix == ".json":
with config_fp.open(encoding="utf-8") as fh:
return json.load(fh)
msg = (
"Got unknown config file extension: "
f"{config_fp.suffix}. Supported extensions are .json5 and .json."
)
raise COMPASSValueError(msg)
def convert_paths_to_strings(obj):
"""[NOT PUBLIC API] Convert all Path instances to strings"""
logger.trace("Converting paths to strings in object: %s", obj)
if isinstance(obj, Path):
return str(obj)
if isinstance(obj, dict):
return {
convert_paths_to_strings(key): convert_paths_to_strings(value)
for key, value in obj.items()
}
if isinstance(obj, list):
return [convert_paths_to_strings(item) for item in obj]
if isinstance(obj, tuple):
return tuple(convert_paths_to_strings(item) for item in obj)
if isinstance(obj, set):
return {convert_paths_to_strings(item) for item in obj}
return obj