#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool library. It defines the base classes
for ZeroDExpChannel"""
__all__ = ["Pool0DExpChannel"]
__docformat__ = 'restructuredtext'
import numpy
import time
from sardana import ElementType
from sardana.sardanaevent import EventType
from sardana.sardanaattribute import SardanaAttribute
from sardana.sardanavalue import SardanaValue
from sardana.pool.poolbasechannel import PoolBaseChannel
from sardana.pool.poolacquisition import Pool0DAcquisition
class BaseAccumulation(object):
def __init__(self):
self.buffer = numpy.zeros(shape=(2, 16384), dtype=numpy.float64)
self.clear()
def clear(self):
self.nb_points = 0
self.value = None
self.timestamp = None
def get_value_buffer(self):
return self.buffer[0][:self.nb_points]
def get_time_buffer(self):
return self.buffer[1][:self.nb_points]
def append(self, value, timestamp=None):
if timestamp is None:
timestamp = time.time()
idx = self.nb_points
self.nb_points += 1
self.buffer[0][idx] = value
self.buffer[1][idx] = timestamp
self.update_value(value, timestamp)
def update_value(self, value, timestamp):
self.value = value
self.timestamp = timestamp
LastAccumulation = BaseAccumulation
class SumAccumulation(BaseAccumulation):
def clear(self):
BaseAccumulation.clear(self)
self.sum = 0.0
def update_value(self, value, timestamp):
BaseAccumulation.update_value(self, value, timestamp)
if not value is None:
self.sum += value
self.value = self.sum
class AverageAccumulation(SumAccumulation):
def clear(self):
SumAccumulation.clear(self)
self.nb_valid_points = 0
def update_value(self, value, timestamp):
SumAccumulation.update_value(self, value, timestamp)
if not value is None:
self.nb_valid_points += 1
self.value = self.sum / self.nb_valid_points
class IntegralAccumulation(BaseAccumulation):
def clear(self):
BaseAccumulation.clear(self)
self.sum = 0.0
self.last_value = None
self.start_time = None
def update_value(self, value, timestamp):
if self.nb_points == 1:
self.last_value = value, timestamp
self.start_time = timestamp
self.value = value
else:
last_value, last_timestamp = self.last_value
dt = timestamp - last_timestamp
self.sum += dt * (last_value + value) / 2
total_dt = timestamp - self.start_time
self.value = self.sum / total_dt
self.last_value = value, timestamp
def get_accumulation_class(ctype):
return globals()[ctype + "Accumulation"]
class CurrentValue(SardanaAttribute):
def update(self, cache=True, propagate=1):
if not cache or not self.has_value():
value = self.obj.read_current_value()
self.set_value(value, propagate=propagate)
class Value(SardanaAttribute):
DefaultAccumulationType = "Average"
def __init__(self, *args, **kwargs):
accumulation_type = kwargs.pop(
'accumulation_type', self.DefaultAccumulationType)
super(Value, self).__init__(*args, **kwargs)
self.set_accumulation_type(accumulation_type)
def get_val(self):
return self.obj.get_value_attribute()
def set_accumulation_type(self, ctype):
klass = get_accumulation_class(ctype)
self._accumulation = klass()
def get_accumulation_type(self):
klass_name = self._accumulation.__class__.__name__
return klass_name[:klass_name.index("Accumulation")]
def get_accumulation(self):
return self._accumulation
accumulation = property(get_accumulation)
def _get_value(self):
value = self._accumulation.value
if value is None:
raise Exception("Value not available: no successful acquisition"
" done so far!")
return value
def _get_value_obj(self):
'''Override superclass method and compose a SardanaValue object from
the present values.
'''
value = self._accumulation.value
# use timestamp of the last acquired sample as timestamp of
# accumulation
timestamp = self._accumulation.get_time_buffer()[-1]
value_obj = SardanaValue(value=value, timestamp=timestamp)
return value_obj
def _in_error(self):
# for the moment let's assume that 0D is never in error
# this could be improved by searching the accumulation buffer
# in presence of readout errors
return False
def _has_value(self):
return self.accumulation.value is not None
def _get_timestamp(self):
return self.accumulation.timestamp
def get_accumulation_buffer(self):
return self.accumulation.get_value_buffer()
def get_time_buffer(self):
return self.accumulation.get_time_buffer()
def clear_buffer(self):
self.accumulation.clear()
def append_buffer(self, value, propagate=1):
self.accumulation.append(value.value, value.timestamp)
if propagate > 0:
evt_type = EventType(self.name, priority=propagate)
self.fire_event(evt_type, self)
def update(self, cache=True, propagate=1):
# it is the Pool0DAcquisition action which is allowed to update
raise Exception("0D Value can not be updated from outside"
" of acquisition")
[docs]class Pool0DExpChannel(PoolBaseChannel):
ValueAttributeClass = Value
AcquisitionClass = Pool0DAcquisition
def __init__(self, **kwargs):
kwargs['elem_type'] = ElementType.ZeroDExpChannel
PoolBaseChannel.__init__(self, **kwargs)
self._current_value = CurrentValue(self, listeners=self.on_change)
# -------------------------------------------------------------------------
# Accumulation
# -------------------------------------------------------------------------
[docs] def get_accumulation_type(self):
return self.get_value_attribute().get_accumulation_type()
[docs] def get_accumulation(self):
return self.get_value_attribute().get_accumulation()
[docs] def set_accumulation_type(self, ctype):
return self.get_value_attribute().set_accumulation_type(ctype)
accumulation = property(get_accumulation)
accumulation_type = property(get_accumulation_type, set_accumulation_type)
# -------------------------------------------------------------------------
# value
# -------------------------------------------------------------------------
[docs] def get_accumulated_value_attribute(self):
"""Returns the accumulated value attribute object for this 0D.
:return: the accumulated value attribute
:rtype: :class:`~sardana.sardanaattribute.SardanaAttribute`"""
return self.get_value_attribute()
[docs] def get_current_value_attribute(self):
"""Returns the current value attribute object for this 0D.
:return: the current value attribute
:rtype: :class:`~sardana.sardanaattribute.SardanaAttribute`"""
return self._current_value
[docs] def get_accumulated_value(self):
"""Gets the accumulated value for this 0D.
:return:
a :class:`~sardana.sardanavalue.SardanaValue` containing the 0D
value
:rtype:
:class:`~sardana.sardanaattribute.SardanaAttribute`
:raises: Exception if no acquisition has been done yet on this 0D"""
return self.get_accumulated_value_attribute()
[docs] def read_current_value(self):
"""Reads the 0D value from hardware.
:return:
a :class:`~sardana.sardanavalue.SardanaValue` containing the counter
value
:rtype:
:class:`~sardana.sardanavalue.SardanaValue`"""
return self.acquisition.read_value()[self]
[docs] def put_current_value(self, value, propagate=1):
"""Put a current value.
:param value:
the new value
:type value:
:class:`~sardana.sardanavalue.SardanaValue`
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
:type propagate:
int"""
curr_val_attr = self.get_current_value_attribute()
curr_val_attr.set_value(value, propagate=propagate)
if self.is_in_operation():
acc_val_attr = self.get_accumulated_value_attribute()
acc_val_attr.append_buffer(value, propagate=propagate)
[docs] def get_current_value(self, cache=True, propagate=1):
"""Returns the counter value.
:return:
the 0D accumulated value
:rtype:
:class:`~sardana.sardanaattribute.SardanaAttribute`"""
curr_val_attr = self.get_current_value_attribute()
curr_val_attr.update(cache=cache, propagate=propagate)
return curr_val_attr
current_value = property(get_current_value, doc="0D value")
accumulated_value = property(get_accumulated_value, doc="0D value")
[docs] def clear_buffer(self):
self.get_accumulated_value_attribute().clear_buffer()
# -------------------------------------------------------------------------
# accumulation buffer
# -------------------------------------------------------------------------
[docs] def get_accumulation_buffer(self):
return self.get_accumulated_value_attribute().get_accumulation_buffer()
accumulation_buffer = property(get_accumulation_buffer)
# -------------------------------------------------------------------------
# time buffer
# -------------------------------------------------------------------------
[docs] def get_time_buffer(self):
return self.get_accumulated_value_attribute().get_time_buffer()
time_buffer = property(get_time_buffer)
[docs] def start_acquisition(self, value=None):
self._aborted = False
self.clear_buffer()
if value is None:
raise Exception(
"Invalid integration_time '%s'. Hint set a new value for 'value' first" % value)
if not self._simulation_mode:
acq = self.acquisition.run()