modified: equipment/turtle_device.py

modified:   noblocking_turtle_shell.py
This commit is contained in:
vviora 2023-12-19 15:41:52 +03:00
parent 7742a74729
commit 8a621ebbe3
2 changed files with 82 additions and 48 deletions

View File

@ -1,56 +1,73 @@
import time
from turtle import Turtle
from typing import Any, Collection, Optional
from controls.device import ActionDescriptor, SynchronyDevice, TraitDescriptor
from equipment.device import SynchronyDevice
from equipment.device import Device
import inspect
from typing import Optional, Collection, Any
class TurtleDevice(SynchronyDevice):
def open(self, name, **kwargs):
self.turtle = Turtle(kwargs)
def open(self):
super().open()
def close(self):
del self.turtle
self.turtle = Turtle()
def close (self):
super().close()
del self.turtle
@property
def trait_descriptors(self):
traits = [attribute for attribute in vars(self.turtle).items()
if (attribute[0].startswith('_') == False)]
self.traits = [TraitDescriptor(*tr) for tr in traits]
return self.traits
l = list()
for i in inspect.getmembers(TurtleDevice):
if not (i[0].startswith("__") or i[0].startswith("_")) and not inspect.isfunction(i[1]):
l.append(i[0])
return l
@property
def action_descriptors(self):
actions = [(attribute, getattr(t, attribute)) for attribute in dir(t)
if (attribute.startswith('_') == False)]
self.actions = [TraitDescriptor(*ac) for ac in actions]
return self.actions
def read(self, trait_name):
collection = self.traits
return ([collection[i].info for i in range(len(collection))
if collection[i].name == trait_name])
def write(self, trait_name: str, value: Any) -> bool:
collection = self.traits
for i in range(len(collection)):
if collection[i].name == trait_name:
collection[i].info=value
l = list()
#print(inspect.getmembers(TurtleDevice))
#for i in dir(TurtleDevice):
# # print(type(getattr(TurtleDevice(), i))=='method')
# # if not i.startswith("__") and type(getattr(TurtleDevice(), i)) == types.MethodType:
# if not i.startswith("__") and inspect.isroutine(inspect.getattr_static(TurtleDevice, i)):
# sig = inspect.signature(getattr(TurtleDevice, i))
# l.append(i + " " + str(sig))
#return l
for i in inspect.getmembers(TurtleDevice):
# print(type(getattr(TurtleDevice, i)))
# print(type(getattr(TurtleDevice(), i))=='method')
if not (i[0].startswith("__") or i[0].startswith("_")) and inspect.isfunction(i[1]):
sig = inspect.signature(getattr(TurtleDevice, i[0]))
l.append(i[0]+str(sig))
return l
def execute(self, action_name: str, *args, **kwargs):
self.turtle.action_name(args, *kwargs)
"""Execute action `action_name`, using `args` and `kwargs` as action argument."""
if self.state == 1:
f = getattr(self.turtle, action_name)
f(int(*args), **kwargs)
else:
print("It's not opened")
self.open()
#time.sleep(30)
f = getattr(self.turtle, action_name)
f(int(*args), **kwargs)
def read(self, trait_name: str):
"""Read physical state of trait `trait_name` from device."""
return getattr(self.turtle, trait_name)
def write(self, trait_name: str, value: Any) -> bool:
"""Pass `value` to trait `trait_name` of device."""
setattr(self.turtle, trait_name, value)
def invalidate(self, trait_name: str):
return super().invalidate(trait_name)
def __getitem__(self, trait_name: str) -> Any | None:
collection = self.traits
return ([collection[i] for i in range(len(collection))
if collection[i].name == trait_name])
"""Invalidate logical state of trait `trait_name`"""
if self.read(trait_name) == self.__getitem__(trait_name):
print("Everything's okey")
else:
print(f"Something went wrong with {trait_name}.")
def __getitem__(self, trait_name: str) -> Optional[Any]:
"""Return logical state of trait `trait_name`."""
return getattr(self, trait_name)

View File

@ -2,6 +2,8 @@ import cmd
import threading
from queue import Empty, Queue
from equipment.turtle_device import TurtleDevice
import argparse
import time
class TurtleDeviceThread(threading.Thread):
@ -10,6 +12,7 @@ class TurtleDeviceThread(threading.Thread):
super().__init__()
self.device = TurtleDevice()
self.queue = Queue()
self.event = Event()
def run(self):
while True:
@ -21,7 +24,21 @@ class TurtleDeviceThread(threading.Thread):
if (item == 'exit'):
break
self.device.execute(item[0], item[1:])
self.queue.task_done()
self.queue.task_done()
def __message_processing__(self):
while True:
new_message = self.queue.get()
#print(type(new_message))
#print(new_message)
#print("Name and args of action are {}".format(new_message))
self.device.execute(*new_message)
#time.sleep(30)
self.queue.task_done()
if self.event.is_set():
break
def __message_thread__(self):
thread = Thread(target = self.__message_processing__, daemon=True)
thread.start()
class NoBlockingTurtleShell(cmd.Cmd):
intro = 'Welcome to the turtle shell. Type help or ? to list commands.\n'
@ -30,13 +47,13 @@ class NoBlockingTurtleShell(cmd.Cmd):
def __init__(self, turtle_thread: TurtleDeviceThread):
super(NoBlockingTurtleShell, self).__init__()
self.turtle_thread = TurtleDeviceThread()
turtle_thread.__message_thread__()
def do_execute(self, arg):
self.turtle_thread.queue.put(arg)
turtle_thread.queue.put(parse(arg))
def do_exit(self, arg):
self.turtle_thread.queue.put('exit')
turtle_thread.event.set()
if __name__ == '__main__':