summaryrefslogtreecommitdiffstats
path: root/brokenithm-evolved-umi.py
blob: b1b72b188d1ea852471e6943cfc1d92d3fa31e0e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import mmap
import ctypes
import time
import key_config
import umi_led
import threading
import keyboard

class SharedMemoryData(ctypes.Structure):
    _fields_ = [
        ("airIoStatus", ctypes.c_uint8 * 6),
        ("sliderIoStatus", ctypes.c_uint8 * 32),
        ("ledRgbData", ctypes.c_uint8 * 96),
        ("reserved", ctypes.c_uint8 * 4)
    ]

# Shared memory details
SHARED_MEMORY_NAME = "Local\\BROKENITHM_SHARED_BUFFER"
SHARED_MEMORY_SIZE = ctypes.sizeof(ctypes.c_uint8) * ctypes.sizeof(SharedMemoryData)


def initialize_shared_mem():
    try:
        shared_memory = mmap.mmap(-1, SHARED_MEMORY_SIZE, tagname=SHARED_MEMORY_NAME, access=mmap.ACCESS_WRITE)
        slider_status = [0] * 32  # Default: All slider zones inactive
        air_status = [0] * 6  # Default: All air zones inactive
        shared_memory.seek(0)
        shared_memory.write(bytearray(air_status))
        print("[Initialization] Resetting Air Note Status")
        shared_memory.seek(6)
        shared_memory.write(bytearray(slider_status))
        print("[Initialization] Resetting Slider Status")
        shared_memory.seek(6 + 32) 
        print("[Initialization] Setting Default Slider LED state")
        shared_memory.write(bytearray(umi_led.DEFAULT_LED_STATE))
        print("[Initialization] Success. Complete")
        return shared_memory
    except Exception:
        print("[Error] A Fatal Error occured while trying to write to the Shared Memory while initializing")
        return None

def monitor_key_presses_and_air(controller: key_config.InputConfig):
    try:
        shared_memory = mmap.mmap(-1, SHARED_MEMORY_SIZE, tagname=SHARED_MEMORY_NAME, access=mmap.ACCESS_READ)
        key_states = {zone: False for zone in controller.get_keyzone_layout().keys()}
        air_states = {zone: False for zone in controller.get_airzone_layout().keys()}
        print("Now Monitoring for Inputs")
        while True:
            shared_memory.seek(0)
            current_air_status = list(shared_memory.read(6))
            shared_memory.seek(6)
            current_slider_status = list(shared_memory.read(32))
            for zone, key in controller.get_keyzone_layout().items():
                is_pressed = current_slider_status[zone] == 128
                if is_pressed != key_states[zone]:
                    if is_pressed:
                        keyboard.press(key)
                        print(f"Key {key} pressed")
                    else:
                        keyboard.release(key)
                        print(f"Key {key} released")
                    key_states[zone] = is_pressed
            for zone, key in controller.get_airzone_layout().items():
                is_active = current_air_status[zone] == 128
                if is_active != air_states[zone]:
                    if is_active:
                        keyboard.press(key)
                        print(f"Air {key} activated")
                    else:
                        keyboard.release(key)
                        print(f"Air {key} deactivated")
                    air_states[zone] = is_active
            time.sleep(0.01)
    except Exception:
        print("[Error] A Fatal Error occured while monitoring and translating inputs")
    except KeyboardInterrupt:
        for key in controller.get_keyzone_layout().values():
            keyboard.release(key)
        for key in controller.get_airzone_layout().values():
            keyboard.release(key)
        print("Stopping monitoring.")


if __name__ == "__main__":
    shared_memory = initialize_shared_mem()
    controller = key_config.Umiguri32KeyZone()
    if shared_memory is not None:
        stop_event = threading.Event()
        server_thread = threading.Thread(target=umi_led.start_umiguri_websocket_server, args=(shared_memory, stop_event))
        server_thread.start()
        monitor_key_presses_and_air(controller)
        print("Shutting down...")
        stop_event.set()
        shared_memory.close()
        exit()

send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage