Source code for lvmnps.switch.dli.powerswitch

# -*- coding: utf-8 -*-
#
# @Author: Florian Briegel (briegel@mpia.de)
# @Date: 2021-06-24
# @Filename: lvmnps/switch/iboot/powerswitch.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

import datetime

from sdsstools.logger import SDSSLogger

from lvmnps.switch.dli.lvmpower import PowerSwitch as DliPowerSwitch
from lvmnps.switch.powerswitchbase import PowerSwitchBase


# Todo: Dont inherit clu.Device in lvmnps.switch.dli.dlipower.PowerSwitch if you are not using it.


__all__ = ["PowerSwitch"]


[docs]class PowerSwitch(PowerSwitchBase): """Powerswitch class to manage the iboot power switch""" def __init__(self, name: str, config: [], log: SDSSLogger): super().__init__(name, config, log) self.hostname = self.config_get("hostname") self.username = self.config_get("user", "admin") self.password = self.config_get("password", "admin") self.use_https = self.config_get("use_https", False) self.dli = None self.reachable = None
[docs] async def start(self): if not await self.isReachable(): self.log.warning(f"{self.name} not reachable on start up") await self.update(self.outlets)
[docs] async def stop(self): try: pass except Exception as ex: self.log.error(f"Unexpected exception {type(ex)}: {ex}") return False self.log.debug("So Long, and Thanks for All the Fish ...")
[docs] async def isReachable(self): try: if not self.dli: self.dli = DliPowerSwitch( name=self.name, userid=self.username, password=self.password, hostname=self.hostname, use_https=self.use_https, ) # reachable = self.statuslist() self.reachable = await self.dli.verify() if not self.reachable: self.dli = None return self.reachable except Exception as ex: self.log.error( f"Unexpected exception is {type(ex)}: {ex}" ) # help me please.... to ck self.dli = None return False
[docs] async def update(self, outlets): # outlets contains all targeted ports self.log.debug(f"{outlets}") # flag = await self.isReachable() try: if await self.isReachable(): # get a list [] of port states, use outlets for a subset. currentstatus = await self.dli.statusdictionary() # print(currentstatus) for o in outlets: o.setState(currentstatus[o.portnum]) else: for o in outlets: o.setState(-1) except Exception as ex: self.log.error(f"Unexpected exception for {type(ex)}: {ex}")
[docs] async def switch(self, state, outlets): # outlets contains all targeted ports self.log.debug(f"{outlets} = {state}") try: if await self.isReachable(): # either loop over the outlets or pass the outlet list. current_time = datetime.datetime.now() print(f"after isReachable : {current_time}") for o in outlets: await self.dli.on(o.portnum) if state else await self.dli.off( o.portnum ) current_time = datetime.datetime.now() print(f"after dli : {current_time}") await self.update(outlets) print(outlets) current_time = datetime.datetime.now() print(f"after update : {current_time}") except Exception as ex: self.log.error(f"Unexpected exception to {type(ex)}: {ex}")
[docs] async def cycle(self, name, portnum): outlets = self.collectOutletsByNameAndPort(name, portnum) for o in outlets: await self.dli.cycle(o.portnum)