vviora
718e9700ca
modified: equipment/turtle_device.py modified: noblocking_turtle_shell.py modified: turtle_shell.py
81 lines
2.8 KiB
Python
81 lines
2.8 KiB
Python
from turtle import Turtle
|
|
from typing import Optional, Collection, Any
|
|
from controls.device import *
|
|
import inspect
|
|
|
|
|
|
class TurtleDevice(SynchronyDevice):
|
|
def open(self):
|
|
self.turtle = Turtle()
|
|
self.descriptors = dict(traits=dict(), actions=dict())
|
|
super().open()
|
|
|
|
def close(self):
|
|
self.turtle.reset()
|
|
super().close()
|
|
|
|
def trait_descriptors(self) -> Collection[TraitDescriptor]:
|
|
return self.get_descriptors()["traits"]
|
|
|
|
def action_descriptors(self) -> Collection[ActionDescriptor]:
|
|
return self.get_descriptors()["actions"]
|
|
|
|
def __getitem__(self, trait_name: str) -> Optional[Any]:
|
|
"""Return logical state of trait `trait_name`."""
|
|
return self.descriptors[trait_name]
|
|
|
|
def read_descriptors(self, arg):
|
|
self.descriptors = self.get_descriptors()
|
|
return self.descriptors
|
|
|
|
def read(self, trait_name: str) -> Any:
|
|
"""Read physical state of trait `trait_name` from device."""
|
|
self.read_descriptors()
|
|
return self.descriptors[trait_name]
|
|
|
|
def write(self, trait_name: str, value: Any) -> bool:
|
|
"""Turtle traits are not writable"""
|
|
# trait_desc = self.trait_descriptors.get(trait_name)
|
|
# if trait_desc.writable:
|
|
# return getattr(self.turtle, trait_name)(value)
|
|
super().write()
|
|
|
|
def execute(self, action_name: str, *args, **kwargs):
|
|
"""Execute action `action_name`, using `args` and `kwargs` as action argument."""
|
|
action = self.get_descriptors()["actions"].get(action_name)
|
|
if action:
|
|
getattr(self.turtle, action_name)(*args, **kwargs)
|
|
return
|
|
else:
|
|
raise DeviceError("No such action: `{}`".format(action_name))
|
|
|
|
def invalidate(self, trait_name: str):
|
|
"""Invalidate logical state of trait `trait_name`"""
|
|
if self.trait_descriptors.get(trait_name):
|
|
self.trait_descriptors[trait_name] = None
|
|
raise DeviceError()
|
|
|
|
def get_descriptors(self):
|
|
descriptors = dict(actions=dict(), traits=dict())
|
|
|
|
for m_name, member in inspect.getmembers(Turtle):
|
|
if m_name.startswith("_"):
|
|
continue
|
|
if m_name.lower() != m_name:
|
|
continue
|
|
doc = inspect.getdoc(member)
|
|
if doc is None:
|
|
continue
|
|
|
|
if not inspect.isfunction(member):
|
|
descriptors["traits"][m_name] = TraitDescriptor(
|
|
m_name, doc, readable=True, writable=False
|
|
)
|
|
else:
|
|
sig = inspect.signature(member)
|
|
params_dict = dict(sig.parameters)
|
|
descriptors["actions"][m_name] = ActionDescriptor(
|
|
m_name, arguments=params_dict, info=doc
|
|
)
|
|
return descriptors
|