Source code for compass.web.file_loader

"""COMPASS file loader for web files using Docling"""

import os
import asyncio
import logging

from elm.web.file_loader import (
    AsyncFetchWithRetry,
    AsyncHTMLLoader,
    BaseAsyncFileLoader,
    AsyncWebFileLoader,
    AsyncLocalFileLoader,
)
from elm.web.document import MDDocument
from docling_core.utils.file import resolve_remote_filename, AnyHttpUrl

from compass.services.cpu import read_docling_web_file, read_docling_local_file
from compass.services.threaded import TempFileCache


logger = logging.getLogger(__name__)


class _AsyncHTMLOnlyLoader(BaseAsyncFileLoader):
    """Class for loading HTML files using only the HTML loader"""

    def __init__(
        self,
        pw_launch_kwargs=None,
        html_read_kwargs=None,
        html_read_coroutine=None,
        browser_semaphore=None,
        use_scrapling_stealth=False,
        num_pw_html_retries=3,
        **__,  # consume any extra kwargs
    ):
        """

        Parameters
        ----------
        pw_launch_kwargs : dict, optional
            Keyword-value argument pairs to pass to
            ``async_playwright.chromium.launch`` (only used when
            reading HTML). By default, ``None``.
        html_read_kwargs : dict, optional
            Keyword-value argument pairs to pass to the
            `html_read_coroutine`. By default, ``None``.
        html_read_coroutine : callable, optional
            HTML file read coroutine. Must by an async function. Should
            accept HTML text as the first argument and kwargs as the
            rest. Must return a :obj:`elm.web.document.HTMLDocument`.
            If ``None``, a default function that runs in the main thread
            is used. By default, ``None``.
        browser_semaphore : asyncio.Semaphore, optional
            Semaphore instance that can be used to limit the number of
            playwright browsers open concurrently. If ``None``, no
            limits are applied. By default, ``None``.
        use_scrapling_stealth : bool, default=False
            Option to use scrapling stealth scripts instead of
            playwright-stealth. By default, ``False``.
        num_pw_html_retries : int, default=3
            Number of attempts to load HTML content. This is useful
            because the playwright parameters are stochastic, and
            sometimes a combination of them can fail to load HTML. The
            default value is likely a good balance between processing
            attempts and retrieval success. Note that the minimum number
            of attempts will always be 2, even if the user provides a
            value smaller than this. By default, ``3``.
        """
        super().__init__(file_cache_coroutine=TempFileCache.call)
        self._html_loader = AsyncHTMLLoader(
            pw_launch_kwargs=pw_launch_kwargs,
            html_read_kwargs=html_read_kwargs,
            html_read_coroutine=html_read_coroutine,
            browser_semaphore=browser_semaphore,
            use_scrapling_stealth=use_scrapling_stealth,
            num_pw_html_retries=num_pw_html_retries,
        )

    async def _fetch_doc(self, url):
        """Fetch a doc using Docling"""
        doc = await self._html_loader.fetch(url)
        return doc, doc.text


[docs] class AsyncDoclingWebFileLoader(BaseAsyncFileLoader): """Async web file loader using Docling""" def __init__( # noqa: PLR0913, PLR0917 self, header_template=None, verify_ssl=True, aget_kwargs=None, pw_launch_kwargs=None, html_read_kwargs=None, html_read_coroutine=None, file_cache_coroutine=None, browser_semaphore=None, use_scrapling_stealth=False, num_pw_html_retries=3, to_md_kwargs=None, pytesseract_exe_fp=None, **__, # consume any extra kwargs ): """ Parameters ---------- header_template : dict, optional Optional GET header template. If not specified, uses :obj:`~elm.web.utilities.DEFAULT_HEADERS`. By default, ``None``. verify_ssl : bool, optional Option to use aiohttp's default SSL check. If ``False``, SSL certificate validation is skipped. By default, ``True``. aget_kwargs : dict, optional Other kwargs to pass to :meth:`aiohttp.ClientSession.get`. By default, ``None``. pw_launch_kwargs : dict, optional Keyword-value argument pairs to pass to ``async_playwright.chromium.launch`` (only used when reading HTML). By default, ``None``. html_read_kwargs : dict, optional Keyword-value argument pairs to pass to the `html_read_coroutine`. By default, ``None``. html_read_coroutine : callable, optional HTML file read coroutine. Must by an async function. Should accept HTML text as the first argument and kwargs as the rest. Must return a :obj:`elm.web.document.HTMLDocument`. If ``None``, a default function that runs in the main thread is used. By default, ``None``. file_cache_coroutine : callable, optional File caching coroutine. Can be used to cache files downloaded by this class. Must accept an :obj:`~elm.web.document.BaseDocument` instance as the first argument and the file content to be written as the second argument. If this method is not provided, no document caching is performed. By default, ``None``. browser_semaphore : asyncio.Semaphore, optional Semaphore instance that can be used to limit the number of playwright browsers open concurrently. If ``None``, no limits are applied. By default, ``None``. use_scrapling_stealth : bool, default=False Option to use scrapling stealth scripts instead of playwright-stealth. By default, ``False``. num_pw_html_retries : int, default=3 Number of attempts to load HTML content. This is useful because the playwright parameters are stochastic, and sometimes a combination of them can fail to load HTML. The default value is likely a good balance between processing attempts and retrieval success. Note that the minimum number of attempts will always be 2, even if the user provides a value smaller than this. By default, ``3``. to_md_kwargs : dict, optional Keyword-value argument pairs to pass to to Docling's :func:`~docling_core.types.doc.DoclingDocument.export_to_markdown` method for converting the raw content to a markdown document. Can be useful to specify image placeholders (i.e. ``"image_placeholder"=""``) or page break placeholders (i.e. ``"page_break_placeholder"="<!-- page break -->"). By default, ``None``. pytesseract_exe_fp : path-like, optional Path to the `pytesseract` executable. If specified, OCR will be used to extract text from scanned PDFs using Google's Tesseract. By default ``None``. """ super().__init__(file_cache_coroutine=file_cache_coroutine) self.content_fetcher = AsyncFetchWithRetry( header_template=header_template, verify_ssl=verify_ssl, aget_kwargs=aget_kwargs, ) self.html_loader = _AsyncHTMLOnlyLoader( pw_launch_kwargs=pw_launch_kwargs, html_read_kwargs=html_read_kwargs, html_read_coroutine=html_read_coroutine, browser_semaphore=browser_semaphore, use_scrapling_stealth=use_scrapling_stealth, num_pw_html_retries=num_pw_html_retries, ) self.to_md_kwargs = to_md_kwargs or {} self.pytesseract_exe_fp = pytesseract_exe_fp
[docs] async def fetch_all(self, *sources): """Fetch documents for all requested sources. Parameters ---------- *sources Iterable of sources (as strings) used to fetch the documents. Returns ------- list List of parsed documents. """ outer_task_name = asyncio.current_task().get_name() fetches = [ asyncio.create_task(self.fetch(source), name=outer_task_name) for source in sources ] docs = await asyncio.gather(*fetches) docs = [doc for doc in docs if doc is not None and not doc.empty] if docs: logger.debug( "Got the following doc types from initial fetch:\n\t- %s", "\n\t- ".join( [ f"{doc.attrs['source']} -> {doc.attrs['doc_type']!r}" for doc in docs ] ), ) to_re_fetch = [ doc.attrs["source"] for doc in docs if doc.attrs["doc_type"].casefold() == "html" ] if to_re_fetch: logger.debug( "Loading HTML with Playwright for %d source(s):\n%r", len(to_re_fetch), to_re_fetch, ) docs += await self.html_loader.fetch_all(*to_re_fetch) return docs
async def _fetch_doc(self, url): """Fetch a doc using Docling""" out = await self.content_fetcher.fetch(url) if out is None: return MDDocument(pages=[]), None logger.debug("Got content from %r", url) raw_content, __, __, headers = out resolved_filename = resolve_remote_filename( http_url=AnyHttpUrl(url), response_headers=dict(headers) ) doc = await read_docling_web_file( raw_content, url=resolved_filename, source_uri=url, headers=dict(headers), pytesseract_exe_fp=self.pytesseract_exe_fp, **self.to_md_kwargs, ) if doc.empty: logger.info("Docling could not parse content from %s", url) return doc, None if doc.attrs["doc_type"].casefold() != "html": doc.WRITE_KWARGS = {"mode": "wb"} doc.FILE_EXTENSION = doc.attrs["doc_type"] return doc, raw_content return doc, doc.text
[docs] class AsyncLocalDoclingFileLoader(BaseAsyncFileLoader): """Async local file loader using Docling""" def __init__( self, file_cache_coroutine=None, doc_attrs=None, to_md_kwargs=None, pytesseract_exe_fp=None, **__, # consume any extra kwargs ): """ Parameters ---------- file_cache_coroutine : callable, optional File caching coroutine. Can be used to cache files downloaded by this class. Must accept an :obj:`~elm.web.document.BaseDocument` instance as the first argument and the file content to be written as the second argument. If this method is not provided, no document caching is performed. By default, ``None``. doc_attrs : dict, optional Additional document attributes to add to each loaded document. By default, ``None``. to_md_kwargs : dict, optional Keyword-value argument pairs to pass to to Docling's :func:`~docling_core.types.doc.DoclingDocument.export_to_markdown` method for converting the raw content to a markdown document. Can be useful to specify image placeholders (i.e. ``"image_placeholder"=""``) or page break placeholders (i.e. ``"page_break_placeholder"="<!-- page break -->"). By default, ``None``. pytesseract_exe_fp : path-like, optional Path to the `pytesseract` executable. If specified, OCR will be used to extract text from scanned PDFs using Google's Tesseract. By default ``None``. """ super().__init__(file_cache_coroutine=file_cache_coroutine) self.to_md_kwargs = to_md_kwargs or {} self.doc_attrs = doc_attrs or {} self.pytesseract_exe_fp = pytesseract_exe_fp async def _fetch_doc(self, source): """Load a doc by reading file based on extension""" doc, raw_content = await read_docling_local_file( source, pytesseract_exe_fp=self.pytesseract_exe_fp, **self.to_md_kwargs, ) if doc.empty: logger.info("Docling could not parse content from %s", source) return doc, None if doc.attrs["doc_type"].casefold() != "html": doc.WRITE_KWARGS = {"mode": "wb"} doc.FILE_EXTENSION = doc.attrs["doc_type"] return doc, raw_content return doc, doc.text async def _fetch_doc_with_url_in_metadata(self, source): """Fetch doc contents and add source to metadata""" doc, raw_content = await self._fetch_doc(source) for key, value in self.doc_attrs.items(): doc.attrs[key] = value doc.attrs["source_fp"] = source return doc, raw_content
if os.environ.get("COMPASS_FILE_LOAD_BACKEND", "elm") == "docling": COMPASSWebFileLoader = AsyncDoclingWebFileLoader COMPASSLocalFileLoader = AsyncLocalDoclingFileLoader else: COMPASSWebFileLoader = AsyncWebFileLoader COMPASSLocalFileLoader = AsyncLocalFileLoader