Source code for compass.pipeline.collection.dedupe

"""Document deduplication for collected artifacts"""

import logging


logger = logging.getLogger(__name__)


[docs] class DocumentDeDuplicator: """Domain Service for deduplicating collected documents""" def __init__(self): self._docs = {}
[docs] def add_docs(self, docs, *, step_name, jurisdiction_name): """Add documents to the collection mapping Parameters ---------- docs : list Collected document objects to add to the internal de-duplicated mapping. step_name : str Identifier for the collection step that produced the documents. jurisdiction_name : str Full jurisdiction name to attach to documents that do not already include one. """ if not docs: logger.debug("No docs found to add for step %r", step_name) return logger.debug("Adding %d doc(s) to collection", len(docs)) for doc in docs: doc.attrs.setdefault("jurisdiction_name", jurisdiction_name) try: key = _collection_doc_key(doc) except KeyError: key = _collection_doc_key(doc, use_fallback=True) if key not in self._docs: self._docs[key] = {"doc": doc, "from_steps": []} self._docs[key]["from_steps"].append(step_name)
@property def values(self): """Deduplicated collected docs""" return self._docs.values() def __bool__(self): return bool(self._docs)
def _collection_doc_key(doc, use_fallback=False): """Build the deduplication key for a collected document""" if use_fallback: return str( doc.attrs.get("checksum") or doc.attrs.get("source_fp") or doc.attrs.get("source") or doc.attrs.get("cache_fn") or id(doc) ) return str(doc.attrs["checksum"])