#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""Generic Tango Pool Device base classes"""
__all__ = [
"PoolDevice",
"PoolDeviceClass",
"PoolElementDevice",
"PoolElementDeviceClass",
"PoolExpChannelDevice",
"PoolExpChannelDeviceClass",
"PoolGroupDevice",
"PoolGroupDeviceClass",
]
__docformat__ = "restructuredtext"
import functools
import time
import warnings
from copy import deepcopy
from typing import Any, Callable, Optional, Sequence, Tuple, Union
import PyTango
import taurus
from packaging.version import Version
from PyTango import (
IMAGE,
READ,
READ_WRITE,
SCALAR,
SPECTRUM,
AttrData,
CmdArgType,
DevBoolean,
DevDouble,
DevEncoded,
DevFailed,
DevLong64,
DevState,
DevString,
DevVarStringArray,
DevVoid,
DispLevel,
ErrSeverity,
Except,
Util,
seqStr_2_obj,
)
from tango.server import command
from taurus.core.util.codecs import CodecFactory
from taurus.core.util.containers import CaselessDict
import sardana
from sardana import ElementType, InvalidAxis, InvalidId, sardanacustomsettings
from sardana.pool.poolmetacontroller import DataInfo
from sardana.tango.core.SardanaDevice import SardanaDevice, SardanaDeviceClass
from sardana.tango.core.util import (
GenericImageAttr,
GenericScalarAttr,
GenericSpectrumAttr,
to_tango_attr_info,
)
from sardana.util.wrappers import wrap_method
PYTANGO_VERSION = Version(PyTango.__version__)
[docs]
class PoolDevice(SardanaDevice):
"""Base Tango Pool device class"""
#: list of extreme error states
ExtremeErrorStates = DevState.FAULT, DevState.UNKNOWN
#: list of busy states
BusyStates = DevState.MOVING, DevState.RUNNING
#: Maximum number of retries in a busy state
BusyRetries = 3
def __init__(self, dclass, name):
"""Constructor"""
SardanaDevice.__init__(self, dclass, name)
[docs]
def init(self, name: str):
"""initialize the device once in the object lifetime. Override when
necessary but **always** call the method from your super class
:param name: device name"""
SardanaDevice.init(self, name)
util = Util.instance()
self._pool_device = util.get_device_list_by_class("Pool")[0]
self._element = None
[docs]
def get_id(self) -> Union[str, int]:
"""Returns the element id from database
(name or numeric id).
:return: the element id
"""
if self.pool.use_numeric_element_ids:
return self.Id
else:
return self.alias
[docs]
def get_ctrl_id(self) -> Union[str, int]:
"""Returns the controller id from database
(name or numeric id).
:return: the controller id
"""
if self.pool.use_numeric_element_ids:
return int(self.Ctrl_id)
else:
return self.Ctrl_id
[docs]
def get_instrument_id(self) -> Union[str, int]:
"""Returns the instrument id from database
(name or numeric id).
:return: the instrument id
"""
if self.pool.use_numeric_element_ids:
return int(self.Instrument_id)
else:
return self.Instrument_id
@property
def pool_device(self):
"""The tango pool device"""
return self._pool_device
@property
def pool(self):
"""The sardana pool object"""
return self.pool_device.pool
[docs]
def get_element(self) -> sardana.pool.poolelement.PoolElement:
"""Returns the underlying pool element object
:return: the underlying pool element object
"""
return self._element
[docs]
def set_element(self, element: sardana.pool.poolelement.PoolElement) -> None:
"""Associates this device with the sardana element
:param element: the sardana element
"""
self._element = element
element = property(get_element, set_element, doc="The underlying sardana element")
[docs]
def init_device(self):
"""Initialize the device. Called during startup after :meth:`init` and
every time the tango ``Init`` command is executed.
Override when necessary but **always** call the method from your super
class"""
SardanaDevice.init_device(self)
[docs]
def delete_device(self):
"""Clean the device. Called during shutdown and every time the tango
``Init`` command is executed.
Override when necessary but **always** call the method from your super
class"""
SardanaDevice.delete_device(self)
[docs]
def Abort(self):
"""The tango abort command. Aborts the active operation"""
self.element.abort()
try:
self.element.get_state(cache=False, propagate=2)
except Exception:
self.warning("Abort: failed to read state")
[docs]
def is_Abort_allowed(self) -> bool:
"""Returns True if it is allowed to execute the tango abort command
:return: True if it is allowed to execute the tango abort command or
False otherwise
"""
return self.get_state() != DevState.UNKNOWN
[docs]
def Stop(self):
"""The tango stop command. Stops the active operation"""
self.element.stop()
try:
self.element.get_state(cache=False, propagate=2)
except Exception:
self.info("Stop: failed to read state")
[docs]
def is_Stop_allowed(self) -> bool:
"""Returns True if it is allowed to execute the tango stop command
:return: True if it is allowed to execute the tango stop command or
False otherwise
"""
return self.get_state() != DevState.UNKNOWN
[docs]
def Release(self):
"""The tango release command. Release the active operation"""
self.element.release()
[docs]
def is_Release_allowed(self) -> bool:
"""Returns True if it is allowed to execute the tango release command
:return: True if it is allowed to execute the tango release command or
False otherwise
"""
return self.get_state() != DevState.UNKNOWN
def _is_allowed(self, req_type):
"""Generic is_allowed"""
# state = self.get_state()
# if state in self.ExtremeErrorStates:
# return False
# if req_type == AttReqType.WRITE_REQ:
# if state in self.BusyStates:
# return False
return True
[docs]
def get_dynamic_attributes(
self,
) -> Tuple[taurus.core.util.CaselessDict, taurus.core.util.CaselessDict]:
"""Returns the standard dynamic and fully dynamic attributes for this
device. The return is a tuple of two dictionaries:
- standard attributes: caseless dictionary with key being the attribute
name and value is a tuple of attribute name(str), tango information,
attribute information
- dynamic attributes: caseless dictionary with key being the attribute
name and value is a tuple of attribute name(str), tango information,
attribute information
**tango information**
seq< :class:`~PyTango.CmdArgType`, :class:`~PyTango.AttrDataFormat`, :class:`~PyTango.AttrWriteType` >
**attribute information**
attribute information as returned by the sardana controller
:return: the standard dynamic and fully dynamic attributes
"""
return CaselessDict(), CaselessDict()
[docs]
def get_dynamic_commands(
self,
) -> list:
"""Override of :class:`PoolDevice.get_dynamic_commands`.
Returns the standard dynamic commands for this
device. The return is a :class:`list` containing :class:`tuple` entries with command parameters:
* fname(str) method name implemented in controller,
* dtype_in :class:`~PyTango.CmdArgType`, dformat_in :class:`~PyTango.AttrDataFormat`, doc_in(str),
* dtype_out :class:`~PyTango.CmdArgType`, dformat_out :class:`~PyTango.AttrDataFormat`, doc_out(str),
* display_level
:return: the standard commands
"""
dynamic_commands = self._get_dynamic_commands()
return dynamic_commands
def _get_dynamic_commands(self):
if not hasattr(self, "ctrl"):
return []
ctrl = self.ctrl
if ctrl is None:
self.warning("no controller: dynamic commands NOT created")
return []
if not ctrl.is_online():
self.warning("controller offline: dynamic commands NOT created")
return []
ctrl_commands = ctrl.get_ctrl_commands()
return ctrl_commands
[docs]
def initialize_dynamic_attributes(self):
"""Initializes this device dynamic attributes"""
self._attributes = attrs = CaselessDict()
self._commands = []
attr_data = self.get_dynamic_attributes()
command_data = self.get_dynamic_commands()
std_attrs, dyn_attrs = attr_data
self.remove_unwanted_dynamic_attributes(std_attrs, dyn_attrs)
if std_attrs is not None:
read = self._read_DynamicAttribute
write = self._write_DynamicAttribute
is_allowed = self._is_DynamicAttribute_allowed
for attr_name, data_info in list(std_attrs.items()):
attr_name, data_info, attr_info = data_info
attr = self.add_standard_attribute(
attr_name, data_info, attr_info, read, write, is_allowed
)
attrs[attr.get_name()] = None
if dyn_attrs is not None:
read = self._read_DynamicAttribute
write = self._write_DynamicAttribute
is_allowed = self._is_DynamicAttribute_allowed
for attr_name, data_info in list(dyn_attrs.items()):
attr_name, data_info, attr_info = data_info
attr = self.add_dynamic_attribute(
attr_name, data_info, attr_info, read, write, is_allowed
)
attrs[attr.get_name()] = None
if command_data is not None:
for cmd in command_data:
try:
cmd = self.add_standard_command(*cmd)
except AttributeError as e:
self.warning(
f"Could not initialize cmd with parameters {cmd}. Reason: ({type(e).__name__}): {str(e)}"
)
return attrs
[docs]
def add_standard_command(
self,
fname: str,
dtype_in=None,
dformat_in=None,
doc_in="",
dtype_out=None,
dformat_out=None,
doc_out="",
display_level=None,
cmd_green_mode=True,
) -> Optional[Callable]:
"""Adds a single standard dynamic command.
This method registers a new dynamic command in the device server,
using the provided Python callable as the command implementation.
Input/output data types, formats, and documentation strings may be
specified to fully describe the command for Tango clients.
:param fname: The name of the Python method implementing the command.
:param dtype_in: Tango input data type for the command (e.g., `PyTango.CmdArgType`).
:param dformat_in: Tango input data format (e.g., `PyTango.AttrDataFormat`).
:param doc_in: Documentation string describing the command input.
:param dtype_out: Tango output data type for the command.
:param dformat_out: Tango output data format.
:param doc_out: Documentation string describing the command output.
:param display_level: Optional Tango display level for the command
(e.g., `PyTango.DispLevel`).
:param cmd_green_mode: Green mode for command method. If True: run with green mode executor,
if False: run directly
:return: The created Tango command object.
:raises AttributeError:
If the controller instance cannot be obtained from
``self.element.controller.ctrl_obj``, or if the controller does not
define a callable attribute for the provided ``fname``.
"""
if PYTANGO_VERSION < Version("10.0"):
warnings.warn(
"Dynamic commands require PyTango >= 10.0.0 "
"Command registration skipped.",
RuntimeWarning,
stacklevel=2,
)
return None
if not hasattr(self, "ctrl"):
return None
try:
ctrl_obj = self.ctrl.ctrl
except AttributeError:
self.warning("Could not get element attributes.")
raise
assert hasattr(ctrl_obj, fname), "Controller does not implement %s" % fname
method = wrap_method(ctrl_obj, fname)
# In previous version dynamic command registration with method constructed by `command` and green mode
# does not allow to pass instance method
if PYTANGO_VERSION < Version("10.1.0"):
cmd_green_mode = False
warnings.warn(
"Dynamic commands with green mode require PyTango >= 10.1.0 "
"Green mode set to False.",
RuntimeWarning,
stacklevel=2,
)
command_data = command(
method,
dtype_in=dtype_in,
dformat_in=dformat_in,
doc_in=doc_in,
dtype_out=dtype_out,
dformat_out=dformat_out,
doc_out=doc_out,
display_level=display_level,
cmd_green_mode=cmd_green_mode,
)
cmd = self.add_command(command_data)
return cmd
[docs]
def remove_unwanted_dynamic_attributes(self, new_std_attrs, new_dyn_attrs):
"""Removes unwanted dynamic attributes from previous device creation"""
dev_class = self.get_device_class()
multi_attr = self.get_device_attr()
multi_class_attr = dev_class.get_class_attr()
static_attr_names = list(map(str.lower, list(dev_class.attr_list.keys())))
static_attr_names.extend(("state", "status"))
new_attrs = CaselessDict(new_std_attrs)
new_attrs.update(new_dyn_attrs)
device_attr_names = []
for i in range(multi_attr.get_attr_nb()):
device_attr_names.append(multi_attr.get_attr_by_ind(i).get_name())
# in case of calling DevRestart() on the admin device
# we don't want to remove attribute configuration from the DB
util = Util.instance()
# TODO: remove this try..except whenever pytango#541
try:
is_device_restarting = util.is_device_restarting(self.get_name())
except Exception:
self.warning(
"Could not verify if device is being restarted. "
"This may lead to undesired removal of attribute configuration."
)
is_device_restarting = False
for attr_name in device_attr_names:
attr_name_lower = attr_name.lower()
if attr_name_lower in static_attr_names:
continue
keep_attr_conf = is_device_restarting and attr_name in new_attrs
# TODO: when pytango#540 gets implemented always remove
# the attr and to keep the attr conf use clean_db=False
if keep_attr_conf:
continue
try:
self.remove_attribute(attr_name)
except Exception:
self.warning("Error removing dynamic attribute %s", attr_name_lower)
self.debug("Details:", exc_info=1)
klass_attr_names = []
klass_attrs = multi_class_attr.get_attr_list()
for ind in range(len(klass_attrs)):
klass_attr_names.append(klass_attrs[ind].get_name())
for attr_name in klass_attr_names:
attr_name_lower = attr_name.lower()
if attr_name_lower in static_attr_names:
continue
# if new dynamic attribute is in class attribute then delete it
# from class attribute to be later on added again (eventually
# with diffent data type or data format)
if attr_name_lower in new_attrs:
try:
attr = multi_class_attr.get_attr(attr_name)
old_type = CmdArgType(attr.get_type())
old_format = attr.get_format()
old_access = attr.get_writable()
new_attr = new_attrs[attr_name]
new_type, new_format, new_access = new_attr[1][0][:3]
differ = (
new_type != old_type
or new_format != old_format
or new_access != old_access
)
if differ:
self.info("Replacing dynamic attribute %s", attr_name)
self.debug("old type: %s, new type: %s", old_type, new_type)
self.debug(
"old format: %s, new format: %s", old_format, new_format
)
self.debug(
"old access: %s, new access: %s", old_access, new_access
)
multi_class_attr.remove_attr(
attr.get_name(), attr.get_cl_name()
)
except Exception:
self.warning(
"Error removing dynamic attribute %s from device class",
attr_name,
)
self.debug("Details:", exc_info=1)
[docs]
def add_dynamic_attribute(
self,
attr_name: str,
data_info: Tuple[
PyTango.CmdArgType, PyTango.AttrDataFormat, PyTango.AttrWriteType
],
attr_info: Any,
read: Any,
write: Any,
is_allowed: Any,
) -> PyTango.Attr:
"""Adds a single dynamic attribute
:param attr_name: the attribute name
:param data_info: tango attribute information
:param attr_info: attribute information
:param read: read method for the attribute
:param write: write method for the attribute
:param is_allowed: is allowed method"""
tg_type, tg_format, tg_access = data_info[0]
if tg_access == READ:
write = None
if tg_format == SCALAR:
attr = GenericScalarAttr(attr_name, tg_type, tg_access)
if tg_format == SPECTRUM:
dim_x = attr_info.maxdimsize[0]
attr = GenericSpectrumAttr(attr_name, tg_type, tg_access, dim_x=dim_x)
elif tg_format == IMAGE:
dim_x, dim_y = attr_info.maxdimsize
attr = GenericImageAttr(
attr_name, tg_type, tg_access, dim_x=dim_x, dim_y=dim_y
)
if tg_access == READ_WRITE and tg_format == SCALAR:
memorized = attr_info.memorized.lower()
# sardana takes care of restoring memorized values
if memorized in ("true", "true_without_hard_applied"):
attr.set_memorized()
attr.set_memorized_init(False)
attr.set_disp_level(DispLevel.EXPERT)
return self.add_attribute(attr, read, write, is_allowed)
[docs]
def add_standard_attribute(
self,
attr_name: str,
data_info: Tuple[
PyTango.CmdArgType, PyTango.AttrDataFormat, PyTango.AttrWriteType
],
attr_info: Any,
read: Any,
write: Any,
is_allowed: Any,
) -> PyTango.Attr:
"""Adds a single standard dynamic attribute
:param attr_name: the attribute name
:param data_info: tango attribute information
:param attr_info: attribute information
:param read: read method for the attribute
:param write: write method for the attribute
:param is_allowed: is allowed method"""
dev_class = self.get_device_class()
attr_data = AttrData(attr_name, dev_class.get_name(), data_info)
attr = self.add_attribute(attr_data, read, write, is_allowed)
return attr
[docs]
def read_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Generic read dynamic attribute.
Default implementation raises :exc:`NotImplementedError`
:param attr: attribute to be read
:raises: :exc:`NotImplementedError`"""
raise NotImplementedError
[docs]
def write_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Generic write dynamic attribute.
Default implementation raises :exc:`NotImplementedError`
:param attr: attribute to be written
:raises: :exc:`NotImplementedError`"""
raise NotImplementedError
[docs]
def is_DynamicAttribute_allowed(self, req_type: Any) -> bool:
"""Generic is dynamic attribute allowed.
Default implementation calls :meth:`_is_allowed`
:param req_type: request type
"""
return self._is_allowed(req_type)
def _read_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Generic internal read dynamic attribute.
Checks if this object has a 'read_'+<attr_name> method and calls it.
If not calls :meth:`read_DynamicAttribute`.
:param attr: attribute to be read
"""
name = attr.get_name()
read_name = "read_" + name
if hasattr(self, read_name):
read = getattr(self, read_name)
return read(attr)
return self.read_DynamicAttribute(attr)
def _write_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Generic internal write dynamic attribute.
Checks if this object has a 'write_'+<attr_name> method and calls it.
If not calls :meth:`write_DynamicAttribute`.
:param attr: attribute to be written
"""
name = attr.get_name()
write_name = "write_" + name
if hasattr(self, write_name):
write = getattr(self, write_name)
return write(attr)
return self.write_DynamicAttribute(attr)
def _is_DynamicAttribute_allowed(self, req_type: Any) -> bool:
"""Generic is dynamic attribute allowed.
Default implementation calls :meth:`is_DynamicAttribute_allowed`
:param req_type: request type
"""
return self.is_DynamicAttribute_allowed(req_type)
[docs]
def dev_state(self) -> PyTango.DevState:
"""Calculates and returns the device state. Called by Tango on a read
state request.
:return: the device state
"""
element = self.element
try:
use_cache = element.is_in_operation() and not self.Force_HW_Read
ctrl_state = element.get_state(cache=use_cache, propagate=0)
state = self.calculate_tango_state(ctrl_state)
return state
except Exception:
self.error("Exception trying to return state")
self.debug("Details:", exc_info=1)
return DevState.FAULT
[docs]
def dev_status(self) -> str:
"""
Calculates and returns the device status. Called by Tango on a read
status request.
:return: the device status
"""
element = self.element
try:
use_cache = element.is_in_operation() and not self.Force_HW_Read
ctrl_status = self.element.get_status(cache=use_cache, propagate=0)
status = self.calculate_tango_status(ctrl_status)
return status
except Exception as e:
msg = "Exception trying to return status: %s" % str(e)
self.error(msg)
self.debug("Details:", exc_info=1)
return msg
[docs]
def wait_for_operation(self):
"""Waits for an operation to finish. It uses the maxumum number of
retries. Sleeps 0.01s between retries.
:raises: :exc:`Exception` in case of a timeout"""
element, n = self.element, self.BusyRetries
while element.is_in_operation():
if n == 0:
raise Exception("Wait for operation timedout")
time.sleep(0.01)
self.warning("waited for operation")
n = n - 1
[docs]
def Restore(self):
"""Restore tango command. Restores the attributes to their former glory.
This applies to memorized writable attributes which have a set point
stored in the database"""
restore_attributes, db_values = self.get_restore_data()
multi_attribute = self.get_device_attr()
for attr_name in restore_attributes:
props = db_values[attr_name]
if props is None or "__value" not in props:
continue
attribute = multi_attribute.get_w_attr_by_name(attr_name)
write_meth_name = "write_" + attr_name
write_meth = getattr(self, write_meth_name, None)
if write_meth is None:
self.warning(
"Could not recover %s: %s does not exist",
attr_name,
write_meth_name,
)
continue
self.restore_attribute(attribute, write_meth, props["__value"])
[docs]
def get_restore_data(self):
restore_attributes = self.get_attributes_to_restore()
db = Util.instance().get_database()
db_values = db.get_device_attribute_property(
self.get_name(), restore_attributes
)
return restore_attributes, db_values
[docs]
def get_attributes_to_restore(self):
std_attrs, dyn_attrs = self.get_dynamic_attributes()
multi_attribute = self.get_device_attr()
restore = []
for attr_name in std_attrs:
try:
attribute = multi_attribute.get_w_attr_by_name(attr_name)
except DevFailed:
continue
restore.append(attribute.get_name())
for attr_name in dyn_attrs:
try:
attribute = multi_attribute.get_w_attr_by_name(attr_name)
except DevFailed:
continue
restore.append(attribute.get_name())
return restore
def _get_attribute_value_from_db_value(self, attribute, db_value):
value = seqStr_2_obj(
db_value, attribute.get_data_type(), attribute.get_data_format()
)
return value
[docs]
def restore_attribute(self, attribute, write_meth, db_value):
value = self._get_attribute_value_from_db_value(attribute, db_value)
attr_name = attribute.get_name()
try:
attribute.set_write_value(value)
self.info("Restoring %s", attr_name)
write_meth(attribute)
except Exception:
self.warning("Could not recover %s: Error in write", attr_name)
self.debug("Details:", exc_info=1)
[docs]
class PoolDeviceClass(SardanaDeviceClass):
"""Base Tango Pool Device Class class"""
#:
#: Sardana device class properties definition
#:
#: .. seealso:: :ref:`server-old-api`
#:
class_property_list = SardanaDeviceClass.class_property_list
#:
#: Sardana device properties definition
#:
#: .. seealso:: :ref:`server-old-api`
#:
device_property_list = {
"Id": [DevLong64, "Internal ID", InvalidId],
"Force_HW_Read": [
DevBoolean,
"Force a hardware read of value even when in operation (motion/acquisition",
False,
],
}
device_property_list.update(SardanaDeviceClass.device_property_list)
#:
#: Sardana device command definition
#:
#: .. seealso:: :ref:`server-old-api`
#:
cmd_list = {
"Stop": [[DevVoid, ""], [DevVoid, ""]],
"Abort": [[DevVoid, ""], [DevVoid, ""]],
"Release": [[DevVoid, ""], [DevVoid, ""]],
"Restore": [[DevVoid, ""], [DevVoid, ""]],
}
cmd_list.update(SardanaDeviceClass.cmd_list)
#:
#: Sardana device attribute definition
#:
#: .. seealso:: :ref:`server-old-api`
#:
attr_list = {}
attr_list.update(SardanaDeviceClass.attr_list)
standard_attr_list = {}
def _get_class_properties(self):
ret = SardanaDeviceClass._get_class_properties(self)
ret["Description"] = "Generic Pool device class"
ret["InheritedFrom"].insert(0, "SardanaDevice")
return ret
[docs]
class PoolElementDevice(PoolDevice):
"""Base Tango Pool Element Device class"""
ignore_memorized_attrs_during_initialization = tuple()
[docs]
def init_device(self):
"""Initialize the device. Called during startup after :meth:`init` and
every time the tango ``Init`` command is executed.
Override when necessary but **always** call the method from your super
class"""
PoolDevice.init_device(self)
detect_evts = ()
non_detect_evts = ("instrument",)
self.set_change_events(detect_evts, non_detect_evts)
self.instrument = None
self.ctrl = None
instrument_id = self.get_instrument_id()
if instrument_id != InvalidId:
self.instrument = self.pool.get_element_by_id(instrument_id)
ctrl_id = self.get_ctrl_id()
self.ctrl = self.pool.get_element_by_id(ctrl_id)
[docs]
def add_standard_command(
self,
fname: str,
dtype_in=None,
dformat_in=None,
doc_in="",
dtype_out=None,
dformat_out=None,
doc_out="",
display_level=None,
cmd_green_mode=True,
):
"""Overrides PoolDevice add_standard_command.
Adds a single standard dynamic command for given axis (axis is injected into device command).
This method registers a new dynamic command in the device server,
using the provided Python callable as the command implementation.
Input/output data types, formats, and documentation strings may be
specified to fully describe the command for Tango clients.
:param fname: The name of the Python method implementing the command.
:param dtype_in: Tango input data type for the command (e.g., `PyTango.CmdArgType`).
:param dformat_in: Tango input data format (e.g., `PyTango.AttrDataFormat`).
:param doc_in: Documentation string describing the command input.
:param dtype_out: Tango output data type for the command.
:param dformat_out: Tango output data format.
:param doc_out: Documentation string describing the command output.
:param display_level: Optional Tango display level for the command
(e.g., `PyTango.DispLevel`).
:param cmd_green_mode: Green mode for command method. If True: run with green mode executor,
if False: run directly
:return: The created Tango command object.
:raises AttributeError:
If the controller instance cannot be obtained from
``self.element.controller.ctrl_obj``, or if the controller does not
define a callable attribute for the provided ``fname``.
"""
if PYTANGO_VERSION < Version("10.0"):
warnings.warn(
"Dynamic commands require PyTango >= 10.0.0 "
"Command registration skipped.",
RuntimeWarning,
stacklevel=2,
)
return None
try:
ctrl_obj = self.ctrl.ctrl
axis = self.element.axis
except AttributeError:
self.warning("Could not get element attributes.")
raise
# Pre-bind the axis so the Tango command receives only the user argument
assert hasattr(ctrl_obj, fname), "Controller does not implement %s" % fname
method = functools.partial(getattr(ctrl_obj, fname), axis)
functools.update_wrapper(method, getattr(ctrl_obj, fname))
# In previous version dynamic command registration with method constructed by `command` and green mode
# does not allow to pass instance method
if PYTANGO_VERSION < Version("10.1.0"):
cmd_green_mode = False
warnings.warn(
"Dynamic commands with green mode require PyTango >= 10.1.0 "
"Green mode set to False.",
RuntimeWarning,
)
command_data = command(
method,
dtype_in=dtype_in,
dformat_in=dformat_in,
doc_in=doc_in,
dtype_out=dtype_out,
dformat_out=dformat_out,
doc_out=doc_out,
display_level=display_level,
cmd_green_mode=cmd_green_mode,
)
cmd = self.add_command(command_data)
return cmd
[docs]
def initialize_attribute_values(self):
"""Initialize attribute values."""
memorized_values = self.get_memorized_values()
for attr in self.ignore_memorized_attrs_during_initialization:
memorized_values.pop(attr, None)
self.element.init_attribute_values(memorized_values)
[docs]
def delete_device(self):
element = self.element
if not element.deleted:
ctrl = element.controller
ctrl.remove_element(element, propagate=0)
PoolDevice.delete_device(self)
[docs]
def read_Instrument(self, attr: PyTango.Attribute) -> None:
"""Read the value of the ``Instrument`` tango attribute.
Returns the instrument full name or empty string if this element doesn't
belong to any instrument
:param attr: tango instrument attribute
"""
instrument = self.element.instrument
if instrument is None:
attr.set_value("")
else:
attr.set_value(instrument.full_name)
[docs]
def write_Instrument(self, attr: PyTango.Attribute) -> None:
"""Write the value of the ``Instrument`` tango attribute.
Sets a new instrument full name or empty string if this element doesn't
belong to any instrument.
The instrument **must** have been previously created.
:param attr: tango instrument attribute
"""
name = attr.get_write_value()
instrument = None
if name:
instrument = self.pool.get_element(full_name=name)
if instrument.get_type() != ElementType.Instrument:
raise Exception("%s is not an instrument" % name)
self.element.instrument = instrument
db = Util.instance().get_database()
db.put_device_property(self.get_name(), {"Instrument_id": instrument.id})
[docs]
def get_dynamic_attributes(
self,
) -> Tuple[taurus.core.util.CaselessDict, taurus.core.util.CaselessDict]:
"""Override of :class:`PoolDevice.get_dynamic_attributes`.
Returns the standard dynamic and fully dynamic attributes for this
device. The return is a tuple of two dictionaries:
- standard attributes: caseless dictionary with key being the attribute
name and value is a tuple of attribute name(str), tango information,
attribute information
- dynamic attributes: caseless dictionary with key being the attribute
name and value is a tuple of attribute name(str), tango information,
attribute information
**tango information**
seq< :class:`~PyTango.CmdArgType`, :class:`~PyTango.AttrDataFormat`, :class:`~PyTango.AttrWriteType` >
**attribute information**
attribute information as returned by the sardana controller
:return: the standard dynamic and fully dynamic attributes
"""
if hasattr(self, "_dynamic_attributes_cache"):
return self._standard_attributes_cache, self._dynamic_attributes_cache
std_attrs, dyn_attrs = self._get_dynamic_attributes()
self._standard_attributes_cache = std_attrs
self._dynamic_attributes_cache = dyn_attrs
return std_attrs, dyn_attrs
def _get_dynamic_commands(self):
ctrl = self.ctrl
if ctrl is None:
self.warning("no controller: dynamic commands NOT created")
return PoolDevice.get_dynamic_commands(self)
if not ctrl.is_online():
self.warning("controller offline: dynamic commands NOT created")
return PoolDevice.get_dynamic_commands(self)
axis_commands = ctrl.get_axis_commands(self.element.axis)
return axis_commands
def _get_dynamic_attributes(self):
ctrl = self.ctrl
if ctrl is None:
self.warning("no controller: dynamic attributes NOT created")
return PoolDevice.get_dynamic_attributes(self)
if not ctrl.is_online():
self.warning("controller offline: dynamic attributes NOT created")
return PoolDevice.get_dynamic_attributes(self)
dyn_attrs = CaselessDict()
std_attrs = CaselessDict()
dev_class = self.get_device_class()
axis_attrs = ctrl.get_axis_attributes(self.element.axis)
std_attrs_lower = [attr.lower() for attr in dev_class.standard_attr_list]
for attr_name, attr_info in list(axis_attrs.items()):
attr_name_lower = attr_name.lower()
if attr_name_lower in std_attrs_lower:
data_info = DataInfo.toDataInfo(attr_name, attr_info)
# copy in order to leave the class attributes untouched
# the downstream code can append MaxDimSize to the attr. info
tg_info = deepcopy(dev_class.standard_attr_list[attr_name])
std_attrs[attr_name] = attr_name, tg_info, data_info
else:
data_info = DataInfo.toDataInfo(attr_name, attr_info)
name, tg_info = to_tango_attr_info(attr_name, data_info)
dyn_attrs[attr_name] = name, tg_info, data_info
return std_attrs, dyn_attrs
[docs]
def read_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Read a generic dynamic attribute. Calls the controller of this
element to get the dynamic attribute value
:param attr: tango attribute
"""
name = attr.get_name()
ctrl = self.ctrl
if ctrl is None:
raise Exception("Cannot read %s. Controller not build!" % name)
v = ctrl.get_axis_attr(self.element.axis, name)
if v is None:
raise TypeError("Cannot read %s. Controller returns None" % (name,))
attr.set_value(v)
[docs]
def write_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Write a generic dynamic attribute. Calls the controller of this
element to get the dynamic attribute value
:param attr: tango attribute
"""
name = attr.get_name()
value = attr.get_write_value()
self.debug("writing dynamic attribute %s with value %s", name, value)
ctrl = self.ctrl
if ctrl is None:
raise Exception("Cannot write %s. Controller not build!" % name)
ctrl.set_axis_attr(self.element.axis, name, value)
[docs]
def read_SimulationMode(self, attr: PyTango.Attribute) -> None:
"""Read the current simulation mode.
:param attr: tango attribute
"""
attr.set_value(self.element.simulation_mode)
[docs]
def write_SimulationMode(self, attr: PyTango.Attribute) -> None:
"""Sets the simulation mode.
:param attr: tango attribute
"""
self.element.simulation_mode = attr.get_write_value()
[docs]
class PoolElementDeviceClass(PoolDeviceClass):
"""Base Tango Pool Element Device Class class"""
#:
#: Sardana device properties definition
#:
#: .. seealso:: :ref:`server-old-api`
#:
device_property_list = {
"Axis": [DevLong64, "Axis in the controller", [InvalidAxis]],
"Ctrl_id": [DevString, "Controller ID", [InvalidId]],
"Instrument_id": [DevString, "Instrument ID", [InvalidId]],
}
device_property_list.update(PoolDeviceClass.device_property_list)
#:
#: Sardana device attribute definition
#:
#: .. seealso:: :ref:`server-old-api`
#:
attr_list = {
"Instrument": [
[DevString, SCALAR, READ_WRITE],
{"label": "Instrument", "Display level": DispLevel.EXPERT},
],
"SimulationMode": [
[DevBoolean, SCALAR, READ_WRITE],
{"label": "Simulation mode"},
],
}
attr_list.update(PoolDeviceClass.attr_list)
cmd_list = {}
cmd_list.update(PoolDeviceClass.cmd_list)
[docs]
def get_standard_attr_info(self, attr: str):
"""Returns information about the standard attribute
:param attr: attribute name
:return: a sequence of tango data_type, data format"""
return self.standard_attr_list[attr]
def _get_class_properties(self):
ret = PoolDeviceClass._get_class_properties(self)
ret["Description"] = "Generic Pool element device class"
ret["InheritedFrom"].insert(0, "PoolDevice")
return ret
[docs]
class PoolGroupDevice(PoolDevice):
"""Base Tango Pool Group Device class"""
[docs]
def read_ElementList(self, attr: PyTango.Attribute) -> None:
"""Read the element list.
:param attr: tango attribute
"""
attr.set_value(self.get_element_names())
[docs]
def get_element_names(self):
"""Returns the list of element names.
:return: a list of attribute names"""
elements = self.element.get_user_elements()
return [element.name for element in elements]
[docs]
def elements_changed(self, evt_src, evt_type, evt_value):
"""Callback for when the elements of this group changed"""
self.push_change_event("ElementList", self.get_element_names())
[docs]
class PoolGroupDeviceClass(PoolDeviceClass):
"""Base Tango Pool Group Device Class class"""
#:
#: Sardana device properties definition
#:
#: .. seealso:: :ref:`server-old-api`
#:
device_property_list = {
"Elements": [DevVarStringArray, "elements in the group", []],
}
device_property_list.update(PoolDeviceClass.device_property_list)
#:
#: Sardana device command definition
#:
#: .. seealso:: :ref:`server-old-api`
#:
cmd_list = {}
cmd_list.update(PoolDeviceClass.cmd_list)
#:
#: Sardana device attribute definition
#:
#: .. seealso:: :ref:`server-old-api`
#:
attr_list = {
"ElementList": [[DevString, SPECTRUM, READ, 4096]],
}
attr_list.update(PoolDeviceClass.attr_list)
def _get_class_properties(self):
ret = PoolDeviceClass._get_class_properties(self)
ret["Description"] = "Generic Pool group device class"
ret["InheritedFrom"].insert(0, "PoolDevice")
return ret
class PoolExpChannelDevice(PoolElementDevice):
def __init__(self, dclass, name):
"""Constructor"""
PoolElementDevice.__init__(self, dclass, name)
codec_name = getattr(sardanacustomsettings, "VALUE_BUFFER_CODEC")
self._value_buffer_codec = CodecFactory().getCodec(codec_name)
codec_name = getattr(sardanacustomsettings, "VALUE_REF_BUFFER_CODEC")
self._value_ref_buffer_codec = CodecFactory().getCodec(codec_name)
def _encode_value_chunk(
self, value_chunk: Sequence[sardana.sardanavalue.SardanaValue]
) -> str:
"""Prepare value chunk to be passed via communication channel.
:param value_chunk: value chunk
:return: json string representing value chunk
"""
index = []
value = []
for idx, sdn_value in value_chunk.items():
index.append(idx)
value.append(sdn_value.value)
data = dict(index=index, value=value)
encoded_data = self._value_buffer_codec.encode(("", data))
return encoded_data
def _encode_value_ref_chunk(self, value_ref_chunk: Any) -> str:
"""Prepare value ref chunk to be passed via communication channel.
:param value_ref_chunk: value ref chunk
:return: json string representing value chunk
"""
index = []
value_ref = []
for idx, sdn_value in value_ref_chunk.items():
index.append(idx)
value_ref.append(sdn_value.value)
data = dict(index=index, value_ref=value_ref)
encoded_data = self._value_ref_buffer_codec.encode(("", data))
return encoded_data
def initialize_dynamic_attributes(self):
attrs = PoolElementDevice.initialize_dynamic_attributes(self)
non_detect_evts = "integrationtime", "shape"
for attr_name in non_detect_evts:
if attr_name in attrs:
self.set_change_event(attr_name, True, False)
return attrs
def read_ValueBuffer(self, _):
desc = (
"ValueBuffer attribute is not foreseen for reading. It is "
"used only as the communication channel for the continuous "
"acquisitions."
)
Except.throw_exception(
"UnsupportedFeature",
desc,
"PoolExpChannelDevice.read_ValueBuffer",
ErrSeverity.WARN,
)
def read_ValueRefBuffer(self, _):
desc = (
"ValueRefBuffer attribute is not foreseen for reading. "
"It is used only as the communication channel for the "
"continuous acquisitions."
)
Except.throw_exception(
"UnsupportedFeature",
desc,
"PoolExpChannelDevice.read_ValueRefBuffer",
ErrSeverity.WARN,
)
def read_IntegrationTime(self, attr: PyTango.Attribute) -> None:
"""Reads the integration time.
:param attr: tango attribute
"""
attr.set_value(self.element.integration_time)
def write_IntegrationTime(self, attr: PyTango.Attribute) -> None:
"""Sets the integration time.
:param attr: tango attribute
"""
self.element.integration_time = attr.get_write_value()
def read_Shape(self, attr: PyTango.Attribute) -> None:
"""Reads the shape.
:param attr: tango attribute
"""
attr.set_value(self.element.get_shape(cache=False))
class PoolExpChannelDeviceClass(PoolElementDeviceClass):
#:
#: Sardana device attribute definition
#:
#: .. seealso:: :ref:`server-old-api`
#:
attr_list = {"IntegrationTime": [[DevDouble, SCALAR, READ_WRITE]]}
attr_list.update(PoolElementDeviceClass.attr_list)
standard_attr_list = {
"ValueBuffer": [[DevEncoded, SCALAR, READ]],
"Shape": [
[DevLong64, SPECTRUM, READ, 2],
{
"label": "Shape (X,Y)",
"description": "Shape of the value. It is an array with \n"
"at most 2 elements: X and Y dimensions. \n"
"0-element array - scalar\n"
"1-element array (X) - spectrum\n"
"2-element array (X, Y) - image",
},
],
}
standard_attr_list.update(PoolElementDeviceClass.standard_attr_list)
class PoolTimerableDevice(PoolExpChannelDevice):
def __init__(self, dclass, name):
"""Constructor"""
PoolExpChannelDevice.__init__(self, dclass, name)
def initialize_dynamic_attributes(self):
attrs = PoolExpChannelDevice.initialize_dynamic_attributes(self)
detect_evts = ("timer",)
for attr_name in detect_evts:
if attr_name in attrs:
self.set_change_event(attr_name, True, True)
return attrs
def read_Timer(self, attr: PyTango.Attribute) -> None:
"""Reads the timer for this channel.
:param attr: tango attribute
"""
timer = self.element.timer
if timer is None:
timer = "None"
attr.set_value(timer)
def write_Timer(self, attr: PyTango.Attribute) -> None:
"""Sets the timer for this channel.
:param attr: tango attribute
"""
timer = attr.get_write_value()
if timer == "None":
timer = None
self.element.timer = timer
class PoolTimerableDeviceClass(PoolExpChannelDeviceClass):
#:
#: Sardana device attribute definition
#:
#: .. seealso:: :ref:`server-old-api`
#:
# Attribute definitions
attr_list = {
"Timer": [
[DevString, SCALAR, READ_WRITE],
{
"Memorized": "true_without_hard_applied",
},
]
}
attr_list.update(PoolExpChannelDeviceClass.attr_list)
standard_attr_list = {}
standard_attr_list.update(PoolExpChannelDeviceClass.standard_attr_list)