from turtle import Turtle, bye from typing import Any, Collection, Optional from types import FunctionType, MethodType from inspect import getfullargspec from controls.device import SynchronyDevice from controls.device import ActionDescriptor, TraitDescriptor from controls.device import DeviceLifecycleState, DeviceError class TurtleDevice(SynchronyDevice): def __init__(self) -> None: super().__init__() self._device: Optional[Turtle] = None self._logical_traits: dict[str, Optional[Any]] = {} self._actions: list[str] = [] def open(self) -> None: """Opens a new device and initializes all logical traits with `None`""" if self.state == DeviceLifecycleState.OPEN: return super().open() self._device = Turtle() self._logical_traits = {k.name: None for k in self.trait_descriptors()} self._actions = [a.name for a in self.action_descriptors()] def close(self) -> None: """Closes the device and invalidates all logial traits""" if self.state == DeviceLifecycleState.CLOSE: return self._logical_traits = {k: None for k in self._logical_traits.keys()} bye() super().close() def action_descriptors(self) -> Collection[ActionDescriptor]: if self.state != DeviceLifecycleState.OPEN: raise DeviceError("Device is not opened") def get_args(name: str) -> dict[str, type]: spec = getfullargspec(getattr(self._device, name)) return {arg: spec.annotations.get(arg, object) for arg in spec.args} actions = (filter(lambda name: not name.startswith("_"), filter(lambda name: isinstance(getattr(self._device, name), MethodType), dir(self._device)))) # lisp :3 return [ActionDescriptor(name=action,arguments=get_args(action)) for action in actions] def trait_descriptors(self) -> Collection[TraitDescriptor]: if self.state != DeviceLifecycleState.OPEN: raise DeviceError("Device is not opened") traits = dict(filter(lambda i: not i[0].startswith('_'), filter(lambda i: not isinstance(i[1], FunctionType), self._device.__dict__.items()))) return [TraitDescriptor(name=trait) for trait in traits.keys()] def read(self, trait_name: str) -> Any: if self.state != DeviceLifecycleState.OPEN: raise DeviceError("Device is not opened") self._logical_traits[trait_name] = self._device.__dict__[trait_name] return self._device.__dict__[trait_name] def write(self, trait_name: str, value: Any) -> bool: if self.state != DeviceLifecycleState.OPEN: raise DeviceError("Device is not opened") if trait_name not in self._logical_traits.keys(): raise KeyError(f"\"{trait_name}\" is not a trait of {self.__class__}") return super().write(trait_name, value) def execute(self, action_name: str, *args, **kwargs): if self.state != DeviceLifecycleState.OPEN: raise DeviceError("Device is not opened") if action_name not in self._actions: raise KeyError(f"\"{action_name}\" is not an action of {self.__class__}") return getattr(self._device, action_name)(*args, **kwargs) def invalidate(self, trait_name: str): if trait_name in self._logical_traits: self._logical_traits[trait_name] = None else: raise KeyError(f"\"{trait_name}\" is not a trait of {self.__class__}") def __getitem__(self, trait_name: str) -> Optional[Any]: if trait_name in self._logical_traits: return self._logical_traits[trait_name] else: raise KeyError(f"\"{trait_name}\" is not a trait of {self.__class__}")