"""Ordinance full processing logic"""
import time
import json
import asyncio
import logging
from copy import deepcopy
from functools import cached_property
from contextlib import AsyncExitStack, contextmanager
from datetime import datetime, UTC
from elm.web.utilities import get_redirected_url
from compass.plugin import PLUGIN_REGISTRY
from compass.extraction.context import ExtractionContext
from compass.scripts.download import (
find_jurisdiction_website,
download_known_urls,
load_known_docs,
download_jurisdiction_ordinance_using_search_engine,
download_jurisdiction_ordinances_from_website,
download_jurisdiction_ordinances_from_website_compass_crawl,
)
from compass.exceptions import COMPASSValueError, COMPASSError
from compass.validation.location import JurisdictionWebsiteValidator
from compass.llm import OpenAIConfig
from compass.services.cpu import (
PDFLoader,
OCRPDFLoader,
read_pdf_doc,
read_pdf_doc_ocr,
read_pdf_file,
read_pdf_file_ocr,
)
from compass.services.usage import UsageTracker
from compass.services.openai import usage_from_response
from compass.services.provider import RunningAsyncServices
from compass.services.threaded import (
TempFileCachePB,
TempFileCache,
FileMover,
CleanedFileWriter,
OrdDBFileWriter,
UsageUpdater,
JurisdictionUpdater,
HTMLFileLoader,
read_html_file,
)
from compass.utilities import (
LLM_COST_REGISTRY,
compile_run_summary_message,
load_all_jurisdiction_info,
load_jurisdictions_from_fp,
save_run_meta,
Directories,
ProcessKwargs,
compute_total_cost_from_usage,
)
from compass.utilities.enums import LLMTasks
from compass.utilities.jurisdictions import jurisdictions_from_df
from compass.utilities.logs import (
LocationFileLog,
LogListener,
NoLocationFilter,
log_versions,
)
from compass.utilities.base import WebSearchParams
from compass.utilities.io import load_config
from compass.utilities.parsing import convert_paths_to_strings
from compass.pb import COMPASS_PB
logger = logging.getLogger(__name__)
MAX_CONCURRENT_SEARCH_ENGINE_QUERIES = 10
[docs]
async def process_jurisdictions_with_openai( # noqa: PLR0917, PLR0913
out_dir,
tech,
jurisdiction_fp,
model="gpt-4o-mini",
num_urls_to_check_per_jurisdiction=5,
max_num_concurrent_browsers=10,
max_num_concurrent_website_searches=10,
max_num_concurrent_jurisdictions=25,
url_ignore_substrings=None,
known_local_docs=None,
known_doc_urls=None,
file_loader_kwargs=None,
search_engines=None,
pytesseract_exe_fp=None,
td_kwargs=None,
tpe_kwargs=None,
ppe_kwargs=None,
log_dir=None,
clean_dir=None,
ordinance_file_dir=None,
jurisdiction_dbs_dir=None,
perform_se_search=True,
perform_website_search=True,
llm_costs=None,
log_level="INFO",
keep_async_logs=False,
):
"""Extract ordinances for one or more jurisdiction(s)
This function scrapes ordinance documents (PDFs or HTML text) for a
given set of jurisdictions and processes them using one or more
LLM models. Output files, logs, and intermediate artifacts are
stored in configurable directories.
The processing has a well-defined order:
1. Process any/all known local documents
2. Process any/all known document URLs
3. Search engine-based search for ordinance documents
4. Jurisdiction website crawl-based search for ordinance
documents
Users can disable any of these steps via inputs to this function. If
any step returns a document with extractable ordinance information,
subsequent steps are skipped for that jurisdiction.
Parameters
----------
out_dir : path-like
Path to the output directory. If it does not exist, it will be
created. This directory will contain the structured ordinance
CSV file, all downloaded ordinance documents (PDFs and HTML),
usage metadata, and default subdirectories for logs and
intermediate outputs (unless otherwise specified).
tech : str
Label indicating which technology type is being processed. Must
be one of the keys of
:obj:`~compass.plugin.registry.PLUGIN_REGISTRY`.
jurisdiction_fp : path-like
Path to a CSV file specifying the jurisdictions to process.
The CSV must contain at least two columns: "County" and "State",
which specify the county and state names, respectively. If you
would like to process a subdivision with a county, you must also
include "Subdivision" and "Jurisdiction Type" columns. The
"Subdivision" should be the name of the subdivision, and the
"Jurisdiction Type" should be a string identifying the type of
subdivision (e.g., "City", "Township", etc.)
model : str or list of dict, optional
LLM model(s) to use for scraping and parsing ordinance
documents. If a string is provided, it is assumed to be the name
of the default model (e.g., "gpt-4o"), and environment variables
are used for authentication.
If a list is provided, it should contain dictionaries of
arguments that can initialize instances of
:class:`~compass.llm.config.OpenAIConfig`. Each dictionary can
specify the model name, client type, and initialization
arguments.
Each dictionary must also include a ``tasks`` key, which maps to
a string or list of strings indicating the tasks that instance
should handle. Exactly one of the instances **must** include
"default" as a task, which will be used when no specific task is
matched. For example::
"model": [
{
"model": "gpt-4o-mini",
"llm_call_kwargs": {
"temperature": 0,
"timeout": 300,
},
"client_kwargs": {
"api_key": "<your_api_key>",
"api_version": "<your_api_version>",
"azure_endpoint": "<your_azure_endpoint>",
},
"tasks": ["default", "date_extraction"],
},
{
"model": "gpt-4o",
"client_type": "openai",
"tasks": ["ordinance_text_extraction"],
}
]
.. IMPORTANT::
You will need to ensure that the model name used here
matches your deployment if you are using Azure OpenAI. For
example, if you deployed the GPT-4o-mini model under the
name ``"gpt-4o-mini-2025-04-11"``, you would want to set
``"model": "gpt-4o-mini-2025-04-11"``.
By default, ``"gpt-4o"``.
num_urls_to_check_per_jurisdiction : int, optional
Number of unique Google search result URLs to check for each
jurisdiction when attempting to locate ordinance documents.
By default, ``5``.
max_num_concurrent_browsers : int, optional
Maximum number of browser instances to launch concurrently for
retrieving information from the web. Increasing this value too
much may lead to timeouts or performance issues on machines with
limited resources. By default, ``10``.
max_num_concurrent_website_searches : int, optional
Maximum number of website searches allowed to run
simultaneously. Increasing this value can speed up searches, but
may lead to timeouts or performance issues on machines with
limited resources. By default, ``10``.
max_num_concurrent_jurisdictions : int, default=25
Maximum number of jurisdictions to process concurrently.
Limiting this can help manage memory usage when dealing with a
large number of documents. By default ``25``.
url_ignore_substrings : list of str, optional
A list of substrings that, if found in any URL, will cause the
URL to be excluded from consideration. This can be used to
specify particular websites or entire domains to ignore. For
example::
url_ignore_substrings = [
"wikipedia",
"nlr.gov",
"www.co.delaware.in.us/documents/1649699794_0382.pdf",
]
The above configuration would ignore all `wikipedia` articles,
all websites on the NLR domain, and the specific file located
at `www.co.delaware.in.us/documents/1649699794_0382.pdf`.
By default, ``None``.
known_local_docs : dict or path-like, optional
A dictionary where keys are the jurisdiction codes (as strings)
and values are lists of dictionaries containing information
about each document. The latter dictionaries should contain at
least the key ``"source_fp"`` pointing to the **full** path of
the local document file. All other keys will be added as
attributes to the loaded document instance. You can include the
key ``"check_if_legal_doc"`` to manually enable/disable the
legal document check for known documents. Similarly, you can
provide the ``"date"`` key, which is a list of
``[year, month, day]``, some or all of which can be null, to
skip the date extraction step of the processing pipeline. If
this input is provided, local documents will be checked first.
See the top-level documentation of this function for the full
processing of the pipeline. This input can also be a path to a
JSON file containing the dictionary of code-to-document-info
mappings. By default, ``None``.
known_doc_urls : dict or path-like, optional
A dictionary where keys are the jurisdiction codes (as strings)
and values are lists of dictionaries containing information
about each document. The latter dictionaries should contain at
least the key ``"source"`` representing the known URL to check
for that document. All other keys will be added as attributes
to the loaded document instance. You can include the key
``"check_if_legal_doc"`` to manually enable/disable the legal
document check for documents at known URLs. Similarly, you can
provide the ``"date"`` key, which is a list of
``[year, month, day]``, some or all of which can be null, to
skip the date extraction step of the processing pipeline. If
this input is provided, the known URLs will be checked before
applying the search engine search. See the top-level
documentation of this function for the full processing order of
the pipeline. This input can also be a path to a JSON file
containing the dictionary of code-to-document-info mappings.
.. Note:: The same input can be used for both `known_local_docs`
and `known_doc_urls` as long as both ``"source_fp"``
and ``"source"`` keys are provided in each document
info dictionary.
By default, ``None``.
file_loader_kwargs : dict, optional
Dictionary of keyword arguments pairs to initialize
:class:`elm.web.file_loader.AsyncWebFileLoader`. If found, the
"pw_launch_kwargs" key in these will also be used to initialize
the :class:`elm.web.search.google.PlaywrightGoogleLinkSearch`
used for the google URL search. By default, ``None``.
search_engines : list, optional
A list of dictionaries, where each dictionary contains
information about a search engine class that should be used for
the document retrieval process. Each dictionary should contain
at least the key ``"se_name"``, which should correspond to one
of the search engine class names from
:obj:`elm.web.search.run.SEARCH_ENGINE_OPTIONS`. The rest of the
keys in the dictionary should contain keyword-value pairs to be
used as parameters to initialize the search engine class (things
like API keys and configuration options; see the ELM
documentation for details on search engine class parameters).
The list should be ordered by search engine preference - the
first search engine parameters will be used to submit the
queries initially, then any subsequent search engine listings
will be used as fallback (in order that they appear). Do not
repeat search engines - only the last config dictionary will be
used to initialize the search engine if you do. If ``None``,
then all default configurations for the search engines
(along with the fallback order) are used. By default, ``None``.
pytesseract_exe_fp : path-like, optional
Path to the `pytesseract` executable. If specified, OCR will be
used to extract text from scanned PDFs using Google's Tesseract.
By default ``None``.
td_kwargs : dict, optional
Additional keyword arguments to pass to
:class:`tempfile.TemporaryDirectory`. The temporary directory is
used to store documents which have not yet been confirmed to
contain relevant information. By default, ``None``.
tpe_kwargs : dict, optional
Additional keyword arguments to pass to
:class:`concurrent.futures.ThreadPoolExecutor`, used for
I/O-bound tasks such as logging. By default, ``None``.
ppe_kwargs : dict, optional
Additional keyword arguments to pass to
:class:`concurrent.futures.ProcessPoolExecutor`, used for
CPU-bound tasks such as PDF loading and parsing.
By default, ``None``.
log_dir : path-like, optional
Path to the directory for storing log files. If not provided, a
``logs`` subdirectory will be created inside `out_dir`.
By default, ``None``.
clean_dir : path-like, optional
Path to the directory for storing cleaned ordinance text output.
If not provided, a ``cleaned_text`` subdirectory will be created
inside `out_dir`. By default, ``None``.
ordinance_file_dir : path-like, optional
Path to the directory where downloaded ordinance files (PDFs or
HTML) for each jurisdiction are stored. If not provided, a
``ordinance_files`` subdirectory will be created inside
`out_dir`. By default, ``None``.
jurisdiction_dbs_dir : path-like, optional
Path to the directory where parsed ordinance database files are
stored for each jurisdiction. If not provided, a
``jurisdiction_dbs`` subdirectory will be created inside
`out_dir`. By default, ``None``.
perform_se_search : bool, default=True
Option to perform a search engine-based search for ordinance
documents. This is the standard way to collect ordinance
documents, and it is recommended to leave this set to ``True``
unless you are re-processing local documents. If ``True``, the
search engine approach is used to locate ordinance documents
before falling back to a website crawl-based search (if that has
been selected). By default, ``True``.
perform_website_search : bool, default=True
Option to fallback to a jurisdiction website crawl-based search
for ordinance documents if the search engine approach fails to
recover any relevant documents. By default, ``True``.
llm_costs : dict, optional
Dictionary mapping model names to their token costs, used to
track the estimated total cost of LLM usage during the run. The
structure should be::
{"model_name": {"prompt": float, "response": float}}
Costs are specified in dollars per million tokens. For example::
"llm_costs": {"my_gpt": {"prompt": 1.5, "response": 3.7}}
registers a model named `"my_gpt"` with a cost of $1.5 per
million input (prompt) tokens and $3.7 per million output
(response) tokens for the current processing run.
.. NOTE::
The displayed total cost does not track cached tokens, so
treat it like an estimate. Your final API costs may vary.
If set to ``None``, no custom model costs are recorded, and
cost tracking may be unavailable in the progress bar.
By default, ``None``.
log_level : str, optional
Logging level for ordinance scraping and parsing (e.g., "TRACE",
"DEBUG", "INFO", "WARNING", or "ERROR"). By default, ``"INFO"``.
keep_async_logs : bool, default=False
Option to store the full asynchronous log record to a file. This
is only useful if you intend to monitor overall processing
progress from a file instead of from the terminal. If ``True``,
all of the unordered records are written to a "all.log" file in
the `log_dir` directory. By default, ``False``.
Returns
-------
str
Message summarizing run results, including total processing
time, total cost, output directory, and number of documents
found. The message is formatted for easy reading in the terminal
and may include color-coded cost information if the terminal
supports it.
"""
called_args = locals()
if log_level == "DEBUG":
log_level = "DEBUG_TO_FILE"
log_listener = LogListener(["compass", "elm"], level=log_level)
LLM_COST_REGISTRY.update(llm_costs or {})
dirs = _setup_folders(
out_dir,
log_dir=log_dir,
clean_dir=clean_dir,
ofd=ordinance_file_dir,
jdd=jurisdiction_dbs_dir,
)
async with log_listener as ll:
_setup_main_logging(dirs.logs, log_level, ll, keep_async_logs)
steps = _check_enabled_steps(
known_local_docs=known_local_docs,
known_doc_urls=known_doc_urls,
perform_se_search=perform_se_search,
perform_website_search=perform_website_search,
)
_log_exec_info(called_args, steps)
try:
pk = ProcessKwargs(
known_local_docs,
known_doc_urls,
file_loader_kwargs,
td_kwargs,
tpe_kwargs,
ppe_kwargs,
max_num_concurrent_jurisdictions,
)
wsp = WebSearchParams(
num_urls_to_check_per_jurisdiction,
max_num_concurrent_browsers,
max_num_concurrent_website_searches,
url_ignore_substrings,
pytesseract_exe_fp,
search_engines,
)
models = _initialize_model_params(model)
runner = _COMPASSRunner(
dirs=dirs,
log_listener=log_listener,
tech=tech,
models=models,
web_search_params=wsp,
process_kwargs=pk,
perform_se_search=perform_se_search,
perform_website_search=perform_website_search,
log_level=log_level,
)
return await runner.run(jurisdiction_fp)
except COMPASSError:
raise
except Exception:
logger.exception("Fatal error during processing")
raise
class _COMPASSRunner:
"""Helper class to run COMPASS"""
def __init__(
self,
dirs,
log_listener,
tech,
models,
web_search_params=None,
process_kwargs=None,
perform_se_search=True,
perform_website_search=True,
log_level="INFO",
):
self.dirs = dirs
self.log_listener = log_listener
self.tech = tech
self.models = models
self.web_search_params = web_search_params or WebSearchParams()
self.process_kwargs = process_kwargs or ProcessKwargs()
self.perform_se_search = perform_se_search
self.perform_website_search = perform_website_search
self.log_level = log_level
@cached_property
def browser_semaphore(self):
"""asyncio.Semaphore or None: Browser concurrency limiter"""
return (
asyncio.Semaphore(
self.web_search_params.max_num_concurrent_browsers
)
if self.web_search_params.max_num_concurrent_browsers
else None
)
@cached_property
def crawl_semaphore(self):
"""asyncio.Semaphore or None: Concurrency limiter for crawls"""
return (
asyncio.Semaphore(
self.web_search_params.max_num_concurrent_website_searches
)
if self.web_search_params.max_num_concurrent_website_searches
else None
)
@cached_property
def search_engine_semaphore(self):
"""asyncio.Semaphore: Concurrency limiter for search queries"""
return asyncio.Semaphore(MAX_CONCURRENT_SEARCH_ENGINE_QUERIES)
@cached_property
def _jurisdiction_semaphore(self):
"""asyncio.Semaphore or None: Sem to limit # of processes"""
return (
asyncio.Semaphore(
self.process_kwargs.max_num_concurrent_jurisdictions
)
if self.process_kwargs.max_num_concurrent_jurisdictions
else None
)
@property
def jurisdiction_semaphore(self):
"""asyncio.Semaphore or AsyncExitStack: Jurisdiction context"""
if self._jurisdiction_semaphore is None:
return AsyncExitStack()
return self._jurisdiction_semaphore
@cached_property
def file_loader_kwargs(self):
"""dict: Keyword arguments for ``AsyncWebFileLoader``"""
file_loader_kwargs = _configure_file_loader_kwargs(
self.process_kwargs.file_loader_kwargs
)
if self.web_search_params.pytesseract_exe_fp is not None:
_setup_pytesseract(self.web_search_params.pytesseract_exe_fp)
file_loader_kwargs.update(
{"pdf_ocr_read_coroutine": read_pdf_doc_ocr}
)
return file_loader_kwargs
@cached_property
def local_file_loader_kwargs(self):
"""dict: Keyword arguments for ``AsyncLocalFileLoader``"""
file_loader_kwargs = {
"pdf_read_coroutine": read_pdf_file,
"html_read_coroutine": read_html_file,
"pdf_read_kwargs": (
self.file_loader_kwargs.get("pdf_read_kwargs")
),
"html_read_kwargs": (
self.file_loader_kwargs.get("html_read_kwargs")
),
}
if self.web_search_params.pytesseract_exe_fp is not None:
_setup_pytesseract(self.web_search_params.pytesseract_exe_fp)
file_loader_kwargs.update(
{"pdf_ocr_read_coroutine": read_pdf_file_ocr}
)
return file_loader_kwargs
@cached_property
def known_local_docs(self):
"""dict: Known filepaths keyed by jurisdiction code"""
known_local_docs = self.process_kwargs.known_local_docs or {}
if isinstance(known_local_docs, str):
known_local_docs = load_config(known_local_docs)
return {int(key): val for key, val in known_local_docs.items()}
@cached_property
def known_doc_urls(self):
"""dict: Known URLs keyed by jurisdiction code"""
known_doc_urls = self.process_kwargs.known_doc_urls or {}
if isinstance(known_doc_urls, str):
known_doc_urls = load_config(known_doc_urls)
return {int(key): val for key, val in known_doc_urls.items()}
@cached_property
def tpe_kwargs(self):
"""dict: Keyword arguments for ``ThreadPoolExecutor``"""
return _configure_thread_pool_kwargs(self.process_kwargs.tpe_kwargs)
@cached_property
def extractor_class(self):
"""obj: Extractor class for the specified technology"""
if self.tech.casefold() not in PLUGIN_REGISTRY:
msg = f"Unknown tech input: {self.tech}"
raise COMPASSValueError(msg)
return PLUGIN_REGISTRY[self.tech.casefold()]
@cached_property
def _base_services(self):
"""list: Services required to support jurisdiction processing"""
base_services = [
TempFileCachePB(
td_kwargs=self.process_kwargs.td_kwargs,
tpe_kwargs=self.tpe_kwargs,
),
TempFileCache(
td_kwargs=self.process_kwargs.td_kwargs,
tpe_kwargs=self.tpe_kwargs,
),
FileMover(self.dirs.ordinance_files, tpe_kwargs=self.tpe_kwargs),
CleanedFileWriter(
self.dirs.clean_files, tpe_kwargs=self.tpe_kwargs
),
OrdDBFileWriter(
self.dirs.jurisdiction_dbs, tpe_kwargs=self.tpe_kwargs
),
UsageUpdater(
self.dirs.out / "usage.json", tpe_kwargs=self.tpe_kwargs
),
JurisdictionUpdater(
self.dirs.out / "jurisdictions.json",
tpe_kwargs=self.tpe_kwargs,
),
PDFLoader(**(self.process_kwargs.ppe_kwargs or {})),
HTMLFileLoader(**self.tpe_kwargs),
]
if self.web_search_params.pytesseract_exe_fp is not None:
base_services.append(
# pytesseract locks up with multiple processes, so
# hardcode to only use 1 for now
OCRPDFLoader(max_workers=1),
)
return base_services
async def run(self, jurisdiction_fp):
"""Run COMPASS for a set of jurisdictions
Parameters
----------
jurisdiction_fp : path-like
Path to CSV file containing the jurisdictions to search.
Returns
-------
str
Message summarizing run results, including total processing
time, total cost, output directory, and number of documents
found. The message is formatted for easy reading in the
terminal and may include color-coded cost information if
the terminal supports it.
"""
jurisdictions_df = _load_jurisdictions_to_process(jurisdiction_fp)
num_jurisdictions = len(jurisdictions_df)
COMPASS_PB.create_main_task(num_jurisdictions=num_jurisdictions)
start_date = datetime.now(UTC)
doc_infos, total_cost = await self._run_all(jurisdictions_df)
doc_infos = [
di
for di in doc_infos
if di is not None and di.get("ord_db_fp") is not None
]
if doc_infos:
num_docs_found = self.extractor_class.save_structured_data(
doc_infos, self.dirs.out
)
else:
num_docs_found = 0
total_time = save_run_meta(
self.dirs,
self.tech,
start_date=start_date,
end_date=datetime.now(UTC),
num_jurisdictions_searched=num_jurisdictions,
num_jurisdictions_found=num_docs_found,
total_cost=total_cost,
models=self.models,
)
run_msg = compile_run_summary_message(
total_seconds=total_time,
total_cost=total_cost,
out_dir=self.dirs.out,
document_count=num_docs_found,
)
for sub_msg in run_msg.split("\n"):
logger.info(
sub_msg.replace("[#71906e]", "").replace("[/#71906e]", "")
)
return run_msg
async def _run_all(self, jurisdictions_df):
"""Process all jurisdictions while required services run"""
services = [model.llm_service for model in set(self.models.values())]
services += self._base_services
_ = self.file_loader_kwargs # init loader kwargs once
_ = self.local_file_loader_kwargs # init local loader kwargs once
logger.info("Processing %d jurisdiction(s)", len(jurisdictions_df))
async with RunningAsyncServices(services):
tasks = []
for jurisdiction in jurisdictions_from_df(jurisdictions_df):
usage_tracker = UsageTracker(
jurisdiction.full_name, usage_from_response
)
task = asyncio.create_task(
self._processed_jurisdiction_info_with_pb(
jurisdiction,
self.known_local_docs.get(jurisdiction.code),
self.known_doc_urls.get(jurisdiction.code),
usage_tracker=usage_tracker,
),
name=jurisdiction.full_name,
)
tasks.append(task)
doc_infos = await asyncio.gather(*tasks)
total_cost = await _compute_total_cost()
return doc_infos, total_cost
async def _processed_jurisdiction_info_with_pb(
self, jurisdiction, *args, **kwargs
):
"""Process a jurisdiction while updating the progress bar"""
async with self.jurisdiction_semaphore:
with COMPASS_PB.jurisdiction_prog_bar(jurisdiction.full_name):
return await self._processed_jurisdiction_info(
jurisdiction, *args, **kwargs
)
async def _processed_jurisdiction_info(
self, jurisdiction, *args, **kwargs
):
"""Convert processed document to minimal metadata"""
extraction_context = await self._process_jurisdiction_with_logging(
jurisdiction, *args, **kwargs
)
if extraction_context is None or isinstance(
extraction_context, Exception
):
return None
doc_info = {
"jurisdiction": jurisdiction,
"ord_db_fp": extraction_context.attrs.get("ord_db_fp"),
}
logger.debug("Saving the following doc info:\n%s", doc_info)
return doc_info
async def _process_jurisdiction_with_logging(
self,
jurisdiction,
known_local_docs=None,
known_doc_urls=None,
usage_tracker=None,
):
"""Retrieve ordinance document with location-scoped logging"""
async with LocationFileLog(
self.log_listener,
self.dirs.logs,
location=jurisdiction.full_name,
level=self.log_level,
):
task = asyncio.create_task(
_SingleJurisdictionRunner(
self.extractor_class(
jurisdiction=jurisdiction,
model_configs=self.models,
usage_tracker=usage_tracker,
),
jurisdiction,
self.models,
self.web_search_params,
self.file_loader_kwargs,
local_file_loader_kwargs=self.local_file_loader_kwargs,
known_local_docs=known_local_docs,
known_doc_urls=known_doc_urls,
browser_semaphore=self.browser_semaphore,
crawl_semaphore=self.crawl_semaphore,
search_engine_semaphore=self.search_engine_semaphore,
perform_se_search=self.perform_se_search,
perform_website_search=self.perform_website_search,
usage_tracker=usage_tracker,
).run(),
name=jurisdiction.full_name,
)
try:
extraction_context, *__ = await asyncio.gather(task)
except KeyboardInterrupt:
raise
except Exception as e:
msg = "Encountered error of type %r while processing %s:"
err_type = type(e)
logger.exception(msg, err_type, jurisdiction.full_name)
extraction_context = None
return extraction_context
class _SingleJurisdictionRunner:
"""Helper class to process a single jurisdiction"""
def __init__( # noqa: PLR0913
self,
extractor,
jurisdiction,
models,
web_search_params,
file_loader_kwargs,
*,
local_file_loader_kwargs=None,
known_local_docs=None,
known_doc_urls=None,
browser_semaphore=None,
crawl_semaphore=None,
search_engine_semaphore=None,
perform_se_search=True,
perform_website_search=True,
usage_tracker=None,
):
self.extractor = extractor
self.jurisdiction = jurisdiction
self.models = models
self.web_search_params = web_search_params
self.file_loader_kwargs = file_loader_kwargs
self.local_file_loader_kwargs = local_file_loader_kwargs
self.known_local_docs = known_local_docs
self.known_doc_urls = known_doc_urls
self.browser_semaphore = browser_semaphore
self.crawl_semaphore = crawl_semaphore
self.search_engine_semaphore = search_engine_semaphore
self.usage_tracker = usage_tracker
self.perform_se_search = perform_se_search
self.perform_website_search = perform_website_search
self.jurisdiction_website = jurisdiction.website_url
self.validate_user_website_input = True
self._jsp = None
@cached_property
def file_loader_kwargs_no_ocr(self):
"""dict: Keyword arguments for `AsyncWebFileLoader` (no OCR)"""
flk = deepcopy(self.file_loader_kwargs)
flk.pop("pdf_ocr_read_coroutine", None)
return flk
@contextmanager
def _tracked_progress(self):
"""Context manager to set up jurisdiction sub-progress bar"""
loc = self.jurisdiction.full_name
with COMPASS_PB.jurisdiction_sub_prog(loc) as self._jsp:
yield
self._jsp = None
async def run(self):
"""Download and parse ordinances for a single jurisdiction
Returns
-------
BaseDocument or None
Document containing ordinance information, or ``None`` when
no valid ordinance content was identified.
"""
start_time = time.monotonic()
extraction_context = None
logger.info(
"Kicking off processing for jurisdiction: %s",
self.jurisdiction.full_name,
)
try:
extraction_context = await self._run()
finally:
await self.extractor.record_usage()
await _record_jurisdiction_info(
self.jurisdiction,
extraction_context,
start_time,
self.usage_tracker,
)
logger.info(
"Completed processing for jurisdiction: %s",
self.jurisdiction.full_name,
)
return extraction_context
async def _run(self):
"""Search for documents and parse them for ordinances"""
if self.known_local_docs:
logger.debug(
"Checking local docs for jurisdiction: %s",
self.jurisdiction.full_name,
)
extraction_context = await self._try_find_ordinances(
method=self._load_known_local_documents,
)
if extraction_context is not None:
return extraction_context
else:
logger.debug(
"%r processing had no known local docs configured",
self.jurisdiction.full_name,
)
if self.known_doc_urls:
logger.debug(
"Checking known URLs for jurisdiction: %s",
self.jurisdiction.full_name,
)
extraction_context = await self._try_find_ordinances(
method=self._download_known_url_documents,
)
if extraction_context is not None:
return extraction_context
else:
logger.debug(
"%r processing had no known URLs configured",
self.jurisdiction.full_name,
)
if self.perform_se_search:
logger.debug(
"Collecting documents using a search engine for "
"jurisdiction: %s",
self.jurisdiction.full_name,
)
extraction_context = await self._try_find_ordinances(
method=self._find_documents_using_search_engine,
)
if extraction_context is not None:
return extraction_context
else:
logger.debug(
"%r processing didn't have SE search enabled",
self.jurisdiction.full_name,
)
if self.perform_website_search:
logger.debug(
"Collecting documents from the jurisdiction website for: %s",
self.jurisdiction.full_name,
)
extraction_context = await self._try_find_ordinances(
method=self._find_documents_from_website,
)
if extraction_context is not None:
return extraction_context
else:
logger.debug(
"%r processing didn't have jurisdiction website search "
"enabled",
self.jurisdiction.full_name,
)
return None
async def _try_find_ordinances(self, method, *args, **kwargs):
"""Execute a retrieval method and parse resulting documents"""
extraction_context = await method(*args, **kwargs)
if extraction_context is None:
return None
COMPASS_PB.update_jurisdiction_task(
self.jurisdiction.full_name,
description="Extracting structured data...",
)
context = await self.extractor.parse_docs_for_structured_data(
extraction_context
)
await self._write_out_structured_data(extraction_context)
logger.debug("Final extraction context:\n%s", context)
return context
async def _load_known_local_documents(self):
"""Load ordinance documents from known local file paths"""
docs = await load_known_docs(
self.jurisdiction,
[info["source_fp"] for info in self.known_local_docs],
local_file_loader_kwargs=self.local_file_loader_kwargs,
)
if not docs:
return None
_add_known_doc_attrs_to_all_docs(
docs, self.known_local_docs, key="source_fp"
)
extraction_context = await self._filter_docs(
docs, need_jurisdiction_verification=False
)
if not extraction_context:
return None
extraction_context.attrs["jurisdiction_website"] = None
extraction_context.attrs["compass_crawl"] = False
await self.extractor.record_usage()
return extraction_context
async def _download_known_url_documents(self):
"""Download ordinance documents from pre-specified URLs"""
docs = await download_known_urls(
self.jurisdiction,
[info["source"] for info in self.known_doc_urls],
browser_semaphore=self.browser_semaphore,
file_loader_kwargs=self.file_loader_kwargs,
)
if not docs:
return None
_add_known_doc_attrs_to_all_docs(
docs, self.known_doc_urls, key="source"
)
extraction_context = await self._filter_docs(
docs, need_jurisdiction_verification=False
)
if not extraction_context:
return None
extraction_context.attrs["jurisdiction_website"] = None
extraction_context.attrs["compass_crawl"] = False
await self.extractor.record_usage()
return extraction_context
async def _find_documents_using_search_engine(self):
"""Search the web for ordinance docs using search engines"""
docs = await download_jurisdiction_ordinance_using_search_engine(
await self.extractor.get_query_templates(),
self.jurisdiction,
num_urls=self.web_search_params.num_urls_to_check_per_jurisdiction,
file_loader_kwargs=self.file_loader_kwargs,
search_semaphore=self.search_engine_semaphore,
browser_semaphore=self.browser_semaphore,
url_ignore_substrings=self.web_search_params.url_ignore_substrings,
**self.web_search_params.se_kwargs,
)
extraction_context = await self._filter_docs(
docs, need_jurisdiction_verification=True
)
if not extraction_context:
return None
extraction_context.attrs["jurisdiction_website"] = None
extraction_context.attrs["compass_crawl"] = False
await self.extractor.record_usage()
return extraction_context
async def _find_documents_from_website(self):
"""Search the jurisdiction website for ordinance documents"""
if self.jurisdiction_website and self.validate_user_website_input:
await self._validate_jurisdiction_website()
if not self.jurisdiction_website:
website = await self._try_find_jurisdiction_website()
if not website:
return None
self.jurisdiction_website = website
extraction_context, scrape_results = await self._try_elm_crawl()
found_with_compass_crawl = False
if not extraction_context:
extraction_context = await self._try_compass_crawl(scrape_results)
found_with_compass_crawl = True
if not extraction_context:
return None
extraction_context.attrs["jurisdiction_website"] = (
self.jurisdiction_website
)
extraction_context.attrs["compass_crawl"] = found_with_compass_crawl
await self.extractor.record_usage()
return extraction_context
async def _validate_jurisdiction_website(self):
"""Validate a user-supplied jurisdiction website URL"""
if self.jurisdiction_website is None:
return
self.jurisdiction_website = await get_redirected_url(
self.jurisdiction_website, timeout=30
)
COMPASS_PB.update_jurisdiction_task(
self.jurisdiction.full_name,
description=(
f"Validating user input website: {self.jurisdiction_website}"
),
)
model_config = self.models.get(
LLMTasks.DOCUMENT_JURISDICTION_VALIDATION,
self.models[LLMTasks.DEFAULT],
)
validator = JurisdictionWebsiteValidator(
browser_semaphore=self.browser_semaphore,
file_loader_kwargs=self.file_loader_kwargs_no_ocr,
usage_tracker=self.usage_tracker,
llm_service=model_config.llm_service,
**model_config.llm_call_kwargs,
)
is_website_correct = await validator.check(
self.jurisdiction_website, self.jurisdiction
)
if not is_website_correct:
self.jurisdiction_website = None
async def _try_find_jurisdiction_website(self):
"""Locate the primary jurisdiction website via search"""
COMPASS_PB.update_jurisdiction_task(
self.jurisdiction.full_name,
description="Searching for jurisdiction website...",
)
return await find_jurisdiction_website(
self.jurisdiction,
self.models,
file_loader_kwargs=self.file_loader_kwargs_no_ocr,
search_semaphore=self.search_engine_semaphore,
browser_semaphore=self.browser_semaphore,
usage_tracker=self.usage_tracker,
url_ignore_substrings=(
self.web_search_params.url_ignore_substrings
),
**self.web_search_params.se_kwargs,
)
async def _try_elm_crawl(self):
"""Crawl the jurisdiction website using the ELM crawler"""
self.jurisdiction_website = await get_redirected_url(
self.jurisdiction_website, timeout=30
)
out = await download_jurisdiction_ordinances_from_website(
self.jurisdiction_website,
heuristic=await self.extractor.get_heuristic(),
keyword_points=await self.extractor.get_website_keywords(),
file_loader_kwargs=self.file_loader_kwargs_no_ocr,
crawl_semaphore=self.crawl_semaphore,
pb_jurisdiction_name=self.jurisdiction.full_name,
return_c4ai_results=True,
)
docs, scrape_results = out
extraction_context = await self._filter_docs(
docs, need_jurisdiction_verification=True
)
return extraction_context, scrape_results
async def _try_compass_crawl(self, scrape_results):
"""Crawl the jurisdiction website using the COMPASS crawler"""
checked_urls = set()
for scrape_result in scrape_results:
checked_urls.update({sub_res.url for sub_res in scrape_result})
docs = (
await download_jurisdiction_ordinances_from_website_compass_crawl(
self.jurisdiction_website,
heuristic=await self.extractor.get_heuristic(),
keyword_points=await self.extractor.get_website_keywords(),
file_loader_kwargs=self.file_loader_kwargs_no_ocr,
already_visited=checked_urls,
crawl_semaphore=self.crawl_semaphore,
pb_jurisdiction_name=self.jurisdiction.full_name,
)
)
return await self._filter_docs(
docs, need_jurisdiction_verification=True
)
async def _filter_docs(self, docs, need_jurisdiction_verification):
if not docs:
return None
extraction_context = ExtractionContext(documents=docs)
return await self.extractor.filter_docs(
extraction_context,
need_jurisdiction_verification=need_jurisdiction_verification,
)
async def _write_out_structured_data(self, extraction_context):
"""Write cleaned text to `jurisdiction_dbs` dir"""
if extraction_context.attrs.get("structured_data") is None:
return
out_fn = extraction_context.attrs.get("out_data_fn")
if out_fn is None:
out_fn = f"{self.jurisdiction.full_name} Ordinances.csv"
out_fp = await OrdDBFileWriter.call(extraction_context, out_fn)
logger.info(
"Structured data for %s stored here: '%s'",
self.jurisdiction.full_name,
out_fp,
)
extraction_context.attrs["ord_db_fp"] = out_fp
def _setup_main_logging(log_dir, level, listener, keep_async_logs):
"""Setup main logger for catching exceptions during execution"""
fmt = logging.Formatter(fmt="[%(asctime)s] %(levelname)s: %(message)s")
handler = logging.FileHandler(log_dir / "main.log", encoding="utf-8")
handler.setFormatter(fmt)
handler.setLevel(level)
handler.addFilter(NoLocationFilter())
listener.addHandler(handler)
if keep_async_logs:
handler = logging.FileHandler(log_dir / "all.log", encoding="utf-8")
log_fmt = "[%(asctime)s] %(levelname)s - %(taskName)s: %(message)s"
fmt = logging.Formatter(fmt=log_fmt)
handler.setFormatter(fmt)
handler.setLevel(level)
listener.addHandler(handler)
logger.debug_to_file("Using async log format: %s", log_fmt)
def _log_exec_info(called_args, steps):
"""Log versions and function parameters to file"""
log_versions(logger)
logger.info(
"Using the following document acquisition step(s):\n\t%s",
" -> ".join(steps),
)
normalized_args = convert_paths_to_strings(called_args)
logger.debug_to_file(
"Called 'process_jurisdictions_with_openai' with:\n%s",
json.dumps(normalized_args, indent=4),
)
def _check_enabled_steps(
known_local_docs=None,
known_doc_urls=None,
perform_se_search=True,
perform_website_search=True,
):
"""Check that at least one processing step is enabled"""
steps = []
if known_local_docs:
steps.append("Check local document")
if known_doc_urls:
steps.append("Check known document URL")
if perform_se_search:
steps.append("Look for document using search engine")
if perform_website_search:
steps.append("Look for document on jurisdiction website")
if not steps:
msg = (
"No processing steps enabled! Please provide at least one of "
"'known_local_docs', 'known_doc_urls', or set at least one of "
"'perform_se_search' or 'perform_website_search' to True."
)
raise COMPASSValueError(msg)
return steps
def _setup_folders(out_dir, log_dir=None, clean_dir=None, ofd=None, jdd=None):
"""Setup output directory folders"""
dirs = Directories(out_dir, log_dir, clean_dir, ofd, jdd)
if dirs.out.exists():
msg = (
f"Output directory '{out_dir!s}' already exists! Please specify a "
"new directory for every COMPASS run."
)
raise COMPASSValueError(msg)
dirs.make_dirs()
return dirs
def _initialize_model_params(user_input):
"""Initialize llm caller args for models from user input"""
if isinstance(user_input, str):
return {LLMTasks.DEFAULT: OpenAIConfig(name=user_input)}
caller_instances = {}
for kwargs in user_input:
tasks = kwargs.pop("tasks", LLMTasks.DEFAULT)
if isinstance(tasks, str):
tasks = [tasks]
model_config = OpenAIConfig(**kwargs)
for task in tasks:
if task in caller_instances:
msg = (
f"Found duplicated task: {task!r}. Please ensure each "
"LLM caller definition has uniquely-assigned tasks."
)
raise COMPASSValueError(msg)
caller_instances[task] = model_config
if LLMTasks.DEFAULT not in caller_instances:
msg = (
"No 'default' LLM caller defined in the `model` portion of the "
"input config! Please ensure exactly one of the model "
"definitions has 'tasks' set to 'default' or left unspecified.\n"
f"Found tasks: {list(caller_instances)}"
)
raise COMPASSValueError(msg)
return caller_instances
def _load_jurisdictions_to_process(jurisdiction_fp):
"""Load the jurisdictions to retrieve documents for"""
if jurisdiction_fp is None:
logger.info("No `jurisdiction_fp` input! Loading all jurisdictions")
return load_all_jurisdiction_info()
return load_jurisdictions_from_fp(jurisdiction_fp)
def _configure_thread_pool_kwargs(tpe_kwargs):
"""Set thread pool workers to 5 if user didn't specify"""
tpe_kwargs = tpe_kwargs or {}
tpe_kwargs.setdefault("max_workers", 5)
return tpe_kwargs
def _configure_file_loader_kwargs(file_loader_kwargs):
"""Add PDF reading coroutine to kwargs"""
file_loader_kwargs = file_loader_kwargs or {}
file_loader_kwargs.update({"pdf_read_coroutine": read_pdf_doc})
return file_loader_kwargs
async def _record_jurisdiction_info(
loc, extraction_context, start_time, usage_tracker
):
"""Record info about jurisdiction"""
seconds_elapsed = time.monotonic() - start_time
await JurisdictionUpdater.call(
loc, extraction_context, seconds_elapsed, usage_tracker
)
def _setup_pytesseract(exe_fp):
"""Set the pytesseract command"""
import pytesseract # noqa: PLC0415
logger.debug("Setting `tesseract_cmd` to %s", exe_fp)
pytesseract.pytesseract.tesseract_cmd = exe_fp
async def _compute_total_cost():
"""Compute total cost from tracked usage"""
total_usage = await UsageUpdater.call(None)
if not total_usage:
return 0
return compute_total_cost_from_usage(total_usage)
def _add_known_doc_attrs_to_all_docs(docs, doc_infos, key):
"""Add user-defined doc attributes to all loaded docs"""
for doc in docs:
source_fp = doc.attrs.get(key)
if not source_fp:
continue
_add_known_doc_attrs(doc, source_fp, doc_infos, key)
def _add_known_doc_attrs(doc, source_fp, doc_infos, key):
"""Add user-defined doc attributes to a loaded doc"""
for info in doc_infos:
if str(info[key]) == str(source_fp):
doc.attrs.update(info)
return