diff options
Diffstat (limited to 'umi_led.py')
| -rw-r--r-- | umi_led.py | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/umi_led.py b/umi_led.py new file mode 100644 index 0000000..e5718ff --- /dev/null +++ b/umi_led.py @@ -0,0 +1,203 @@ +""" +Umiguri LED Controller Protocol. Some commands have been stubbed +Thanks inonote for the great resources on the protocol +https://gist.github.com/inonote/00251fed881a82c9df1e505eef1722bc + +The implementation below is largely derived from +https://github.com/inonote/UmiguriSampleLedServer/blob/master/src/App.cpp +""" + +import asyncio +import websockets + +kLedServerPort = 7124 +kLedServerName = "BrokenithmLedServer" +kLedServerVersion = (123, 456) +kLedServerHardwareName = "Brokenithm" +kLedServerHardwareVersion = (987, 654) + +kLedProtocolVersion = 0x01 + +ULED_COMMAND_SET_LED = 0x10 +ULED_COMMAND_INITIALIZE = 0x11 +ULED_COMMAND_READY = ULED_COMMAND_INITIALIZE | 0x08 # 0x19 +ULED_COMMAND_PING = 0x12 +ULED_COMMAND_PONG = ULED_COMMAND_PING | 0x08 # 0x1A +ULED_COMMAND_REQUEST_SERVER_INFO = 0xD0 +ULED_COMMAND_REPORT_SERVER_INFO = ULED_COMMAND_REQUEST_SERVER_INFO | 0x08 # 0xD8 + +DEFAULT_LED_STATE = [ + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255, 0, 0, 0, + 255, 255, 255 +] + +SHOW_LOG = False + +def log_message(message: str): + if not SHOW_LOG: + return + print(message) + +def validate_message(payload: bytes) -> bool: + """ + Checks if the incoming payload conforms to the protocol: + 1. At least 3 bytes: [protocolVersion, command, length]. + 2. The length byte must match (payload.size() - 3). + 3. For known commands, check expected payload size if needed. + """ + if len(payload) < 3: + return False + + if payload[0] != kLedProtocolVersion: + return False + + length_byte = payload[2] + if length_byte != (len(payload) - 3): + return False + + command = payload[1] + if command == ULED_COMMAND_SET_LED: + if len(payload) != 106: # 103 bytes of LED data + 3 header bytes + return False + elif command == ULED_COMMAND_PING: + if len(payload) != 7: # 4 bytes payload + 3 header bytes + return False + + return True + +def get_brokenithm_led_array(payload): + led_state = [] + land_colors = [] + border_colors = [] + for i in range(16): + offset = 1 + i * 3 + land_colors.append(tuple(payload[offset:offset + 3])) + + for i in range(15): + offset = 49 + i * 3 + border_colors.append(tuple(payload[offset:offset + 3])) + land_colors = list(reversed(land_colors)) + border_colors = list(reversed(border_colors)) + try: + for i in range(15): + led_state.append(land_colors[i][2]) + led_state.append(land_colors[i][0]) + led_state.append(land_colors[i][1]) + led_state.append(border_colors[i][2]) + led_state.append(border_colors[i][0]) + led_state.append(border_colors[i][1]) + led_state.append(land_colors[15][2]) + led_state.append(land_colors[15][0]) + led_state.append(land_colors[15][1]) + except Exception: + log_message("[LEDServer] Invalid LED State received ignoring") + return None + return led_state + +# R,G,B -> B,R,G + +async def handle_message(websocket, payload: bytes, shared_memory): + """ + Process incoming (validated) payload and respond if needed. + """ + command = payload[1] + log_message(f"Received raw data ({len(payload)} bytes): {payload}") + + if command == ULED_COMMAND_PING: + log_message("-> Handling PING command") + custom_data = payload[3:7] + response = bytearray([kLedProtocolVersion, ULED_COMMAND_PONG, 6]) + custom_data + bytearray([0x51, 0xED]) + await websocket.send(response) + + elif command == ULED_COMMAND_INITIALIZE: + log_message("-> Handling INITIALIZE command") + response = bytearray([kLedProtocolVersion, ULED_COMMAND_READY, 0]) + await websocket.send(response) + + elif command == ULED_COMMAND_REQUEST_SERVER_INFO: + log_message("-> Handling REQUEST_SERVER_INFO command") + buf = bytearray(3 + 44) + buf[0] = kLedProtocolVersion + buf[1] = ULED_COMMAND_REPORT_SERVER_INFO + buf[2] = 44 + + name_bytes = kLedServerName.encode('ascii')[:16] + buf[3:3+len(name_bytes)] = name_bytes + + ver0, ver1 = kLedServerVersion + buf[3+16] = ver0 & 0xFF + buf[3+17] = (ver0 >> 8) & 0xFF + buf[3+18] = ver1 & 0xFF + buf[3+19] = (ver1 >> 8) & 0xFF + + hw_bytes = kLedServerHardwareName.encode('ascii')[:16] + buf[3+22:3+22+len(hw_bytes)] = hw_bytes + + hwver0, hwver1 = kLedServerHardwareVersion + buf[3+38] = hwver0 & 0xFF + buf[3+39] = (hwver0 >> 8) & 0xFF + buf[3+40] = hwver1 & 0xFF + buf[3+41] = (hwver1 >> 8) & 0xFF + + await websocket.send(buf) + + elif command == ULED_COMMAND_SET_LED: + log_message("-> Handling SET_LED command") + brightness = payload[3] + led_data = payload[3:] + log_message(f" Brightness: {brightness}") + log_message(f" LED color data length: {len(led_data)}") + led_arr = get_brokenithm_led_array(led_data) + if led_arr is None: + return + shared_memory.seek(6 + 32) + shared_memory.write(bytearray(led_arr)) + + else: + log_message(f"-> Unknown command 0x{command:02X}") + +async def handle_client(websocket, shared_memory): + """ + Main entry point for each connected client. + Listens for messages, validates them, and calls handle_message. + """ + print(f"[LEDServer] Client Detected from: {websocket.remote_address}") + try: + async for raw_message in websocket: + if isinstance(raw_message, bytes): + if validate_message(raw_message): + await handle_message(websocket, raw_message, shared_memory) + else: + log_message(f"Invalid message received: {raw_message}") + else: + log_message(f"Received text message (ignored): {raw_message}") + except websockets.exceptions.ConnectionClosed as e: + log_message(f"Client disconnected: {e.reason}") + +async def run_websocket_server(shared_memory, stop_event): + print(f"[LEDerver] Starting WebSocket server on port {kLedServerPort}...") + async with websockets.serve(lambda ws: handle_client(ws, shared_memory), "0.0.0.0", kLedServerPort): + print("[LEDServer] LEDServer started. Awaiting connections...") + while not stop_event.is_set(): + await asyncio.sleep(0.1) + print("[LEDServer] Websocket Server Shutting Down") + +def start_umiguri_websocket_server(shared_memory, stop_event): + try: + asyncio.run(run_websocket_server(shared_memory, stop_event)) + except KeyboardInterrupt: + log_message("Keyboard Interrupt Received - Server stopped.") |
