Source code for r2x_core.rules_executor

"""Execute a set of rules for a given translation context."""

from __future__ import annotations

from collections.abc import Callable
from typing import Any, cast
from uuid import uuid4

from infrasys import Component, SupplementalAttribute
from loguru import logger
from rust_ok import Err, Ok, Result

from .plugin_context import PluginContext
from .result import RuleApplicationStats, RuleResult, TranslationResult
from .rules import Rule
from .time_series import transfer_time_series_metadata
from .utils import (
    build_target_fields,
    create_target_component,
    evaluate_rule_filter,
    iter_components,
    resolve_component_type,
    sort_rules_by_dependencies,
)


[docs] def apply_rules_to_context(context: PluginContext) -> TranslationResult: """Apply all transformation rules defined in a PluginContext. Parameters ---------- context : PluginContext The plugin context containing rules and systems Returns ------- TranslationResult Rich result object with detailed statistics and per-rule results Raises ------ ValueError If the context has no rules defined or if circular dependencies are detected """ if not context.rules: raise ValueError(f"{type(context).__name__} has no rules. Use context.list_rules().") sorted_rules = sort_rules_by_dependencies(context.list_rules()).unwrap_or_raise(exc_type=ValueError) rule_results: list[RuleResult] = [] total_converted = 0 successful_rules = 0 failed_rules = 0 for rule in sorted_rules: logger.debug("Applying rule: {}", rule) result = apply_single_rule(rule, context=context) match result: case Ok(stats): rule_results.append( RuleResult( rule=rule, converted=stats.converted, skipped=stats.skipped, success=True, error=None, ) ) total_converted += stats.converted successful_rules += 1 case Err(_): error = str(result.err()) logger.error("Rule {} failed: {}", rule, error) rule_results.append( RuleResult( rule=rule, converted=0, skipped=0, success=False, error=error, ) ) failed_rules += 1 ts_result = transfer_time_series_metadata(context) return TranslationResult( total_rules=len(context.rules), successful_rules=successful_rules, failed_rules=failed_rules, total_converted=total_converted, rule_results=rule_results, time_series_transferred=ts_result.transferred, time_series_updated=ts_result.updated, )
[docs] def apply_single_rule(rule: Rule, *, context: PluginContext) -> Result[RuleApplicationStats, ValueError]: """Apply one transformation rule across matching components.""" converted = 0 target_types = rule.get_target_types() should_regenerate_uuid = len(target_types) > 1 read_system = context.target_system if rule.system == "target" else context.source_system if read_system is None: return Err(ValueError(f"System '{rule.system}' is not set in context")) source_class_result = _resolve_source_class(rule, context=context) if source_class_result.is_err(): return source_class_result.map(lambda _: RuleApplicationStats(converted=0, skipped=0)) source_class = cast(type[Component], source_class_result.ok()) resolved_targets: list[type] = [] for target_type in target_types: target_class_result = _resolve_component_class( target_type, context=context, label="target", allow_supplemental=True ) if target_class_result.is_err(): return target_class_result.map(lambda _: RuleApplicationStats(converted=0, skipped=0)) resolved_class = target_class_result.ok() assert resolved_class is not None resolved_targets.append(resolved_class) filter_func: Callable[[Any], bool] | None = None if rule.filter is not None: rule_filter = rule.filter filter_func = lambda comp, rf=rule_filter: evaluate_rule_filter(comp, rule_filter=rf) # noqa: E731 found_component = False for src_component in iter_components(read_system, class_type=source_class, filter_func=filter_func): found_component = True for target_class in resolved_targets: fields_result = build_target_fields(src_component, rule=rule, context=context).map_err( lambda e: ValueError(f"Failed to build fields for {src_component.label}: {e}") # noqa: B023 ) if fields_result.is_err(): return fields_result.map(lambda _: RuleApplicationStats(converted=0, skipped=0)) kwargs = cast(dict[str, Any], fields_result.ok()) if should_regenerate_uuid and "uuid" in kwargs: kwargs = dict(kwargs) kwargs["uuid"] = str(uuid4()) component = create_target_component(target_class, kwargs=kwargs) attach_result = _attach_component(component, src_component, context) if attach_result.is_err(): return attach_result.map(lambda _: RuleApplicationStats(converted=0, skipped=0)) if _is_supplemental_attribute(component): logger.trace( "Rule {}: attached SA {} to {}", rule, type(component).__name__, src_component.label, ) converted += 1 if not found_component: logger.warning("No components found for source type '{}' in rule {}", rule.get_source_types(), rule) logger.debug("Rule {}: {} converted", rule, converted) return Ok(RuleApplicationStats(converted=converted, skipped=0))
def _convert_component_with_class( rule: Rule, source_component: Any, target_class: type, context: PluginContext, regenerate_uuid: bool, ) -> Result[Any, ValueError]: """Convert a single source component to a pre-resolved target class. Separated from type resolution so callers can resolve once and reuse. """ fields_result = build_target_fields(source_component, rule=rule, context=context).map_err( lambda e: ValueError(f"Failed to build fields for {source_component.label}: {e}") ) def create_component(kwargs: dict[str, Any]) -> Result[Any, ValueError]: """ Create a target component instance with the given keyword arguments. If `regenerate_uuid` is True and 'uuid' is present in kwargs, a new UUID is generated. Returns an Ok result with the created component, or an Err if creation fails. Parameters ---------- kwargs : dict[str, Any] The keyword arguments to use for constructing the component. Returns ------- Result[Any, ValueError] Ok(component) if successful, Err(ValueError) if creation fails. """ if regenerate_uuid and "uuid" in kwargs: kwargs = dict(kwargs) kwargs["uuid"] = str(uuid4()) return Ok(create_target_component(target_class, kwargs=kwargs)) return fields_result.and_then(create_component) def _convert_component( rule: Rule, source_component: Any, target_type: str, context: PluginContext, regenerate_uuid: bool, ) -> Result[Any, ValueError]: """Convert a single source component to a target type. Resolves the target class on every call. Prefer _convert_component_with_class when converting many components with the same rule to avoid repeated resolution. """ target_class_result = _resolve_component_class( target_type, context=context, label="target", allow_supplemental=True ) return target_class_result.and_then( lambda target_class: _convert_component_with_class( rule, source_component, target_class, context, regenerate_uuid ) ) def _resolve_component_class( type_name: str, *, context: PluginContext, label: str, allow_supplemental: bool = False ) -> Result[type, ValueError]: """Resolve a named type and verify it is an infrasys component-compatible class.""" class_result = resolve_component_type(type_name, context=context).map_err( lambda e: ValueError(f"Failed to resolve {label} type '{type_name}': {e}") ) if class_result.is_err(): return class_result.map(lambda _: Component) resolved_class = class_result.ok() is_component = isinstance(resolved_class, type) and issubclass(resolved_class, Component) is_supplemental = ( allow_supplemental and isinstance(resolved_class, type) and issubclass(resolved_class, SupplementalAttribute) ) if not (is_component or is_supplemental): expected = "Component or SupplementalAttribute" if allow_supplemental else "Component" return Err(ValueError(f"Resolved {label} type '{type_name}' is not a {expected} subclass")) assert resolved_class is not None return Ok(resolved_class) def _resolve_source_class(rule: Rule, *, context: PluginContext) -> Result[type[Component], ValueError]: """Resolve all source types for a rule into a single component class. Rules with multiple source types are not supported here; the caller is responsible for deciding how to handle that case. """ source_types = rule.get_source_types() if not source_types: return Err(ValueError(f"Rule '{rule}' has no source types defined")) # For now rules only support a single source type source_type = source_types[0] if len(source_types) > 1: logger.warning("Rule '{}' defines multiple source types; only '{}' will be used", rule, source_type) return _resolve_component_class(source_type, context=context, label="source").map( lambda resolved_class: cast(type[Component], resolved_class) ) def _is_supplemental_attribute(component: Component) -> bool: """Check if a component is a supplemental attribute. Parameters ---------- component : Any The component to check Returns ------- bool True if the component is a supplemental attribute, False otherwise """ return isinstance(component, SupplementalAttribute) def _attach_component( component: Any, source_component: Any, context: PluginContext, ) -> Result[None, ValueError]: """Attach a component to the target system. For regular components, adds them directly to the system. For supplemental attributes, finds the corresponding target component and attaches the supplemental attribute to it. Parameters ---------- component : Any The component or supplemental attribute to attach source_component : Any The source component that was converted context : PluginContext The plugin context Returns ------- Result[None, ValueError] Ok if attachment succeeds, Err otherwise """ if context.target_system is None: return Err(ValueError("target_system must be set in context")) if not _is_supplemental_attribute(component): context.target_system.add_component(component) return Ok(None) # Find the target component that corresponds to the source component # We look for a component with the same UUID in the target system try: target_component = context.target_system.get_component_by_uuid(source_component.uuid) except Exception as e: logger.error( "Failed to find target component with UUID {} for supplemental attribute attachment: {}", source_component.uuid, e, ) return Err( ValueError( f"Cannot attach supplemental attribute: target component with UUID " f"{source_component.uuid} not found in target system" ) ) context.target_system.add_supplemental_attribute(target_component, component) return Ok(None)