Source code for sardana.tango.pool.MeasurementGroup

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""The sardana tango measurement group module"""

__all__ = ["MeasurementGroup", "MeasurementGroupClass"]

__docformat__ = "restructuredtext"

import sys
import time

from PyTango import (
    READ,
    READ_WRITE,
    SCALAR,
    AttrQuality,
    DevDouble,
    DevLong,
    DevState,
    DevString,
    DevVoid,
    DispLevel,
    Except,
    Util,
)
from taurus.core.util.codecs import CodecFactory
from taurus.core.util.log import DebugIt

from sardana import SardanaServer, State
from sardana.pool import AcqMode
from sardana.pool.pooldefs import SynchDomain
from sardana.sardanaattribute import SardanaAttribute
from sardana.tango.core.util import exception_str
from sardana.tango.pool.PoolDevice import PoolGroupDevice, PoolGroupDeviceClass


def _decode_configuration(configuration):
    return CodecFactory().decode(("json", configuration))


def _decode_moveable(moveable):
    if moveable == "None":
        moveable = None
    return moveable


def _decode_software_synchronizer_initial_domain(domain):
    try:
        return SynchDomain[domain]
    except KeyError:
        raise Exception("Invalid domain (can be either Position or Time)")


[docs] class MeasurementGroup(PoolGroupDevice): def __init__(self, dclass, name): PoolGroupDevice.__init__(self, dclass, name)
[docs] def init(self, name): PoolGroupDevice.init(self, name)
[docs] def get_measurement_group(self): return self.element
[docs] def set_measurement_group(self, measurement_group): self.element = measurement_group
measurement_group = property(get_measurement_group, set_measurement_group)
[docs] @DebugIt() def delete_device(self): PoolGroupDevice.delete_device(self) mg = self.measurement_group if mg is not None: mg.remove_listener(self.on_measurement_group_changed) self.measurement_group = None
[docs] @DebugIt() def init_device(self): PoolGroupDevice.init_device(self) # state and status are already set by the super class detect_evts = ( "moveable", "synchdescription", "softwaresynchronizerinitialdomain", ) # TODO: nbstarts could be moved to detect events with # abs_change criteria of 1, but be careful with # tango-controls/pytango#302 non_detect_evts = ( "configuration", "integrationtime", "monitorcount", "acquisitionmode", "elementlist", "latencytime", "nbstarts", ) self.set_change_events(detect_evts, non_detect_evts) self.Elements = list(self.Elements) for i in range(len(self.Elements)): try: self.Elements[i] = int(self.Elements[i]) except Exception: pass mg = self.measurement_group if mg is None: full_name = self.get_full_name() name = self.alias or full_name svr_running = SardanaServer.server_state == State.Running self.measurement_group = mg = self.pool.create_measurement_group( name=name, full_name=full_name, id=self.get_id(), user_elements=self.Elements, svr_running=svr_running, ) mg.add_listener(self.on_measurement_group_changed) # force a state read to initialize the state attribute # state = self.measurement_group.state self.set_state(DevState.ON)
[docs] def initialize_attribute_values(self): """Initialize attribute values.""" memorized_values = self.get_memorized_values() configuration = memorized_values.get("Configuration", None) if configuration is not None: configuration = _decode_configuration(configuration) memorized_values["Configuration"] = configuration moveable = memorized_values.get("Moveable", None) if moveable is not None: moveable = _decode_moveable(moveable) memorized_values["Moveable"] = moveable domain = memorized_values.get("SoftwareSynchronizerInitialDomain", None) if domain is not None: domain = _decode_software_synchronizer_initial_domain(domain) memorized_values["SoftwareSynchronizerInitialDomain"] = domain self.measurement_group.init_attribute_values(memorized_values) if configuration is not None: self._update_elements_property()
[docs] def on_measurement_group_changed(self, event_source, event_type, event_value): try: self._on_measurement_group_changed(event_source, event_type, event_value) except Exception: msg = 'Error occured "on_measurement_group_changed(%s.%s): %s"' exc_info = sys.exc_info() self.error( msg, self.measurement_group.name, event_type.name, exception_str(*exc_info[:2]), ) self.debug("Details", exc_info=exc_info)
def _on_measurement_group_changed(self, event_source, event_type, event_value): # during server startup and shutdown avoid processing element # creation events if SardanaServer.server_state != State.Running: return timestamp = time.time() name = event_type.name name = name.replace("_", "") multi_attr = self.get_device_attr() attr = multi_attr.get_attr_by_name(name) quality = AttrQuality.ATTR_VALID priority = event_type.priority error = None if name == "state": event_value = self.calculate_tango_state(event_value) elif name == "status": event_value = self.calculate_tango_status(event_value) elif name == "acquisitionmode": event_value = AcqMode.whatis(event_value) elif name == "configuration": cfg = self.measurement_group.get_user_configuration() codec = CodecFactory().getCodec("json") _, event_value = codec.encode(("", cfg)) elif name == "synchdescription": codec = CodecFactory().getCodec("json") _, event_value = codec.encode(("", event_value)) elif name == "integrationtime" and event_value is None: event_value = float("NaN") elif name == "moveable" and event_value is None: event_value = "None" else: if isinstance(event_value, SardanaAttribute): if event_value.error: error = Except.to_dev_failed(*event_value.exc_info) timestamp = event_value.timestamp event_value = event_value.value self.set_attribute( attr, value=event_value, timestamp=timestamp, quality=quality, priority=priority, error=error, synch=False, )
[docs] def always_executed_hook(self): pass
# state = to_tango_state(self.motor_group.get_state(cache=False)) def read_attr_hardware(self, data): pass
[docs] def read_IntegrationTime(self, attr): it = self.measurement_group.integration_time if it is None: it = float("nan") attr.set_value(it)
[docs] def write_IntegrationTime(self, attr): self.measurement_group.integration_time = attr.get_write_value()
[docs] def read_MonitorCount(self, attr): it = self.measurement_group.monitor_count if it is None: it = 0 attr.set_value(it)
[docs] def write_MonitorCount(self, attr): self.measurement_group.monitor_count = attr.get_write_value()
[docs] def read_AcquisitionMode(self, attr): acq_mode = self.measurement_group.acquisition_mode acq_mode_str = AcqMode.whatis(acq_mode) attr.set_value(acq_mode_str)
[docs] def write_AcquisitionMode(self, attr): acq_mode_str = attr.get_write_value() try: acq_mode = AcqMode.lookup[acq_mode_str] except KeyError: raise Exception( "Invalid acquisition mode. Must be one of " + ", ".join(list(AcqMode.keys())) ) self.measurement_group.acquisition_mode = acq_mode
[docs] def read_Configuration(self, attr): cfg = self.measurement_group.get_user_configuration() codec = CodecFactory().getCodec("json") data = codec.encode(("", cfg)) attr.set_value(data[1])
def _update_elements_property(self): util = Util.instance() db = util.get_database() elem_ids = self.measurement_group.user_element_ids data = {"elements": elem_ids} db.put_device_property(self.get_name(), data)
[docs] def write_Configuration(self, attr): data = attr.get_write_value() cfg = _decode_configuration(data) self.measurement_group.set_configuration_from_user(cfg) self._update_elements_property()
[docs] def read_NbStarts(self, attr): nb_starts = self.measurement_group.nb_starts if nb_starts is None: nb_starts = int("nan") attr.set_value(nb_starts)
[docs] def write_NbStarts(self, attr): self.measurement_group.nb_starts = attr.get_write_value()
[docs] def read_Moveable(self, attr): moveable = self.measurement_group.moveable if moveable is None: moveable = "None" attr.set_value(moveable)
[docs] def write_Moveable(self, attr): moveable = attr.get_write_value() moveable = _decode_moveable(moveable) self.measurement_group.moveable = moveable
[docs] def read_SynchDescription(self, attr): synch_description = self.measurement_group.synch_description codec = CodecFactory().getCodec("json") data = codec.encode(("", synch_description)) attr.set_value(data[1])
[docs] def write_SynchDescription(self, attr): synch_description_json = attr.get_write_value() synch_description = CodecFactory().decode(("json", synch_description_json)) self.measurement_group.synch_description = synch_description
[docs] def read_LatencyTime(self, attr): latency_time = self.measurement_group.latency_time attr.set_value(latency_time)
[docs] def read_SoftwareSynchronizerInitialDomain(self, attr): domain = self.measurement_group.software_synchronizer_initial_domain d = SynchDomain(domain).name attr.set_value(d)
[docs] def write_SoftwareSynchronizerInitialDomain(self, attr): data = attr.get_write_value() domain = _decode_software_synchronizer_initial_domain(data) self.measurement_group.software_synchronizer_initial_domain = domain
[docs] def Prepare(self): self.measurement_group.prepare()
[docs] def Start(self): try: self.wait_for_operation() except Exception: raise Exception("Cannot acquire: already involved in an operation") self.measurement_group.start_acquisition()
[docs] def Stop(self): self.measurement_group.stop()
[docs] class MeasurementGroupClass(PoolGroupDeviceClass): # Class Properties class_property_list = {} # Device Properties device_property_list = {} device_property_list.update(PoolGroupDeviceClass.device_property_list) # Command definitions cmd_list = { "Prepare": [[DevVoid, ""], [DevVoid, ""]], "Start": [[DevVoid, ""], [DevVoid, ""]], } cmd_list.update(PoolGroupDeviceClass.cmd_list) # Attribute definitions attr_list = { "IntegrationTime": [ [DevDouble, SCALAR, READ_WRITE], {"Memorized": "false", "Display level": DispLevel.OPERATOR}, ], "MonitorCount": [ [DevLong, SCALAR, READ_WRITE], {"Memorized": "false", "Display level": DispLevel.OPERATOR}, ], "AcquisitionMode": [ [DevString, SCALAR, READ_WRITE], { "Memorized": "true_without_hard_applied", "Display level": DispLevel.OPERATOR, }, ], "Configuration": [ [DevString, SCALAR, READ_WRITE], { "Memorized": "true_without_hard_applied", "Display level": DispLevel.EXPERT, }, ], "NbStarts": [ [DevLong, SCALAR, READ_WRITE], {"Memorized": "false", "Display level": DispLevel.OPERATOR}, ], # TODO: Does it have sense to memorize Moveable? "Moveable": [ [DevString, SCALAR, READ_WRITE], { "Memorized": "true_without_hard_applied", "Display level": DispLevel.EXPERT, }, ], "SynchDescription": [ [DevString, SCALAR, READ_WRITE], {"Memorized": "false", "Display level": DispLevel.EXPERT}, ], "LatencyTime": [[DevDouble, SCALAR, READ], {"Display level": DispLevel.EXPERT}], "SoftwareSynchronizerInitialDomain": [ [DevString, SCALAR, READ_WRITE], { "Memorized": "true_without_hard_applied", "Display level": DispLevel.OPERATOR, }, ], } attr_list.update(PoolGroupDeviceClass.attr_list) def _get_class_properties(self): ret = PoolGroupDeviceClass._get_class_properties(self) ret["Description"] = "Measurement group device class" ret["InheritedFrom"].insert(0, "PoolGroupDevice") return ret