#!/usr/bin/env python
"""
Copyright (c) 2013, Luke Fitzgerald
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The views and conclusions contained in the software and documentation are those
of the authors and should not be interpreted as representing official policies,
either expressed or implied, of the FreeBSD Project."""
import asyncio
import logging
import socket
import struct
HELLO_STR = "hello-000"
HEADER_STRUCT = struct.Struct("<B21s21sBBH")
COMMAND_MAP = {
"NULL": 0,
"SET": 1,
"GET": 2,
"IO": 3,
"KEEPALIVE": 4,
"RSS": 5,
"RCU": 6,
}
SOCKET_TIMEOUT = 1
[docs]class DXPCommand(object):
COMMAND = None
DESCRIPTOR_MAP = None
DESCRIPTOR = None
PAYLOAD_STRUCT = None
def __init__(self, interface):
self.interface = interface
def _build_header(self):
if not self.COMMAND:
raise Exception("'COMMAND' type not specified for class")
if not self.DESCRIPTOR_MAP:
raise Exception("'DESCRIPTOR_MAP' not specified for class")
if not self.DESCRIPTOR:
raise Exception("'DESCRIPTOR' type not specified for class")
return HEADER_STRUCT.pack(
COMMAND_MAP[self.COMMAND],
bytes(self.interface.username, "utf-8"),
bytes(self.interface.password, "utf-8"),
self.DESCRIPTOR_MAP[self.DESCRIPTOR],
0,
self.interface.get_seq_num(),
)
def _build_payload(self, *pack_args):
if not self.PAYLOAD_STRUCT:
raise Exception("'PAYLOAD_STRUCT' not specified for class")
return self.PAYLOAD_STRUCT.pack(*pack_args)
async def _get_response(self, socket):
"""
Parse the response from the request
"""
raise Exception("get_response method not implemented")
async def _get_boolean_response(self):
# response = self.interface.socket.recv(1)
response = await self.interface.loop.sock_recv(self.interface.socket, 1)
if not response:
return False
self.interface.increment_seq_num()
return self._parse_bool(response)
def _parse_bool(self, string):
return not struct.unpack("?", string)[0]
[docs] async def do_request(self):
header = self._build_header()
payload = self._build_payload()
request = header + payload
# self.interface.socket.sendall(request)
# self.interface.socket.sendall(bytes(request, 'utf-8'))
await self.interface.loop.sock_sendall(self.interface.socket, request)
return await self._get_response()
async def _do_payloadless_request(self):
request = self._build_header()
# self.interface.socket.sendall(request)
# self.interface.socket.sendall(bytes(request, 'utf-8'))
await self.interface.loop.sock_sendall(self.interface.socket, request)
return await self._get_response()
[docs]class IOCommand(DXPCommand):
COMMAND = "IO"
DESCRIPTOR_MAP = {
"NULL": 0,
"CHANGE_RELAY": 1,
"CHANGE_RELAYS": 2,
"GET_RELAY": 3,
"GET_RELAYS": 4,
"GET_INPUT": 5,
"GET_INPUTS": 6,
"PULSE_RELAY": 7,
}
[docs]class RelayCommand(IOCommand):
STATE_MAP = {True: 1, False: 0, "NO_CHANGE": 2}
async def _get_response(self):
return await self._get_boolean_response()
[docs]class ChangeRelayCommand(RelayCommand):
DESCRIPTOR = "CHANGE_RELAY"
PAYLOAD_STRUCT = struct.Struct("<BB")
def __init__(self, interface, relay, state):
super(ChangeRelayCommand, self).__init__(interface)
self.relay = relay
self.state = state
def _build_payload(self):
return super(ChangeRelayCommand, self)._build_payload(
self.relay, self.STATE_MAP[self.state]
)
[docs]class ChangeRelaysCommand(RelayCommand):
DESCRIPTOR = "CHANGE_RELAYS"
PAYLOAD_STRUCT = struct.Struct("<" + ("B" * 32)) # 32 unsigned chars
def __init__(self, interface, relay_state_dict):
super(ChangeRelaysCommand, self).__init__(interface)
self.relay_state_dict = relay_state_dict
def _build_payload(self):
state_list = []
self.interface.logger.debug(self.relay_state_dict)
for relay in range(32):
if (relay + 1) not in self.relay_state_dict:
state_list.append(self.STATE_MAP["NO_CHANGE"])
else:
state_list.append(self.STATE_MAP[self.relay_state_dict[relay + 1]])
self.interface.logger.debug(state_list)
return super(ChangeRelaysCommand, self)._build_payload(*state_list)
[docs]class GetRelaysRequest(IOCommand):
DESCRIPTOR = "GET_RELAYS"
[docs] async def do_request(self):
return await self._do_payloadless_request()
async def _get_response(self):
# response = self.interface.socket.recv(self.interface.num_relays)
response = await self.interface.loop.sock_recv(
self.interface.socket, self.interface.num_relays
)
if not response:
return None
self.interface.increment_seq_num()
# return [True if ord(str(relay_status)) == 1 else False
return [True if relay_status == 1 else False for relay_status in response]
[docs]class PulseRelayRequest(RelayCommand):
DESCRIPTOR = "PULSE_RELAY"
PAYLOAD_STRUCT = struct.Struct("<BBH")
def __init__(self, interface, relay, state, width):
super(PulseRelayRequest, self).__init__(interface)
self.relay = relay
self.state = state
self.width = width
def _build_payload(self):
return super(PulseRelayRequest, self)._build_payload(
self.relay, self.STATE_MAP[self.state], self.width
)
[docs]class iBootInterface(object):
def __init__(self, ip, username, password, num_relays=3, log=None, port=9100):
self.ip = ip
self.username = username
self.password = password
self.port = port
self.num_relays = num_relays
self.seq_num = None
self.socket = None
if log:
self.logger = log
else:
logging.basicConfig()
self.logger = logging.getLogger("iBootInterface")
self.logger.setLevel(logging.DEBUG)
self.loop = asyncio.get_event_loop()
[docs] def get_seq_num(self):
seq_num = self.seq_num
self.seq_num += 1
return seq_num
[docs] def increment_seq_num(self):
self.seq_num += 1
[docs] async def connect(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(SOCKET_TIMEOUT)
try:
# self.socket.connect((self.ip, self.port))
await self.loop.sock_connect(self.socket, (self.ip, self.port))
except socket.error:
self.logger.error("Socket failed to connect")
return False
try:
# self.socket.sendall(bytes(HELLO_STR, 'utf-8'))
await self.loop.sock_sendall(self.socket, bytes(HELLO_STR, "utf-8"))
return await self._get_initial_seq_num()
except socket.error:
self.logger.error("Socket error")
return False
return True
async def _get_initial_seq_num(self):
# response = self.socket.recv(2)
response = await self.loop.sock_recv(self.socket, 2)
if not response:
return False
self.seq_num = struct.unpack("H", response)[0] + 1
return True
[docs] def disconnect(self):
try:
self.socket.close()
except socket.error:
pass
[docs] async def switch(self, relay, on):
"""Switch the given relay on or off"""
await self.connect()
request = ChangeRelayCommand(self, relay, on)
try:
return await request.do_request()
except socket.error:
return False
finally:
self.disconnect()
[docs] async def switch_multiple(self, relay_state_dict):
"""
Change the state of multiple relays at once
State dictionary should be of the form:
{1: True}
where the key is the relay and the value is the new state
"""
await self.connect()
for relay, new_state in list(relay_state_dict.items()):
request = ChangeRelayCommand(self, relay, new_state)
try:
result = await request.do_request()
if not result:
return False
except socket.error:
self.disconnect()
return False
self.disconnect()
return True
[docs] async def get_relays(self):
await self.connect()
request = GetRelaysRequest(self)
try:
return await request.do_request()
except socket.error:
return False
finally:
self.disconnect()
[docs] async def pulse_relay(self, relay, on, length):
await self.connect()
request = PulseRelayRequest(self, relay, on, length)
try:
return await request.do_request()
except socket.error:
return False
finally:
self.disconnect()
if __name__ == "__main__":
interface = iBootInterface("192.168.0.105", "admin", "admin")
print(str(interface.get_relays()))
print(str(interface.switch(1, False)))