advanced-python-homework/noblocking_turtle_shell.py

76 lines
2.5 KiB
Python
Raw Permalink Normal View History

2023-11-04 19:05:45 +03:00
import cmd
import threading
from queue import Queue
2023-11-15 23:55:37 +03:00
from functools import reduce
from typing import Collection, Any, Optional
from controls.device import ActionDescriptor, DeviceError
from controls.either import Either, Left, Right
2023-11-04 19:05:45 +03:00
from equipment.turtle_device import TurtleDevice
2023-11-15 23:55:37 +03:00
def select(fr: Collection[ActionDescriptor], where: tuple[str, Any]) -> Optional[ActionDescriptor]:
results = list(filter(lambda r: r.__dict__[where[0]] == where[1], fr))
return None if len(results) == 0 else results[0]
2023-11-16 18:57:59 +03:00
class TurtleDeviceController():
2023-11-04 19:05:45 +03:00
def __init__(self):
super().__init__()
2023-11-15 23:55:37 +03:00
self._device = TurtleDevice()
self._device.open()
self._queue = Queue()
def send(self, task: list[str]) -> None:
self._queue.put(task)
def _parse_args(self, args: list[str]) -> Either[tuple, str]:
action_name, action_args = args[0], args[1:]
try:
descr = select(self._device.action_descriptors(), ('name', action_name))
except DeviceError as e:
return Right(f'Device error: {e}')
if descr is None:
return Right(f'Unknown action: "{action_name}"')
return Left(reduce(lambda acc, a: acc + (int(a),), action_args, (action_name,)))
def run(self): # ! NoReturn
while (True):
task = self._queue.get(block=True, timeout=None)
if task[0] == 'stop':
self._device.close()
return
args = self._parse_args(task)
if args.is_right():
print(f'*** {args.get_right()}')
else:
self._device.execute(*args.get_left())
2023-11-04 19:05:45 +03:00
class NoBlockingTurtleShell(cmd.Cmd):
intro = 'Welcome to the turtle shell. Type help or ? to list commands.\n'
prompt = '(turtle) '
file = None
2023-11-16 18:57:59 +03:00
def __init__(self, turtle_thread: TurtleDeviceController):
2023-11-15 23:55:37 +03:00
super(NoBlockingTurtleShell, self).__init__()
2023-11-16 18:57:59 +03:00
self._turtle_controller = turtle_thread
2023-11-04 19:05:45 +03:00
2023-11-15 23:55:37 +03:00
def do_execute(self, arg: str):
'Execute a turtle command: EXECUTE COMMAND ARG1 ARG2 ...'
2023-11-16 18:57:59 +03:00
self._turtle_controller.send(arg.split(' '))
2023-11-04 19:05:45 +03:00
def do_exit(self, arg):
2023-11-15 23:55:37 +03:00
'close the turtle window, and exit: EXIT'
print('Waiting for the turtle to finish jobs...')
2023-11-16 18:57:59 +03:00
self._turtle_controller.send(['stop'])
2023-11-15 23:55:37 +03:00
return True
2023-11-04 19:05:45 +03:00
if __name__ == '__main__':
2023-11-16 18:57:59 +03:00
turtle_controller = TurtleDeviceController()
shell = NoBlockingTurtleShell(turtle_controller)
2023-11-15 23:55:37 +03:00
cmd_thread = threading.Thread(target=shell.cmdloop)
cmd_thread.start()
2023-11-16 18:57:59 +03:00
turtle_controller.run()