"""Rule definitions and helpers for the translation pipeline."""
from __future__ import annotations
from collections.abc import Callable, Mapping
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeAlias
from pydantic import BaseModel, PrivateAttr, model_validator
from rust_ok import Result
if TYPE_CHECKING:
pass
[docs]
class RuleFilter(BaseModel):
"""Declarative predicate for selecting source components."""
field: str | None = None
op: Literal["eq", "neq", "in", "not_in", "geq", "startswith", "not_startswith", "endswith"] | None = None
values: list[Any] | None = None
prefixes: list[str] | None = None
any_of: list[RuleFilter] | None = None
all_of: list[RuleFilter] | None = None
casefold: bool = True
on_missing: Literal["include", "exclude"] = "exclude"
_normalized_values: list[Any] | None = PrivateAttr(None)
@model_validator(mode="after")
def _validate_structure(self) -> RuleFilter:
"""Ensure the filter is either a leaf or a composition."""
is_leaf = (
self.field is not None
or self.op is not None
or self.values is not None
or self.prefixes is not None
)
has_children = bool(self.any_of) or bool(self.all_of)
if is_leaf and has_children:
raise ValueError("RuleFilter cannot mix field/op/values with any_of/all_of")
if not is_leaf and not has_children:
raise ValueError("RuleFilter requires field/op/values or any_of/all_of")
if self.any_of and self.all_of:
raise ValueError("RuleFilter cannot set both any_of and all_of")
if is_leaf:
if not self.field:
raise ValueError("RuleFilter.field is required for leaf filters")
if self.op is None:
raise ValueError("RuleFilter.op is required for leaf filters")
if not (self.values or self.prefixes):
raise ValueError("RuleFilter.values must contain at least one value")
if self.op == "geq" and len(self.values or []) != 1:
raise ValueError("RuleFilter.geq expects exactly one comparison value")
if self.op in {"startswith", "not_startswith"}:
prefix_values = self.prefixes if self.prefixes else self.values
if not prefix_values:
raise ValueError(
"RuleFilter.prefixes must provide at least one entry for prefix operations"
)
if any(not isinstance(prefix, str) for prefix in prefix_values):
raise ValueError("RuleFilter.prefixes entries must be strings")
object.__setattr__(self, "values", prefix_values)
# Precompute casefolded values once so evaluate_rule_filter does not
# recompute them per component.
normalized = [
str(val).casefold() if self.casefold and isinstance(val, str) else val
for val in self.values or []
]
object.__setattr__(self, "_normalized_values", normalized)
return self
[docs]
def matches(self, component: Any) -> bool:
"""Evaluate this filter against a component instance."""
from .utils import evaluate_rule_filter
return evaluate_rule_filter(component, rule_filter=self)
RuleGetter: TypeAlias = Callable[..., Result[Any, ValueError]]
class RuleLike(Protocol):
"""Minimal interface required to build kwargs for a target component."""
@property
def field_map(self) -> Mapping[str, str | list[str]]:
"""Mapping of target field names to source field names."""
...
@property
def getters(self) -> Mapping[str, RuleGetter | str]:
"""Mapping of target field names to getter callables."""
...
@property
def defaults(self) -> Mapping[str, Any]:
"""Default values for target fields."""
...
[docs]
@dataclass(frozen=True, slots=True)
class Rule:
"""Declarative rule for converting one component type to another."""
source_type: str | list[str]
target_type: str | list[str]
version: int
field_map: dict[str, str | list[str]] = field(default_factory=dict)
getters: dict[str, RuleGetter | str] = field(default_factory=dict)
defaults: dict[str, Any] = field(default_factory=dict)
filter: RuleFilter | None = field(default=None)
system: Literal["source", "target"] = "source"
name: str | None = None
depends_on: list[str] | None = None
def __str__(self) -> str:
"""Represent string."""
return f"{self.source_type}->{self.target_type}(v{self.version})"
def __post_init__(self) -> Any:
"""Validate init."""
if self.has_multiple_sources() and self.has_multiple_targets():
raise NotImplementedError(
f"Rule cannot have both multiple sources and multiple targets. "
f"source_type={self.source_type}, target_type={self.target_type}"
)
for target_field, source_fields in self.field_map.items():
if isinstance(source_fields, list) and target_field not in self.getters:
msg = f"Multi-field mapping for '{target_field}' requires a getter function"
raise ValueError(msg)
if self.filter is not None and not isinstance(self.filter, RuleFilter):
raise TypeError(f"Rule.filter must be a RuleFilter, not {type(self.filter).__name__}")
def __hash__(self) -> int:
"""Hash based on rule's unique identifier."""
source_key = tuple(self.source_type) if isinstance(self.source_type, list) else self.source_type
target_key = tuple(self.target_type) if isinstance(self.target_type, list) else self.target_type
return hash((source_key, target_key, self.version))
def __eq__(self, other: object) -> bool:
"""Equality based on rule's unique identifier."""
if not isinstance(other, Rule):
return NotImplemented
return (
self.source_type == other.source_type
and self.target_type == other.target_type
and self.version == other.version
)
[docs]
def has_multiple_sources(self) -> bool:
"""Check if rule applies to multiple source types."""
return isinstance(self.source_type, list)
[docs]
def has_multiple_targets(self) -> bool:
"""Check if rule creates multiple target types."""
return isinstance(self.target_type, list)
[docs]
def get_source_types(self) -> list[str]:
"""Return source types as list."""
return self.source_type if isinstance(self.source_type, list) else [self.source_type]
[docs]
def get_target_types(self) -> list[str]:
"""Return target types as list."""
return self.target_type if isinstance(self.target_type, list) else [self.target_type]
[docs]
@classmethod
def from_records(cls, records: list[dict[str, Any]]) -> list[Rule]:
"""Create rules from json objects."""
from .getters import _preprocess_rule_getters
rules_list = []
for rule in records:
if getters := rule.get("getters"):
rule["getters"] = _preprocess_rule_getters(getters).unwrap_or_raise()
if "filter" in rule:
rule["filter"] = (
RuleFilter.model_validate(rule["filter"]) if rule["filter"] is not None else None
)
rules_list.append(cls(**rule))
return rules_list
RuleFilter.model_rebuild()