from pydantic import ConfigDict, BaseModel, model_validator
from typing import List, Optional, Any, Union, Dict
from enum import Enum
import helics
import os
import re
from loguru import logger
from pydss.simulation_input_models import SimulationSettingsModel
from pydss.common import SUBSCRIPTIONS_FILENAME, ExportMode
from pydss.utils.utils import load_data
TYPE_INFO = {
'CurrentsMagAng': 'vector',
'Currents': 'vector',
'RatedCurrent': 'double',
'EmergAmps': 'double',
'NormalAmps': 'double',
'normamps': 'double',
'Losses': 'vector',
'PhaseLosses': 'vector',
'Powers': 'vector',
'TotalPower': 'vector',
'LineLosses': 'vector',
'SubstationLosses': 'vector',
'kV': 'double',
'kVARated': 'double',
'kvar': 'double',
'kW': 'double',
'kVABase': 'double',
'kWh': 'double',
'puVmagAngle': 'vector',
'VoltagesMagAng': 'vector',
'VMagAngle': 'vector',
'Voltages': 'vector',
'Vmaxpu': 'double',
'Vminpu': 'double',
'Frequency': 'double',
'Taps': 'vector',
'%stored': 'double',
'Distance': 'double'
}
class DataType(Enum):
DOUBLE = "double"
VECTOR = "vector"
STRING = "string"
BOOLEAN = "boolean"
INTEGER = "integer"
[docs]
class Subscription(BaseModel):
model: str
property: str
id: str
unit: Optional[str] = None
subscribe: bool = True
data_type: DataType
multiplier: float = 1.0
object: Any = None
states: List[Union[float, int, bool]] = [0.0, 0.0, 0.0, 0.0, 0.0]
sub: Any = None
model_config = ConfigDict(arbitrary_types_allowed=True)
[docs]
class Publication(BaseModel):
model: str
property: str
id: str
object: Any = None
pub: Any = None
data_type: DataType
[docs]
class Subscriptions(BaseModel):
federate: Any = None
opendss_models: Dict
subscriptions: List[Subscription]
[docs]
@model_validator(mode='after')
def is_in_opendss_model(self)-> 'Subscriptions':
for subscription in self.subscriptions:
if subscription.model not in self.opendss_models:
raise AssertionError(f"The loaded OpenDSS model does not have an element define with the name {subscription}")
if subscription.subscribe:
subscription.object = self.opendss_models[subscription.model]
subscription.sub = helics.helicsFederateRegisterSubscription(
self.federate,
subscription.id,
subscription.unit
)
return self
[docs]
class Publications(BaseModel):
federate: Any = None
federate_name: str
opendss_models: Dict
publications: List[Publication] = []
legacy_input: Dict = {}
input: Dict = {}
[docs]
@model_validator(mode='after')
def build_from_legacy(self)-> 'Publications':
publications = []
for object_type, k in self.legacy_input.items():
if object_type in self.opendss_models:
models = self.opendss_models[object_type]
for model in models:
for ppty in k['Publish']:
name = '{}.{}.{}'.format(self.federate_name, model, ppty)
pub_dict = {
"model" : model,
"object" : models[model],
"id" : name,
"property" : ppty,
"data_type" : TYPE_INFO[ppty],
"pub" : helics.helicsFederateRegisterGlobalTypePublication(
self.federate,
name,
TYPE_INFO[ppty],
''
)
}
publications.append(Publication.model_validate(pub_dict))
self.publications = publications
return self
[docs]
@model_validator(mode='after')
def build_from_export(self)-> 'Publications':
publications = []
for object_type, export_properties in self.input.items():
if object_type in self.opendss_models:
models = self.opendss_models[object_type]
for export_property in export_properties:
filtered_models = {}
if export_property['publish']:
if 'names' in export_property and export_property['names']:
for k, v in models.items():
if k in export_property['names']:
filtered_models[k] = models[k]
elif 'name_regexes' in export_property and export_property['name_regexes']:
for regex_expression in export_property['name_regexes']:
r = re.compile(regex_expression)
matches = list(filter(r.match, models.keys()))
for match in matches:
filtered_models[match] = models[match]
else:
filtered_models = models
for model_name, model_obj in filtered_models.items():
if object_type == "Buses":
name = '{}.Bus.{}.{}'.format(self.federate_name, model_name, export_property["property"])
else:
name = '{}.{}.{}'.format(self.federate_name, model_name, export_property["property"])
pub_dict = {
"model" : model_name,
"object" : model_obj,
"id" : name,
"property" : export_property["property"],
"data_type" : TYPE_INFO[export_property["property"]],
"pub" : helics.helicsFederateRegisterGlobalTypePublication(
self.federate,
name,
TYPE_INFO[export_property["property"]],
''
)
}
publications.append(Publication.model_validate(pub_dict))
self.publications = publications
return self
class helics_interface:
n_states = 5
init_state = 1
def __init__(self, dss_solver, objects_by_name, objects_by_class, settings: SimulationSettingsModel, system_paths, default=True):
self.itr = 0
self.c_seconds = 0
self.c_seconds_old = -1
self._settings = settings
self._co_convergance_error_tolerance = settings.helics.error_tolerance
self._co_convergance_max_iterations = self._settings.helics.max_co_iterations
self._publications = {}
self._system_paths = system_paths
self._objects_by_element = objects_by_name
self._objects_by_class = objects_by_class
self._create_helics_federate()
self._dss_solver = dss_solver
if default:
self.registerPubSubTags()
def registerPubSubTags(self, pubs=None, subs=None):
self._registerFederateSubscriptions(subs)
self._registerFederatePublications(pubs)
helics.helicsFederateEnterExecutingModeIterative(
self._federate,
helics.helics_iteration_request_iterate_if_needed
)
logger.info('Entered HELICS execution mode')
def _create_helics_federate(self):
self.fedinfo = helics.helicsCreateFederateInfo()
helics.helicsFederateInfoSetCoreName(self.fedinfo, self._settings.helics.federate_name)
helics.helicsFederateInfoSetCoreTypeFromString(self.fedinfo, self._settings.helics.core_type)
helics.helicsFederateInfoSetCoreInitString(self.fedinfo, f"--federates=1")
IP = self._settings.helics.broker
Port = self._settings.helics.broker_port
logger.info("Connecting to broker @ {}".format(f"{IP}:{Port}" if Port else IP))
if self._settings.helics.broker:
helics.helicsFederateInfoSetBroker(self.fedinfo, str(self._settings.helics.broker))
if self._settings.helics.broker_port:
helics.helicsFederateInfoSetBrokerPort(self.fedinfo, self._settings.helics.broker_port)
helics.helicsFederateInfoSetTimeProperty(self.fedinfo, helics.helics_property_time_delta,
self._settings.helics.time_delta)
helics.helicsFederateInfoSetIntegerProperty(self.fedinfo, helics.helics_property_int_log_level,
self._settings.helics.logging_level)
helics.helicsFederateInfoSetIntegerProperty(self.fedinfo, helics.helics_property_int_max_iterations,
self._settings.helics.max_co_iterations)
self._federate = helics.helicsCreateValueFederate(self._settings.helics.federate_name, self.fedinfo)
return
def _registerFederateSubscriptions(self, subscriptions:Subscriptions = None):
"""
:param subs:
:return:
"""
if subscriptions is None:
subscription_file = os.path.join(
self._system_paths["ExportLists"],
SUBSCRIPTIONS_FILENAME,
)
assert os.path.exists(subscription_file), f"The following file does not exist: {subscription_file}"
file_data = load_data(subscription_file)
file_data["opendss_models"] = self._objects_by_element
file_data["federate"] = self._federate
self.subscriptions = Subscriptions.model_validate(file_data)
else:
self.subscriptions = subscriptions
logger.info(str(self.subscriptions.subscriptions))
for subscription in self.subscriptions.subscriptions:
logger.info(f"subscription created: {subscription}")
return
def updateHelicsSubscriptions(self):
for subscription in self.subscriptions.subscriptions:
if subscription.subscribe:
value = None
if subscription.data_type == DataType.DOUBLE:
value = helics.helicsInputGetDouble(subscription.sub)
elif subscription.data_type == DataType.VECTOR:
value = helics.helicsInputGetVector(subscription.sub)
elif subscription.data_type == DataType.STRING:
value = helics.helicsInputGetString(subscription.sub)
elif subscription.data_type == DataType.BOOLEAN:
value = helics.helicsInputGetBoolean(subscription.sub)
elif subscription.data_type == DataType.INTEGER:
value = helics.helicsInputGetInteger(subscription.sub)
if value and value != 0:
if value > 1e6 or value < -1e6:
value = 1.0
value = value * subscription.multiplier
subscription.object.SetParameter(subscription.property, value)
logger.info('Value for "{}.{}" changed to "{}"'.format(
subscription.model,
subscription.property,
value
))
if self._settings.helics.iterative_mode:
if self.c_seconds != self.c_seconds_old:
subscription.states = [self.init_state] * self.n_states
else:
subscription.states.insert(0, subscription.states.pop())
subscription.states[0] = value
self.c_seconds_old = self.c_seconds
def _registerFederatePublications(self, publications:Publications = None):
if publications:
self.publicatiuons = publications
else:
legacy_export_file = os.path.join(
self._system_paths ["ExportLists"],
ExportMode.BY_CLASS.value + ".toml",
)
export_file = os.path.join(
self._system_paths ["ExportLists"],
ExportMode.EXPORTS.value + ".toml",
)
publication_dict = {
"opendss_models" : self._objects_by_class,
"federate_name" : self._settings.helics.federate_name,
"federate" : self._federate,
}
if os.path.exists(export_file):
export_data = load_data(export_file)
publication_dict["input"] = export_data
elif os.path.exists(legacy_export_file):
legacy_data = load_data(legacy_export_file)
publication_dict["legacy_input"] = legacy_data
else:
raise FileNotFoundError("No valid export settings found for the current scenario")
self.publications = Publications.model_validate(publication_dict)
logger.info(str(self.publications.publications))
for publication in self.publications.publications:
logger.info(f"pubscription created: {publication}")
return
def updateHelicsPublications(self):
for publication in self.publications.publications:
value = publication.object.GetValue(publication.property)
if publication.data_type == DataType.VECTOR:
helics.helicsPublicationPublishVector(publication.pub, value)
elif publication.data_type == DataType.DOUBLE:
helics.helicsPublicationPublishDouble(publication.pub, value)
elif publication.data_type == DataType.STRING:
helics.helicsPublicationPublishString(publication.pub, value)
elif publication.data_type == DataType.BOOLEAN:
helics.helicsPublicationPublishBoolean(publication.pub, value)
elif publication.data_type == DataType.INTEGER:
helics.helicsPublicationPublishInteger(publication.pub, value)
else:
raise ValueError("Unsupported data type forr teh HELICS interface")
logger.info(f"{publication} - {value}")
return
def request_time_increment(self):
error = sum([abs(sub.states[0] - sub.states[1]) for sub in self.subscriptions.subscriptions])
r_seconds = self._dss_solver.GetTotalSeconds() #- self._dss_solver.GetStepResolutionSeconds()
if not self._settings.helics.iterative_mode:
while self.c_seconds < r_seconds:
self.c_seconds = helics.helicsFederateRequestTime(self._federate, r_seconds)
logger.info('Time requested: {} - time granted: {} '.format(r_seconds, self.c_seconds))
return True, self.c_seconds
else:
self.c_seconds, iteration_state = helics.helicsFederateRequestTimeIterative(
self._federate,
r_seconds,
helics.helics_iteration_request_iterate_if_needed
)
logger.info('Time requested: {} - time granted: {} error: {} it: {}'.format(
r_seconds, self.c_seconds, error, self.itr))
if error > -1 and self.itr < self._co_convergance_max_iterations - 1:
self.itr += 1
return False, self.c_seconds
else:
self.itr = 0
return True, self.c_seconds
def __del__(self):
helics.helicsFederateDisconnect(self._federate)
state = helics.helicsFederateGetState(self._federate)
helics.helicsFederateInfoFree(self.fedinfo)
helics.helicsFederateFree(self._federate)
logger.info('HELICS federate for pydss destroyed')