Source code for compass.pipeline.extraction

"""Extraction workflow for prepared documents"""

import logging

from compass.extraction.context import ExtractionContext
from compass.services.threaded import OrdDBFileWriter
from compass.pb import COMPASS_PB


logger = logging.getLogger(__name__)


[docs] class DocumentExtraction: """Workflow object that follows a fixed extraction pipeline""" def __init__(self, workflow): self.workflow = workflow
[docs] async def extract_from_docs(self, docs): """Filter and extract data from a set of docs Parameters ---------- docs : iterable The documents to filter and extract structured data from. Returns ------- compass.extraction.context.ExtractionContext or None The context containing extracted structured data and other relevant information, or ``None`` if no data was extracted. """ if not docs: return None extraction_context = ExtractionContext(documents=docs) extraction_context = await self.workflow.extractor.filter_docs( extraction_context ) if not extraction_context: return None extraction_context.attrs["jurisdiction_website"] = ( self.workflow.jurisdiction_website ) COMPASS_PB.update_jurisdiction_task( self.workflow.jurisdiction.full_name, description="Extracting structured data...", ) context = await self.workflow.extractor.parse_docs_for_structured_data( extraction_context ) await self._write_out_structured_data(extraction_context) logger.debug("Final extraction context:\n%s", context) return context
async def _write_out_structured_data(self, extraction_context): """Write structured output for one jurisdiction""" if extraction_context.attrs.get("structured_data") is None: return out_fn = extraction_context.attrs.get("out_data_fn") if out_fn is None: out_fn = f"{self.workflow.jurisdiction.full_name} Ordinances.csv" out_fp = await OrdDBFileWriter.call(extraction_context, out_fn) logger.info( "Structured data for %s stored here: '%s'", self.workflow.jurisdiction.full_name, out_fp, ) extraction_context.attrs["ord_db_fp"] = out_fp