Source code for compass.web.search

"""COMPASS ordinance document web search functionality"""

import logging
from warnings import warn

from elm.web.search.run import search_with_fallback, search_all_se

from compass.warn import COMPASSWarning


logger = logging.getLogger(__name__)


[docs] async def search_single_jurisdiction( query_templates, jurisdiction, num_urls=5, browser_semaphore=None, url_ignore_substrings=None, url_keep_substrings=None, simple=True, **se_kwargs, ): """Search the web for relevant links and return a sorted output Parameters ---------- query_templates : iterable of str Query templates to format with the jurisdiction name and search. Each template should include a ``{jurisdiction}`` placeholder for the jurisdiction name. jurisdiction : Jurisdiction Jurisdiction instance representing the jurisdiction to search documents for. num_urls : int, optional Number of unique search result URL's to check for each jurisdiction. By default, ``5``. browser_semaphore : asyncio.Semaphore Semaphore instance that can be used to limit the number of playwright browsers used to submit search engine queries open concurrently. By default, ``None``. url_ignore_substrings : list of str, optional URL substrings that should be excluded from search results. Substrings are applied case-insensitively. By default, ``None``. url_keep_substrings : list of str, optional URL substrings that should be included in search results even if they match an ignore substring. Substrings are applied case-insensitively. By default, ``None``. simple : bool, optional Flag indicating whether to use a simple top-n sort from the first search engine that gives results (``True``) or to apply a holistic link sorting based on all results from all search engines (``False``). By default, ``True``. **se_kwargs Additional keyword arguments forwarded to :func:`elm.web.search.run.web_search_links_as_docs`. Common entries include ``usage_tracker`` for logging LLM usage and extra Playwright configuration. Returns ------- dict Dictionary containing the following keys: - ``jurisdiction``: Full jurisdiction name - ``state``: Jurisdiction state - ``county``: Jurisdiction county - ``subdivision``: Jurisdiction subdivision name - ``queries``: List of formatted query strings that were searched - ``results``: List of search results dictionaries, with at least one key: ``"url"``, which contains the URL of the search result. """ queries = [ query.format(jurisdiction=jurisdiction.full_name) for query in query_templates ] base = { "jurisdiction": jurisdiction.full_name, "state": jurisdiction.state, "county": jurisdiction.county, "subdivision": jurisdiction.subdivision_name, "queries": queries, "results": [], "error": None, } run_meth = _run_simple_sort_search if simple else _run_holistic_sort_search try: out = await run_meth( queries, num_urls, url_ignore_substrings, url_keep_substrings, browser_semaphore, jurisdiction.full_name, **se_kwargs, ) except Exception as exc: logger.exception("Search failed for %s", jurisdiction.full_name) base["error"] = f"{type(exc).__name__}: {exc}" return base base["results"] = out return base
async def _run_simple_sort_search( queries, num_urls, ignore_url_parts, url_keep_substrings, search_semaphore, jurisdiction_full_name, **se_kwargs, ): """Run search with fallback search engines, applying simple sort""" if url_keep_substrings: msg = ( "url_keep_substrings is not currently implemented for simple" "search result sorting. Consider using holistic sorting to " "apply the url whitelist." ) warn(msg, COMPASSWarning) urls = await search_with_fallback( queries, num_urls=num_urls, ignore_url_parts=ignore_url_parts, browser_semaphore=search_semaphore, task_name=jurisdiction_full_name, **se_kwargs, ) return [{"url": url} for url in urls] async def _run_holistic_sort_search( queries, num_urls, url_blacklist, url_whitelist, browser_semaphore, jurisdiction_full_name, **se_kwargs, ): """Run search with all search engines and apply holistic sorting""" out = await search_all_se( queries, num_urls=10, # Need as many results as possible for holistic sort ignore_url_parts=None, # custom filters applied later browser_semaphore=browser_semaphore, task_name=jurisdiction_full_name, **se_kwargs, ) return _apply_filters(out, url_blacklist, url_whitelist, num_urls) def _apply_filters(results, url_blacklist, url_whitelist, num_urls): """Mark blacklisted URLs, duplicates, and beyond top-N entries""" results = _flatten_results(results) _apply_blacklist_filters(results, url_blacklist, url_whitelist) _apply_duplicate_filters(results) _apply_top_n_filters(results, num_urls) for entry in results: entry.pop("_order", None) entry.pop("query_index", None) entry.pop("se_order", None) return sorted(results, key=_overall_sort_key) def _flatten_results(results): """Flatten results from nested structure to a single list""" flat = [] result_order = 1 for se_ind, se_results in enumerate(results, start=1): for query_ind, single_query_results in enumerate(se_results, start=1): for link_info in single_query_results: link_info["filtered_reason"] = None link_info["overall_rank"] = None link_info["query_index"] = query_ind link_info["se_order"] = se_ind link_info["_order"] = result_order flat.append(link_info) result_order += 1 return flat def _apply_blacklist_filters(results, url_blacklist, url_whitelist): """Mark rows that match any blacklist substring""" blacklist_terms = [sub.casefold() for sub in url_blacklist or [] if sub] whitelist_terms = [sub.casefold() for sub in url_whitelist or [] if sub] for entry in results: url_cf = entry["url"].casefold() if any(sub in url_cf for sub in whitelist_terms): continue match_index = next( ( i for i, sub_cf in enumerate(blacklist_terms) if sub_cf in url_cf ), None, ) if match_index is None: continue entry["filtered_reason"] = f"blacklist:{blacklist_terms[match_index]}" def _apply_duplicate_filters(results): """Mark duplicate rows per search engine and URL""" winners = {} for entry in _active_results_sorted(results): key = (entry["search_engine"], entry["url"]) winner = winners.get(key) if winner is None: winners[key] = entry continue winner.setdefault("duplicates", []).append( { "url": entry["url"], "query": entry["query"], "search_engine": entry["search_engine"], "query_rank": entry["query_rank"], } ) entry["filtered_reason"] = "duplicate" def _apply_top_n_filters(results, num_urls): """Mark entries past top-N after filtering""" for overall_rank, entry in enumerate( _active_results_sorted(results), start=1 ): entry["overall_rank"] = overall_rank if overall_rank <= num_urls: continue entry["filtered_reason"] = "beyond_top_n" def _active_results_sorted(results): """Return filtered-in rows sorted by ranking priority""" active_results = [ entry for entry in results if entry["filtered_reason"] is None ] active_results.sort(key=_link_sort_key) return active_results def _link_sort_key(entry): """Get a sort key for a search result entry Lower values indicate more confidence in result """ duplicate_count = len(entry.get("duplicates", [])) return ( # lower is better -duplicate_count, entry["query_rank"], entry["query_index"], entry["search_engine"], entry["_order"], ) def _overall_sort_key(result): """Get overall sort key for a search result item""" return ( result.get("overall_rank") or float("inf"), result.get("filtered_reason") or "", )