#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
""" """
__all__ = ["Controller", "ControllerClass"]
__docformat__ = "restructuredtext"
import time
from PyTango import (
READ,
READ_WRITE,
SCALAR,
SPECTRUM,
AttrQuality,
DevFailed,
DevLong,
DevState,
DevString,
DevVarLongArray,
DevVarStringArray,
DevVoid,
DispLevel,
Except,
Util,
)
from taurus.core.util.containers import CaselessDict
from taurus.core.util.log import DebugIt
from sardana import DataFormat, DataType, SardanaServer, State, sardanacustomsettings
from sardana.sardanaattribute import SardanaAttribute
from sardana.sardanautils import interleave_two_lists, interleaved_list_to_dict
from sardana.tango.core.util import to_tango_attr_info
from .PoolDevice import PoolDevice, PoolDeviceClass
def to_bool(s):
return s.lower() == "true"
[docs]
class Controller(PoolDevice):
def __init__(self, dclass, name):
self._use_physical_roles_property = getattr(
sardanacustomsettings, "USE_PHYSICAL_ROLES_PROPERTY", True
)
PoolDevice.__init__(self, dclass, name)
[docs]
def init(self, name):
PoolDevice.init(self, name)
[docs]
def get_ctrl(self):
return self.element
[docs]
def set_ctrl(self, ctrl):
self.element = ctrl
ctrl = property(get_ctrl, set_ctrl)
[docs]
@DebugIt()
def delete_device(self):
PoolDevice.delete_device(self)
ctrl = self.ctrl
if ctrl is not None:
ctrl.remove_listener(self.on_controller_changed)
self.ctrl = None
[docs]
@DebugIt()
def init_device(self):
PoolDevice.init_device(self)
detect_evts = "state", "status"
non_detect_evts = ("elementlist",)
self.set_change_events(detect_evts, non_detect_evts)
if not self.pool.use_numeric_element_ids:
db = self.get_database()
if len(db.get_device_property(self.get_name(), "id")["id"]) > 0:
self.warning(
"Forcing sardanacustomsettings.USE_NUMERIC_ELEMENT_IDS=True "
"because numeric element ids are still configured in Tango DB."
)
self.pool.use_numeric_element_ids = True
id_ = self.get_id()
try:
self.ctrl = ctrl = self.pool.get_element_by_id(id_)
except KeyError:
role_ids = self.get_role_ids()
full_name = self.get_full_name()
name = self.alias or full_name
args = dict(
type=self.Type,
name=name,
full_name=full_name,
library=self.Library,
klass=self.Klass,
id=id_,
role_ids=role_ids,
properties=self._get_ctrl_properties(),
)
ctrl = self.pool.create_controller(**args)
self.ctrl = ctrl
self.set_state(DevState.ON)
# self.set_state(to_tango_state(ctrl.get_state()))
# self.set_status(ctrl.get_status())
else:
# TODO: consider adding `_properties` either as
# a `PoolController` property or a `re_init()` argument
ctrl._properties = self._get_ctrl_properties()
ctrl.re_init()
ctrl.add_listener(self.on_controller_changed)
[docs]
def initialize_attribute_values(self):
"""Initialize attribute values."""
memorized_values = self.get_memorized_values()
self.ctrl.init_attribute_values(memorized_values)
def _migrate_role_property(self, db, old_property_name, old_property_value):
physical_roles = interleave_two_lists(
self._get_ctrl_roles(), old_property_value
)
db.put_device_property(self.get_name(), {"physical_roles": physical_roles})
db.delete_device_property(self.get_name(), old_property_name)
def _get_role_ids_using_old_properties(self, db):
property_name = None
role_ids = db.get_device_property(self.get_name(), ["motor_role_ids"])[
"motor_role_ids"
]
if len(role_ids) == 0:
role_ids = db.get_device_property(self.get_name(), ["counter_role_ids"])[
"counter_role_ids"
]
if len(role_ids) == 0:
role_ids = self.Role_ids
if len(role_ids) > 0:
property_name = "Role_ids"
else:
property_name = "counter_role_ids"
else:
property_name = "motor_role_ids"
return role_ids, property_name
[docs]
def get_role_ids(self):
db = Util.instance().get_database()
if db is None:
return []
if self._use_physical_roles_property:
role_ids_prop = db.get_device_property(self.get_name(), ["physical_roles"])[
"physical_roles"
]
if len(role_ids_prop) == 0:
role_ids, property_to_migrate = self._get_role_ids_using_old_properties(
db
)
if property_to_migrate is not None:
self._migrate_role_property(db, property_to_migrate, role_ids)
else:
role_ids_map = interleaved_list_to_dict(role_ids_prop)
role_ids = []
for role in self._get_ctrl_roles():
role_ids.append(role_ids_map[role])
else:
role_ids, _ = self._get_role_ids_using_old_properties(db)
if self.pool.use_numeric_element_ids:
role_ids = list(map(int, role_ids))
return role_ids
def _get_ctrl_roles(self):
ctrl_info = self.pool.get_controller_class_info(self.Klass)
try:
return ctrl_info.motor_roles
except AttributeError:
return ctrl_info.counter_roles
def _get_ctrl_properties(self):
try:
ctrl_info = self.pool.get_controller_class_info(self.Klass)
prop_infos = ctrl_info.ctrl_properties
except Exception:
return {}
db = Util.instance().get_database()
if db is None:
return {}
props = {}
if prop_infos:
props.update(
db.get_device_property(self.get_name(), list(prop_infos.keys()))
)
for p in list(props.keys()):
if len(props[p]) == 0:
props[p] = None
ret = {}
missing_props = []
for prop_name, prop_value in list(props.items()):
if prop_value is None:
dv = prop_infos[prop_name].default_value
if dv is None:
missing_props.append(prop_name)
ret[prop_name] = dv
continue
prop_info = prop_infos[prop_name]
dtype, dformat = prop_info.dtype, prop_info.dformat
op = str
if dtype == DataType.Integer:
op = int
elif dtype == DataType.Double:
op = float
elif dtype == DataType.Boolean:
op = to_bool
prop_value = list(map(op, prop_value))
if dformat == DataFormat.Scalar:
prop_value = prop_value[0]
ret[prop_name] = prop_value
if missing_props:
self.set_state(DevState.ALARM)
missing_props = ", ".join(missing_props)
self.set_status("Controller has missing properties: %s" % missing_props)
return ret
[docs]
def always_executed_hook(self):
pass
def read_attr_hardware(self, data):
pass
[docs]
def dev_state(self):
if self.ctrl is None or not self.ctrl.is_online():
return DevState.FAULT
return DevState.ON
[docs]
def dev_status(self):
if self.ctrl is None or not self.ctrl.is_online():
self._status = self.ctrl.get_ctrl_error_str()
else:
self._status = PoolDevice.dev_status(self)
return self._status
[docs]
def read_ElementList(self, attr):
attr.set_value(self.get_element_names())
[docs]
def CreateElement(self, argin):
pass
[docs]
def DeleteElement(self, argin):
pass
[docs]
def get_element_names(self):
elements = self.ctrl.get_elements()
return [elements[id].get_name() for id in sorted(elements)]
[docs]
def on_controller_changed(self, event_src, event_type, event_value):
# during server startup and shutdown avoid processing element
# creation events
if SardanaServer.server_state != State.Running:
return
timestamp = time.time()
name = event_type.name.lower()
multi_attr = self.get_device_attr()
try:
attr = multi_attr.get_attr_by_name(name)
except DevFailed:
return
quality = AttrQuality.ATTR_VALID
priority = event_type.priority
error = None
if name == "state":
event_value = self.calculate_tango_state(event_value)
elif name == "status":
event_value = self.calculate_tango_status(event_value)
else:
if isinstance(event_value, SardanaAttribute):
if event_value.error:
error = Except.to_dev_failed(*event_value.exc_info)
timestamp = event_value.timestamp
event_value = event_value.value
self.set_attribute(
attr,
value=event_value,
timestamp=timestamp,
quality=quality,
priority=priority,
error=error,
synch=False,
)
[docs]
def get_dynamic_attributes(self):
if hasattr(self, "_dynamic_attributes_cache"):
return self._standard_attributes_cache, self._dynamic_attributes_cache
info = self.ctrl.ctrl_info
if info is None:
self.warning("Controller %s doesn't have any information", self.ctrl)
return PoolDevice.get_dynamic_attributes(self)
self._dynamic_attributes_cache = dyn_attrs = CaselessDict()
self._standard_attributes_cache = std_attrs = CaselessDict()
for attr_data in list(info.ctrl_attributes.values()):
attr_name = attr_data.name
name, tg_info = to_tango_attr_info(attr_name, attr_data)
dyn_attrs[attr_name] = attr_name, tg_info, attr_data
return std_attrs, dyn_attrs
[docs]
def read_DynamicAttribute(self, attr):
attr_name = attr.get_name()
attr.set_value(self.ctrl.get_ctrl_attr(attr_name))
[docs]
def write_DynamicAttribute(self, attr):
v = attr.get_write_value()
attr_name = attr.get_name()
self.ctrl.set_ctrl_attr(attr_name, v)
[docs]
def read_LogLevel(self, attr):
log_level = self.ctrl.get_log_level()
self.debug(log_level)
attr.set_value(log_level)
[docs]
def write_LogLevel(self, attr):
self.ctrl.set_log_level(attr.get_write_value())
[docs]
class ControllerClass(PoolDeviceClass):
# Class Properties
class_property_list = {}
class_property_list.update(PoolDeviceClass.class_property_list)
# Device Properties
device_property_list = {
"Type": [DevString, "", None],
"Library": [DevString, "", None],
"Klass": [DevString, "", None],
"Role_ids": [DevVarLongArray, "", []],
}
device_property_list.update(PoolDeviceClass.device_property_list)
# Command definitions
cmd_list = {
"CreateElement": [[DevVarStringArray, ""], [DevVoid, ""]],
"DeleteElement": [[DevString, ""], [DevVoid, ""]],
}
cmd_list.update(PoolDeviceClass.cmd_list)
# Attribute definitions
attr_list = {
"ElementList": [[DevString, SPECTRUM, READ, 4096]],
"LogLevel": [
[DevLong, SCALAR, READ_WRITE],
{
"Memorized": "true_without_hard_applied",
"label": "Log level",
"Display level": DispLevel.EXPERT,
},
],
}
attr_list.update(PoolDeviceClass.attr_list)
def _get_class_properties(self):
ret = PoolDeviceClass._get_class_properties(self)
ret["Description"] = "Controller device class"
ret["InheritedFrom"].insert(0, "PoolDevice")
return ret