Source code for mxcubecore.HardwareObjects.DESY.P11Energy

# encoding: utf-8
#
#  Project: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

__copyright__ = """Copyright The MXCuBE Collaboration"""
__license__ = "LGPLv3+"

import logging

from mxcubecore import HardwareRepository as HWR
from mxcubecore.HardwareObjects.abstract.AbstractEnergy import AbstractEnergy

log = logging.getLogger("HWR")


[docs]class P11Energy(AbstractEnergy): _default_energy = 12.0 def __init__(self, name): super().__init__(name)
[docs] def init(self): self.chan_energy = self.get_channel_object("chanEnergy") if self.chan_energy is not None: self.chan_energy.connect_signal("update", self.energy_position_changed) self.chan_status = self.get_channel_object("chanStatus") if self.chan_status is not None: self.chan_status.connect_signal("update", self.energy_state_changed) self.chan_autobrake = self.get_channel_object("chanAutoBrake") limits = self.get_property("limits", None) try: limits = list(map(float, limits.split(","))) except Exception as e: log.error("P11Transmission - cannot parse limits: {}".format(str(e))) limits = None if limits is None: log.error( "P11Transmission - Cannot read LIMITS from configuration xml file. Check values" ) return else: self.set_limits(limits) self.energy_state_changed() self.energy_position_changed()
[docs] def re_emit_values(self): self.emit("valueChanged", (self._nominal_value)) self.emit("energyChanged", (self._nominal_value, self._wavelength_value))
[docs] def is_ready(self): return self._state == self.STATES.READY
def energy_state_changed(self, state=None): if state is None: state = self.chan_status.get_value() _state = str(state) if _state == "ON": self._state = self.STATES.READY elif _state == "MOVING": self._state = self.STATES.BUSY elif _state == "DISABLED": self._state = self.STATES.OFF else: self._state = self.STATES.FAULT self.emit("stateChanged", self._state)
[docs] def energy_position_changed(self, pos=None): """ Event called when energy value has been changed :param pos: float :return: """ if pos is None: pos = self.chan_energy.get_value() energy = pos / 1000.0 if self._nominal_value is None or abs(energy - self._nominal_value) > 1e-3: self._nominal_value = energy self._wavelength_value = 12.3984 / energy if self._wavelength_value is not None: self.emit( "energyChanged", (self._nominal_value, self._wavelength_value) ) self.emit("valueChanged", (self._nominal_value,))
def _set_value(self, value): """ Implementation pending """ prog_value = value * 1000 self.chan_autobrake.set_value(True) if self.get_state() == self.STATES.READY: current_value = self.get_value() if abs(current_value - value) < 0.01: self.log.debug( "The difference between the current and desired energy values is less than 0.01. No change made." ) else: self.log.debug("Programming ENERGY to %s eV" % prog_value) self.chan_energy.set_value(prog_value) self.log.debug(f"Energy value set to {value} keV.")
[docs] def get_value(self): """ Returns current energy in keV :return: float """ value = self._default_energy if self.chan_energy is not None: try: value = self.chan_energy.get_value() return value / 1000 except Exception: logging.getLogger("HWR").exception( "Energy: could not read current energy" ) return None return value
def test_hwo(hwo): print("Energy is: {0} keV".format(hwo.get_value()))