Source code for sardana.macroserver.macros.env

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""Environment related macros"""

__all__ = [
    "dumpenv",
    "load_env",
    "lsenv",
    "senv",
    "usenv",
    "genv",
    "lsvo",
    "setvo",
    "usetvo",
    "lsgh",
    "defgh",
    "udefgh",
]

__docformat__ = "restructuredtext"

import pprint

##########################################################################
#
# Environment related macros
#
##########################################################################
from lxml import etree
from taurus.console.list import List
from taurus.core.tango.tangovalidator import TangoDeviceNameValidator

from sardana.macroserver.macro import Macro, Optional, Type, iMacro
from sardana.macroserver.msexception import UnknownEnv


def reprValue(v, max=74):
    # cut long strings
    v = str(v)
    if len(v) > max:
        v = v[:max] + " [...]"
    return v


[docs] class dumpenv(Macro): """Dumps the complete environment"""
[docs] def run(self): env = self.getGlobalEnv() out = List(["Name", "Value", "Type"]) for k, v in env.items(): str_v = reprValue(v) type_v = type(v).__name__ out.appendRow([str(k), str_v, type_v]) for line in out.genOutput(): self.output(line)
[docs] class lsvo(Macro): """Lists the view options"""
[docs] def run(self): vo = self.getViewOptions() out = List(["View option", "Value"]) for key, value in list(vo.items()): out.appendRow([key, str(value)]) for line in out.genOutput(): self.output(line)
[docs] class setvo(Macro): """Sets the given view option to the given value. Available view options: - **ShowDial**: used by macro wm, pwm and wa. Default value ``False`` - **ShowCtrlAxis**: used by macro wm, pwm and wa. Default value ``False`` - **PosFormat**: used by macro wm, pwm, wa and umv. Default value ``-1`` - **OutputBlock**: used by scan macros. Default value ``False`` - **OutputDecimation**: used by scan macros. If ``True``, scan output is decimated with a fixed limit of 100 emitted points per second. Default value ``False``. - **DescriptionLength**: used by lsdef. Default value ``60`` """ param_def = [ ["name", Type.String, None, "View option name"], ["value", Type.String, None, "View option value"], ]
[docs] def run(self, name, value): try: value = eval(value) except Exception: pass self.setViewOption(name, value)
[docs] class usetvo(Macro): """Resets the value of the given view option. Available view options: - **ShowDial**: used by macro wm, pwm and wa. Default value ``False`` - **ShowCtrlAxis**: used by macro wm, pwm and wa. Default value ``False`` - **PosFormat**: used by macro wm, pwm, wa and umv. Default value ``-1`` - **OutputBlock**: used by scan macros. Default value ``False`` - **OutputDecimation**: used by scan macros. If ``True``, scan output is decimated with a fixed limit of 100 emitted points per second. Default value ``False``. - **DescriptionLength**: used by lsdef. Default value ``60`` """ param_def = [["name", Type.String, None, "View option name"]]
[docs] def run(self, name): self.resetViewOption(name)
[docs] class lsenv(Macro): """Lists the environment in alphabetical order""" param_def = [ [ "macro_list", [["macro", Type.MacroClass, None, "macro name"], {"min": 0}], None, "List of macros to show environment", ], ]
[docs] def prepare(self, macro_list, **opts): self.table_opts = opts
[docs] def run(self, macro_list): # list the environment for the current door if len(macro_list) == 0: # list All the environment for the current door out = List(["Name", "Value", "Type"]) env = self.getAllDoorEnv() names_list = list(env.keys()) names_list.sort(key=str.lower) for k in names_list: str_val = self.reprValue(env[k]) type_name = type(env[k]).__name__ out.appendRow([k, str_val, type_name]) # list the environment for the current door for the given macros else: out = List(["Macro", "Name", "Value", "Type"]) for macro in macro_list: env = self.getEnv(key=None, macro_name=macro.name) names_list = list(env.keys()) names_list.sort(key=str.lower) for k in names_list: str_val = self.reprValue(env[k]) type_name = type(env[k]).__name__ out.appendRow([macro.name, k, str_val, type_name]) for line in out.genOutput(): self.output(line)
def reprValue(self, v, max=54): # cut long strings v = str(v) if len(v) > max: v = "%s [...]" % v[:max] return v
[docs] class senv(Macro): """Sets the given environment variable to the given value""" param_def = [ [ "name", Type.Env, None, "Environment variable name. Can be one of the following:\n" " - <name> - global variable\n" " - <full door name>.<name> - variable value for a specific door\n" " - <macro name>.<name> - variable value for a specific macro\n" " - <full door name>.<macro name>.<name> - variable value for a " "specific macro running on a specific door", ], [ "value_list", [ [ "value", Type.String, None, "environment value item (string value which corresponds" + "to a Python type e.g. int, float, Exception, etc. " + "must be double enclosed in quotes e.g. \"'int'\"", ], {"min": 1}, ], None, "value(s). one item will eval to a single element. More than " "one item will eval to a tuple of elements", ], ]
[docs] def run(self, env, value): if len(value) == 1: value = value[0] else: value = "(%s)" % ", ".join(value) k, v = self.setEnv(env, value) line = "%s = %s" % (k, str(v)) self.output(line)
[docs] class genv(Macro): """Gets the given environment variable""" param_def = [ [ "name", Type.Env, None, "Environment variable name. Can be one of the following:\n" " - <name> - global variable\n" " - <full door name>.<name> - variable value for a specific " "door\n" " - <macro name>.<name> - variable value for a specific" " macro\n" " - <full door name>.<macro name>.<name> - variable value" " for a specific macro running on a specific door", ], ]
[docs] def run(self, var): pars = var.split(".") door_name = None macro_name = None if len(pars) == 1: key = pars[0] elif len(pars) > 1: _, door_name, _ = TangoDeviceNameValidator().getNames(pars[0]) if door_name is None: # first string is a Macro name macro_name = pars[0] if len(pars) == 3: macro_name = pars[1] key = pars[2] else: key = pars[1] env = self.getEnv(key=key, macro_name=macro_name, door_name=door_name) env = pprint.pformat(env) self.output("{} = \n".format(key) + env)
[docs] class usenv(Macro): """Unsets the given environment variable""" param_def = [ [ "environment_list", [["env", Type.Env, None, "Environment variable name"], {"min": 1}], None, "List of environment items to be removed", ], ]
[docs] def run(self, env): self.unsetEnv(env) self.output("Success!")
[docs] class load_env(Macro): """Read environment variables from config_env.xml file"""
[docs] def run(self): doc = etree.parse("config_env.xml") root = doc.getroot() for element in root: if element.find("./name").text == "auto_filter": self.output("Loading auto_filter variables:") filter_max_elem = element.find(".//FilterMax") if filter_max_elem is not None: filter_max = filter_max_elem.text self.setEnv("FilterMax", filter_max) self.output("FilterMax loaded") else: self.output("FilterMax not found") filter_min_elem = element.find(".//FilterMin") if filter_min_elem is not None: filter_min = filter_max_elem.text self.setEnv("FilterMin", filter_min) self.output("FilterMin loaded") else: self.output("FilterMin not found") filter_delta_elem = element.find(".//FilterDelta") if filter_delta_elem is not None: filter_delta = filter_delta_elem.text self.setEnv("FilterDelta", filter_delta) self.output("FilterDelta loaded") else: self.output("FilterDelta not found") filter_signal_elem = element.find(".//FilterSignal") if filter_signal_elem is not None: filter_signal = filter_signal_elem.text self.setEnv("FilterSignal", filter_signal) self.output("FilterSignal loaded") else: self.output("FilterSignal not found") filter_absorber_elem = element.find(".//FilterAbsorber") if filter_absorber_elem is not None: filter_absorber = filter_absorber_elem.text self.setEnv("FilterAbsorber", filter_absorber) self.output("FilterAbsorber loaded") else: self.output("FilterAbsorber not found") auto_filter_elem = element.find(".//AutoFilter") if auto_filter_elem is not None: auto_filter = auto_filter_elem.text self.setEnv("AutoFilter", auto_filter) self.output("AutoFilter loaded") else: self.output("AutoFilter not found") if element.find("./name").text == "auto_beamshutter": self.output("Loading auto_beamshutter variables:") auto_beamshutter_elem = element.find(".//AutoBeamshutter") if auto_beamshutter_elem is not None: auto_beamshutter = auto_beamshutter_elem.text self.setEnv("AutoBeamshutter", auto_beamshutter) self.output("AutoBeamshutter loaded") else: self.output("AutoBeamshutter not found") beamshutter_limit_elem = element.find(".//BeamshutterLimit") if beamshutter_limit_elem is not None: beamshutter_limit = beamshutter_limit_elem.text self.setEnv("BeamshutterLimit", beamshutter_limit) self.output("BeamshutterLimit loaded") else: self.output("BeamshutterLimit not found") beamshutter_signal_elem = element.find(".//BeamshutterSignal") if beamshutter_signal_elem is not None: beamshutter_signal = beamshutter_signal_elem.text self.setEnv("BeamshutterSignal", beamshutter_signal) self.output("BeamshutterSignal loaded") else: self.output("BeamshutterSignal not found") beamshutter_time_elem = element.find(".//BeamshutterTime") if beamshutter_time_elem is not None: beamshutter_time = beamshutter_time_elem.text self.setEnv("BeamshutterTime", beamshutter_time) self.output("BeamshutterTime loaded") else: self.output("BeamshutterTime not found") if element.find("./name").text == "exafs": self.output("Loading exafs variables:") exafs_int_times_elem = element.find(".//ExafsIntTimes") if exafs_int_times_elem is not None: exafs_int_times = exafs_int_times_elem.text self.setEnv("ExafsIntTimes", exafs_int_times) self.output("ExafsIntTimes loaded") else: self.output("ExafsIntTimes not found") exafs_nb_intervals_elem = element.find(".//ExafsNbIntervals") if exafs_nb_intervals_elem is not None: exafs_nb_intervals = exafs_nb_intervals_elem.text self.setEnv("ExafsNbIntervals", exafs_nb_intervals) self.output("ExafsNbIntervals loaded") else: self.output("ExafsNbIntervals not found") exafs_regions_elem = element.find(".//ExafsRegions") if exafs_regions_elem is not None: exafs_regions = exafs_regions_elem.text self.setEnv("ExafsRegions", exafs_regions) self.output("ExafsRegions loaded") else: self.output("ExafsRegions not found") misc_tree = root.find("./miscellaneous") if misc_tree is not None: for parameter in misc_tree: if parameter.tag != "name": self.setEnv(parameter.tag, parameter.text)
[docs] class lsgh(Macro): """List general hooks. .. note:: The `lsgh` macro has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including its removal) may occur if deemed necessary by the core developers. """
[docs] def run(self): try: general_hooks = self.getEnv("_GeneralHooks") except UnknownEnv: self.output("No general hooks") return out = List(["Hook place", "Hook(s)"]) default_dict = {} for hook in general_hooks: name = hook[0] places = hook[1] for place in places: if place not in list(default_dict.keys()): default_dict[place] = [] default_dict[place].append(name) for place in list(default_dict.keys()): place_set = 0 for hook in default_dict[place]: if place_set: out.appendRow(["", hook]) else: out.appendRow([place, hook]) place_set = 1 for line in out.genOutput(): self.output(line)
[docs] class defgh(Macro): """Define general hook: >>> defgh "mv [[mot02 9]]" pre-scan >>> defgh "ct 0.1" pre-scan >>> defgh lsm pre-scan >>> defgh "mv mot03 10" pre-scan >>> defgh "Print 'Hello world'" pre-scan .. note:: The `defgh` macro has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including its removal) may occur if deemed necessary by the core developers. """ param_def = [ [ "macro_name", Type.String, None, ('Macro name with parameters. Ex.: "mv exp_dmy01 10"'), ], [ "hookplace_list", [["place", Type.String, None, "macro name"], {"min": 1}], None, "List of places where the hook has to be executed", ], ]
[docs] def run(self, macro_name, place): self.output( "Hook: %s, hook place: %s has been defined", macro_name, ", ".join(place) ) try: macros_list = self.getEnv("_GeneralHooks") except UnknownEnv: macros_list = [] hook_tuple = (macro_name, place) self.debug(hook_tuple) macros_list.append(hook_tuple) self.setEnv("_GeneralHooks", macros_list) self.debug("General hooks:") self.debug(macros_list)
[docs] class udefgh(iMacro): """Undefine general hook. >>> udefgh - interactive mode, asks for confirmation. Undefines all hooks if the answer is "yes" >>> udefgh all - undefines all hooks >>> udefgh "ct 0.1" - undefines specified hook in all places >>> udefgh lsm pre-scan - undefines specified hook in specified place .. note:: The `udefgh` macro has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including its removal) may occur if deemed necessary by the core developers. """ param_def = [ ["macro_name", Type.String, Optional, "General hook to be undefined"], [ "hook_place", Type.String, Optional, ("Place to undefine the general hook from"), ], ]
[docs] def run(self, macro_name, hook_place): try: gh_macros_list = self.getEnv("_GeneralHooks") except UnknownEnv: return user_confirmation = False if macro_name is None: # ask confirmation from the user ans = self.input( "You are about to undefine all general hooks. Are you sure you want to proceed? [y/n]" ) if ans.lower() in ["y", "yes"]: user_confirmation = True if macro_name == "all" or user_confirmation: self.unsetEnv("_GeneralHooks") self.info("All general hooks have been undefined.") out = List(["Hook place", "Hook(s)"]) default_dict = {} for hook in gh_macros_list: name = hook[0] places = hook[1] for place in places: if place not in list(default_dict.keys()): default_dict[place] = [] default_dict[place].append(name) for place in list(default_dict.keys()): place_set = 0 for hook in default_dict[place]: if place_set: out.appendRow(["", hook]) else: out.appendRow([place, hook]) place_set = 1 for line in out.genOutput(): self.output(line) else: macros_list = [] for el in gh_macros_list: defined_macro, defined_place = el[0], el[1] if defined_macro != macro_name: macros_list.append(el) else: for place in defined_place: if hook_place == place or hook_place is None: self.info( "Hook: %s, hook place: %s has been undefined" % (macro_name, place) ) else: macros_list.append((defined_macro, [place])) if macros_list: self.setEnv("_GeneralHooks", macros_list) else: self.unsetEnv("_GeneralHooks")