Source code for compass.extraction.water.parse

"""Water ordinance structured parsing class"""

import asyncio
import logging

import pandas as pd

from compass.llm.calling import BaseLLMCaller, ChatLLMCaller
from compass.common import run_async_tree, setup_async_decision_tree
from compass.extraction.water.graphs import (
    setup_graph_permits,
    setup_graph_extraction,
    setup_graph_geothermal,
    setup_graph_oil_and_gas,
    setup_graph_limits,
    setup_graph_well_spacing,
    setup_graph_time,
    setup_graph_metering_device,
    setup_graph_drought,
    setup_graph_contingency,
    setup_graph_plugging_reqs,
    setup_graph_external_transfer,
    setup_graph_production_reporting,
    setup_graph_production_cost,
    setup_graph_setback_features,
    setup_graph_redrilling,
)

logger = logging.getLogger(__name__)


DEFAULT_SYSTEM_MESSAGE = (
    "You are a legal expert explaining water rights ordinances to a "
    "geothermal energy developer."
)


[docs] class StructuredWaterParser(BaseLLMCaller): """LLM ordinance document structured data scraping utility""" def __init__(self, wizard, location, **kwargs): """ Parameters ---------- wizard : elm.wizard.EnergyWizard Instance of the EnergyWizard class used for RAG. location : str Name of the groundwater conservation district or county. """ super().__init__(**kwargs) self.location = location self.wizard = wizard def _init_chat_llm_caller(self, system_message): """Initialize a ChatLLMCaller instance for the DecisionTree""" return ChatLLMCaller( self.llm_service, system_message=system_message, usage_tracker=self.usage_tracker, **self.kwargs, )
[docs] async def parse(self): """Parse text and extract structured ordinance data""" values = {"location": self.location} check_map = { "requirements": self._check_reqs, "extraction_requirements": self._check_extraction, "well_spacing": self._check_spacing, "drilling_window": self._check_time, "metering_device": self._check_metering_device, "district_drought_mgmt_plan": self._check_district_drought, "well_drought_mgmt_plan": self._check_well_drought, "plugging_requirements": self._check_plugging, "transfer_requirements": self._check_transfer, "production_reporting": self._check_production_reporting, "production_cost": self._check_production_cost, "setbacks": self._check_setbacks, "redrilling": self._check_redrilling, } tasks = { name: asyncio.create_task(func()) for name, func in check_map.items() } limit_intervals = ["daily", "monthly", "annual"] for interval in limit_intervals: task_name = f"{interval}_limits" tasks[task_name] = asyncio.create_task( self._check_limits(interval) ) logger.debug("Starting value extraction with %d tasks.", len(tasks)) results = await asyncio.gather(*tasks.values(), return_exceptions=True) for key, result in zip(tasks.keys(), results, strict=True): if isinstance(result, Exception): logger.warning("Task %s failed: %s", key, result) values[key] = None else: values[key] = result logger.debug("Value extraction complete.") return pd.DataFrame(values)
async def _check_with_graph( self, graph_setup_func, limit=50, **format_kwargs ): """Generic method to check requirements using a graph setup func Parameters ---------- graph_setup_func : callable Function that returns a graph for the decision tree limit : int, optional Limit for vector DB query, by default 50 **format_kwargs Additional keyword arguments for prompt formatting Returns ------- dict Extracted data as JSON dict """ graph = graph_setup_func() prompt = graph.nodes["init"].get("db_query") format_dict = {"DISTRICT_NAME": self.location} format_dict.update(format_kwargs) prompt = prompt.format(**format_dict) response, _, _ = self.wizard.query_vector_db(prompt, limit=limit) text = response.tolist() all_text = "\n".join(text) tree = setup_async_decision_tree( graph_setup_func, text=all_text, chat_llm_caller=self._init_chat_llm_caller(DEFAULT_SYSTEM_MESSAGE), **(format_kwargs or {}), ) return await run_async_tree(tree) async def _check_reqs(self): """Get the requirements mentioned in the text""" return await self._check_with_graph(setup_graph_permits) async def _check_extraction(self): """Get the extraction requirements mentioned in the text""" return await self._check_with_graph(setup_graph_extraction) async def _check_geothermal(self): """Get the geothermal requirements mentioned in the text""" return await self._check_with_graph(setup_graph_geothermal) async def _check_oil_and_gas(self): """Get the oil and gas requirements mentioned in the text""" return await self._check_with_graph(setup_graph_oil_and_gas) async def _check_limits(self, interval): """Get the extraction limits mentioned in the text""" return await self._check_with_graph( setup_graph_limits, interval=interval ) async def _check_spacing(self): """Get the spacing requirements mentioned in the text""" return await self._check_with_graph(setup_graph_well_spacing) async def _check_time(self): """Get the time requirements mentioned in the text""" return await self._check_with_graph(setup_graph_time) async def _check_metering_device(self): """Get the metering device mentioned in the text""" return await self._check_with_graph(setup_graph_metering_device) async def _check_district_drought(self): """Get the drought management plan mentioned in the text""" return await self._check_with_graph(setup_graph_drought) async def _check_well_drought(self): """Get the well drought management plan mentioned in the text""" return await self._check_with_graph(setup_graph_contingency) async def _check_plugging(self): """Get the plugging requirements mentioned in the text""" return await self._check_with_graph(setup_graph_plugging_reqs) async def _check_transfer(self): """Get the transfer requirements mentioned in the text""" return await self._check_with_graph(setup_graph_external_transfer) async def _check_production_reporting(self): """Get the reporting requirements mentioned in the text""" return await self._check_with_graph(setup_graph_production_reporting) async def _check_production_cost(self): """Get the production cost requirements mentioned in the text""" return await self._check_with_graph(setup_graph_production_cost) async def _check_setbacks(self): """Get the setback requirements mentioned in the text""" return await self._check_with_graph(setup_graph_setback_features) async def _check_redrilling(self): """Get the redrilling requirements mentioned in the text""" return await self._check_with_graph(setup_graph_redrilling)