##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""Environment related macros"""
__all__ = [
"dumpenv",
"load_env",
"lsenv",
"senv",
"usenv",
"genv",
"lsvo",
"setvo",
"usetvo",
"lsgh",
"defgh",
"udefgh",
]
__docformat__ = "restructuredtext"
import pprint
##########################################################################
#
# Environment related macros
#
##########################################################################
from lxml import etree
from taurus.console.list import List
from taurus.core.tango.tangovalidator import TangoDeviceNameValidator
from sardana.macroserver.macro import Macro, Optional, Type, iMacro
from sardana.macroserver.msexception import UnknownEnv
def reprValue(v, max=74):
# cut long strings
v = str(v)
if len(v) > max:
v = v[:max] + " [...]"
return v
[docs]
class dumpenv(Macro):
"""Dumps the complete environment"""
[docs]
def run(self):
env = self.getGlobalEnv()
out = List(["Name", "Value", "Type"])
for k, v in env.items():
str_v = reprValue(v)
type_v = type(v).__name__
out.appendRow([str(k), str_v, type_v])
for line in out.genOutput():
self.output(line)
[docs]
class lsvo(Macro):
"""Lists the view options"""
[docs]
def run(self):
vo = self.getViewOptions()
out = List(["View option", "Value"])
for key, value in list(vo.items()):
out.appendRow([key, str(value)])
for line in out.genOutput():
self.output(line)
[docs]
class setvo(Macro):
"""Sets the given view option to the given value.
Available view options:
- **ShowDial**: used by macro wm, pwm and wa. Default value ``False``
- **ShowCtrlAxis**: used by macro wm, pwm and wa. Default value ``False``
- **PosFormat**: used by macro wm, pwm, wa and umv. Default value ``-1``
- **OutputBlock**: used by scan macros. Default value ``False``
- **OutputDecimation**: used by scan macros. If ``True``, scan output is
decimated with a fixed limit of 100 emitted points per second.
Default value ``False``.
- **DescriptionLength**: used by lsdef. Default value ``60``
"""
param_def = [
["name", Type.String, None, "View option name"],
["value", Type.String, None, "View option value"],
]
[docs]
def run(self, name, value):
try:
value = eval(value)
except Exception:
pass
self.setViewOption(name, value)
[docs]
class usetvo(Macro):
"""Resets the value of the given view option.
Available view options:
- **ShowDial**: used by macro wm, pwm and wa. Default value ``False``
- **ShowCtrlAxis**: used by macro wm, pwm and wa. Default value ``False``
- **PosFormat**: used by macro wm, pwm, wa and umv. Default value ``-1``
- **OutputBlock**: used by scan macros. Default value ``False``
- **OutputDecimation**: used by scan macros. If ``True``, scan output is
decimated with a fixed limit of 100 emitted points per second.
Default value ``False``.
- **DescriptionLength**: used by lsdef. Default value ``60``
"""
param_def = [["name", Type.String, None, "View option name"]]
[docs]
def run(self, name):
self.resetViewOption(name)
[docs]
class lsenv(Macro):
"""Lists the environment in alphabetical order"""
param_def = [
[
"macro_list",
[["macro", Type.MacroClass, None, "macro name"], {"min": 0}],
None,
"List of macros to show environment",
],
]
[docs]
def prepare(self, macro_list, **opts):
self.table_opts = opts
[docs]
def run(self, macro_list):
# list the environment for the current door
if len(macro_list) == 0:
# list All the environment for the current door
out = List(["Name", "Value", "Type"])
env = self.getAllDoorEnv()
names_list = list(env.keys())
names_list.sort(key=str.lower)
for k in names_list:
str_val = self.reprValue(env[k])
type_name = type(env[k]).__name__
out.appendRow([k, str_val, type_name])
# list the environment for the current door for the given macros
else:
out = List(["Macro", "Name", "Value", "Type"])
for macro in macro_list:
env = self.getEnv(key=None, macro_name=macro.name)
names_list = list(env.keys())
names_list.sort(key=str.lower)
for k in names_list:
str_val = self.reprValue(env[k])
type_name = type(env[k]).__name__
out.appendRow([macro.name, k, str_val, type_name])
for line in out.genOutput():
self.output(line)
def reprValue(self, v, max=54):
# cut long strings
v = str(v)
if len(v) > max:
v = "%s [...]" % v[:max]
return v
[docs]
class senv(Macro):
"""Sets the given environment variable to the given value"""
param_def = [
[
"name",
Type.Env,
None,
"Environment variable name. Can be one of the following:\n"
" - <name> - global variable\n"
" - <full door name>.<name> - variable value for a specific door\n"
" - <macro name>.<name> - variable value for a specific macro\n"
" - <full door name>.<macro name>.<name> - variable value for a "
"specific macro running on a specific door",
],
[
"value_list",
[
[
"value",
Type.String,
None,
"environment value item (string value which corresponds"
+ "to a Python type e.g. int, float, Exception, etc. "
+ "must be double enclosed in quotes e.g. \"'int'\"",
],
{"min": 1},
],
None,
"value(s). one item will eval to a single element. More than "
"one item will eval to a tuple of elements",
],
]
[docs]
def run(self, env, value):
if len(value) == 1:
value = value[0]
else:
value = "(%s)" % ", ".join(value)
k, v = self.setEnv(env, value)
line = "%s = %s" % (k, str(v))
self.output(line)
[docs]
class genv(Macro):
"""Gets the given environment variable"""
param_def = [
[
"name",
Type.Env,
None,
"Environment variable name. Can be one of the following:\n"
" - <name> - global variable\n"
" - <full door name>.<name> - variable value for a specific "
"door\n"
" - <macro name>.<name> - variable value for a specific"
" macro\n"
" - <full door name>.<macro name>.<name> - variable value"
" for a specific macro running on a specific door",
],
]
[docs]
def run(self, var):
pars = var.split(".")
door_name = None
macro_name = None
if len(pars) == 1:
key = pars[0]
elif len(pars) > 1:
_, door_name, _ = TangoDeviceNameValidator().getNames(pars[0])
if door_name is None: # first string is a Macro name
macro_name = pars[0]
if len(pars) == 3:
macro_name = pars[1]
key = pars[2]
else:
key = pars[1]
env = self.getEnv(key=key, macro_name=macro_name, door_name=door_name)
env = pprint.pformat(env)
self.output("{} = \n".format(key) + env)
[docs]
class usenv(Macro):
"""Unsets the given environment variable"""
param_def = [
[
"environment_list",
[["env", Type.Env, None, "Environment variable name"], {"min": 1}],
None,
"List of environment items to be removed",
],
]
[docs]
def run(self, env):
self.unsetEnv(env)
self.output("Success!")
[docs]
class load_env(Macro):
"""Read environment variables from config_env.xml file"""
[docs]
def run(self):
doc = etree.parse("config_env.xml")
root = doc.getroot()
for element in root:
if element.find("./name").text == "auto_filter":
self.output("Loading auto_filter variables:")
filter_max_elem = element.find(".//FilterMax")
if filter_max_elem is not None:
filter_max = filter_max_elem.text
self.setEnv("FilterMax", filter_max)
self.output("FilterMax loaded")
else:
self.output("FilterMax not found")
filter_min_elem = element.find(".//FilterMin")
if filter_min_elem is not None:
filter_min = filter_max_elem.text
self.setEnv("FilterMin", filter_min)
self.output("FilterMin loaded")
else:
self.output("FilterMin not found")
filter_delta_elem = element.find(".//FilterDelta")
if filter_delta_elem is not None:
filter_delta = filter_delta_elem.text
self.setEnv("FilterDelta", filter_delta)
self.output("FilterDelta loaded")
else:
self.output("FilterDelta not found")
filter_signal_elem = element.find(".//FilterSignal")
if filter_signal_elem is not None:
filter_signal = filter_signal_elem.text
self.setEnv("FilterSignal", filter_signal)
self.output("FilterSignal loaded")
else:
self.output("FilterSignal not found")
filter_absorber_elem = element.find(".//FilterAbsorber")
if filter_absorber_elem is not None:
filter_absorber = filter_absorber_elem.text
self.setEnv("FilterAbsorber", filter_absorber)
self.output("FilterAbsorber loaded")
else:
self.output("FilterAbsorber not found")
auto_filter_elem = element.find(".//AutoFilter")
if auto_filter_elem is not None:
auto_filter = auto_filter_elem.text
self.setEnv("AutoFilter", auto_filter)
self.output("AutoFilter loaded")
else:
self.output("AutoFilter not found")
if element.find("./name").text == "auto_beamshutter":
self.output("Loading auto_beamshutter variables:")
auto_beamshutter_elem = element.find(".//AutoBeamshutter")
if auto_beamshutter_elem is not None:
auto_beamshutter = auto_beamshutter_elem.text
self.setEnv("AutoBeamshutter", auto_beamshutter)
self.output("AutoBeamshutter loaded")
else:
self.output("AutoBeamshutter not found")
beamshutter_limit_elem = element.find(".//BeamshutterLimit")
if beamshutter_limit_elem is not None:
beamshutter_limit = beamshutter_limit_elem.text
self.setEnv("BeamshutterLimit", beamshutter_limit)
self.output("BeamshutterLimit loaded")
else:
self.output("BeamshutterLimit not found")
beamshutter_signal_elem = element.find(".//BeamshutterSignal")
if beamshutter_signal_elem is not None:
beamshutter_signal = beamshutter_signal_elem.text
self.setEnv("BeamshutterSignal", beamshutter_signal)
self.output("BeamshutterSignal loaded")
else:
self.output("BeamshutterSignal not found")
beamshutter_time_elem = element.find(".//BeamshutterTime")
if beamshutter_time_elem is not None:
beamshutter_time = beamshutter_time_elem.text
self.setEnv("BeamshutterTime", beamshutter_time)
self.output("BeamshutterTime loaded")
else:
self.output("BeamshutterTime not found")
if element.find("./name").text == "exafs":
self.output("Loading exafs variables:")
exafs_int_times_elem = element.find(".//ExafsIntTimes")
if exafs_int_times_elem is not None:
exafs_int_times = exafs_int_times_elem.text
self.setEnv("ExafsIntTimes", exafs_int_times)
self.output("ExafsIntTimes loaded")
else:
self.output("ExafsIntTimes not found")
exafs_nb_intervals_elem = element.find(".//ExafsNbIntervals")
if exafs_nb_intervals_elem is not None:
exafs_nb_intervals = exafs_nb_intervals_elem.text
self.setEnv("ExafsNbIntervals", exafs_nb_intervals)
self.output("ExafsNbIntervals loaded")
else:
self.output("ExafsNbIntervals not found")
exafs_regions_elem = element.find(".//ExafsRegions")
if exafs_regions_elem is not None:
exafs_regions = exafs_regions_elem.text
self.setEnv("ExafsRegions", exafs_regions)
self.output("ExafsRegions loaded")
else:
self.output("ExafsRegions not found")
misc_tree = root.find("./miscellaneous")
if misc_tree is not None:
for parameter in misc_tree:
if parameter.tag != "name":
self.setEnv(parameter.tag, parameter.text)
[docs]
class lsgh(Macro):
"""List general hooks.
.. note::
The `lsgh` macro has been included in Sardana
on a provisional basis. Backwards incompatible changes
(up to and including its removal) may occur if
deemed necessary by the core developers.
"""
[docs]
def run(self):
try:
general_hooks = self.getEnv("_GeneralHooks")
except UnknownEnv:
self.output("No general hooks")
return
out = List(["Hook place", "Hook(s)"])
default_dict = {}
for hook in general_hooks:
name = hook[0]
places = hook[1]
for place in places:
if place not in list(default_dict.keys()):
default_dict[place] = []
default_dict[place].append(name)
for place in list(default_dict.keys()):
place_set = 0
for hook in default_dict[place]:
if place_set:
out.appendRow(["", hook])
else:
out.appendRow([place, hook])
place_set = 1
for line in out.genOutput():
self.output(line)
[docs]
class defgh(Macro):
"""Define general hook:
>>> defgh "mv [[mot02 9]]" pre-scan
>>> defgh "ct 0.1" pre-scan
>>> defgh lsm pre-scan
>>> defgh "mv mot03 10" pre-scan
>>> defgh "Print 'Hello world'" pre-scan
.. note::
The `defgh` macro has been included in Sardana
on a provisional basis. Backwards incompatible changes
(up to and including its removal) may occur if
deemed necessary by the core developers.
"""
param_def = [
[
"macro_name",
Type.String,
None,
('Macro name with parameters. Ex.: "mv exp_dmy01 10"'),
],
[
"hookplace_list",
[["place", Type.String, None, "macro name"], {"min": 1}],
None,
"List of places where the hook has to be executed",
],
]
[docs]
def run(self, macro_name, place):
self.output(
"Hook: %s, hook place: %s has been defined", macro_name, ", ".join(place)
)
try:
macros_list = self.getEnv("_GeneralHooks")
except UnknownEnv:
macros_list = []
hook_tuple = (macro_name, place)
self.debug(hook_tuple)
macros_list.append(hook_tuple)
self.setEnv("_GeneralHooks", macros_list)
self.debug("General hooks:")
self.debug(macros_list)
[docs]
class udefgh(iMacro):
"""Undefine general hook.
>>> udefgh - interactive mode, asks for confirmation. Undefines all hooks if the answer is "yes"
>>> udefgh all - undefines all hooks
>>> udefgh "ct 0.1" - undefines specified hook in all places
>>> udefgh lsm pre-scan - undefines specified hook in specified place
.. note::
The `udefgh` macro has been included in Sardana
on a provisional basis. Backwards incompatible changes
(up to and including its removal) may occur if
deemed necessary by the core developers.
"""
param_def = [
["macro_name", Type.String, Optional, "General hook to be undefined"],
[
"hook_place",
Type.String,
Optional,
("Place to undefine the general hook from"),
],
]
[docs]
def run(self, macro_name, hook_place):
try:
gh_macros_list = self.getEnv("_GeneralHooks")
except UnknownEnv:
return
user_confirmation = False
if macro_name is None:
# ask confirmation from the user
ans = self.input(
"You are about to undefine all general hooks. Are you sure you want to proceed? [y/n]"
)
if ans.lower() in ["y", "yes"]:
user_confirmation = True
if macro_name == "all" or user_confirmation:
self.unsetEnv("_GeneralHooks")
self.info("All general hooks have been undefined.")
out = List(["Hook place", "Hook(s)"])
default_dict = {}
for hook in gh_macros_list:
name = hook[0]
places = hook[1]
for place in places:
if place not in list(default_dict.keys()):
default_dict[place] = []
default_dict[place].append(name)
for place in list(default_dict.keys()):
place_set = 0
for hook in default_dict[place]:
if place_set:
out.appendRow(["", hook])
else:
out.appendRow([place, hook])
place_set = 1
for line in out.genOutput():
self.output(line)
else:
macros_list = []
for el in gh_macros_list:
defined_macro, defined_place = el[0], el[1]
if defined_macro != macro_name:
macros_list.append(el)
else:
for place in defined_place:
if hook_place == place or hook_place is None:
self.info(
"Hook: %s, hook place: %s has been undefined"
% (macro_name, place)
)
else:
macros_list.append((defined_macro, [place]))
if macros_list:
self.setEnv("_GeneralHooks", macros_list)
else:
self.unsetEnv("_GeneralHooks")