"""Time series utils."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from sqlite3 import Connection
from typing import TYPE_CHECKING, cast
from uuid import UUID
from loguru import logger
if TYPE_CHECKING:
from .plugin_context import PluginContext
[docs]
@dataclass(frozen=True, slots=True)
class TimeSeriesTransferResult:
"""Transfer status of time series metadata."""
transferred: int
updated: int
children_remapped: int
UNIQUE_TS_COLUMNS: tuple[str, ...] = (
"owner_uuid",
"owner_type",
"owner_category",
"time_series_uuid",
"name",
"time_series_type",
"features",
)
def _main_db_path(conn: Connection) -> str | None:
"""Return filesystem path for the main SQLite database, if present."""
try:
for _, name, path in conn.execute("PRAGMA database_list").fetchall():
if name == "main" and path:
return str(path)
except Exception:
return None
return None
def _ts_columns(conn: Connection) -> list[str]:
"""Return ordered column names excluding the autoincrement primary key."""
rows = conn.execute("PRAGMA table_info(time_series_associations)").fetchall()
return [row[1] for row in rows if row[1] != "id"]
def _deduplicate_ts_associations(conn: Connection, unique_cols: tuple[str, ...]) -> int:
"""Remove duplicate association rows using the unique key columns."""
group_by = ",".join(unique_cols)
before = conn.total_changes
conn.execute(
f"""
DELETE FROM time_series_associations
WHERE rowid NOT IN (
SELECT MIN(rowid)
FROM time_series_associations
GROUP BY {group_by}
)
"""
)
return conn.total_changes - before
def _count_ts_associations(conn: Connection) -> int:
"""Return total rows in time series associations."""
count = conn.execute("SELECT COUNT(*) FROM time_series_associations").fetchone()[0]
return int(count)
def _setup_target_and_child_tables(
tgt_metadata: Connection,
src_associations: Connection,
uuid_map: dict,
) -> tuple[list[tuple], dict[str, str]]:
"""Set up temporary tables for target components and child mapping.
Returns
-------
Tuple of (child_remapping, uuid_to_type) where
- child_remapping is a list of (child_uuid, parent_uuid, parent_type) tuples
- uuid_to_type maps UUID strings to component type names
"""
uuid_to_type = {str(uuid): type(comp).__name__ for uuid, comp in uuid_map.items()}
tgt_metadata.execute("DROP TABLE IF EXISTS target_components")
tgt_metadata.execute("CREATE TEMP TABLE target_components (uuid TEXT PRIMARY KEY, type TEXT)")
tgt_metadata.executemany("INSERT INTO target_components VALUES (?, ?)", list(uuid_to_type.items()))
target_uuids = list(uuid_to_type.keys())
if not target_uuids:
# No target components exist; skip the query and return empty results.
tgt_metadata.execute("DROP TABLE IF EXISTS child_mapping")
tgt_metadata.execute(
"CREATE TEMP TABLE child_mapping (child_uuid TEXT, parent_uuid TEXT, parent_type TEXT)"
)
return [], uuid_to_type
# Build placeholders for the IN clause
placeholders = ",".join("?" for _ in target_uuids)
child_parent_rows = src_associations.execute(
f"""
SELECT component_uuid, attached_component_uuid
FROM component_associations
WHERE attached_component_uuid IN ({placeholders})
""",
target_uuids,
).fetchall()
child_remapping = [
(child_uuid, parent_uuid, type(uuid_map[UUID(parent_uuid)]).__name__)
for child_uuid, parent_uuid in child_parent_rows
if parent_uuid in uuid_to_type
]
tgt_metadata.execute("DROP TABLE IF EXISTS child_mapping")
tgt_metadata.execute(
"CREATE TEMP TABLE child_mapping (child_uuid TEXT, parent_uuid TEXT, parent_type TEXT)"
)
if child_remapping:
tgt_metadata.executemany("INSERT INTO child_mapping VALUES (?, ?, ?)", child_remapping)
return child_remapping, uuid_to_type
def _transfer_associations(
src_metadata: Connection,
tgt_metadata: Connection,
uuid_to_type: dict,
columns: list[str],
) -> None:
"""Transfer time series associations from source to target."""
column_csv = ",".join(columns)
placeholders = ",".join(["?"] * len(columns))
src_db_path = _main_db_path(src_metadata)
if src_db_path:
tgt_metadata.execute("ATTACH DATABASE ? AS src_ts", (src_db_path,))
try:
tgt_metadata.execute(
f"""
INSERT OR IGNORE INTO time_series_associations ({column_csv})
SELECT {column_csv}
FROM src_ts.time_series_associations s
WHERE s.owner_uuid IN (SELECT uuid FROM target_components)
AND s.time_series_type != 'Deterministic'
AND NOT EXISTS (
SELECT 1 FROM time_series_associations t
WHERE t.owner_uuid = s.owner_uuid
AND t.owner_type = s.owner_type
AND t.owner_category = s.owner_category
AND t.time_series_uuid = s.time_series_uuid
AND t.name = s.name
AND t.time_series_type = s.time_series_type
AND t.features = s.features
)
"""
)
finally:
try:
tgt_metadata.execute("DETACH DATABASE src_ts")
except Exception as exc:
logger.warning("Could not detach src_ts during time series transfer: {}", exc)
else:
src_rows = src_metadata.execute(f"SELECT {column_csv} FROM time_series_associations").fetchall()
target_uuids = set(uuid_to_type.keys())
ts_type_idx = columns.index("time_series_type")
filtered_rows = [
row
for row in src_rows
if row[columns.index("owner_uuid")] in target_uuids
and row[ts_type_idx] != "DeterministicSingleTimeSeries"
]
if filtered_rows:
tgt_metadata.executemany(
f"INSERT OR IGNORE INTO time_series_associations ({column_csv}) VALUES ({placeholders})",
filtered_rows,
)
def _remove_duplicate_rows_before_remap(tgt_metadata: Connection) -> None:
"""Remove rows that would become duplicates after remapping."""
tgt_metadata.execute("""
WITH owner_resolution AS (
SELECT
ts.rowid as rowid,
ts.owner_uuid as original_uuid,
COALESCE(tc_direct.uuid, cm.parent_uuid) as resolved_uuid,
ts.time_series_type,
ts.name,
ts.resolution,
ts.features,
ROW_NUMBER() OVER (
PARTITION BY
COALESCE(tc_direct.uuid, cm.parent_uuid),
ts.time_series_type,
ts.name,
ts.resolution,
ts.features
ORDER BY ts.rowid
) as rn
FROM time_series_associations ts
LEFT JOIN target_components tc_direct ON ts.owner_uuid = tc_direct.uuid
LEFT JOIN child_mapping cm ON ts.owner_uuid = cm.child_uuid
WHERE tc_direct.uuid IS NOT NULL OR cm.parent_uuid IS NOT NULL
)
DELETE FROM time_series_associations
WHERE rowid IN (
SELECT rowid FROM owner_resolution WHERE rn > 1
)
""")
def _remap_child_associations(tgt_metadata: Connection) -> int:
"""Remap child associations with new owner UUIDs and types.
Returns
-------
Number of associations updated.
"""
result = tgt_metadata.execute("""
WITH owner_resolution AS (
SELECT
ts.owner_uuid as original_uuid,
COALESCE(tc_direct.uuid, cm.parent_uuid) as resolved_uuid,
COALESCE(tc_direct.type, cm.parent_type) as resolved_type
FROM time_series_associations ts
LEFT JOIN target_components tc_direct ON ts.owner_uuid = tc_direct.uuid
LEFT JOIN child_mapping cm ON ts.owner_uuid = cm.child_uuid
WHERE tc_direct.uuid IS NOT NULL OR cm.parent_uuid IS NOT NULL
)
UPDATE time_series_associations
SET
owner_uuid = (SELECT resolved_uuid FROM owner_resolution WHERE original_uuid = time_series_associations.owner_uuid),
owner_type = (SELECT resolved_type FROM owner_resolution WHERE original_uuid = time_series_associations.owner_uuid)
WHERE owner_uuid IN (SELECT original_uuid FROM owner_resolution)
""")
return max(result.rowcount if result.rowcount is not None else 0, 0)
def _finalize_transfer(tgt_metadata: Connection) -> None:
"""Create unique index and prepare for metadata reload."""
tgt_metadata.execute(
"""
CREATE UNIQUE INDEX IF NOT EXISTS idx_ts_owner_series_unique
ON time_series_associations (
owner_uuid,
owner_type,
owner_category,
time_series_uuid,
name,
time_series_type,
features
)
"""
)