Source code for sardana.pool.pool

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module contains the main pool class"""

__all__ = ["Pool"]

__docformat__ = "restructuredtext"

import gc
import logging.handlers
import os.path
from typing import Optional

from taurus.core.tango.tangovalidator import TangoAttributeNameValidator
from taurus.core.util.containers import CaselessDict

from sardana import (
    TYPE_ACQUIRABLE_ELEMENTS,
    TYPE_MOVEABLE_ELEMENTS,
    TYPE_PHYSICAL_ELEMENTS,
    TYPE_PSEUDO_ELEMENTS,
    ElementType,
    InvalidId,
    sardanacustomsettings,
)
from sardana.pool.poolcontainer import PoolContainer
from sardana.pool.poolcontroller import PoolController
from sardana.pool.poolcontrollermanager import ControllerManager
from sardana.pool.poolmeasurementgroup import PoolMeasurementGroup
from sardana.pool.poolmetacontroller import TYPE_MAP_OBJ
from sardana.pool.poolmonitor import PoolMonitor
from sardana.pool.poolobject import PoolObject
from sardana.sardanaevent import EventType
from sardana.sardanamanager import SardanaElementManager, SardanaIDManager
from sardana.sardanamodulemanager import ModuleManager


class Graph(dict):
    def find_path(self, start, end, path=[]):
        path = path + [start]
        if start == end:
            return path
        if start not in self:
            return None
        for node in self[start]:
            if node not in path:
                newpath = self.find_path(node, end, path)
                if newpath:
                    return newpath
        return None

    def find_all_paths(self, start, end, path=[]):
        path = path + [start]
        if start == end:
            return [path]
        if start not in self:
            return []
        paths = []
        for node in self[start]:
            if node not in path:
                newpaths = self.find_all_paths(node, end, path)
                for newpath in newpaths:
                    paths.append(newpath)
        return paths

    def find_shortest_path(self, start, end, path=[]):
        path = path + [start]
        if start == end:
            return path
        if start not in self:
            return None
        shortest = None
        for node in self[start]:
            if node not in path:
                newpath = self.find_shortest_path(node, end, path)
                if newpath:
                    if not shortest or len(newpath) < len(shortest):
                        shortest = newpath
        return shortest


[docs] class Pool(PoolContainer, PoolObject, SardanaElementManager, SardanaIDManager): """The central pool class.""" #: Default value representing the number of state reads per position #: read during a motion loop Default_MotionLoop_StatesPerPosition = 10 #: Default value representing the sleep time for each motion loop Default_MotionLoop_SleepTime = 0.01 #: Default value representing the number of state reads per value #: read during a motion loop Default_AcqLoop_StatesPerValue = 10 #: Default value representing the sleep time for each acquisition loop Default_AcqLoop_SleepTime = 0.01 Default_DriftCorrection = True def __init__(self, full_name, name=None): self.use_numeric_element_ids = getattr( sardanacustomsettings, "USE_NUMERIC_ELEMENT_IDS", False ) self._path_id = None self._motion_loop_states_per_position = ( self.Default_MotionLoop_StatesPerPosition ) self._motion_loop_sleep_time = self.Default_MotionLoop_SleepTime self._acq_loop_states_per_value = self.Default_AcqLoop_StatesPerValue self._acq_loop_sleep_time = self.Default_AcqLoop_SleepTime self._drift_correction = self.Default_DriftCorrection self._remote_log_handler = None # dict<str, dict<str, str>> # keys are acquisition channel names and value is a dict describing the # channel containing: # - 'name': with value being the channel name (given by user) # - 'full_name': acq channel full name (ex: tango attribute) # - 'origin': 'local' if local to this server or 'remote' if a remote # channel self._extra_acquisition_element_names = CaselessDict() PoolContainer.__init__(self) PoolObject.__init__( self, full_name=full_name, name=name, id=InvalidId, pool=self, elem_type=ElementType.Pool, ) self._monitor = PoolMonitor(self, "PMonitor", auto_start=False) # To be used if the user wants to use sardana without tango. # self.init_local_logging() ControllerManager().set_pool(self) # TODO: not ready to use. path must be the same as the one calculated in # sardana.tango.core.util:prepare_logging
[docs] def init_local_logging(self): log = logging.getLogger("Controller") log.propagate = 0 path = os.path.join(os.sep, "tmp", "tango") log_file_name = os.path.join(path, "controller.log.txt") maxBytes = getattr(sardanacustomsettings, "POOL_LOG_FILES_SIZE", 1e7) backupCount = getattr(sardanacustomsettings, "POOL_LOG_BCK_COUNT", 5) try: if not os.path.exists(path): os.makedirs(path, 0o777) f_h = logging.handlers.RotatingFileHandler( log_file_name, maxBytes=maxBytes, backupCount=backupCount ) f_h.setFormatter(self.getLogFormat()) log.addHandler(f_h) self.info("Controller logs stored in %s", log_file_name) except Exception: self.warning("Controller logs could not be created!") self.debug("Details:", exc_info=1)
[docs] def clear_remote_logging(self): rh = self._remote_log_handler if rh is None: return log = logging.getLogger("Controller") log.removeHandler(rh) self._remote_log_handler = None
[docs] def init_remote_logging( self, host: Optional[str] = None, port: Optional[int] = None ) -> None: """Initializes remote logging. :param host: host name [default: None, meaning use the machine host name as returned by :func:`socket.gethostname`]. :param port: port number [default: None, meaning use :data:`logging.handlers.DEFAULT_TCP_LOGGING_PORT`""" log = logging.getLogger("Controller") # port 0 means no remote logging if port == 0: return # first check that the handler has not been initialized yet for handler in log.handlers: if isinstance(handler, logging.handlers.SocketHandler): return if host is None: import socket host = socket.gethostname() # host = socket.getfqdn() if port is None: port = logging.handlers.DEFAULT_TCP_LOGGING_PORT handler = logging.handlers.SocketHandler(host, port) if hasattr(handler, "retryMax"): # default max retry is 30s which seems too much. Let's make it that # the pool tries to reconnect to a client every 10s (similar to the # tango event reconnection handler.retryMax = 10.0 log.addHandler(handler) self.info("Remote logging initialized for host '%s' on port %d", host, port)
[docs] def serialize(self, *args, **kwargs): kwargs = PoolObject.serialize(self, *args, **kwargs) kwargs["type"] = self.__class__.__name__ kwargs["id"] = InvalidId kwargs["parent"] = None return kwargs
[docs] def set_motion_loop_sleep_time(self, motion_loop_sleep_time): self._motion_loop_sleep_time = motion_loop_sleep_time
[docs] def get_motion_loop_sleep_time(self): return self._motion_loop_sleep_time
motion_loop_sleep_time = property( get_motion_loop_sleep_time, set_motion_loop_sleep_time, doc="motion sleep time (s)", )
[docs] def set_motion_loop_states_per_position(self, motion_loop_states_per_position): self._motion_loop_states_per_position = motion_loop_states_per_position
[docs] def get_motion_loop_states_per_position(self): return self._motion_loop_states_per_position
motion_loop_states_per_position = property( get_motion_loop_states_per_position, set_motion_loop_states_per_position, doc="Number of State reads done before doing a position read in the " "motion loop", )
[docs] def set_acq_loop_sleep_time(self, acq_loop_sleep_time): self._acq_loop_sleep_time = acq_loop_sleep_time
[docs] def get_acq_loop_sleep_time(self): return self._acq_loop_sleep_time
acq_loop_sleep_time = property( get_acq_loop_sleep_time, set_acq_loop_sleep_time, doc="acquisition sleep time (s)", )
[docs] def set_acq_loop_states_per_value(self, acq_loop_states_per_value): self._acq_loop_states_per_value = acq_loop_states_per_value
[docs] def get_acq_loop_states_per_value(self): return self._acq_loop_states_per_value
acq_loop_states_per_value = property( get_acq_loop_states_per_value, set_acq_loop_states_per_value, doc="Number of State reads done before doing a value read in the " "acquisition loop", )
[docs] def set_drift_correction(self, drift_correction): self._drift_correction = drift_correction
[docs] def get_drift_correction(self): return self._drift_correction
drift_correction = property( get_drift_correction, set_drift_correction, doc="drift correction" ) @property def monitor(self): return self._monitor @property def ctrl_manager(self): return ControllerManager()
[docs] def set_python_path(self, path): mod_man = ModuleManager() if self._path_id is not None: mod_man.remove_python_path(self._path_id) self._path_id = mod_man.add_python_path(path)
[docs] def set_pool_path(self, path): self.ctrl_manager.setControllerPath(path, reload=False)
[docs] def get_pool_path(self): return self.ctrl_manager.getControllerPath()
pool_path = property(get_pool_path, set_pool_path)
[docs] def get_controller_libs(self): return self.ctrl_manager.getControllerLibs()
[docs] def get_controller_lib_names(self): return self.ctrl_manager.getControllerLibNames()
[docs] def get_controller_class_names(self): return self.ctrl_manager.getControllerNames()
[docs] def get_controller_classes(self): return self.ctrl_manager.getControllers()
[docs] def get_controller_class_info(self, name): return self.ctrl_manager.getControllerMetaClass(name)
[docs] def get_controller_classes_info(self, names): return self.ctrl_manager.getControllerMetaClasses(names)
[docs] def get_controller_libs_summary_info(self): libs = self.get_controller_libs() ret = [] for ctrl_lib_info in libs: elem = "%s (%s)" % (ctrl_lib_info.getName(), ctrl_lib_info.getFileName()) ret.append(elem) return ret
[docs] def get_controller_classes_summary_info(self): ctrl_classes = self.get_controller_classes() ret = [] for ctrl_class_info in ctrl_classes: types = ctrl_class_info.getTypes() types_str = [ TYPE_MAP_OBJ[t].name for t in types if t != ElementType.Controller ] types_str = ", ".join(types_str) elem = "%s (%s) %s" % ( ctrl_class_info.getName(), ctrl_class_info.getFileName(), types_str, ) ret.append(elem) return ret
[docs] def get_elements_str_info(self, obj_type=None): if obj_type is None: objs = list(self.get_element_id_map().values()) objs.extend(self.get_controller_classes()) objs.extend(self.get_controller_libs()) elif obj_type == ElementType.ControllerClass: objs = self.get_controller_classes() elif obj_type == ElementType.ControllerLibrary: objs = self.get_controller_libs() else: objs = self.get_elements_by_type(obj_type) name = self.full_name return [obj.str(pool=name) for obj in objs]
[docs] def get_elements_info(self, obj_type=None): if obj_type is None: objs = list(self.get_element_id_map().values()) objs.extend(self.get_controller_classes()) objs.extend(self.get_controller_libs()) objs.append(self) elif obj_type == ElementType.ControllerClass: objs = self.get_controller_classes() elif obj_type == ElementType.ControllerLibrary: objs = self.get_controller_libs() else: objs = self.get_elements_by_type(obj_type) name = self.full_name return [obj.serialize(pool=name) for obj in objs]
[docs] def get_acquisition_elements_info(self): ret = [] for _, element in list(self.get_element_name_map().items()): if element.get_type() not in TYPE_ACQUIRABLE_ELEMENTS: continue acq_channel = element.get_default_acquisition_channel() full_name = "{0}/{1}".format(element.full_name, acq_channel) info = dict(name=element.name, full_name=full_name, origin="local") ret.append(info) ret.extend(list(self._extra_acquisition_element_names.values())) return ret
[docs] def get_acquisition_elements_str_info(self): return list(map(self.str_object, self.get_acquisition_elements_info()))
def _fill_kwargs_with_id(self, kwargs, name=None): if self.use_numeric_element_ids: eid = kwargs.get("id") if eid is None: kwargs["id"] = eid = self.get_new_id() else: self.reserve_id(eid) else: kwargs["id"] = name
[docs] def create_controller(self, **kwargs): ctrl_type = kwargs["type"] lib = kwargs["library"] class_name = kwargs["klass"] name = kwargs["name"] elem_type = ElementType[ctrl_type] mod_name, _ = os.path.splitext(lib) kwargs["module"] = mod_name td = TYPE_MAP_OBJ[ElementType.Controller] klass_map = td.klass auto_full_name = td.auto_full_name kwargs["full_name"] = full_name = kwargs.get( "full_name", auto_full_name.format(**kwargs) ) self.check_element(name, full_name) ctrl_class_info = None ctrl_lib_info = self.ctrl_manager.getControllerLib(mod_name) if ctrl_lib_info is not None: ctrl_class_info = ctrl_lib_info.get_controller(class_name) kwargs["pool"] = self kwargs["class_info"] = ctrl_class_info kwargs["lib_info"] = ctrl_lib_info self._fill_kwargs_with_id(kwargs, name) # For pseudo controllers make sure 'role_ids' is given klass = klass_map.get(elem_type, PoolController) if elem_type in TYPE_PSEUDO_ELEMENTS: kwargs["role_ids"] # make sure the properties (that may have come from a case insensitive # environment like tango) are made case sensitive props = {} if ctrl_class_info is None: ctrl_prop_info = {} else: ctrl_prop_info = ctrl_class_info.ctrl_properties for k, v in list(kwargs["properties"].items()): info = ctrl_prop_info.get(k) if info is None: props[k] = v else: props[info.name] = v kwargs["properties"] = props ctrl = klass(**kwargs) ret = self.add_element(ctrl) self.fire_event(EventType("ElementCreated"), ctrl) return ret
[docs] def create_element(self, **kwargs): etype = kwargs["type"] ctrl_id = kwargs["ctrl_id"] axis = kwargs["axis"] elem_type = ElementType[etype] name = kwargs["name"] handle_ctrl_err = kwargs.get("handle_ctrl_err", True) try: ctrl = self.get_element(id=ctrl_id) except Exception: raise RuntimeError("No controller with id: {} found".format(ctrl_id)) elem_axis = ctrl.get_element(axis=axis) if elem_axis is not None: raise RuntimeError( "Controller already contains axis %d (%s)" % (axis, elem_axis.get_name()) ) kwargs["pool"] = self kwargs["ctrl"] = ctrl kwargs["ctrl_name"] = ctrl.get_name() td = TYPE_MAP_OBJ[elem_type] klass = td.klass auto_full_name = td.auto_full_name full_name = kwargs.get("full_name", auto_full_name.format(**kwargs)) self.check_element(name, full_name) if ctrl.is_online(): ctrl_types, ctrl_id = ctrl.get_ctrl_types(), ctrl.get_id() if elem_type not in ctrl_types: ctrl_type_str = ElementType.whatis(ctrl_types[0]) raise Exception( "Cannot create %s in %s controller" % (etype, ctrl_type_str) ) # check if controller is online # check if axis is allowed # create the element in the controller self._fill_kwargs_with_id(kwargs, name) elem = klass(**kwargs) ctrl.add_element(elem, handle_ctrl_err=handle_ctrl_err) ret = self.add_element(elem) self.fire_event(EventType("ElementCreated"), elem) return ret
[docs] def create_motor_group(self, **kwargs): name = kwargs["name"] elem_ids = kwargs["user_elements"] kwargs["pool"] = self kwargs["pool_name"] = self.name td = TYPE_MAP_OBJ[ElementType.MotorGroup] klass = td.klass auto_full_name = td.auto_full_name full_name = kwargs.get("full_name", auto_full_name.format(**kwargs)) kwargs.pop("pool_name") self.check_element(name, full_name) for elem_id in elem_ids: elem = self.pool.get_element(id=elem_id) if elem.get_type() not in (ElementType.Motor, ElementType.PseudoMotor): raise Exception("%s is not a motor" % elem.name) self._fill_kwargs_with_id(kwargs, name) elem = klass(**kwargs) ret = self.add_element(elem) self.fire_event(EventType("ElementCreated"), elem) return ret
[docs] def create_measurement_group(self, **kwargs): name = kwargs["name"] elem_ids = kwargs["user_elements"] kwargs["pool"] = self kwargs["pool_name"] = self.name td = TYPE_MAP_OBJ[ElementType.MeasurementGroup] klass = td.klass auto_full_name = td.auto_full_name full_name = kwargs.get("full_name", auto_full_name.format(**kwargs)) kwargs.pop("pool_name") self.check_element(name, full_name) for elem_id in elem_ids: tg_attr_validator = TangoAttributeNameValidator() try: params = tg_attr_validator.getUriGroups(elem_id) except TypeError: params = None if params is None: try: self.pool.get_element(id=elem_id) except KeyError: raise Exception("Invalid channel name %s" % elem_id) self._fill_kwargs_with_id(kwargs, name) elem = klass(**kwargs) ret = self.add_element(elem) self.fire_event(EventType("ElementCreated"), elem) return ret
[docs] def rename_element(self, old_name, new_name): if not self.use_numeric_element_ids: raise NotImplementedError( "can not rename elements at runtime when USE_NUMERIC_ELEMENT_IDS=False" ) elem = self.get_element_by_name(old_name) if isinstance(elem, PoolMeasurementGroup): elem.rename_element(old_name, new_name) else: elem.controller.rename_element(old_name, new_name) PoolContainer.rename_element(self, old_name, new_name) elem = self.get_element_by_name(new_name) self.fire_event(EventType("ElementChanged"), elem)
[docs] def delete_element(self, name): try: elem = self.get_element(name=name) except Exception: try: elem = self.get_element(full_name=name) except Exception: raise Exception("There is no element with name '%s'" % name) # cycle-reference may exist between the element and a traceback # stored in SardanaValue or SardanaAttribute as a consequence of # getting sys.exc_info() - try to delete them with gc.collect() if elem.has_dependent_elements(): gc.collect() dependent_elements = elem.get_dependent_elements() if len(dependent_elements) > 0: names = [elem.name for elem in dependent_elements] raise Exception( "The element {} can't be deleted because {} depend on it." "\n\nIf the name of the dependent element starts with " "'_mg_ms_*' it means that are motor groups, execute " "DeleteElement(<motor_group_name>) command on the Pool e.g. " "Pool_demo1_1.DeleteElement('_mg_ms_20671_1') in Spock.".format( name, ", ".join(names) ) ) elem_type = elem.get_type() if elem_type == ElementType.Controller: if not elem.is_online(): raise Exception( "Cannot delete offline controller - it may " "have elements. Delete elements first and then " "use other means to delete the controller " "e.g. Jive." ) if len(elem.get_elements()) > 0: raise Exception( "Cannot delete controller with elements. Delete elements first" ) elif elem_type == ElementType.Instrument: if elem.has_elements(): raise Exception("Cannot delete instrument with elements") parent_instrument = elem.parent_instrument if parent_instrument is not None: parent_instrument.remove_instrument(elem) elif hasattr(elem, "get_controller"): ctrl = elem.get_controller() ctrl.remove_element(elem) instrument = elem.instrument if instrument is not None: instrument.remove_element(elem) self.remove_element(elem) self.fire_event(EventType("ElementDeleted"), elem) if hasattr(elem, "get_controller"): elem.set_deleted(True)
[docs] def create_instrument(self, **kwargs): full_name = kwargs["full_name"] is_root = full_name.count("/") == 1 if is_root: parent_full_name, _ = "", full_name[1:] parent = None else: parent_full_name, _ = full_name.rsplit("/", 1) try: parent = self.get_element_by_full_name(parent_full_name) except Exception: raise Exception( "No parent instrument named '%s' found" % parent_full_name ) if parent.get_type() != ElementType.Instrument: raise Exception( "%s is not an instrument as expected" % parent_full_name ) kwargs["parent"] = parent kwargs["name"] = full_name kwargs["pool"] = self self.check_element(full_name, full_name) td = TYPE_MAP_OBJ[ElementType.Instrument] klass = td.klass self._fill_kwargs_with_id(kwargs, full_name) elem = klass(**kwargs) if parent: parent.add_instrument(elem) ret = self.add_element(elem) self.fire_event(EventType("ElementCreated"), elem) return ret
[docs] def stop(self): msg = "" controllers = self.get_elements_by_type(ElementType.Controller) for controller in controllers: if not controller.is_online(): continue if controller.is_pseudo(): continue elif ElementType.IORegister in controller.get_ctrl_types(): # Skip IOR since they are not stoppable continue error_elements = controller.stop_elements() if len(error_elements) > 0: element_names = "" for element in error_elements: element_names += element.name + " " msg += "Controller %s -> %s\n" % (controller.name, element_names) self.error( "Unable to stop %s controller: " "Stop of elements %s failed" % (controller.name, element_names) ) if msg: msg_init = "Elements which could not be stopped:\n" raise Exception(msg_init + msg)
[docs] def abort(self): msg = "" controllers = self.get_elements_by_type(ElementType.Controller) for controller in controllers: if controller.is_pseudo(): continue elif ElementType.IORegister in controller.get_ctrl_types(): # Skip IOR since they are not stoppable continue error_elements = controller.abort_elements() if len(error_elements) > 0: element_names = "" for element in error_elements: element_names += element.name + " " msg += "Controller %s -> %s\n" % (controller.name, element_names) self.error( "Unable to abort %s controller: " "Abort of elements %s failed" % (controller.name, element_names) ) if msg: msg_init = "Elements which could not be aborted:\n" raise Exception(msg_init + msg)
# -------------------------------------------------------------------------- # (Re)load code # --------------------------------------------------------------------------
[docs] def reload_controller_lib(self, lib_name): manager = self.ctrl_manager old_lib = manager.getControllerLib(lib_name) new_elements, changed_elements, deleted_elements = [], [], [] if old_lib is not None: ctrl_infos = old_lib.get_controllers() pool_ctrls = self.get_elements_by_type(ElementType.Controller) init_pool_ctrls = [] for pool_ctrl in pool_ctrls: if pool_ctrl.get_ctrl_info() in ctrl_infos: init_pool_ctrls.append(pool_ctrl) changed_elements.append(old_lib) new_lib = manager.reloadControllerLib(lib_name) if old_lib is None: new_elements.extend(new_lib.get_controllers()) new_elements.append(new_lib) else: new_names = {ctrl.name for ctrl in new_lib.get_controllers()} old_names = {ctrl.name for ctrl in old_lib.get_controllers()} changed_names = set.intersection(new_names, old_names) deleted_names = old_names.difference(new_names) new_names = new_names.difference(old_names) for new_name in new_names: new_elements.append(new_lib.get_controller(new_name)) for changed_name in changed_names: changed_elements.append(new_lib.get_controller(changed_name)) for deleted_name in deleted_names: deleted_elements.append(old_lib.get_controller(deleted_name)) evt = {"new": new_elements, "change": changed_elements, "del": deleted_elements} self.fire_event(EventType("ElementsChanged"), evt) if old_lib is not None: for pool_ctrl in init_pool_ctrls: pool_ctrl.re_init()
[docs] def reload_controller_class(self, class_name): ctrl_info = self.ctrl_manager.getControllerMetaClass(class_name) lib_name = ctrl_info.module_name self.reload_controller_lib(lib_name)
[docs] def get_element_id_graph(self): physical_elems_id_map = {} elem_type_map = self.get_element_type_map() for elem_type in TYPE_PHYSICAL_ELEMENTS: physical_elems_id_map.update(elem_type_map[elem_type])
# TODO def _build_element_id_dependencies(self, elem_id, graph=None): if graph is None: graph = Graph() elem = self.get_element_by_id(elem_id) if elem.get_id() in graph or elem.get_type() in TYPE_PHYSICAL_ELEMENTS: return graph graph[elem_id] = list(elem.get_user_element_ids()) return graph
[docs] def get_moveable_id_graph(self): moveable_elems_id_map = {} elem_type_map = self.get_element_type_map() for elem_type in TYPE_MOVEABLE_ELEMENTS: moveable_elems_id_map.update(elem_type_map[elem_type]) graph = Graph() for moveable_id in moveable_elems_id_map: self._build_element_id_dependencies(moveable_id, graph) return graph
def _build_element_dependencies(self, elem, graph=None): if graph is None: graph = Graph() if elem.get_id() in graph or elem.get_type() in TYPE_PHYSICAL_ELEMENTS: return graph graph[elem] = list(elem.get_user_elements()) return graph
[docs] def get_moveable_graph(self): moveable_elems_map = {} elem_type_map = self.get_element_type_map() for elem_type in TYPE_MOVEABLE_ELEMENTS: moveable_elems_map.update(elem_type_map[elem_type]) graph = Graph() for moveable in list(moveable_elems_map.values()): self._build_element_dependencies(moveable, graph) return graph