diff options
Diffstat (limited to 'brokenithm-evolved-umi.py')
| -rw-r--r-- | brokenithm-evolved-umi.py | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/brokenithm-evolved-umi.py b/brokenithm-evolved-umi.py new file mode 100644 index 0000000..b1b72b1 --- /dev/null +++ b/brokenithm-evolved-umi.py @@ -0,0 +1,96 @@ +import mmap +import ctypes +import time +import key_config +import umi_led +import threading +import keyboard + +class SharedMemoryData(ctypes.Structure): + _fields_ = [ + ("airIoStatus", ctypes.c_uint8 * 6), + ("sliderIoStatus", ctypes.c_uint8 * 32), + ("ledRgbData", ctypes.c_uint8 * 96), + ("reserved", ctypes.c_uint8 * 4) + ] + +# Shared memory details +SHARED_MEMORY_NAME = "Local\\BROKENITHM_SHARED_BUFFER" +SHARED_MEMORY_SIZE = ctypes.sizeof(ctypes.c_uint8) * ctypes.sizeof(SharedMemoryData) + + +def initialize_shared_mem(): + try: + shared_memory = mmap.mmap(-1, SHARED_MEMORY_SIZE, tagname=SHARED_MEMORY_NAME, access=mmap.ACCESS_WRITE) + slider_status = [0] * 32 # Default: All slider zones inactive + air_status = [0] * 6 # Default: All air zones inactive + shared_memory.seek(0) + shared_memory.write(bytearray(air_status)) + print("[Initialization] Resetting Air Note Status") + shared_memory.seek(6) + shared_memory.write(bytearray(slider_status)) + print("[Initialization] Resetting Slider Status") + shared_memory.seek(6 + 32) + print("[Initialization] Setting Default Slider LED state") + shared_memory.write(bytearray(umi_led.DEFAULT_LED_STATE)) + print("[Initialization] Success. Complete") + return shared_memory + except Exception: + print("[Error] A Fatal Error occured while trying to write to the Shared Memory while initializing") + return None + +def monitor_key_presses_and_air(controller: key_config.InputConfig): + try: + shared_memory = mmap.mmap(-1, SHARED_MEMORY_SIZE, tagname=SHARED_MEMORY_NAME, access=mmap.ACCESS_READ) + key_states = {zone: False for zone in controller.get_keyzone_layout().keys()} + air_states = {zone: False for zone in controller.get_airzone_layout().keys()} + print("Now Monitoring for Inputs") + while True: + shared_memory.seek(0) + current_air_status = list(shared_memory.read(6)) + shared_memory.seek(6) + current_slider_status = list(shared_memory.read(32)) + for zone, key in controller.get_keyzone_layout().items(): + is_pressed = current_slider_status[zone] == 128 + if is_pressed != key_states[zone]: + if is_pressed: + keyboard.press(key) + print(f"Key {key} pressed") + else: + keyboard.release(key) + print(f"Key {key} released") + key_states[zone] = is_pressed + for zone, key in controller.get_airzone_layout().items(): + is_active = current_air_status[zone] == 128 + if is_active != air_states[zone]: + if is_active: + keyboard.press(key) + print(f"Air {key} activated") + else: + keyboard.release(key) + print(f"Air {key} deactivated") + air_states[zone] = is_active + time.sleep(0.01) + except Exception: + print("[Error] A Fatal Error occured while monitoring and translating inputs") + except KeyboardInterrupt: + for key in controller.get_keyzone_layout().values(): + keyboard.release(key) + for key in controller.get_airzone_layout().values(): + keyboard.release(key) + print("Stopping monitoring.") + + +if __name__ == "__main__": + shared_memory = initialize_shared_mem() + controller = key_config.Umiguri32KeyZone() + if shared_memory is not None: + stop_event = threading.Event() + server_thread = threading.Thread(target=umi_led.start_umiguri_websocket_server, args=(shared_memory, stop_event)) + server_thread.start() + monitor_key_presses_and_air(controller) + print("Shutting down...") + stop_event.set() + shared_memory.close() + exit() + |
