#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""The sardana tango measurement group module"""
__all__ = ["MeasurementGroup", "MeasurementGroupClass"]
__docformat__ = "restructuredtext"
import sys
import time
from PyTango import (
READ,
READ_WRITE,
SCALAR,
AttrQuality,
DevDouble,
DevLong,
DevState,
DevString,
DevVoid,
DispLevel,
Except,
Util,
)
from taurus.core.util.codecs import CodecFactory
from taurus.core.util.log import DebugIt
from sardana import SardanaServer, State
from sardana.pool import AcqMode
from sardana.pool.pooldefs import SynchDomain
from sardana.sardanaattribute import SardanaAttribute
from sardana.tango.core.util import exception_str
from sardana.tango.pool.PoolDevice import PoolGroupDevice, PoolGroupDeviceClass
def _decode_configuration(configuration):
return CodecFactory().decode(("json", configuration))
def _decode_moveable(moveable):
if moveable == "None":
moveable = None
return moveable
def _decode_software_synchronizer_initial_domain(domain):
try:
return SynchDomain[domain]
except KeyError:
raise Exception("Invalid domain (can be either Position or Time)")
[docs]
class MeasurementGroup(PoolGroupDevice):
def __init__(self, dclass, name):
PoolGroupDevice.__init__(self, dclass, name)
[docs]
def init(self, name):
PoolGroupDevice.init(self, name)
[docs]
def get_measurement_group(self):
return self.element
[docs]
def set_measurement_group(self, measurement_group):
self.element = measurement_group
measurement_group = property(get_measurement_group, set_measurement_group)
[docs]
@DebugIt()
def delete_device(self):
PoolGroupDevice.delete_device(self)
mg = self.measurement_group
if mg is not None:
mg.remove_listener(self.on_measurement_group_changed)
self.measurement_group = None
[docs]
@DebugIt()
def init_device(self):
PoolGroupDevice.init_device(self)
# state and status are already set by the super class
detect_evts = (
"moveable",
"synchdescription",
"softwaresynchronizerinitialdomain",
)
# TODO: nbstarts could be moved to detect events with
# abs_change criteria of 1, but be careful with
# tango-controls/pytango#302
non_detect_evts = (
"configuration",
"integrationtime",
"monitorcount",
"acquisitionmode",
"elementlist",
"latencytime",
"nbstarts",
)
self.set_change_events(detect_evts, non_detect_evts)
self.Elements = list(self.Elements)
for i in range(len(self.Elements)):
try:
self.Elements[i] = int(self.Elements[i])
except Exception:
pass
mg = self.measurement_group
if mg is None:
full_name = self.get_full_name()
name = self.alias or full_name
svr_running = SardanaServer.server_state == State.Running
self.measurement_group = mg = self.pool.create_measurement_group(
name=name,
full_name=full_name,
id=self.get_id(),
user_elements=self.Elements,
svr_running=svr_running,
)
mg.add_listener(self.on_measurement_group_changed)
# force a state read to initialize the state attribute
# state = self.measurement_group.state
self.set_state(DevState.ON)
[docs]
def initialize_attribute_values(self):
"""Initialize attribute values."""
memorized_values = self.get_memorized_values()
configuration = memorized_values.get("Configuration", None)
if configuration is not None:
configuration = _decode_configuration(configuration)
memorized_values["Configuration"] = configuration
moveable = memorized_values.get("Moveable", None)
if moveable is not None:
moveable = _decode_moveable(moveable)
memorized_values["Moveable"] = moveable
domain = memorized_values.get("SoftwareSynchronizerInitialDomain", None)
if domain is not None:
domain = _decode_software_synchronizer_initial_domain(domain)
memorized_values["SoftwareSynchronizerInitialDomain"] = domain
self.measurement_group.init_attribute_values(memorized_values)
if configuration is not None:
self._update_elements_property()
[docs]
def on_measurement_group_changed(self, event_source, event_type, event_value):
try:
self._on_measurement_group_changed(event_source, event_type, event_value)
except Exception:
msg = 'Error occured "on_measurement_group_changed(%s.%s): %s"'
exc_info = sys.exc_info()
self.error(
msg,
self.measurement_group.name,
event_type.name,
exception_str(*exc_info[:2]),
)
self.debug("Details", exc_info=exc_info)
def _on_measurement_group_changed(self, event_source, event_type, event_value):
# during server startup and shutdown avoid processing element
# creation events
if SardanaServer.server_state != State.Running:
return
timestamp = time.time()
name = event_type.name
name = name.replace("_", "")
multi_attr = self.get_device_attr()
attr = multi_attr.get_attr_by_name(name)
quality = AttrQuality.ATTR_VALID
priority = event_type.priority
error = None
if name == "state":
event_value = self.calculate_tango_state(event_value)
elif name == "status":
event_value = self.calculate_tango_status(event_value)
elif name == "acquisitionmode":
event_value = AcqMode.whatis(event_value)
elif name == "configuration":
cfg = self.measurement_group.get_user_configuration()
codec = CodecFactory().getCodec("json")
_, event_value = codec.encode(("", cfg))
elif name == "synchdescription":
codec = CodecFactory().getCodec("json")
_, event_value = codec.encode(("", event_value))
elif name == "integrationtime" and event_value is None:
event_value = float("NaN")
elif name == "moveable" and event_value is None:
event_value = "None"
else:
if isinstance(event_value, SardanaAttribute):
if event_value.error:
error = Except.to_dev_failed(*event_value.exc_info)
timestamp = event_value.timestamp
event_value = event_value.value
self.set_attribute(
attr,
value=event_value,
timestamp=timestamp,
quality=quality,
priority=priority,
error=error,
synch=False,
)
[docs]
def always_executed_hook(self):
pass
# state = to_tango_state(self.motor_group.get_state(cache=False))
def read_attr_hardware(self, data):
pass
[docs]
def read_IntegrationTime(self, attr):
it = self.measurement_group.integration_time
if it is None:
it = float("nan")
attr.set_value(it)
[docs]
def write_IntegrationTime(self, attr):
self.measurement_group.integration_time = attr.get_write_value()
[docs]
def read_MonitorCount(self, attr):
it = self.measurement_group.monitor_count
if it is None:
it = 0
attr.set_value(it)
[docs]
def write_MonitorCount(self, attr):
self.measurement_group.monitor_count = attr.get_write_value()
[docs]
def read_AcquisitionMode(self, attr):
acq_mode = self.measurement_group.acquisition_mode
acq_mode_str = AcqMode.whatis(acq_mode)
attr.set_value(acq_mode_str)
[docs]
def write_AcquisitionMode(self, attr):
acq_mode_str = attr.get_write_value()
try:
acq_mode = AcqMode.lookup[acq_mode_str]
except KeyError:
raise Exception(
"Invalid acquisition mode. Must be one of "
+ ", ".join(list(AcqMode.keys()))
)
self.measurement_group.acquisition_mode = acq_mode
[docs]
def read_Configuration(self, attr):
cfg = self.measurement_group.get_user_configuration()
codec = CodecFactory().getCodec("json")
data = codec.encode(("", cfg))
attr.set_value(data[1])
def _update_elements_property(self):
util = Util.instance()
db = util.get_database()
elem_ids = self.measurement_group.user_element_ids
data = {"elements": elem_ids}
db.put_device_property(self.get_name(), data)
[docs]
def write_Configuration(self, attr):
data = attr.get_write_value()
cfg = _decode_configuration(data)
self.measurement_group.set_configuration_from_user(cfg)
self._update_elements_property()
[docs]
def read_NbStarts(self, attr):
nb_starts = self.measurement_group.nb_starts
if nb_starts is None:
nb_starts = int("nan")
attr.set_value(nb_starts)
[docs]
def write_NbStarts(self, attr):
self.measurement_group.nb_starts = attr.get_write_value()
[docs]
def read_Moveable(self, attr):
moveable = self.measurement_group.moveable
if moveable is None:
moveable = "None"
attr.set_value(moveable)
[docs]
def write_Moveable(self, attr):
moveable = attr.get_write_value()
moveable = _decode_moveable(moveable)
self.measurement_group.moveable = moveable
[docs]
def read_SynchDescription(self, attr):
synch_description = self.measurement_group.synch_description
codec = CodecFactory().getCodec("json")
data = codec.encode(("", synch_description))
attr.set_value(data[1])
[docs]
def write_SynchDescription(self, attr):
synch_description_json = attr.get_write_value()
synch_description = CodecFactory().decode(("json", synch_description_json))
self.measurement_group.synch_description = synch_description
[docs]
def read_LatencyTime(self, attr):
latency_time = self.measurement_group.latency_time
attr.set_value(latency_time)
[docs]
def read_SoftwareSynchronizerInitialDomain(self, attr):
domain = self.measurement_group.software_synchronizer_initial_domain
d = SynchDomain(domain).name
attr.set_value(d)
[docs]
def write_SoftwareSynchronizerInitialDomain(self, attr):
data = attr.get_write_value()
domain = _decode_software_synchronizer_initial_domain(data)
self.measurement_group.software_synchronizer_initial_domain = domain
[docs]
def Prepare(self):
self.measurement_group.prepare()
[docs]
def Start(self):
try:
self.wait_for_operation()
except Exception:
raise Exception("Cannot acquire: already involved in an operation")
self.measurement_group.start_acquisition()
[docs]
def Stop(self):
self.measurement_group.stop()
[docs]
class MeasurementGroupClass(PoolGroupDeviceClass):
# Class Properties
class_property_list = {}
# Device Properties
device_property_list = {}
device_property_list.update(PoolGroupDeviceClass.device_property_list)
# Command definitions
cmd_list = {
"Prepare": [[DevVoid, ""], [DevVoid, ""]],
"Start": [[DevVoid, ""], [DevVoid, ""]],
}
cmd_list.update(PoolGroupDeviceClass.cmd_list)
# Attribute definitions
attr_list = {
"IntegrationTime": [
[DevDouble, SCALAR, READ_WRITE],
{"Memorized": "false", "Display level": DispLevel.OPERATOR},
],
"MonitorCount": [
[DevLong, SCALAR, READ_WRITE],
{"Memorized": "false", "Display level": DispLevel.OPERATOR},
],
"AcquisitionMode": [
[DevString, SCALAR, READ_WRITE],
{
"Memorized": "true_without_hard_applied",
"Display level": DispLevel.OPERATOR,
},
],
"Configuration": [
[DevString, SCALAR, READ_WRITE],
{
"Memorized": "true_without_hard_applied",
"Display level": DispLevel.EXPERT,
},
],
"NbStarts": [
[DevLong, SCALAR, READ_WRITE],
{"Memorized": "false", "Display level": DispLevel.OPERATOR},
],
# TODO: Does it have sense to memorize Moveable?
"Moveable": [
[DevString, SCALAR, READ_WRITE],
{
"Memorized": "true_without_hard_applied",
"Display level": DispLevel.EXPERT,
},
],
"SynchDescription": [
[DevString, SCALAR, READ_WRITE],
{"Memorized": "false", "Display level": DispLevel.EXPERT},
],
"LatencyTime": [[DevDouble, SCALAR, READ], {"Display level": DispLevel.EXPERT}],
"SoftwareSynchronizerInitialDomain": [
[DevString, SCALAR, READ_WRITE],
{
"Memorized": "true_without_hard_applied",
"Display level": DispLevel.OPERATOR,
},
],
}
attr_list.update(PoolGroupDeviceClass.attr_list)
def _get_class_properties(self):
ret = PoolGroupDeviceClass._get_class_properties(self)
ret["Description"] = "Measurement group device class"
ret["InheritedFrom"].insert(0, "PoolGroupDevice")
return ret