#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module contains the definition of the macroserver parameters for
macros"""
__all__ = [
"WrongParam",
"MissingParam",
"SupernumeraryParam",
"UnknownParamObj",
"WrongParamType",
"MissingRepeat",
"SupernumeraryRepeat",
"TypeNames",
"Type",
"ParamType",
"ElementParamType",
"ElementParamInterface",
"AttrParamType",
"AbstractParamTypes",
"ParamDecoder",
]
__docformat__ = "restructuredtext"
from copy import deepcopy
from lxml import etree
from taurus.core.util.containers import CaselessDict
from sardana import INTERFACES_EXPANDED, ElementType
from sardana.macroserver.msbase import MSBaseObject
from sardana.macroserver.msexception import (
MacroServerException,
UnknownMacro,
UnknownMacroLibrary,
)
from sardana.sardanautils import is_non_str_seq
class OptionalParamClass(dict):
def __init__(self, obj):
super(OptionalParamClass, self).__init__(obj)
attributes = dir(self)
for attr in attributes:
# items is necessary fo python 3.5 implementation of json
if attr in [
"__setattr__",
"raise_error",
"__class__",
"__dict__",
"__weakref__",
"items",
]:
continue
self.__setattr__(attr, self.raise_error)
self.__setattr__ = self.raise_error
def __repr__(self):
return "Optional"
def raise_error(*args, **kwargs):
raise RuntimeError("can not be accessed")
Optional = OptionalParamClass({"___optional_parameter__": True})
class WrongParam(MacroServerException):
def __init__(self, *args):
MacroServerException.__init__(self, *args)
self.type = "Wrong parameter"
class MissingParam(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = "Missing parameter"
class SupernumeraryParam(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = "Supernumerary parameter"
class UnknownParamObj(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = "Unknown parameter"
class WrongParamType(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = "Unknown parameter type"
class MissingRepeat(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = "Missing repeat"
class SupernumeraryRepeat(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = "Supernumerary repeat"
class TypeNames:
"""Class that holds the list of registered macro parameter types"""
def __init__(self):
self._type_names = {}
self._pending_type_names = {}
def addType(self, name):
"""Register a new macro parameter type"""
setattr(self, name, name)
self._type_names[name] = name
if name in self._pending_type_names:
del self._pending_type_names[name]
def removeType(self, name):
"""remove a macro parameter type"""
delattr(self, name)
try:
del self._type_names[name]
except ValueError:
pass
def __str__(self):
return str(list(self._type_names.keys()))
# def __getattr__(self, name):
# if name not in self._pending_type_names:
# self._pending_type_names[name] = name
# return self._pending_type_names[name]
# This instance of TypeNames is intended to provide access to types to the
# Macros in a "Type.Motor" fashion
Type = TypeNames()
[docs]
class ParamType(MSBaseObject):
All = "All"
# Capabilities
ItemList = "ItemList"
ItemListEvents = "ItemListEvents"
capabilities = []
type_class = str
def __init__(self, macro_server, name):
MSBaseObject.__init__(
self,
name=name,
full_name=name,
macro_server=macro_server,
elem_type=ElementType.ParameterType,
)
def getName(self):
return self.name
def getObj(self, str_repr):
return self.__class__.type_class(str_repr)
@classmethod
def hasCapability(cls, cap):
return cap in cls.capabilities
def serialize(self, *args, **kwargs):
kwargs = MSBaseObject.serialize(self, *args, **kwargs)
kwargs["composed"] = False
return kwargs
class ElementParamType(ParamType):
capabilities = ParamType.ItemList, ParamType.ItemListEvents
def __init__(self, macro_server, name):
ParamType.__init__(self, macro_server, name)
def accepts(self, elem):
return elem.getType() == self._name
def getObj(self, name, pool=ParamType.All, cache=False):
macro_server = self.macro_server
if pool == ParamType.All:
pools = macro_server.get_pools()
else:
pools = (macro_server.get_pool(pool),)
for pool in pools:
elem_info = pool.getObj(name, elem_type=self._name)
if elem_info is not None and self.accepts(elem_info):
return elem_info
# not a pool object, maybe it is a macro server object (perhaps a macro
# code or a macro library
try:
return macro_server.get_macro(name)
except UnknownMacro:
pass
try:
return macro_server.get_macro_lib(name)
except UnknownMacroLibrary:
pass
# neither pool nor macroserver contains any element with this name
raise UnknownParamObj("%s with name %s does not exist" % (self._name, name))
def getObjDict(self, pool=ParamType.All, cache=False):
macro_server = self.macro_server
objs = CaselessDict()
if pool == ParamType.All:
pools = macro_server.get_pools()
else:
pools = (macro_server.get_pool(pool),)
for pool in pools:
for elem_info in pool.getElements():
if self.accepts(elem_info):
objs[elem_info.name] = elem_info
macros = macro_server.get_macros()
for macro_lib_name, macro_lib in list(macros.items()):
if self.accepts(macro_lib):
objs[macro_lib_name] = macro_lib
for macro_name, macro in list(macro_server.get_macros().items()):
if self.accepts(macro):
objs[macro_name] = macro
return objs
def getObjListStr(self, pool=ParamType.All, cache=False):
obj_dict = self.getObjDict(pool=pool, cache=cache)
return list(obj_dict.keys())
def getObjList(self, pool=ParamType.All, cache=False):
obj_dict = self.getObjDict(pool=pool, cache=cache)
return list(obj_dict.values())
def serialize(self, *args, **kwargs):
kwargs = ParamType.serialize(self, *args, **kwargs)
kwargs["composed"] = True
return kwargs
class ElementParamInterface(ElementParamType):
def __init__(self, macro_server, name):
ElementParamType.__init__(self, macro_server, name)
bases, doc = INTERFACES_EXPANDED.get(name)
self._interfaces = bases
def accepts(self, elem):
elem_type = elem.getType()
elem_interfaces = INTERFACES_EXPANDED.get(elem_type)[0]
if elem_interfaces is None:
return ElementParamType.accepts(self, elem)
return self._name in elem_interfaces
def getObj(self, name, pool=ParamType.All, cache=False):
macro_server = self.macro_server
if pool == ParamType.All:
poolObjs = macro_server.get_pools()
else:
poolObjs = (macro_server.get_pool(pool),)
# Here, we check if the element exist in various pools:
# if only in one pool: we use this pool
# if in >1 pool: check env var DefaultPool and use it
# other cases raise exception
elements = []
for poolObj in poolObjs:
elem_info = poolObj.getElementWithInterface(name, self._name)
if elem_info is not None and self.accepts(elem_info):
elements.append(elem_info)
if len(elements) == 0:
msg = "{} with name {} does not exist in any Pool {}".format(
self._name, name, [p.name for p in poolObjs]
)
elif len(elements) == 1:
# Element only exist in one Pool. We use it.
return elements[0]
elif len(elements) > 1:
# Ambiguity, check for env var DefaultPool and
# return the element in this pool if it exists
try:
default_pool = macro_server.get_default_pool()
except RuntimeError as e:
msg = (
"Ambiguity detected. Element defined in more "
"than one Pool (Hint: Try setting the DefaultPool "
"environmental variable e.g. by executing "
"`senv DefaultPool <poolName>` macro"
)
raise RuntimeError(msg) from e
for elem_pool in elements:
if elem_pool.pool == default_pool.name:
self.debug(
"Using element {} in {}".format(elem_pool.name, elem_pool.pool)
)
return elem_pool
# Element not in DefaultPool
msg = (
"{} with name {} does not exist in {} but exists in other"
"pools {} (Hint: Try changing the DefaultPool "
"environmental variable e.g. by executing "
"`senv DefaultPool <poolName>` macro".format(
self._name, name, default_pool.name, [e.pool for e in elements]
)
)
# not a pool object, maybe it is a macro server object (perhaps a macro
# class or a macro library
try:
return macro_server.get_macro(name)
except UnknownMacro:
if self._name == "MacroCode":
msg += (
" (Hint: if {} is not a typo, "
"try adding macro library first "
"e.g. by executing `addmaclib <macro module name>` "
"macro)".format(name)
)
try:
return macro_server.get_macro_lib(name)
except UnknownMacroLibrary:
if self._name == "MacroLibrary":
msg += (
" (Hint: if {0} is not a typo, "
"try adding macro library first "
"e.g. by executing `addmaclib {0}` "
"macro)"
).format(name)
# neither pool nor macroserver contains any element with this name
raise UnknownParamObj(msg)
def getObjDict(self, pool=ParamType.All, cache=False):
macro_server = self.macro_server
objs = CaselessDict()
if macro_server.is_macroserver_interface(self._name):
return macro_server.get_elements_with_interface(self._name)
if pool == ParamType.All:
pools = macro_server.get_pools()
else:
pools = (macro_server.get_pool(pool),)
for pool in pools:
elements = pool.getElementsWithInterface(self._name)
for elem_info in list(elements.values()):
if self.accepts(elem_info):
objs[elem_info.name] = elem_info
return objs
def getObjListStr(self, pool=ParamType.All, cache=False):
obj_dict = self.getObjDict(pool=pool, cache=cache)
return list(obj_dict.keys())
def getObjList(self, pool=ParamType.All, cache=False):
obj_dict = self.getObjDict(pool=pool, cache=cache)
return list(obj_dict.values())
class AttrParamType(ParamType):
pass
AbstractParamTypes = ParamType, ElementParamType, ElementParamInterface, AttrParamType
class ParamDecoder:
def __init__(self, type_manager, params_def, raw_params):
"""Create ParamDecorder object and decode macro parameters
:param type_manager: (sardana.macroserver.mstypemanager.TypeManager)
type manager object
:param params_def: list<list> macro parameter definition
:param raw_params: (lxml.etree._Element or list) xml element
representing macro with subelements representing parameters or list
with parameter values
"""
self.type_manager = type_manager
self.params_def = params_def
self.raw_params = raw_params
self.params = None
self.decode()
def decode(self):
"""Decode raw representation of parameters to parameters as passed
to the prepare or run methods.
"""
# make a copy since in case of XML it could be necessary to modify
# the raw_params - filter out elements different than params
raw_params = deepcopy(self.raw_params)
params_def = self.params_def
# ignore other tags than "param" and "paramRepeat"
# e.g. sequencer may create tags like "hookPlace"
if isinstance(raw_params, etree._Element):
for raw_param in raw_params:
if raw_param.tag not in ("param", "paramrepeat"):
raw_params.remove(raw_param)
params = []
# check if too many parameters were passed
len_params_def = len(params_def)
if len(raw_params) > len_params_def:
msg = (
"%r are supernumerary with respect to definition"
% raw_params[len_params_def:]
)
raise SupernumeraryParam(msg)
# iterate over definition since missing values may just mean using
# the default values
for i, param_def in enumerate(params_def):
try:
raw_param = raw_params[i]
except IndexError:
raw_param = None
obj = self.decodeNormal(raw_param, param_def)
params.append(obj)
self.params = params
return self.params
def decodeNormal(self, raw_param, param_def):
"""Decode and validate parameter
:param raw_param: (lxml.etree._Element or list) xml element
representing parameter
:param param_def: (dict) parameter definition
:return: (list): list with decoded parameter repetitions
"""
param_type = param_def["type"]
name = param_def["name"]
optional_param = False
if isinstance(param_type, list):
param = self.decodeRepeat(raw_param, param_def)
else:
type_manager = self.type_manager
param_type = type_manager.getTypeObj(param_type)
try:
if isinstance(raw_param, etree._Element):
value = raw_param.get("value")
else:
value = raw_param
# None or [] indicates default value
if value is None or (isinstance(value, list) and len(value) == 0):
value = param_def["default_value"]
if value is None:
raise MissingParam("'%s' not specified" % name)
elif value is Optional:
param = None
optional_param = True
else:
# cast to sting to fulfill with ParamType API
param = param_type.getObj(str(value))
except ValueError as e:
raise WrongParamType(str(e)) from e
except UnknownParamObj as e:
raise WrongParam(str(e)) from e
if param is None and not optional_param:
msg = 'Could not create %s parameter "%s" for "%s"' % (
param_type.getName(),
name,
raw_param,
)
raise WrongParam(msg)
return param
def decodeRepeat(self, raw_param_repeat, param_repeat_def):
"""Decode and validate repeat parameter
:param raw_param_repeat: (lxml.etree._Element or list) xml element
representing param repeat with subelements representing repetitions
or list representing repetitions
:param param_repeat_def: (dict) repeat parameter definition
:return: (list): list with decoded parameter repetitions
"""
name = param_repeat_def["name"]
param_type = param_repeat_def["type"]
min_rep = param_repeat_def["min"]
max_rep = param_repeat_def["max"]
param_repeat = []
if raw_param_repeat is None:
raw_param_repeat = param_repeat_def["default_value"]
if raw_param_repeat is None:
raw_param_repeat = []
len_rep = len(raw_param_repeat)
if min_rep and len_rep < min_rep:
msg = "Found %d repetitions of param %s, min is %d" % (
len_rep,
name,
min_rep,
)
raise MissingRepeat(msg)
if max_rep and len_rep > max_rep:
msg = "Found %d repetitions of param %s, max is %d" % (
len_rep,
name,
max_rep,
)
raise SupernumeraryRepeat(msg)
# repeat params with only one member and only one repetition value are
# allowed - encapsulate it in list and try to decode anyway;
# for the moment this only works for non XML decoding but could be
# extended in the future to support XML as well
if not is_non_str_seq(raw_param_repeat) and not isinstance(
raw_param_repeat, etree._Element
):
raw_param_repeat = [raw_param_repeat]
for raw_repeat in raw_param_repeat:
if len(param_type) > 1:
repeat = []
for i, member_type in enumerate(param_type):
try:
member_raw = raw_repeat[i]
except IndexError:
member_raw = None
member = self.decodeNormal(member_raw, member_type)
repeat.append(member)
else:
# if the repeat parameter is composed of just one member
# do not encapsulate it in list and pass directly the item
if isinstance(raw_repeat, etree._Element):
raw_repeat = raw_repeat[0]
# check if one tries to decode repeat parameter of just one
# member encapsulated in a list, empty lists are still allowed
# to indicate default value
elif isinstance(raw_repeat, list) and len(raw_repeat) > 0:
msg = "Repetitions of just one member must not be lists"
raise WrongParam(msg)
repeat = self.decodeNormal(raw_repeat, param_type[0])
param_repeat.append(repeat)
return param_repeat
def getParamList(self):
return self.params
def __iter__(self):
return iter(self.params)
def __getattr__(self, name):
return getattr(self.params, name)
class FlatParamDecoder:
"""Parameter decoder useful for macros with only one repeat parameter
located at the very last place. It requires that the raw parameters are
passed as a flat list of strings.
"""
def __init__(self, type_manager, params_def, raw_params):
self.type_manager = type_manager
self.params_def = params_def
self.raw_params = raw_params
self.params = None
if not self.isPossible(params_def):
msg = (
"%s parameter definition is not compatible with"
" FlatParamDecoder" % params_def
)
raise AttributeError(msg)
self.decode()
@staticmethod
def isPossible(params_def):
for param_def in params_def:
param_type = param_def["type"]
if isinstance(param_type, list):
if param_def != params_def[-1]:
# repeat parameter is not the last one
# it won't be possible to decode it
return False
else:
for sub_param_def in param_type:
if isinstance(sub_param_def, list):
# nested repeat parameter
# it won't be possible to decode it
return False
return True
def decode(self):
params_def = self.params_def
raw_params = self.raw_params
_, self.params = self.decodeNormal(raw_params, params_def)
return self.params
def decodeNormal(self, raw_params, params_def):
str_len = len(raw_params)
obj_list = []
str_idx = 0
for i, par_def in enumerate(params_def):
name = par_def["name"]
type_class = par_def["type"]
def_val = par_def["default_value"]
if str_idx == str_len:
if def_val is None:
if not isinstance(type_class, list):
raise MissingParam("'%s' not specified" % name)
elif isinstance(type_class, list):
min_rep = par_def["min"]
if min_rep > 0:
msg = "'%s' demands at least %d values" % (name, min_rep)
raise WrongParam(msg)
if def_val is not None:
new_obj = def_val
else:
if isinstance(type_class, list):
data = self.decodeRepeat(raw_params[str_idx:], par_def)
dec_token, new_obj = data
else:
type_manager = self.type_manager
type_name = type_class
type_class = type_manager.getTypeClass(type_name)
par_type = type_manager.getTypeObj(type_name)
par_str = raw_params[str_idx]
try:
val = par_type.getObj(par_str)
except ValueError as e:
raise WrongParamType(str(e)) from e
except UnknownParamObj as e:
raise WrongParam(str(e)) from e
if val is None:
msg = 'Could not create %s parameter "%s" for "%s"' % (
par_type.getName(),
name,
par_str,
)
raise WrongParam(msg)
dec_token = 1
new_obj = val
str_idx += dec_token
obj_list.append(new_obj)
return str_idx, obj_list
def decodeRepeat(self, raw_params, par_def):
name = par_def["name"]
param_def = par_def["type"]
min_rep = par_def["min"]
max_rep = par_def["max"]
dec_token = 0
obj_list = []
rep_nr = 0
while dec_token < len(raw_params):
if max_rep is not None and rep_nr == max_rep:
break
new_token, new_obj_list = self.decodeNormal(
raw_params[dec_token:], param_def
)
dec_token += new_token
if len(new_obj_list) == 1:
new_obj_list = new_obj_list[0]
obj_list.append(new_obj_list)
rep_nr += 1
if rep_nr < min_rep:
msg = "Found %d repetitions of param %s, min is %d" % (
rep_nr,
name,
min_rep,
)
raise MissingRepeat(msg)
return dec_token, obj_list
def getParamList(self):
return self.params
def __iter__(self):
return iter(self.params)
def __getattr__(self, name):
return getattr(self.params, name)