Source code for sardana.tango.macroserver.test.macroexecutor

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

import atexit
import copy
import threading
import time

import PyTango
from taurus.core.util.codecs import CodecFactory

from sardana import sardanacustomsettings
from sardana.macroserver.macros.test import BaseMacroExecutor


[docs] class TangoAttrCb(object): """An abstract callback class for Tango events""" def __init__(self, tango_macro_executor): self._tango_macro_executor = tango_macro_executor
[docs] class TangoResultCb(TangoAttrCb): """Callback class for Tango events of the Result attribute"""
[docs] def push_event(self, *args, **kwargs): """callback method receiving the event""" event_data = args[0] if event_data.err: result = event_data.errors else: result = event_data.attr_value.value self._tango_macro_executor._result = result
[docs] class TangoLogCb(TangoAttrCb): """Callback class for Tango events of the log attributes e.g. Output, Error, Critical """ def __init__(self, tango_macro_executor, log_name): self._tango_macro_executor = tango_macro_executor self._log_name = log_name
[docs] def push_event(self, *args, **kwargs): """callback method receiving the event""" event_data = args[0] if event_data.attr_value: log = event_data.attr_value.value log_buffer_name = "_%s" % self._log_name log_buffer = getattr(self._tango_macro_executor, log_buffer_name) log_buffer.append(log) common_buffer = self._tango_macro_executor._common if common_buffer is not None: common_buffer.append(log)
[docs] class TangoStatusCb(TangoAttrCb): """Callback class for Tango events of the MacroStatus attribute""" START_STATES = ["start"] DONE_STATES = ["finish", "stop", "exception"]
[docs] def push_event(self, *args, **kwargs): """callback method receiving the event""" event_data = args[0] if event_data.err: self._state_buffer = event_data.errors self._tango_macro_executor._done_event.set() attr_value = getattr(event_data, "attr_value") if attr_value is None: return v = attr_value.value if not len(v[1]): return fmt = v[0] codec = CodecFactory().getCodec(fmt) fmt, data = codec.decode(v) for macro_status in data: state = macro_status["state"] exc_stack = macro_status.get("exc_stack") if state == "exception": exception_str = "" for line in exc_stack: exception_str += line self._tango_macro_executor._exception = exception_str if state in self.START_STATES: # print 'TangoStatusCb.push_event: setting _started_event' self._tango_macro_executor._started_event.set() elif state in self.DONE_STATES: # print 'TangoStatusCb.push_event: setting _done_event %s' # %(state) self._tango_macro_executor._done_event.set() # else: # print 'State %s' %(state) self._tango_macro_executor._state_buffer.append(state)
[docs] class TangoMacroExecutor(BaseMacroExecutor): """ Macro executor implemented using Tango communication with the Door device """ _api_util_cleanup_registered = False def __init__(self, door_name=None): super(TangoMacroExecutor, self).__init__() if door_name is None: door_name = getattr(sardanacustomsettings, "UNITTEST_DOOR_NAME") self._door = PyTango.DeviceProxy(door_name) self._done_event = None self._started_event = None if not TangoMacroExecutor._api_util_cleanup_registered: # remove whenever PyTango#390 gets fixed atexit.register(PyTango.ApiUtil.cleanup) TangoMacroExecutor._api_util_cleanup_registered = True def _clean(self): """Recreates threading Events in case the macro executor is reused.""" super(TangoMacroExecutor, self)._clean() self._started_event = threading.Event() self._done_event = threading.Event() def _run(self, macro_name, macro_params): """reimplemented from :class:`BaseMacroExecutor`""" # preaparing RunMacro command input arguments argin = copy.copy(macro_params) argin.insert(0, macro_name) # registering for MacroStatus events status_cb = TangoStatusCb(self) self._status_id = self._door.subscribe_event( "macrostatus", PyTango.EventType.CHANGE_EVENT, status_cb ) # executing RunMacro command self._door.RunMacro(argin) def _wait(self, timeout): """reimplemented from :class:`BaseMacroExecutor`""" # TODO: In case of timeout = inf if the macro excecutor run a macro # with wrong parameters it'll never awake of the done_event wait # Pending to remove this comment when Sardana resolves the bug. if self._done_event: self._done_event.wait(timeout) # add an extra sleep time for the following cases: # - macro executing another macro - internal macro may finish but # the external may still not # - in case of stopping a macro, the events are emitted: first # finish and then stop # TODO: base the wait condition on the events from door state # attribute time.sleep(3) self._door.unsubscribe_event(self._status_id) def _stop(self, started_event_timeout=3.0): """reimplemented from :class:`BaseMacroExecutor`""" self._started_event.wait(started_event_timeout) try: self._door.StopMacro() except PyTango.DevFailed as e: raise Exception("Unable to Stop macro: %s" % e) def _registerLog(self, log_level): """reimplemented from :class:`BaseMacroExecutor`""" log_cb = TangoLogCb(self, log_level) id_log_name = "_%s_id" % log_level id_log = self._door.subscribe_event( log_level, PyTango.EventType.CHANGE_EVENT, log_cb ) setattr(self, id_log_name, id_log) def _unregisterLog(self, log_level): """reimplemented from :class:`BaseMacroExecutor`""" id_log_name = "_%s_id" % log_level id_log = getattr(self, id_log_name) self._door.unsubscribe_event(id_log) def _registerResult(self): """reimplemented from :class:`BaseMacroExecutor`""" result_cb = TangoResultCb(self) self._result_id = self._door.subscribe_event( "result", PyTango.EventType.CHANGE_EVENT, result_cb ) def _unregisterResult(self): """reimplemented from :class:`BaseMacroExecutor`""" self._door.unsubscribe_event(self._result_id)
[docs] def getData(self): """Returns the data object for the last executed macro :return: (obj) """ data = self._door.RecordData return CodecFactory().decode(data)