Source code for compass.pipeline.collection.base

"""Collection workflow for the COMPASS pipeline"""

from compass.pipeline.collection.dedupe import DocumentDeDuplicator
from compass.pipeline.collection.persistence import persist_documents
from compass.pipeline.collection.steps import (
    CompassWebsiteCrawlStep,
    ElmWebsiteCrawlStep,
    KnownLocalDocumentsStep,
    KnownUrlDocumentsStep,
    SearchEngineDocumentsStep,
)


[docs] class DocumentCollection: """Workflow object that applies a fixed pipeline of steps""" def __init__(self, workflow): """ Parameters ---------- workflow : compass.pipeline.jurisdiction.SingleJurisdictionRun The workflow for the jurisdiction being processed, which may or may not have website search enabled. The workflow is passed to each collection step, which may use it to access jurisdiction information and other relevant data, and to determine whether website search is enabled. """ self.workflow = workflow self.de_duplicator = DocumentDeDuplicator() self.steps = [ KnownLocalDocumentsStep(), KnownUrlDocumentsStep(), SearchEngineDocumentsStep(), ElmWebsiteCrawlStep(), CompassWebsiteCrawlStep(), ]
[docs] async def execute(self, *, eager_extract=False, relative_to=None): """Run the fixed collection sequence The document collection has a well-defined order: 1. Process any/all known local documents 2. Process any/all known document URLs 3. Search engine-based search for ordinance documents 4. Jurisdiction website crawl-based search for ordinance documents Users can disable any of these steps via the workflow configuration. Parameters ---------- eager_extract : bool, optional Option to apply extraction as soon as any documents are found. If the extraction returns any structured data, subsequent steps are skipped for that jurisdiction. By default, ``False``. relative_to : path-like, optional Optional directory that should be the root of all relative paths. By default, ``None``. Returns ------- dict or None If ``eager_extract`` is ``False``, a dictionary containing collection information and metadata. If ``eager_extract`` is ``True``, the result of the extraction workflow if any structured data was extracted, or ``None`` if no structured data was extracted from any of the collected documents. """ for step in self.steps: docs = await step.collect(self.workflow) self.de_duplicator.add_docs( docs, step_name=str(step.STEP_NAME), jurisdiction_name=self.workflow.jurisdiction.full_name, ) if eager_extract: context = ( await self.workflow.extraction_workflow.extract_from_docs( docs ) ) if context is not None: return context if eager_extract: return None collection_info = await persist_documents( self.workflow.jurisdiction, self.de_duplicator, relative_to=relative_to, ) collection_info["jurisdiction_website"] = ( self.workflow.jurisdiction_website ) return collection_info