advanced-python-homework/noblocking_turtle_shell.py

75 lines
2.5 KiB
Python

import cmd
import threading
from queue import Queue
from functools import reduce
from typing import Collection, Any, Optional
from controls.device import ActionDescriptor, DeviceError
from controls.either import Either, Left, Right
from equipment.turtle_device import TurtleDevice
def select(fr: Collection[ActionDescriptor], where: tuple[str, Any]) -> Optional[ActionDescriptor]:
results = list(filter(lambda r: r.__dict__[where[0]] == where[1], fr))
return None if len(results) == 0 else results[0]
class TurtleDeviceThread():
def __init__(self):
super().__init__()
self._device = TurtleDevice()
self._device.open()
self._queue = Queue()
def send(self, task: list[str]) -> None:
self._queue.put(task)
def _parse_args(self, args: list[str]) -> Either[tuple, str]:
action_name, action_args = args[0], args[1:]
try:
descr = select(self._device.action_descriptors(), ('name', action_name))
except DeviceError as e:
return Right(f'Device error: {e}')
if descr is None:
return Right(f'Unknown action: "{action_name}"')
return Left(reduce(lambda acc, a: acc + (int(a),), action_args, (action_name,)))
def run(self): # ! NoReturn
while (True):
task = self._queue.get(block=True, timeout=None)
if task[0] == 'stop':
self._device.close()
return
args = self._parse_args(task)
if args.is_right():
print(f'*** {args.get_right()}')
else:
self._device.execute(*args.get_left())
class NoBlockingTurtleShell(cmd.Cmd):
intro = 'Welcome to the turtle shell. Type help or ? to list commands.\n'
prompt = '(turtle) '
file = None
def __init__(self, turtle_thread: TurtleDeviceThread):
super(NoBlockingTurtleShell, self).__init__()
self._turtle_thread = turtle_thread
def do_execute(self, arg: str):
'Execute a turtle command: EXECUTE COMMAND ARG1 ARG2 ...'
self._turtle_thread.send(arg.split(' '))
def do_exit(self, arg):
'close the turtle window, and exit: EXIT'
print('Waiting for the turtle to finish jobs...')
self._turtle_thread.send(['stop'])
return True
if __name__ == '__main__':
turtle_thread = TurtleDeviceThread()
shell = NoBlockingTurtleShell(turtle_thread)
cmd_thread = threading.Thread(target=shell.cmdloop)
cmd_thread.start()
turtle_thread.run()