Source code for lvmnps.switch.iboot.powerswitch

# -*- coding: utf-8 -*-
#
# @Author: Florian Briegel (briegel@mpia.de)
# @Date: 2021-06-24
# @Filename: lvmnps/switch/iboot/powerswitch.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from sdsstools.logger import SDSSLogger

from lvmnps.switch.iboot.iboot import iBootInterface
from lvmnps.switch.powerswitchbase import PowerSwitchBase


__all__ = ["PowerSwitch"]


[docs]class PowerSwitch(PowerSwitchBase): """Powerswitch class to manage the iboot power switch""" def __init__(self, name: str, config: [], log: SDSSLogger): super().__init__(name, config, log) self.hostname = self.config_get("hostname") self.username = self.config_get("username", "admin") self.password = self.config_get("password", "admin") self.portsnum = int(self.config_get("ports.num", "1")) self.iboot = iBootInterface( self.hostname, self.username, self.password, self.portsnum, self.log )
[docs] async def start(self): if not await self.isReachable(): self.log.warning(f"{self.name} not reachable on start up") await self.update(self.outlets)
[docs] async def stop(self): try: return self.iboot.disconnect() except Exception as ex: self.log.error(f"Unexpected exception {type(ex)}: {ex}") return False self.log.debug("So Long, and Thanks for All the Fish ...")
[docs] async def isReachable(self): try: return self.iboot.connect() except Exception as ex: self.log.error(f"Unexpected exception {type(ex)}: {ex}") return False
[docs] async def update(self, outlets): try: if await self.isReachable(): relays = self.iboot.get_relays() for o in outlets: o.setState(relays[o.portnum - 1]) else: for o in outlets: o.setState(-1) except Exception as ex: self.log.error(f"Unexpected exception {type(ex)}: {ex}")
[docs] async def switch(self, state, outlets): self.log.debug(str(state)) try: if await self.isReachable(): for o in outlets: self.iboot.switch(o.portnum, state) await self.update(outlets) except Exception as ex: self.log.error(f"Unexpected exception {type(ex)}: {ex}")