Source code for mxcubecore.HardwareObjects.LNLS.EPICSActuator

"""
Superclass for EPICS actuators.

Should be put as the first superclass,
e.g. class EPICSMotor(EPICSActuator, AbstractMotor):

Example of xml file:

<object class="LNLS.EPICSActuator">
    <channel type="epics" name="epicsActuator_val">MNC:B:LUCIOLE01:LIGHT_CH1</channel>
    <channel type="epics" name="epicsActuator_rbv" polling="500">MNC:B:LUCIOLE01:LIGHT_CH1</channel>
    <username>BackLight</username>
    <motor_name>BackLight</motor_name>
    <default_limits>(0, 8000)</default_limits>
</object>
"""

import random
import time

import gevent

from mxcubecore.HardwareObjects.abstract import AbstractActuator


[docs]class EPICSActuator(AbstractActuator.AbstractActuator): """EPCIS actuator class""" ACTUATOR_VAL = 'epicsActuator_val' # target ACTUATOR_RBV = 'epicsActuator_rbv' # readback def __init__(self, name): super(EPICSActuator, self).__init__(name) self.__wait_actuator_task = None self._nominal_limits = (-1E4, 1E4)
[docs] def init(self): """ Initialization method """ super(EPICSActuator, self).init() self.update_state(self.STATES.READY)
def _wait_actuator(self): """ Wait actuator to be ready.""" time.sleep(0.3) self.update_state(self.STATES.READY)
[docs] def get_value(self): """Override AbstractActuator method.""" return self.get_channel_value(self.ACTUATOR_RBV)
[docs] def set_value(self, value, timeout=0): """ Override AbstractActuator method.""" if self.read_only: raise ValueError("Attempt to set value for read-only Actuator") if self.validate_value(value): self.update_state(self.STATES.BUSY) if timeout or timeout is None: with gevent.Timeout( timeout, RuntimeError("Motor %s timed out" % self.username) ): self._set_value(value) new_value = self._wait_actuator(value) else: self._set_value(value) self.__wait_actuator_task = gevent.spawn(self._wait_actuator) else: raise ValueError("Invalid value %s; limits are %s" % (value, self.get_limits()) )
[docs] def abort(self): """Imediately halt movement. By default self.stop = self.abort""" if self.__wait_actuator_task is not None: self.__wait_actuator_task.kill() self.update_state(self.STATES.READY)
def _set_value(self, value): """ Override AbstractActuator method.""" self.set_channel_value(self.ACTUATOR_VAL, value)