#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
""" """
__all__ = ["TwoDExpChannel", "TwoDExpChannelClass"]
__docformat__ = "restructuredtext"
import sys
import time
from PyTango import (
READ,
SCALAR,
AttrQuality,
DevEncoded,
DevFailed,
DevState,
DevString,
DevVoid,
Except,
)
from taurus.core.util.log import DebugIt
from sardana import DataFormat, SardanaServer, State
from sardana.pool.controller import MaxDimSize, TwoDController, Type
from sardana.sardanaattribute import SardanaAttribute
from sardana.tango.core.util import (
exception_str,
memorize_write_attribute,
to_tango_type_format,
)
from sardana.tango.pool.PoolDevice import PoolTimerableDevice, PoolTimerableDeviceClass
[docs]
class TwoDExpChannel(PoolTimerableDevice):
def __init__(self, dclass, name):
PoolTimerableDevice.__init__(self, dclass, name)
self._first_read_cache = False
self._first_read_ref_cache = False
[docs]
def init(self, name):
PoolTimerableDevice.init(self, name)
[docs]
def get_twod(self):
return self.element
[docs]
def set_twod(self, twod):
self.element = twod
twod = property(get_twod, set_twod)
[docs]
@DebugIt()
def delete_device(self):
PoolTimerableDevice.delete_device(self)
twod = self.twod
if twod is not None:
twod.remove_listener(self.on_twod_changed)
self.twod = None
[docs]
@DebugIt()
def init_device(self):
PoolTimerableDevice.init_device(self)
id_ = self.get_id()
try:
self.twod = twod = self.pool.get_element_by_id(id_)
except KeyError:
full_name = self.get_full_name()
name = self.alias or full_name
svr_running = SardanaServer.server_state == State.Running
self.twod = twod = self.pool.create_element(
type="TwoDExpChannel",
name=name,
full_name=full_name,
id=id_,
axis=self.Axis,
ctrl_id=self.get_ctrl_id(),
handle_ctrl_err=not svr_running,
)
if self.instrument is not None:
twod.set_instrument(self.instrument)
else:
ctrl = self.twod.get_controller()
ctrl.add_element(self.twod, propagate=0)
twod.add_listener(self.on_twod_changed)
# force a state read to initialize the state attribute
# state = ct.state
self.set_state(DevState.ON)
[docs]
def on_twod_changed(self, event_source, event_type, event_value):
try:
self._on_twod_changed(event_source, event_type, event_value)
except DevFailed:
raise
except Exception:
msg = 'Error occurred "on_twod_changed(%s.%s): %s"'
exc_info = sys.exc_info()
self.error(
msg, self.motor.name, event_type.name, exception_str(*exc_info[:2])
)
self.debug("Details", exc_info=exc_info)
def _on_twod_changed(self, event_source, event_type, event_value):
# during server startup and shutdown avoid processing element
# creation events
if SardanaServer.server_state != State.Running:
return
timestamp = time.time()
name = event_type.name.lower()
name = name.replace("_", "") # for integration_time events
try:
attr = self.get_attribute_by_name(name)
except DevFailed:
return
quality = AttrQuality.ATTR_VALID
priority = event_type.priority
value, w_value, error = None, None, None
if name == "state":
value = self.calculate_tango_state(event_value)
elif name == "status":
value = self.calculate_tango_status(event_value)
elif name == "valuebuffer":
value = self._encode_value_chunk(event_value)
self._first_read_cache = True
elif name == "valuerefbuffer":
value = self._encode_value_ref_chunk(event_value)
self._first_read_ref_cache = True
else:
if isinstance(event_value, SardanaAttribute):
if event_value.error:
error = Except.to_dev_failed(*event_value.exc_info)
else:
value = event_value.value
timestamp = event_value.timestamp
else:
value = event_value
if (
name in ("timer", "valuereftemplate", "valuerefenabled")
and value is None
):
value = "None"
elif name == "datasource" and value is None:
full_name = self.get_full_name()
# for Taurus 3/4 compatibility
if not full_name.startswith("tango://"):
full_name = "tango://{0}".format(full_name)
value = "{0}/value".format(full_name)
elif name == "value":
state = self.twod.get_state()
if state == State.Moving:
quality = AttrQuality.ATTR_CHANGING
self.set_attribute(
attr,
value=value,
w_value=w_value,
timestamp=timestamp,
quality=quality,
priority=priority,
error=error,
synch=False,
)
[docs]
def always_executed_hook(self):
# state = to_tango_state(self.twod.get_state(cache=False))
pass
def read_attr_hardware(self, data):
pass
[docs]
def get_dynamic_attributes(self):
cache_built = hasattr(self, "_dynamic_attributes_cache")
std_attrs, dyn_attrs = PoolTimerableDevice.get_dynamic_attributes(self)
if not cache_built:
# For value attribute, listen to what the controller says for data
# type (between long and float) and length
value = std_attrs.get("value")
if value is not None:
_, data_info, attr_info = value
ttype, _ = to_tango_type_format(attr_info.dtype)
data_info[0][0] = ttype
shape = attr_info.maxdimsize
data_info[0][3] = shape[0]
data_info[0][4] = shape[1]
return std_attrs, dyn_attrs
[docs]
def initialize_dynamic_attributes(self):
attrs = PoolTimerableDevice.initialize_dynamic_attributes(self)
# referable channels
# TODO: not 100% sure if valuereftemplate and valuerefenabled should
# not belong to detect_evts
non_detect_evts = (
"valuebuffer",
"valueref",
"valuerefbuffer",
"valuereftemplate",
"valuerefenabled",
)
for attr_name in non_detect_evts:
if attr_name in attrs:
self.set_change_event(attr_name, True, False)
[docs]
def read_Value(self, attr):
twod = self.twod
# TODO: decide if we force the controller developers to store the
# last acquired value in the controllers or we always will use
# cache. This is due to the fact that the clients (MS) read the value
# after the acquisition had finished.
use_cache = twod.is_in_operation() and not self.Force_HW_Read
# For the moment we just check if we recently receive ValueBuffer.
# event. In this case, we use cache and clean the flag
# so the cached value will be returned only at the first readout
# after the acquisition. This is a workaround for the count executed
# by the MacroServer e.g. step scans or ct which read the value after
# the acquisition.
if not use_cache and self._first_read_cache:
use_cache = True
self._first_read_cache = False
value = twod.get_value(cache=use_cache, propagate=0)
if value.error:
Except.throw_python_exception(*value.exc_info)
state = twod.get_state(cache=use_cache, propagate=0)
quality = None
if state == State.Moving:
quality = AttrQuality.ATTR_CHANGING
self.set_attribute(
attr,
value=value.value,
quality=quality,
timestamp=value.timestamp,
priority=0,
)
[docs]
def is_Value_allowed(self, req_type):
if self.get_state() in [DevState.FAULT, DevState.UNKNOWN]:
return False
return True
[docs]
def read_ValueRef(self, attr):
twod = self.twod
value_ref = twod.get_value_ref()
if value_ref.error:
Except.throw_python_exception(*value_ref.exc_info)
use_cache = twod.is_in_operation() and not self.Force_HW_Read
state = twod.get_state(cache=use_cache, propagate=0)
quality = None
if state == State.Moving:
quality = AttrQuality.ATTR_CHANGING
self.set_attribute(
attr,
value=value_ref.value,
quality=quality,
timestamp=value_ref.timestamp,
priority=0,
)
[docs]
def read_ValueRefPattern(self, attr):
value_ref_pattern = self.twod.get_value_ref_pattern()
if value_ref_pattern is None:
value_ref_pattern = "None"
attr.set_value(value_ref_pattern)
[docs]
@memorize_write_attribute
def write_ValueRefPattern(self, attr):
value_ref_pattern = attr.get_write_value()
if value_ref_pattern == "None":
value_ref_pattern = None
self.twod.value_ref_pattern = value_ref_pattern
[docs]
def read_ValueRefEnabled(self, attr):
value_ref_enabled = self.twod.is_value_ref_enabled()
if value_ref_enabled is None:
raise Exception("value reference enabled flag is unknown")
attr.set_value(value_ref_enabled)
[docs]
@memorize_write_attribute
def write_ValueRefEnabled(self, attr):
value_ref_enabled = attr.get_write_value()
if value_ref_enabled == "None":
value_ref_enabled = None
self.twod.value_ref_enabled = value_ref_enabled
[docs]
def read_DataSource(self, attr):
data_source = self.twod.get_data_source()
if data_source is None:
full_name = self.get_full_name()
# for Taurus 3/4 compatibility
if not full_name.startswith("tango://"):
full_name = "tango://{0}".format(full_name)
data_source = "{0}/value".format(full_name)
attr.set_value(data_source)
[docs]
def Start(self):
self.twod.start_acquisition()
_DFT_VALUE_INFO = TwoDController.standard_axis_attributes["Value"]
_DFT_VALUE_MAX_SHAPE = _DFT_VALUE_INFO[MaxDimSize]
_DFT_VALUE_TYPE, _DFT_VALUE_FORMAT = to_tango_type_format(
_DFT_VALUE_INFO[Type], DataFormat.TwoD
)
[docs]
class TwoDExpChannelClass(PoolTimerableDeviceClass):
# Class Properties
class_property_list = {}
# Device Properties
device_property_list = {}
device_property_list.update(PoolTimerableDeviceClass.device_property_list)
# Command definitions
cmd_list = {
"Start": [[DevVoid, ""], [DevVoid, ""]],
}
cmd_list.update(PoolTimerableDeviceClass.cmd_list)
# Attribute definitions
attr_list = {
"DataSource": [[DevString, SCALAR, READ]],
}
attr_list.update(PoolTimerableDeviceClass.attr_list)
standard_attr_list = {
"Value": [
[
_DFT_VALUE_TYPE,
_DFT_VALUE_FORMAT,
READ,
_DFT_VALUE_MAX_SHAPE[0],
_DFT_VALUE_MAX_SHAPE[1],
],
{
"abs_change": "1.0",
},
],
# TODO: here we override type by DevEncoded
"ValueRefBuffer": [[DevEncoded, SCALAR, READ]],
}
standard_attr_list.update(PoolTimerableDeviceClass.standard_attr_list)
def _get_class_properties(self):
ret = PoolTimerableDeviceClass._get_class_properties(self)
ret["Description"] = "2D device class"
ret["InheritedFrom"].insert(0, "PoolTimerableDevice")
return ret