Compare commits

..

2 Commits

Author SHA1 Message Date
1a6e43dfdd modified: noblocking_turtle_shell.py
modified:   turtle_shell.py
2023-12-20 19:18:28 +03:00
718e9700ca modified: controls/device.py
modified:   equipment/turtle_device.py
	modified:   noblocking_turtle_shell.py
	modified:   turtle_shell.py
2023-12-20 19:17:55 +03:00
5 changed files with 129 additions and 113 deletions

View File

@ -1,15 +1,16 @@
from dataclasses import dataclass
from typing import Optional, Collection, Any
from abc import ABC, abstractmethod
from enum import Enum, auto
from abc import ABC, abstractmethod, abstractproperty
from enum import Enum
class DeviceLifecycleState(Enum):
INIT = "init"
OPEN = "open"
CLOSE = "close"
INIT = 0
OPEN = 1
CLOSE = 2
class DevaceError(Exception):
class DeviceError(Exception):
pass
@ -37,7 +38,6 @@ class ActionDescriptor:
class Device(ABC):
# TODO(Homework #3)
_state = DeviceLifecycleState.INIT
@property
@ -47,13 +47,11 @@ class Device(ABC):
def close(self):
self._state = DeviceLifecycleState.CLOSE
@property
@abstractmethod
@abstractproperty
def trait_descriptors(self) -> Collection[TraitDescriptor]:
pass
@property
@abstractmethod
@abstractproperty
def action_descriptors(self) -> Collection[ActionDescriptor]:
pass
@ -71,7 +69,7 @@ class SynchronyDevice(Device):
@abstractmethod
def execute(self, action_name: str, *args, **kwargs):
"""Execute action `action_name`, using `args` and `kwargs` as action argument."""
pass
raise DeviceError
@abstractmethod
def read(self, trait_name: str) -> Any:
@ -87,6 +85,3 @@ class SynchronyDevice(Device):
def invalidate(self, trait_name: str):
"""Invalidate logical state of trait `trait_name`"""
pass
def close(self):
return super().close()

View File

@ -1,73 +1,80 @@
import time
from turtle import Turtle
from equipment.device import SynchronyDevice
from equipment.device import Device
import inspect
from typing import Optional, Collection, Any
from controls.device import *
import inspect
class TurtleDevice(SynchronyDevice):
def open(self):
super().open()
self.turtle = Turtle()
self.descriptors = dict(traits=dict(), actions=dict())
super().open()
def close (self):
def close(self):
self.turtle.reset()
super().close()
del self.turtle
@property
def trait_descriptors(self):
l = list()
for i in inspect.getmembers(TurtleDevice):
if not (i[0].startswith("__") or i[0].startswith("_")) and not inspect.isfunction(i[1]):
l.append(i[0])
return l
@property
def action_descriptors(self):
l = list()
#print(inspect.getmembers(TurtleDevice))
#for i in dir(TurtleDevice):
# # print(type(getattr(TurtleDevice(), i))=='method')
# # if not i.startswith("__") and type(getattr(TurtleDevice(), i)) == types.MethodType:
# if not i.startswith("__") and inspect.isroutine(inspect.getattr_static(TurtleDevice, i)):
# sig = inspect.signature(getattr(TurtleDevice, i))
# l.append(i + " " + str(sig))
#return l
for i in inspect.getmembers(TurtleDevice):
# print(type(getattr(TurtleDevice, i)))
# print(type(getattr(TurtleDevice(), i))=='method')
if not (i[0].startswith("__") or i[0].startswith("_")) and inspect.isfunction(i[1]):
sig = inspect.signature(getattr(TurtleDevice, i[0]))
l.append(i[0]+str(sig))
return l
def trait_descriptors(self) -> Collection[TraitDescriptor]:
return self.get_descriptors()["traits"]
def execute(self, action_name: str, *args, **kwargs):
"""Execute action `action_name`, using `args` and `kwargs` as action argument."""
if self.state == 1:
f = getattr(self.turtle, action_name)
f(int(*args), **kwargs)
else:
print("It's not opened")
self.open()
#time.sleep(30)
f = getattr(self.turtle, action_name)
f(int(*args), **kwargs)
def read(self, trait_name: str):
"""Read physical state of trait `trait_name` from device."""
return getattr(self.turtle, trait_name)
def write(self, trait_name: str, value: Any) -> bool:
"""Pass `value` to trait `trait_name` of device."""
setattr(self.turtle, trait_name, value)
def invalidate(self, trait_name: str):
"""Invalidate logical state of trait `trait_name`"""
if self.read(trait_name) == self.__getitem__(trait_name):
print("Everything's okey")
else:
print(f"Something went wrong with {trait_name}.")
def action_descriptors(self) -> Collection[ActionDescriptor]:
return self.get_descriptors()["actions"]
def __getitem__(self, trait_name: str) -> Optional[Any]:
"""Return logical state of trait `trait_name`."""
return getattr(self, trait_name)
return self.descriptors[trait_name]
def read_descriptors(self, arg):
self.descriptors = self.get_descriptors()
return self.descriptors
def read(self, trait_name: str) -> Any:
"""Read physical state of trait `trait_name` from device."""
self.read_descriptors()
return self.descriptors[trait_name]
def write(self, trait_name: str, value: Any) -> bool:
"""Turtle traits are not writable"""
# trait_desc = self.trait_descriptors.get(trait_name)
# if trait_desc.writable:
# return getattr(self.turtle, trait_name)(value)
super().write()
def execute(self, action_name: str, *args, **kwargs):
"""Execute action `action_name`, using `args` and `kwargs` as action argument."""
action = self.get_descriptors()["actions"].get(action_name)
if action:
getattr(self.turtle, action_name)(*args, **kwargs)
return
else:
raise DeviceError("No such action: `{}`".format(action_name))
def invalidate(self, trait_name: str):
"""Invalidate logical state of trait `trait_name`"""
if self.trait_descriptors.get(trait_name):
self.trait_descriptors[trait_name] = None
raise DeviceError()
def get_descriptors(self):
descriptors = dict(actions=dict(), traits=dict())
for m_name, member in inspect.getmembers(Turtle):
if m_name.startswith("_"):
continue
if m_name.lower() != m_name:
continue
doc = inspect.getdoc(member)
if doc is None:
continue
if not inspect.isfunction(member):
descriptors["traits"][m_name] = TraitDescriptor(
m_name, doc, readable=True, writable=False
)
else:
sig = inspect.signature(member)
params_dict = dict(sig.parameters)
descriptors["actions"][m_name] = ActionDescriptor(
m_name, arguments=params_dict, info=doc
)
return descriptors

View File

@ -1,62 +1,76 @@
import cmd
import threading
from queue import Empty, Queue
from queue import Queue
from turtle import forward
from equipment.turtle_device import TurtleDevice
import argparse
import time
class TurtleDeviceThread(threading.Thread):
# TODO(Homework 4)
def __init__(self):
super().__init__()
self.device = TurtleDevice()
self.queue = Queue()
self.event = Event()
self.queue = Queue[tuple]()
self.device.open()
self.device.execute("speed", 1)
def run(self):
while True:
try:
item = self.queue.get()
except self.queue.Empty:
continue
else:
if (item == 'exit'):
break
self.device.execute(item[0], item[1:])
action, args, kwargs = self.queue.get()
if action == "exit":
self.queue.task_done()
def __message_processing__(self):
while True:
new_message = self.queue.get()
#print(type(new_message))
#print(new_message)
#print("Name and args of action are {}".format(new_message))
self.device.execute(*new_message)
#time.sleep(30)
self.queue.task_done()
if self.event.is_set():
break
def __message_thread__(self):
thread = Thread(target = self.__message_processing__, daemon=True)
thread.start()
self.device.execute(action, *args, **kwargs)
self.queue.task_done()
def add_task(self, action, *args, **kwargs):
self.queue.put((action, args, kwargs))
class NoBlockingTurtleShell(cmd.Cmd):
intro = 'Welcome to the turtle shell. Type help or ? to list commands.\n'
prompt = '(turtle) '
intro = "Welcome to the turtle shell. Type help or ? to list commands.\n"
prompt = "(turtle) "
file = None
def __init__(self, turtle_thread: TurtleDeviceThread):
super(NoBlockingTurtleShell, self).__init__()
turtle_thread.__message_thread__()
super().__init__()
self.turtle_thread = turtle_thread
@property
def turtle_device(self):
return self.turtle_thread.device
def do_execute(self, arg):
turtle_thread.queue.put(parse(arg))
def do_exit(self, arg):
turtle_thread.event.set()
command_and_args = arg.split()
if len(command_and_args) == 1:
self.turtle_thread.add_task(command)
return
command = command_and_args[0]
args = tuple(int(c) if c.isdecimal() else c for c in command_and_args[1:])
self.turtle_thread.add_task(command, *args)
if __name__ == '__main__':
def do_forward(self, arg):
'Move the turtle forward by the specified distance: FORWARD 10'
self.do_execute("forward 100")
def do_right(self, arg):
'Turn turtle right by given number of degrees: RIGHT 20'
self.do_execute("right 100")
def do_left(self, arg):
'Turn turtle left by given number of degrees: LEFT 90'
self.do_execute("left 100")
def do_bye(self, arg):
'Stop recording, close the turtle window, and exit: BYE'
print('Thank you for using Turtle')
self.turtle_thread.add_task("exit")
return True
if __name__ == "__main__":
turtle_thread = TurtleDeviceThread()
turtle_thread.start()
NoBlockingTurtleShell(turtle_thread).cmdloop()
turtle_thread.daemon = True
thread_shell = threading.Thread(target=NoBlockingTurtleShell(turtle_thread).cmdloop)
thread_shell.start()
turtle_thread.run()
thread_shell.join()