Add openpilot tools
This commit is contained in:
94
tools/sim/lib/keyboard_ctrl.py
Normal file
94
tools/sim/lib/keyboard_ctrl.py
Normal file
@@ -0,0 +1,94 @@
|
||||
import sys
|
||||
import termios
|
||||
import time
|
||||
|
||||
from termios import (BRKINT, CS8, CSIZE, ECHO, ICANON, ICRNL, IEXTEN, INPCK,
|
||||
ISTRIP, IXON, PARENB, VMIN, VTIME)
|
||||
from typing import NoReturn
|
||||
|
||||
# Indexes for termios list.
|
||||
IFLAG = 0
|
||||
OFLAG = 1
|
||||
CFLAG = 2
|
||||
LFLAG = 3
|
||||
ISPEED = 4
|
||||
OSPEED = 5
|
||||
CC = 6
|
||||
|
||||
STDIN_FD = sys.stdin.fileno()
|
||||
|
||||
|
||||
KEYBOARD_HELP = """
|
||||
| key | functionality |
|
||||
|------|-----------------------|
|
||||
| 1 | Cruise Resume / Accel |
|
||||
| 2 | Cruise Set / Decel |
|
||||
| 3 | Cruise Cancel |
|
||||
| r | Reset Simulation |
|
||||
| i | Toggle Ignition |
|
||||
| q | Exit all |
|
||||
| wasd | Control manually |
|
||||
"""
|
||||
|
||||
|
||||
def getch() -> str:
|
||||
old_settings = termios.tcgetattr(STDIN_FD)
|
||||
try:
|
||||
# set
|
||||
mode = old_settings.copy()
|
||||
mode[IFLAG] &= ~(BRKINT | ICRNL | INPCK | ISTRIP | IXON)
|
||||
#mode[OFLAG] &= ~(OPOST)
|
||||
mode[CFLAG] &= ~(CSIZE | PARENB)
|
||||
mode[CFLAG] |= CS8
|
||||
mode[LFLAG] &= ~(ECHO | ICANON | IEXTEN)
|
||||
mode[CC][VMIN] = 1
|
||||
mode[CC][VTIME] = 0
|
||||
termios.tcsetattr(STDIN_FD, termios.TCSAFLUSH, mode)
|
||||
|
||||
ch = sys.stdin.read(1)
|
||||
finally:
|
||||
termios.tcsetattr(STDIN_FD, termios.TCSADRAIN, old_settings)
|
||||
return ch
|
||||
|
||||
def keyboard_poll_thread(q: 'Queue[str]'):
|
||||
while True:
|
||||
c = getch()
|
||||
if c == '1':
|
||||
q.put("cruise_up")
|
||||
elif c == '2':
|
||||
q.put("cruise_down")
|
||||
elif c == '3':
|
||||
q.put("cruise_cancel")
|
||||
elif c == 'w':
|
||||
q.put("throttle_%f" % 1.0)
|
||||
elif c == 'a':
|
||||
q.put("steer_%f" % 0.15)
|
||||
elif c == 's':
|
||||
q.put("brake_%f" % 1.0)
|
||||
elif c == 'd':
|
||||
q.put("steer_%f" % -0.15)
|
||||
elif c == 'z':
|
||||
q.put("blinker_left")
|
||||
elif c == 'x':
|
||||
q.put("blinker_right")
|
||||
elif c == 'i':
|
||||
q.put("ignition")
|
||||
elif c == 'r':
|
||||
q.put("reset")
|
||||
elif c == 'q':
|
||||
q.put("quit")
|
||||
break
|
||||
|
||||
def test(q: 'Queue[str]') -> NoReturn:
|
||||
while True:
|
||||
print([q.get_nowait() for _ in range(q.qsize())] or None)
|
||||
time.sleep(0.25)
|
||||
|
||||
if __name__ == '__main__':
|
||||
from multiprocessing import Process, Queue
|
||||
q: Queue[str] = Queue()
|
||||
p = Process(target=test, args=(q,))
|
||||
p.daemon = True
|
||||
p.start()
|
||||
|
||||
keyboard_poll_thread(q)
|
||||
Reference in New Issue
Block a user