Source code for compass.web.website_crawl

"""Custom COMPASS website crawler

Much more simplistic than the Crawl4AI crawler, but designed to access
some links that Crawl4AI cannot (such as those behind a button
interface).
"""

import logging
import operator
from collections import Counter
from contextlib import AsyncExitStack
from urllib.parse import urljoin, urlsplit

from crawl4ai.models import Link as c4AILink
from bs4 import BeautifulSoup
from rebrowser_playwright.async_api import async_playwright
from rebrowser_playwright.async_api import Error as RBPlaywrightError
from playwright._impl._errors import Error as PlaywrightError  # noqa: PLC2701
from elm.web.utilities import pw_page
from elm.web.document import HTMLDocument
from elm.web.website_crawl import ELMLinkScorer, _SCORE_KEY  # noqa: PLC2701
from compass.web.url_utils import sanitize_url

from compass.web.file_loader import COMPASSWebFileLoader
from compass.utilities.parsing import is_pdf_doc


logger = logging.getLogger(__name__)
_DEPTH_KEY = "web_crawl_depth"
_CLICKABLE_SELECTORS = [
    "button",  # "a", "p"
]
_BLACKLIST_SUBSTRINGS = [
    "facebook",
    "twitter",
    "linkedin",
    "instagram",
    "youtube",
    "instagram",
    "pinterest",  # cspell: disable-line
    "tiktok",  # cspell: disable-line
    "x.com",
    "snapchat",
    "reddit",
    "mailto:",
    "tel:",
    "javascript:",
    "login",
    "signup",
    "sign up",
    "signin",
    "sign in",
    "register",
    "subscribe",
    "donate",
    "shop",
    "cart",
    "careers",
    "event",
    "events",
    "calendar",
]
DOC_THRESHOLD = 5
"""Default max documents to collect before terminating COMPASS crawl"""


[docs] class COMPASSLinkScorer(ELMLinkScorer): """Custom URL scorer for COMPASS website crawling""" def _assign_value(self, text): """Score based on the presence of keywords in link text""" score = 0 text = text.casefold().replace("plant", "") for kw, kw_score in self.keyword_points.items(): if kw in text: score += kw_score return score
class _Link(c4AILink): """Crawl4AI Link subclass with a few utilities""" def __hash__(self): return hash(self.href.casefold()) def __repr__(self): return ( f"Link(title={self.title!r}, href={self.href!r}, " f"base_domain={self.base_domain!r})" ) def __str__(self): return f"{self.title} ({self.href})" def __eq__(self, other): if isinstance(other, str): return self.href.casefold() == other.casefold() if not isinstance(other, c4AILink): return NotImplemented return self.href.casefold() == other.href.casefold() @property def consistent_domain(self): """bool: ``True`` if the link is from the base domain""" return self.base_domain.casefold() in self.href.casefold() @property def resembles_pdf(self): """bool: ``True`` if the link has "pdf" in title or href""" return "pdf" in self.title.casefold() or "pdf" in self.href.casefold()
[docs] class COMPASSCrawler: """A simple website crawler to search for ordinance documents""" def __init__( self, validator, url_scorer, file_loader_kwargs=None, already_visited=None, num_link_scores_to_check_per_page=4, max_pages=100, browser_semaphore=None, ): """ Parameters ---------- validator : callable An async callable that takes a document instance (containing the text from a PDF or a webpage) and returns a boolean indicating whether the text passes the validation check. This is used to determine whether or not to keep (i.e. return) the document. url_scorer : callable An async callable that takes a list of dictionaries containing URL information and assigns each dictionary a `score` key representing the score for that URL. The input URL dictionaries will each have at least one key: "href". This key will contain the URL of the link. The dictionary may also have other attributes such as "title", which contains the link title text. file_loader_kwargs : dict, optional Additional keyword-value argument pairs to pass to the :class:`~elm.web.file_loader.AsyncWebFileLoader` class. If this dictionary contains the ``pw_launch_kwargs`` key, it's value (assumes to be another dictionary) will be used to initialize the playwright instances used for the crawl. By default, ``None``. already_visited : set, optional A set of URLs (either strings or ``Link`` objects) that have already been visited. This is used to avoid re-visiting links that have already been checked. By default, ``None``. num_link_scores_to_check_per_page : int, default=3 Number of top unique-scoring links per page to use for recursive crawling. This helps the crawl stay focused on the most likely links to contain documents of interest. max_pages : int, default=100 Maximum number of pages to crawl before terminating, regardless of whether the document was found or not. By default, ``100``. browser_semaphore : :class:`asyncio.Semaphore`, optional Semaphore instance that can be used to limit the number of playwright browsers open concurrently. If ``None``, no limits are applied. By default, ``None``. """ self.validator = validator self.url_scorer = url_scorer self.num_scores_to_check_per_page = num_link_scores_to_check_per_page self.checked_previously = already_visited or set() self.max_pages = max_pages file_loader_kwargs = file_loader_kwargs or {} flk = {"verify_ssl": False} flk.update(file_loader_kwargs or {}) self.afl = COMPASSWebFileLoader(**flk) self.pw_launch_kwargs = ( file_loader_kwargs.get("pw_launch_kwargs") or {} ) self.browser_semaphore = ( AsyncExitStack() if browser_semaphore is None else browser_semaphore ) self._out_docs = [] self._already_visited = {} self._failed_external_domains = set() self._should_stop = None
[docs] async def run( self, base_url, termination_callback=None, on_new_page_visit_hook=None ): """Run the COMPASS website crawler Parameters ---------- base_url : str URL of the website to start crawling from. termination_callback : callable, optional An async callable that takes a list of documents and returns a boolean indicating whether to stop crawling. If ``None``, the crawl will simply terminates when :obj:`DOC_THRESHOLD` number of documents have been found. By default, ``None``. on_new_page_visit_hook : callable, optional An async callable that is called every time a new page is found during the crawl. The callable should accept a single argument, which is the page ``Link`` instance. If ``None``, no additional processing is done on new pages. By default, ``None``. Returns ------- list List of document instances that passed the validation check. Each document contains the text from a PDF and has an attribute `source` that contains the URL of the document. This list may be empty if no documents are found. """ self._should_stop = termination_callback or _default_found_enough_docs await self._run( base_url, on_new_page_visit_hook=on_new_page_visit_hook ) self._should_stop = None self._log_crawl_stats() if self._out_docs: self._out_docs.sort(key=lambda x: -1 * x.attrs[_SCORE_KEY]) return self._out_docs
async def _run( self, base_url, link=None, depth=0, score=0, on_new_page_visit_hook=None, ): """Recursive web crawl function""" if link is None: base_url, link = self._reset_crawl(base_url) if link in self._already_visited: return if on_new_page_visit_hook: await on_new_page_visit_hook(link) self._already_visited[link] = (depth, score) logger.trace("self._already_visited=%r", self._already_visited) if await self._website_link_is_doc(link, depth, score): return num_urls_checked_on_this_page = 0 curr_url_score = None for next_link in await self._get_links_from_page(link, base_url): prev_len = len(self._out_docs) await self._run( base_url, link=_Link( title=next_link["title"], href=next_link["href"], base_domain=base_url, ), depth=depth + 1, score=next_link["score"], on_new_page_visit_hook=on_new_page_visit_hook, ) doc_was_just_found = ( # fmt: off len(self._out_docs) == (prev_len + 1) and ( self._out_docs[-1].attrs.get(_DEPTH_KEY, -1) == (depth + 1) ) ) if doc_was_just_found: if await self.validator(self._out_docs[-1]): logger.debug(" - Document passed validation check!") else: self._out_docs = self._out_docs[:-1] elif ( not link.resembles_pdf and curr_url_score != next_link["score"] ): logger.trace( "Finished checking score %d at depth %d. Next score: %d", curr_url_score or -1, depth, next_link["score"], ) num_urls_checked_on_this_page += 1 curr_url_score = next_link["score"] if await self._should_terminate_crawl( num_urls_checked_on_this_page, link ): break return def _reset_crawl(self, base_url): """Reset crawl state and initialize crawling link""" self._out_docs = [] self._already_visited = {} self._failed_external_domains = set() base_url = sanitize_url(base_url) return base_url, _Link( title="Landing Page", href=sanitize_url(urljoin(base_url, base_url.split(" ")[0])), base_domain=base_url, ) async def _website_link_is_doc(self, link, depth, score): """Check if website link contains doc""" if link in self.checked_previously and link.consistent_domain: # Don't re-check pages on main website return False if await self._website_link_is_pdf(link, depth, score): return True # at this point the page is NOT a PDF. However, it could still # just be a normal webpage on the main domain that we haven't # visited before. In that case, just return False if not link.consistent_domain: return False # now we are on an external page that we either have not visited # before or that we have seen but determined is NOT a PDF file return await self._website_link_as_html_doc(link, depth, score) async def _website_link_is_pdf(self, link, depth, score): """Fetch page content; keep only PDFs""" if not link.consistent_domain and not link.resembles_pdf: logger.debug( "Skipping external non-PDF candidate: %s", link.href, ) return False parsed = urlsplit(link.href) if parsed.scheme not in {"http", "https"}: logger.debug("Skipping non-http URL: %s", link.href) return False if ( not link.consistent_domain and parsed.netloc.casefold() in self._failed_external_domains ): logger.debug( "Skipping external domain after previous fetch failure: %s", parsed.netloc, ) return False logger.debug("Loading Link: %s", link) try: doc = await self.afl.fetch(link.href) except KeyboardInterrupt: raise except Exception as e: msg = ( "Encountered error of type %r while trying to fetch " "content from %s" ) err_type = type(e) logger.exception(msg, err_type, link) if not link.consistent_domain and parsed.netloc: self._failed_external_domains.add(parsed.netloc.casefold()) return False if is_pdf_doc(doc): logger.debug(" - Found PDF!") doc.attrs[_DEPTH_KEY] = depth doc.attrs[_SCORE_KEY] = score self._out_docs.append(doc) return True return False async def _website_link_as_html_doc(self, link, depth, score): """Fetch page content as HTML doc""" logger.debug("Loading Link as HTML: %s", link) html_text = await self._get_text_no_err(link.href) attrs = {_DEPTH_KEY: depth, _SCORE_KEY: score} doc = HTMLDocument([html_text], attrs=attrs) self._out_docs.append(doc) return True async def _get_links_from_page(self, link, base_url): """Get all links from a page sorted by relevance score""" if not link.consistent_domain: logger.debug("Detected new domain, stopping link discovery") return [] html_text = await self._get_text_no_err(link.href) page_links = [] if html_text: page_links = _extract_links_from_html(html_text, base_url=base_url) page_links = await self.url_scorer( [dict(link) for link in page_links] ) page_links = sorted( page_links, key=operator.itemgetter("score"), reverse=True ) _debug_info_on_links(page_links) return page_links async def _get_text_no_err(self, url): """Get all text from a page; return empty string if pw error""" try: text = await self._get_text(url) except (PlaywrightError, RBPlaywrightError): text = "" return text async def _get_text(self, url): """Get all html text from a page""" all_text = [] pw_page_kwargs = { "intercept_routes": True, "ignore_https_errors": True, "timeout": 60_0000, } async with async_playwright() as p, self.browser_semaphore: browser = await p.chromium.launch(**self.pw_launch_kwargs) async with pw_page(browser, **pw_page_kwargs) as page: await page.goto(url) await page.wait_for_load_state("networkidle", timeout=60_000) all_text.append(await page.content()) all_text += await _get_text_from_all_locators(page) return "\n".join(all_text) async def _should_terminate_crawl( self, num_urls_checked_on_this_page, link ): """Check if crawl should terminate""" if num_urls_checked_on_this_page >= self.num_scores_to_check_per_page: logger.debug( "Already checked %d unique link scores from %s", self.num_scores_to_check_per_page, link.href, ) return True if await self._should_stop(self._out_docs): logger.debug("Exiting crawl early due to user condition") return True if len(self._already_visited) >= self.max_pages: logger.debug(" - Too many links visited, stopping recursion") return True logger.trace( "Only checked %d pages, continuing crawl...", len(self._already_visited), ) return False def _log_crawl_stats(self): """Log statistics about crawled pages and depths""" logger.info("Crawled %d pages", len(self._already_visited)) logger.info("Found %d potential documents", len(self._out_docs)) logger.debug("Average score: %.2f", self._compute_avg_link_score()) logger.debug("Pages crawled by depth:") for depth, count in sorted(self._crawl_depth_counts().items()): logger.debug(" Depth %d: %d pages", depth, count) def _compute_avg_link_score(self): """Compute the average score of the crawled results""" return sum( score for __, score in self._already_visited.values() ) / len(self._already_visited) def _crawl_depth_counts(self): """Compute number of pages per depth""" depth_counts = Counter() depth_counts.update([d for d, __ in self._already_visited.values()]) return depth_counts
async def _default_found_enough_docs(out_docs): # noqa: RUF029 """Check if a predetermined # of documents has been found The number to check is set by the module-level constant :obj:`DOC_THRESHOLD`. """ return len(out_docs) >= DOC_THRESHOLD def _debug_info_on_links(links): """Send debug info on links to logger""" num_links = len(links) if num_links <= 0: logger.debug("Found no links!") return logger.debug("Found %d links:", len(links)) for link in links[:3]: logger.debug( " - %d: %s (%s)", link["score"], link["title"], link["href"] ) if num_links > 3: # noqa: PLR2004 logger.debug(" ...") def _extract_links_from_html(text, base_url): """Parse HTML and extract all links""" soup = BeautifulSoup(text, "html.parser") links = [ (a.get_text().strip(), a["href"]) for a in soup.find_all("a", href=True) ] out_links = set() for title, path in links: if not title or not path: continue if any(substr in title.lower() for substr in _BLACKLIST_SUBSTRINGS): continue if any(substr in path.lower() for substr in _BLACKLIST_SUBSTRINGS): continue href = sanitize_url(urljoin(base_url, path)) if urlsplit(href).scheme not in {"http", "https"}: continue out_links.add( _Link( title=title, href=href, base_domain=base_url, ) ) return out_links async def _get_text_from_all_locators(page): """Go through locators on page and get text behind them""" all_text = [] for selector in _CLICKABLE_SELECTORS: logger.trace("Checking selector %r", selector) locators = page.locator(selector) locator_count = await locators.count() logger.trace(" - Found %d instances", locator_count) for index in range(locator_count): try: text = await _get_locator_text(locators, index, page) except (PlaywrightError, RBPlaywrightError): continue if text: all_text.append(text) return all_text async def _get_locator_text(locators, index, page): """Get text after clicking on one of the page locators""" locator = locators.nth(index) if not await locator.is_visible(): return None if not await locator.is_enabled(): return None await locator.click(timeout=10_000) return await page.content()