1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
import mmap
import ctypes
import time
import key_config
import umi_led
import threading
import keyboard
class SharedMemoryData(ctypes.Structure):
_fields_ = [
("airIoStatus", ctypes.c_uint8 * 6),
("sliderIoStatus", ctypes.c_uint8 * 32),
("ledRgbData", ctypes.c_uint8 * 96),
("reserved", ctypes.c_uint8 * 4)
]
# Shared memory details
SHARED_MEMORY_NAME = "Local\\BROKENITHM_SHARED_BUFFER"
SHARED_MEMORY_SIZE = ctypes.sizeof(ctypes.c_uint8) * ctypes.sizeof(SharedMemoryData)
def initialize_shared_mem():
try:
shared_memory = mmap.mmap(-1, SHARED_MEMORY_SIZE, tagname=SHARED_MEMORY_NAME, access=mmap.ACCESS_WRITE)
slider_status = [0] * 32 # Default: All slider zones inactive
air_status = [0] * 6 # Default: All air zones inactive
shared_memory.seek(0)
shared_memory.write(bytearray(air_status))
print("[Initialization] Resetting Air Note Status")
shared_memory.seek(6)
shared_memory.write(bytearray(slider_status))
print("[Initialization] Resetting Slider Status")
shared_memory.seek(6 + 32)
print("[Initialization] Setting Default Slider LED state")
shared_memory.write(bytearray(umi_led.DEFAULT_LED_STATE))
print("[Initialization] Success. Complete")
return shared_memory
except Exception:
print("[Error] A Fatal Error occured while trying to write to the Shared Memory while initializing")
return None
def monitor_key_presses_and_air(controller: key_config.InputConfig):
try:
shared_memory = mmap.mmap(-1, SHARED_MEMORY_SIZE, tagname=SHARED_MEMORY_NAME, access=mmap.ACCESS_READ)
key_states = {zone: False for zone in controller.get_keyzone_layout().keys()}
air_states = {zone: False for zone in controller.get_airzone_layout().keys()}
print("Now Monitoring for Inputs")
while True:
shared_memory.seek(0)
current_air_status = list(shared_memory.read(6))
shared_memory.seek(6)
current_slider_status = list(shared_memory.read(32))
for zone, key in controller.get_keyzone_layout().items():
is_pressed = current_slider_status[zone] == 128
if is_pressed != key_states[zone]:
if is_pressed:
keyboard.press(key)
print(f"Key {key} pressed")
else:
keyboard.release(key)
print(f"Key {key} released")
key_states[zone] = is_pressed
for zone, key in controller.get_airzone_layout().items():
is_active = current_air_status[zone] == 128
if is_active != air_states[zone]:
if is_active:
keyboard.press(key)
print(f"Air {key} activated")
else:
keyboard.release(key)
print(f"Air {key} deactivated")
air_states[zone] = is_active
time.sleep(0.01)
except Exception:
print("[Error] A Fatal Error occured while monitoring and translating inputs")
except KeyboardInterrupt:
for key in controller.get_keyzone_layout().values():
keyboard.release(key)
for key in controller.get_airzone_layout().values():
keyboard.release(key)
print("Stopping monitoring.")
if __name__ == "__main__":
shared_memory = initialize_shared_mem()
controller = key_config.Umiguri32KeyZone()
if shared_memory is not None:
stop_event = threading.Event()
server_thread = threading.Thread(target=umi_led.start_umiguri_websocket_server, args=(shared_memory, stop_event))
server_thread.start()
monitor_key_presses_and_air(controller)
print("Shutting down...")
stop_event.set()
shared_memory.close()
exit()
|