From e70077c9bc5f5dc401326821a7b189b642ed8bbc Mon Sep 17 00:00:00 2001 From: Igor Dunaev Date: Wed, 15 Nov 2023 23:55:37 +0300 Subject: [PATCH] Add sort of non-blockong shell --- controls/either.py | 110 +++++++++++++++++++++++++++++++++++++ equipment/turtle_device.py | 4 +- noblocking_turtle_shell.py | 62 +++++++++++++++++---- pyproject.toml | 2 +- 4 files changed, 164 insertions(+), 14 deletions(-) create mode 100644 controls/either.py diff --git a/controls/either.py b/controls/either.py new file mode 100644 index 0000000..3c7fdee --- /dev/null +++ b/controls/either.py @@ -0,0 +1,110 @@ +from abc import ABC, abstractmethod +from typing import Generic, TypeVar, Callable + +L = TypeVar('L') +R = TypeVar('R') +N = TypeVar('N') + +class Either(ABC, Generic[L, R]): + 'A monad-like object to handle errors in a functional style.' + @abstractmethod + def is_left(self) -> bool: + 'Returns `True` if the objects belongs to `Left` class' + pass + + @abstractmethod + def is_right(self) -> bool: + 'Returns `True` if the objects belongs to `Right` class' + pass + + @abstractmethod + def map_left(self, f: Callable[[L], N]) -> 'Either[N, R]': + '''Applies transform `f` to the value if type is `Left`, + but does not touch the value if type is `Right`.''' + pass + + @abstractmethod + def map_right(self, f: Callable[[R], N]) -> 'Either[L, N]': + '''Applies transform `f` to the right if type is `Right`, + but does not touch the value if type is `Left`.''' + pass + + @abstractmethod + def get_left_or(self, default: L) -> L: + '''Unwraps the underlying value if `Left`, otherwise returns the provided default value.''' + pass + + @abstractmethod + def get_left(self) -> L: + '''Unwraps the underlying value if `Left`, panics if `Right`.''' + pass + + @abstractmethod + def get_right_or(self, default: R) -> R: + '''Unwraps the underlying value if `Right`, otherwise returns the provided default value.''' + pass + + @abstractmethod + def get_right(self) -> R: + '''Unwraps the underlying error if `Right`, panics if `Left`.''' + pass + + +class Left(Either[L, R]): + def __init__(self, value: L) -> None: + super().__init__() + self._left_value = value + + def is_left(self) -> bool: + return True + + def is_right(self) -> bool: + return not self.is_left() + + def map_left(self, f: Callable[[L], N]) -> 'Either[N, R]': + return Left(f(self._left_value)) + + def map_right(self, f: Callable[[R], N]) -> 'Either[L, N]': + return Left(self._left_value) + + def get_left_or(self, default: L) -> L: + return self._left_value + + def get_left(self) -> L: + return self._left_value + + def get_right_or(self, default: R) -> R: + return default + + def get_right(self) -> R: + raise RuntimeError(f'Object {self.__repr__()} is nor of Error type') + + +class Right(Either[L, R]): + def __init__(self, value: R) -> None: + super().__init__() + self._right_value = value + + def is_left(self) -> bool: + return False + + def is_right(self) -> bool: + return not self.is_left() + + def map_left(self, f: Callable[[L], N]) -> 'Either[N, R]': + return Right(self._right_value) + + def map_right(self, f: Callable[[R], N]) -> 'Either[L, N]': + return Right(f(self._right_value)) + + def get_left_or(self, default: L) -> L: + return default + + def get_left(self) -> L: + raise RuntimeError(f'Attempt to unwrap object {self.__repr__()}, which is of Error type') + + def get_right_or(self, default: R) -> R: + return self._right_value + + def get_right(self) -> R: + return self._right_value diff --git a/equipment/turtle_device.py b/equipment/turtle_device.py index 0e4689d..d88943f 100644 --- a/equipment/turtle_device.py +++ b/equipment/turtle_device.py @@ -1,4 +1,4 @@ -from turtle import Turtle +from turtle import Turtle, bye from typing import Any, Collection, Optional from types import FunctionType, MethodType from inspect import getfullargspec @@ -29,7 +29,7 @@ class TurtleDevice(SynchronyDevice): if self.state == DeviceLifecycleState.CLOSE: return self._logical_traits = {k: None for k in self._logical_traits.keys()} - self._device = None + bye() super().close() def action_descriptors(self) -> Collection[ActionDescriptor]: diff --git a/noblocking_turtle_shell.py b/noblocking_turtle_shell.py index 394fe37..c1f5b1b 100644 --- a/noblocking_turtle_shell.py +++ b/noblocking_turtle_shell.py @@ -2,16 +2,49 @@ import cmd import threading from queue import Queue - +from functools import reduce +from typing import Collection, Any, Optional +from controls.device import ActionDescriptor, DeviceError +from controls.either import Either, Left, Right from equipment.turtle_device import TurtleDevice -class TurtleDeviceThread(threading.Thread): - # TODO(Homework 4) +def select(fr: Collection[ActionDescriptor], where: tuple[str, Any]) -> Optional[ActionDescriptor]: + results = list(filter(lambda r: r.__dict__[where[0]] == where[1], fr)) + return None if len(results) == 0 else results[0] + + +class TurtleDeviceThread(): def __init__(self): super().__init__() - self.device = TurtleDevice() - self.queue = Queue() + self._device = TurtleDevice() + self._device.open() + self._queue = Queue() + + def send(self, task: list[str]) -> None: + self._queue.put(task) + + def _parse_args(self, args: list[str]) -> Either[tuple, str]: + action_name, action_args = args[0], args[1:] + try: + descr = select(self._device.action_descriptors(), ('name', action_name)) + except DeviceError as e: + return Right(f'Device error: {e}') + if descr is None: + return Right(f'Unknown action: "{action_name}"') + return Left(reduce(lambda acc, a: acc + (int(a),), action_args, (action_name,))) + + def run(self): # ! NoReturn + while (True): + task = self._queue.get(block=True, timeout=None) + if task[0] == 'stop': + self._device.close() + return + args = self._parse_args(task) + if args.is_right(): + print(f'*** {args.get_right()}') + else: + self._device.execute(*args.get_left()) class NoBlockingTurtleShell(cmd.Cmd): @@ -20,16 +53,23 @@ class NoBlockingTurtleShell(cmd.Cmd): file = None def __init__(self, turtle_thread: TurtleDeviceThread): - pass # TODO(Homework 4) + super(NoBlockingTurtleShell, self).__init__() + self._turtle_thread = turtle_thread - def do_execute(self, arg): - pass # TODO(Homework 4) + def do_execute(self, arg: str): + 'Execute a turtle command: EXECUTE COMMAND ARG1 ARG2 ...' + self._turtle_thread.send(arg.split(' ')) def do_exit(self, arg): - pass # TODO(Homework 4) + 'close the turtle window, and exit: EXIT' + print('Waiting for the turtle to finish jobs...') + self._turtle_thread.send(['stop']) + return True if __name__ == '__main__': turtle_thread = TurtleDeviceThread() - # TODO(Homework 4: Correct start thread) - NoBlockingTurtleShell(turtle_thread).cmdloop() \ No newline at end of file + shell = NoBlockingTurtleShell(turtle_thread) + cmd_thread = threading.Thread(target=shell.cmdloop) + cmd_thread.start() + turtle_thread.run() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index fe31085..499b267 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "controls" -version = "0.0.2" +version = "0.0.3" license = {file = "LICENSE"} description = "Example SCADA System" authors = [{name = "teldufalsari et al (cockroaches)"}]