Source code for compass.pipeline.collection.persistence

"""Persistence for collected documents"""

import json
import asyncio
from pathlib import Path
from warnings import warn
from datetime import datetime, UTC

from compass.services.threaded import (
    FileMover,
    ParsedFileWriter,
    TempFileCacheCopier,
    GenericFuncRunner,
)
from compass.services.cpu import read_docling_local_file
from compass.utilities.io import load_config
from compass.utilities.io import resolve_all_paths
from compass.utilities.parsing import convert_paths_to_strings, is_pdf_doc
from compass.warn import COMPASSWarning
from compass.exceptions import COMPASSFileNotFoundError, COMPASSValueError


COLLECTION_MANIFEST_FILENAME = "collection_manifest.json"


[docs] def build_collection_manifest(tech, jurisdictions): """Build the serialized collection manifest payload Parameters ---------- tech : str Technology specified in the pipeline request, included in the manifest for compatibility validation when loading. jurisdictions : dict Dictionary mapping jurisdiction full names to serialized collection metadata for each jurisdiction, including jurisdiction identifiers and the persisted document records. Returns ------- dict Collection manifest as a dictionary, ready to be serialized and written to disk. """ return { "tech": tech, "created_at": datetime.now(UTC).isoformat(), "jurisdictions": jurisdictions, }
[docs] async def write_collection_manifest(manifest_dir, collection_manifest): """Write a collection manifest to disk Parameters ---------- manifest_dir : path-like Path to the directory where the manifest should be written. collection_manifest : dict Dictionary containing collection manifest information to be serialized and written to disk. Returns ------- pathlib.Path Path to the written manifest file. """ return await GenericFuncRunner.call( _write_collection_manifest, manifest_dir, collection_manifest )
[docs] async def write_collection_manifest_shard(shard_dir, collection_info): """Write one jurisdiction collection manifest shard to disk Parameters ---------- shard_dir : path-like Directory where the jurisdiction shard JSON should be written. collection_info : dict Serialized collection metadata for one jurisdiction. Returns ------- pathlib.Path Path to the written shard file. """ return await GenericFuncRunner.call( _write_collection_manifest_shard, shard_dir, collection_info )
[docs] async def load_collection_manifest(manifest_fp, expected_tech): """Load a collection manifest from disk Parameters ---------- manifest_fp : path-like Path to the collection manifest file to be loaded. expected_tech : str Technology specified in the pipeline request, used to validate compatibility with the manifest. Returns ------- dict Loaded collection manifest as a dictionary. """ return await GenericFuncRunner.call( _load_collection_manifest, manifest_fp, expected_tech )
def _write_collection_manifest(manifest_dir, collection_manifest): """Write a collection manifest to disk""" manifest_fp = Path(manifest_dir) / COLLECTION_MANIFEST_FILENAME manifest_fp.write_text( json.dumps(convert_paths_to_strings(collection_manifest), indent=4), encoding="utf-8", ) return manifest_fp def _write_collection_manifest_shard(shard_dir, collection_info): """Write one jurisdiction collection manifest shard to disk""" shard_dir = Path(shard_dir) shard_dir.mkdir(parents=True, exist_ok=True) shard_fp = shard_dir / _collection_manifest_shard_filename(collection_info) shard_fp.write_text( json.dumps(convert_paths_to_strings(collection_info), indent=4), encoding="utf-8", ) return shard_fp def _load_collection_manifest(manifest_fp, expected_tech): """Load a collection manifest from disk""" try: manifest = load_config(manifest_fp, file_name="Collection manifest") except COMPASSFileNotFoundError: manifest = _load_collection_manifest_from_shards( manifest_fp, expected_tech ) if manifest is None: raise msg = ( f"Collection manifest file '{manifest_fp}' is missing; rebuilding " "collection manifest from jurisdiction shard files" ) warn(msg, COMPASSWarning) _validate_collection_manifest(manifest, expected_tech) return manifest
[docs] async def persist_documents(jurisdiction, collected_docs, *, relative_to=None): """Persist deduplicated documents for one jurisdiction Parameters ---------- jurisdiction : compass.utilities.jurisdictions.Jurisdiction Jurisdiction whose deduplicated documents will be persisted and serialized into collection metadata. collected_docs : \ compass.pipeline.collection.dedupe.DocumentDeDuplicator Deduplicated document collection containing ``{"doc", "from_steps"}`` entries for each persisted document. relative_to : path-like, optional Base path used to store ``source_fp`` and ``parsed_fp`` as relative paths when possible. By default, ``None``. Returns ------- dict Serialized collection metadata for the jurisdiction, including jurisdiction identifiers and the persisted document records. """ tasks = [] for index, info in enumerate(collected_docs.values, start=1): task = asyncio.create_task( _persist_doc( info["doc"], out_fn=f"{jurisdiction.full_name}_{index}", from_steps=info["from_steps"], relative_to=relative_to, ), name=jurisdiction.full_name, ) tasks.append(task) documents = await asyncio.gather(*tasks) return { "full_name": jurisdiction.full_name, "county": jurisdiction.county, "state": jurisdiction.state, "subdivision": jurisdiction.subdivision_name, "jurisdiction_type": jurisdiction.type, "FIPS": jurisdiction.code, "documents": documents, }
[docs] async def load_collected_docs(collection_info, *, task_name): """Load all docs for one jurisdiction from collection info Parameters ---------- collection_info : dict Persisted collection metadata for one jurisdiction, including a ``documents`` list of serialized document records. task_name : str Task name applied to each asynchronous document-loading task. Returns ------- list Loaded document objects in the same order as the persisted ``documents`` entries. """ tasks = [ asyncio.create_task(_load_single_doc(doc_info), name=task_name) for doc_info in collection_info.get("documents") or [] ] return await asyncio.gather(*tasks)
def _validate_collection_manifest(manifest, tech): """Validate manifest version and tech compatibility""" manifest_tech = manifest.get("tech") if manifest_tech and manifest_tech != tech: msg = ( f"Collection manifest tech ({manifest_tech}) does not " f"match specified tech ({tech})" ) raise COMPASSValueError(msg) async def _load_single_doc(doc_info): """Load one document from persisted collection artifacts""" fp = doc_info.get("parsed_fp") if fp is None: msg = ( "Parsed file path ('parsed_fp') is required to load a " "collected document, but it is missing from the following " f"doc info:\n{doc_info}\nSkipping..." ) warn(msg, COMPASSWarning) return None doc, *__ = await read_docling_local_file(fp) doc.attrs.update(doc_info) doc.remove_comments = False doc.attrs["cache_fn"] = await TempFileCacheCopier.call(doc) return doc async def _persist_doc(doc, out_fn, from_steps, relative_to): """Persist one collected document and its parsed text""" await _move_file_to_collection_dir(doc, out_fn, relative_to) await _persist_parsed_text(doc, out_fn, relative_to) return _serialize_collection_doc_info(doc, from_steps) async def _move_file_to_collection_dir(doc, out_fn, relative_to): """Move a source file to the collection output directory""" out_fp = await FileMover.call(doc, out_fn, "downloaded") if relative_to is not None and out_fp is not None: out_fp = _make_relative(out_fp, relative_to) doc.attrs["source_fp"] = out_fp async def _persist_parsed_text(doc, out_fn, relative_to): """Write parsed text for a collected document""" out_fp = await ParsedFileWriter.call(doc, out_fn) if relative_to is not None and out_fp is not None: out_fp = _make_relative(out_fp, relative_to) doc.attrs["parsed_fp"] = out_fp def _make_relative(fp, relative_to): """Make a file path relative to another path when possible""" try: return fp.relative_to(relative_to) except ValueError: msg = ( f"Could not make path {fp} relative to {relative_to}; using " "absolute path instead" ) warn(msg, COMPASSWarning) return fp def _serialize_collection_doc_info(doc, from_steps): """Serialize a collected document for manifest storage""" serialized = dict(doc.attrs) serialized.pop("cache_fn", None) serialized.pop("cleaned_fps", None) serialized.setdefault("check_correct_jurisdiction", True) serialized.update( { "is_pdf": doc.attrs.get("is_pdf", is_pdf_doc(doc)), "num_pages": doc.attrs.get("num_pages", len(doc.pages)), "from_steps": from_steps, } ) return serialized def _collection_manifest_shard_filename(collection_info): """Build a deterministic shard filename for one jurisdiction""" identifier = _clean_shard_name_part(collection_info.get("FIPS")) full_name = _clean_shard_name_part(collection_info.get("full_name")) name_parts = [part for part in (identifier, full_name) if part] base_name = "_".join(name_parts) or "jurisdiction" return f"{base_name}_collection_manifest.json" def _clean_shard_name_part(value): """Normalize one shard filename component""" if value is None: return "" value = str(value).strip() for old, new in (("/", "-"), ("\\", "-"), (":", "-")): value = value.replace(old, new) return value def _load_collection_manifest_from_shards(manifest_fp, expected_tech): """Rebuild a collection manifest from jurisdiction shard files""" manifest_dir = Path(manifest_fp).expanduser().resolve().parent shard_fps = sorted(manifest_dir.rglob("*_collection_manifest.json")) if not shard_fps: return None jurisdictions = [] for shard_fp in shard_fps: collection_info = load_config( shard_fp, resolve_paths=False, file_name="Collection manifest shard", ) jurisdictions.append(resolve_all_paths(collection_info, manifest_dir)) return build_collection_manifest(expected_tech, jurisdictions)