Source code for compass.extraction.water.plugin
"""COMPASS water rights extraction plugin"""
import logging
import importlib.resources
from pathlib import Path
import pandas as pd
from elm import EnergyWizard
from elm.embed import ChunkAndEmbed
from compass.extraction import extract_date
from compass.plugin import BaseExtractionPlugin, register_plugin
from compass.utilities.enums import LLMTasks
from compass.utilities.parsing import extract_year_from_doc_attrs
from compass.exceptions import COMPASSRuntimeError
from compass.extraction.water.parse import StructuredWaterParser
logger = logging.getLogger(__name__)
WATER_RIGHTS_QUERY_TEMPLATES = [
"{jurisdiction} rules",
"{jurisdiction} management plan",
"{jurisdiction} well permits",
"{jurisdiction} well permit requirements",
"requirements to drill a water well in {jurisdiction}",
]
BEST_WATER_RIGHTS_ORDINANCE_WEBSITE_URL_KEYWORDS = {
"pdf": 92160,
"water": 46080,
"rights": 23040,
"zoning": 11520,
"ordinance": 5760,
r"renewable%20energy": 1440,
r"renewable+energy": 1440,
"renewable energy": 1440,
"planning": 720,
"plan": 360,
"government": 180,
"code": 60,
"area": 60,
r"land%20development": 15,
r"land+development": 15,
"land development": 15,
"land": 3,
"environment": 3,
"energy": 3,
"renewable": 3,
"municipal": 1,
"department": 1,
}
[docs]
class WaterRightsHeuristic:
"""NoOp heuristic check"""
[docs]
def check(self, *__, **___): # noqa: PLR6301
"""Always return ``True`` for water rights documents"""
return True
[docs]
class TexasWaterRightsExtractor(BaseExtractionPlugin):
"""COMPASS solar extraction plugin"""
IDENTIFIER = "tx water rights"
"""str: Identifier for extraction task """
JURISDICTION_DATA_FP = (
importlib.resources.files("compass")
/ "data"
/ "tx_water_districts.csv"
)
""":term:`path-like <path-like object>`: Path to Texas GCW names"""
[docs]
async def get_query_templates(self): # noqa: PLR6301
"""Get a list of search engine query templates for extraction
Query templates can contain the placeholder ``{jurisdiction}``
which will be replaced with the full jurisdiction name during
the search engine query.
"""
return WATER_RIGHTS_QUERY_TEMPLATES
[docs]
async def get_website_keywords(self): # noqa: PLR6301
"""Get a dict of website search keyword scores
Dictionary mapping keywords to scores that indicate links which
should be prioritized when performing a website scrape for a
document.
"""
return BEST_WATER_RIGHTS_ORDINANCE_WEBSITE_URL_KEYWORDS
[docs]
async def get_heuristic(self): # noqa: PLR6301
"""Get a `BaseHeuristic` instance with a `check()` method
The ``check()`` method should accept a string of text and return
``True`` if the text passes the heuristic check and ``False``
otherwise.
"""
return WaterRightsHeuristic()
[docs]
async def filter_docs(
self,
extraction_context,
need_jurisdiction_verification=True, # noqa: ARG002
):
"""Filter down candidate documents before parsing
Parameters
----------
extraction_context : ExtractionContext
Context containing candidate documents to be filtered.
Set the ``.documents`` attribute of this object to be the
iterable of documents that should be kept for parsing.
need_jurisdiction_verification : bool, optional
Whether to verify that documents pertain to the correct
jurisdiction. By default, ``True``.
Returns
-------
ExtractionContext
Context with filtered down documents.
"""
model_config = self.model_configs.get(
LLMTasks.EMBEDDING, self.model_configs[LLMTasks.DEFAULT]
)
_setup_endpoints(model_config)
corpus = []
for ind, doc in enumerate(extraction_context, start=1):
url = doc.attrs.get("source", "unknown source")
logger.info("Embedding %r", url)
obj = ChunkAndEmbed(
doc.text,
model=model_config.name,
tokens_per_chunk=model_config.text_splitter_chunk_size,
overlap=model_config.text_splitter_chunk_overlap,
split_on="\n",
)
try:
embeddings = await obj.run_async(
rate_limit=model_config.llm_service_rate_limit
)
if any(e is None for e in embeddings):
msg = (
"Embeddings are ``None`` when building corpus for "
"water rights extraction!"
)
raise COMPASSRuntimeError(msg) # noqa: TRY301
corpus.append(
pd.DataFrame(
{
"text": obj.text_chunks.chunks,
"embedding": embeddings,
}
)
)
except Exception as e: # noqa: BLE001
logger.info("could not embed %r with error: %s", url, e)
continue
date_model_config = self.model_configs.get(
LLMTasks.DATE_EXTRACTION, self.model_configs[LLMTasks.DEFAULT]
)
await extract_date(doc, date_model_config, self.usage_tracker)
await extraction_context.mark_doc_as_data_source(
doc, out_fn_stem=f"{self.jurisdiction.full_name} {ind}"
)
if len(corpus) == 0:
logger.info(
"No documents returned for %s, skipping",
self.jurisdiction.full_name,
)
return None
extraction_context.attrs["corpus"] = pd.concat(corpus)
return extraction_context
[docs]
async def parse_docs_for_structured_data(self, extraction_context):
"""Parse documents to extract structured data/information
Parameters
----------
extraction_context : ExtractionContext
Context containing candidate documents to parse.
Returns
-------
ExtractionContext or None
Context with extracted data/information stored in the
``.attrs`` dictionary, or ``None`` if no data was extracted.
"""
model_config = self.model_configs.get(
LLMTasks.DATA_EXTRACTION, self.model_configs[LLMTasks.DEFAULT]
)
logger.debug("Building energy wizard")
wizard = EnergyWizard(
extraction_context.attrs["corpus"],
model=model_config.name,
)
logger.debug("Calling parser class")
parser = StructuredWaterParser(
wizard=wizard,
location=self.jurisdiction.full_name,
llm_service=model_config.llm_service,
usage_tracker=self.usage_tracker,
**model_config.llm_call_kwargs,
)
data_df = await parser.parse()
data_df = _set_data_year(data_df, extraction_context)
data_df = _set_data_sources(data_df, extraction_context)
extraction_context.attrs["structured_data"] = data_df
extraction_context.attrs["out_data_fn"] = (
f"{self.jurisdiction.full_name} Water Rights.csv"
)
return extraction_context
[docs]
@classmethod
def save_structured_data(cls, doc_infos, out_dir):
"""Write extracted water rights data to disk
Parameters
----------
doc_infos : list of dict
List of dictionaries containing the following keys:
- "jurisdiction": An initialized Jurisdiction object
representing the jurisdiction that was extracted.
- "ord_db_fp": A path to the extracted structured data
stored on disk, or ``None`` if no data was extracted.
out_dir : path-like
Path to the output directory for the data.
Returns
-------
int
Number of unique water rights districts that information was
found/written for.
"""
db = []
for doc_info in doc_infos:
ord_db = pd.read_csv(doc_info["ord_db_fp"])
if len(ord_db) == 0:
continue
jurisdiction = doc_info["jurisdiction"]
ord_db["WCD_ID"] = jurisdiction.code
ord_db["county"] = jurisdiction.county
ord_db["state"] = jurisdiction.state
ord_db["subdivision"] = jurisdiction.subdivision_name
ord_db["jurisdiction_type"] = jurisdiction.type
db.append(ord_db)
if not db:
return 0
db = pd.concat([df.dropna(axis=1, how="all") for df in db], axis=0)
db.to_csv(Path(out_dir) / "water_rights.csv", index=False)
return len(db["WCD_ID"].unique())
def _set_data_year(data_df, extraction_context):
"""Set the ordinance year column in the data DataFrame"""
years = list(
filter(
None,
[
extract_year_from_doc_attrs(doc.attrs)
for doc in extraction_context
],
)
)
# TODO: is `max` the right one to use here?
data_df["year"] = max(years) if years else None
return data_df
def _set_data_sources(data_df, extraction_context):
"""Set the source column in the data DataFrame"""
sources = filter(
None, [doc.attrs.get("source") for doc in extraction_context]
)
data_df["source"] = " ;\n".join(sources) or None
return data_df
def _setup_endpoints(embedding_model_config):
"""Set proper URLS for elm classes"""
ChunkAndEmbed.USE_CLIENT_EMBEDDINGS = True
EnergyWizard.USE_CLIENT_EMBEDDINGS = True
ChunkAndEmbed.EMBEDDING_MODEL = EnergyWizard.EMBEDDING_MODEL = (
embedding_model_config.name
)
endpoint = embedding_model_config.client_kwargs["azure_endpoint"]
ChunkAndEmbed.EMBEDDING_URL = endpoint
ChunkAndEmbed.URL = endpoint
EnergyWizard.EMBEDDING_URL = endpoint
EnergyWizard.URL = "openai.azure.com" # need to trigger Azure setup
register_plugin(TexasWaterRightsExtractor)