from turtle import Turtle from typing import Any, Collection, Optional from types import FunctionType from inspect import getfullargspec from controls.device import SynchronyDevice from controls.device import ActionDescriptor, TraitDescriptor from controls.device import DeviceLifecycleState, DeviceError class TurtleDevice(SynchronyDevice): def __init__(self) -> None: super().__init__() self._device: Optional[Turtle] = None self._logical_traits: dict[str, Optional[Any]] = {} self._actions: list[str] = [] def open(self) -> None: """Opens a new device and initializes all logical traits with `None`""" if self.state == DeviceLifecycleState.OPEN: return super().open() self._device = Turtle() self._logical_traits = {k.name: None for k in self.trait_descriptors()} self._actions = [a.name for a in self.action_descriptors()] def close(self) -> None: """Closes the device and invalidates all logial traits""" if self.state == DeviceLifecycleState.CLOSE: return self._logical_traits = {k: None for k in self._logical_traits.keys()} self._device = None super().close() def action_descriptors(self) -> Collection[ActionDescriptor]: def get_args(f: FunctionType) -> dict[str, type]: spec = getfullargspec(f) return {arg: spec.annotations.get(arg, object) for arg in spec.args} actions = (dict(filter(lambda i: not i[0].startswith('_'), filter(lambda i: isinstance(i[1], FunctionType), self._device.__dict__.items())))) # lisp :3 return [ActionDescriptor(name=action[0],arguments=get_args(action[1])) for action in actions.items()] def trait_descriptors(self) -> Collection[TraitDescriptor]: traits = dict(filter(lambda i: not i[0].startswith('_'), filter(lambda i: not isinstance(i[1], FunctionType), self._device.__dict__.items()))) return [TraitDescriptor(name=trait) for trait in traits.keys()] def read(self, trait_name: str) -> Any: if self.state != DeviceLifecycleState.OPEN: raise DeviceError("Device is not opened") self._logical_traits[trait_name] = self._device.__dict__[trait_name] return self._device.__dict__[trait_name] def write(self, trait_name: str, value: Any) -> bool: if self.state != DeviceLifecycleState.OPEN: raise DeviceError("Device is not opened") if trait_name not in self._logical_traits.keys(): raise KeyError(f"\"{trait_name}\" is not a trait of {self.__class__}") return super().write(trait_name, value) def execute(self, action_name: str, *args, **kwargs): if self.state != DeviceLifecycleState.OPEN: raise DeviceError("Device is not opened") if action_name not in self._actions: raise KeyError(f"\"{action_name}\" is not an action of {self.__class__}") return self._device.__dict__[action_name](args, kwargs) def invalidate(self, trait_name: str): if trait_name in self._logical_traits: self._logical_traits[trait_name] = None else: raise KeyError(f"\"{trait_name}\" is not a trait of {self.__class__}") def __getitem__(self, trait_name: str) -> Optional[Any]: if trait_name in self._logical_traits: return self._logical_traits[trait_name] else: raise KeyError(f"\"{trait_name}\" is not a trait of {self.__class__}")